You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2010/07/09 23:36:41 UTC

svn commit: r962700 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/filter/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/test/java/org/apache/hadoop/hbase/client/

Author: rawson
Date: Fri Jul  9 21:36:40 2010
New Revision: 962700

URL: http://svn.apache.org/viewvc?rev=962700&view=rev
Log:
HBASE-2793  Add ability to extract a specified list of versions of a column in a single roundtrip


Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/TimestampsFilter.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/Filter.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=962700&r1=962699&r2=962700&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Jul  9 21:36:40 2010
@@ -754,6 +754,8 @@ Release 0.21.0 - Unreleased
                (Nicolas Spiegelberg via JD)
    HBASE-2786  TestHLog.testSplit hangs (Nicolas Spiegelberg via JD)
    HBASE-2790  Purge apache-forrest from TRUNK
+   HBASE-2793  Add ability to extract a specified list of versions of a column 
+   	       in a single roundtrip (Kannan via Ryan)
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/Filter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/Filter.java?rev=962700&r1=962699&r2=962700&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/Filter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/Filter.java Fri Jul  9 21:36:40 2010
@@ -102,6 +102,10 @@ public interface Filter extends Writable
      */
     SKIP,
     /**
+     * Skip this column. Go to the next column in this row.
+     */
+    NEXT_COL,
+    /**
      * Done with columns, skip to next row. Note that filterRow() will
      * still be called.
      */

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/TimestampsFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/TimestampsFilter.java?rev=962700&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/TimestampsFilter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/TimestampsFilter.java Fri Jul  9 21:36:40 2010
@@ -0,0 +1,91 @@
+package org.apache.hadoop.hbase.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * Filter that returns only cells whose timestamp (version) is
+ * in the specified list of timestamps (versions).
+ * <p>
+ * Note: Use of this filter overrides any time range/time stamp
+ * options specified using {@link Get#setTimeRange(long, long)},
+ * {@link Scan#setTimeRange(long, long)}, {@link Get#setTimeStamp(long)},
+ * or {@link Scan#setTimeStamp(long)}.
+ */
+public class TimestampsFilter extends FilterBase {
+
+  TreeSet<Long> timestamps;
+
+  // Used during scans to hint the scan to stop early
+  // once the timestamps fall below the minTimeStamp.
+  long minTimeStamp = Long.MAX_VALUE;
+
+  /**
+   * Used during deserialization. Do not use otherwise.
+   */
+  public TimestampsFilter() {
+    super();
+  }
+
+  /**
+   * Constructor for filter that retains only those
+   * cells whose timestamp (version) is in the specified
+   * list of timestamps.
+   *
+   * @param timestamps
+   */
+  public TimestampsFilter(List<Long> timestamps) {
+    this.timestamps = new TreeSet<Long>(timestamps);
+    init();
+  }
+
+  private void init() {
+    if (this.timestamps.size() > 0) {
+      minTimeStamp = this.timestamps.first();
+    }
+  }
+
+  /**
+   * Gets the minimum timestamp requested by filter.
+   * @return  minimum timestamp requested by filter.
+   */
+  public long getMin() {
+    return minTimeStamp;
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(KeyValue v) {
+    if (this.timestamps.contains(v.getTimestamp())) {
+      return ReturnCode.INCLUDE;
+    } else if (v.getTimestamp() < minTimeStamp) {
+      // The remaining versions of this column are guaranteed
+      // to be lesser than all of the other values.
+      return ReturnCode.NEXT_COL;
+    }
+    return ReturnCode.SKIP;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numTimestamps = in.readInt();
+    this.timestamps = new TreeSet<Long>();
+    for (int idx = 0; idx < numTimestamps; idx++) {
+      this.timestamps.add(in.readLong());
+    }
+    init();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    int numTimestamps = this.timestamps.size();
+    out.writeInt(numTimestamps);
+    for (Long timestamp : this.timestamps) {
+      out.writeLong(timestamp);
+    }
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=962700&r1=962699&r2=962700&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Fri Jul  9 21:36:40 2010
@@ -173,8 +173,10 @@ public class ScanQueryMatcher extends Qu
 
     if (filterResponse == ReturnCode.SKIP)
       return MatchCode.SKIP;
-
+    else if (filterResponse == ReturnCode.NEXT_COL)
+      return MatchCode.SEEK_NEXT_COL;
     // else if (filterResponse == ReturnCode.NEXT_ROW)
+
     stickyNextRow = true;
     return MatchCode.SEEK_NEXT_ROW;
   }

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java?rev=962700&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java Fri Jul  9 21:36:40 2010
@@ -0,0 +1,342 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.TimestampsFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Run tests related to {@link TimestampsFilter} using HBase client APIs.
+ * Sets up the HBase mini cluster once at start. Each creates a table
+ * named for the method and does its stuff against that.
+ */
+public class TestTimestampsFilter {
+  final Log LOG = LogFactory.getLog(getClass());
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    // Nothing to do.
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @After
+  public void tearDown() throws Exception {
+    // Nothing to do.
+  }
+
+  /**
+   * Test from client side for TimestampsFilter.
+   *
+   * The TimestampsFilter provides the ability to request cells (KeyValues)
+   * whose timestamp/version is in the specified list of timestamps/version.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testTimestampsFilter() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testTimestampsFilter");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+    KeyValue kvs[];
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+      for (int colIdx = 0; colIdx < 5; colIdx++) {
+        // insert versions 201..300
+        putNVersions(ht, FAMILY, rowIdx, colIdx, 201, 300);
+        // insert versions 1..100
+        putNVersions(ht, FAMILY, rowIdx, colIdx, 1, 100);
+      }
+    }
+
+    // do some verification before flush
+    verifyInsertedValues(ht, FAMILY);
+
+    flush();
+
+    // do some verification after flush
+    verifyInsertedValues(ht, FAMILY);
+
+    // Insert some more versions after flush. These should be in memstore.
+    // After this we should have data in both memstore & HFiles.
+    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+      for (int colIdx = 0; colIdx < 5; colIdx++) {
+        putNVersions(ht, FAMILY, rowIdx, colIdx, 301, 400);
+        putNVersions(ht, FAMILY, rowIdx, colIdx, 101, 200);
+      }
+    }
+
+    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+      for (int colIdx = 0; colIdx < 5; colIdx++) {
+        kvs = getNVersions(ht, FAMILY, rowIdx, colIdx,
+                           Arrays.asList(505L, 5L, 105L, 305L, 205L));
+        assertEquals(4, kvs.length);
+        checkOneCell(kvs[0], FAMILY, rowIdx, colIdx, 305);
+        checkOneCell(kvs[1], FAMILY, rowIdx, colIdx, 205);
+        checkOneCell(kvs[2], FAMILY, rowIdx, colIdx, 105);
+        checkOneCell(kvs[3], FAMILY, rowIdx, colIdx, 5);
+      }
+    }
+
+    // Request an empty list of versions using the Timestamps filter;
+    // Should return none.
+    kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList<Long>());
+    assertEquals(0, kvs.length);
+
+    //
+    // Test the filter using a Scan operation
+    // Scan rows 0..4. For each row, get all its columns, but only
+    // those versions of the columns with the specified timestamps.
+    Result[] results = scanNVersions(ht, FAMILY, 0, 4,
+                                     Arrays.asList(6L, 106L, 306L));
+    assertEquals("# of rows returned from scan", 5, results.length);
+    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+      kvs = results[rowIdx].raw();
+      // each row should have 5 columns.
+      // And we have requested 3 versions for each.
+      assertEquals("Number of KeyValues in result for row:" + rowIdx,
+                   3*5, kvs.length);
+      for (int colIdx = 0; colIdx < 5; colIdx++) {
+        int offset = colIdx * 3;
+        checkOneCell(kvs[offset + 0], FAMILY, rowIdx, colIdx, 306);
+        checkOneCell(kvs[offset + 1], FAMILY, rowIdx, colIdx, 106);
+        checkOneCell(kvs[offset + 2], FAMILY, rowIdx, colIdx, 6);
+      }
+    }
+  }
+
+  /**
+   * Test TimestampsFilter in the presence of version deletes.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testWithVersionDeletes() throws Exception {
+
+    // first test from memstore (without flushing).
+    testWithVersionDeletes(false);
+
+    // run same test against HFiles (by forcing a flush).
+    testWithVersionDeletes(true);
+  }
+
+  private void testWithVersionDeletes(boolean flushTables) throws IOException {
+    byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
+                                   (flushTables ? "flush" : "noflush")); 
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    // For row:0, col:0: insert versions 1 through 5.
+    putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+    // delete version 4.
+    deleteOneVersion(ht, FAMILY, 0, 0, 4);
+
+    if (flushTables) {
+      flush();
+    }
+
+    // request a bunch of versions including the deleted version. We should
+    // only get back entries for the versions that exist.
+    KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
+    assertEquals(3, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 0, 0, 5);
+    checkOneCell(kvs[1], FAMILY, 0, 0, 3);
+    checkOneCell(kvs[2], FAMILY, 0, 0, 2);
+  }
+
+  private void verifyInsertedValues(HTable ht, byte[] cf) throws IOException {
+    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+      for (int colIdx = 0; colIdx < 5; colIdx++) {
+        // ask for versions that exist.
+        KeyValue[] kvs = getNVersions(ht, cf, rowIdx, colIdx,
+                                      Arrays.asList(5L, 300L, 6L, 80L));
+        assertEquals(4, kvs.length);
+        checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
+        checkOneCell(kvs[1], cf, rowIdx, colIdx, 80);
+        checkOneCell(kvs[2], cf, rowIdx, colIdx, 6);
+        checkOneCell(kvs[3], cf, rowIdx, colIdx, 5);
+
+        // ask for versions that do not exist.
+        kvs = getNVersions(ht, cf, rowIdx, colIdx,
+                           Arrays.asList(101L, 102L));
+        assertEquals(0, kvs.length);
+
+        // ask for some versions that exist and some that do not.
+        kvs = getNVersions(ht, cf, rowIdx, colIdx,
+                           Arrays.asList(1L, 300L, 105L, 70L, 115L));
+        assertEquals(3, kvs.length);
+        checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
+        checkOneCell(kvs[1], cf, rowIdx, colIdx, 70);
+        checkOneCell(kvs[2], cf, rowIdx, colIdx, 1);
+      }
+    }
+  }
+
+  // Flush tables. Since flushing is asynchronous, sleep for a bit.
+  private void flush() throws IOException {
+    TEST_UTIL.flush();
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException i) {
+      // ignore
+    }
+  }
+
+  /**
+   * Assert that the passed in KeyValue has expected contents for the
+   * specified row, column & timestamp.
+   */
+  private void checkOneCell(KeyValue kv, byte[] cf,
+                             int rowIdx, int colIdx, long ts) {
+
+    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
+
+    assertEquals("Row mismatch which checking: " + ctx,
+                 "row:"+ rowIdx, Bytes.toString(kv.getRow()));
+
+    assertEquals("ColumnFamily mismatch while checking: " + ctx,
+                 Bytes.toString(cf), Bytes.toString(kv.getFamily()));
+
+    assertEquals("Column qualifier mismatch while checking: " + ctx,
+                 "column:" + colIdx,
+                  Bytes.toString(kv.getQualifier()));
+
+    assertEquals("Timestamp mismatch while checking: " + ctx,
+                 ts, kv.getTimestamp());
+
+    assertEquals("Value mismatch while checking: " + ctx,
+                 "value-version-" + ts, Bytes.toString(kv.getValue()));
+  }
+
+  /**
+   * Uses the TimestampFilter on a Get to request a specified list of
+   * versions for the row/column specified by rowIdx & colIdx.
+   *
+   */
+  private  KeyValue[] getNVersions(HTable ht, byte[] cf, int rowIdx,
+                                   int colIdx, List<Long> versions)
+    throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Filter filter = new TimestampsFilter(versions);
+    Get get = new Get(row);
+    get.addColumn(cf, column);
+    get.setFilter(filter);
+    get.setMaxVersions();
+    Result result = ht.get(get);
+
+    return result.raw();
+  }
+
+  /**
+   * Uses the TimestampFilter on a Scan to request a specified list of
+   * versions for the rows from startRowIdx to endRowIdx (both inclusive).
+   */
+  private Result[] scanNVersions(HTable ht, byte[] cf, int startRowIdx,
+                                 int endRowIdx, List<Long> versions)
+    throws IOException {
+    byte startRow[] = Bytes.toBytes("row:" + startRowIdx);
+    byte endRow[] = Bytes.toBytes("row:" + endRowIdx + 1); // exclusive
+    Filter filter = new TimestampsFilter(versions);
+    Scan scan = new Scan(startRow, endRow);
+    scan.setFilter(filter);
+    scan.setMaxVersions();
+    ResultScanner scanner = ht.getScanner(scan);
+    return scanner.next(endRowIdx - startRowIdx + 1);
+  }
+
+  /**
+   * Insert in specific row/column versions with timestamps
+   * versionStart..versionEnd.
+   */
+  private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
+                            long versionStart, long versionEnd)
+      throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Put put = new Put(row);
+
+    for (long idx = versionStart; idx <= versionEnd; idx++) {
+      put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
+    }
+
+    ht.put(put);
+  }
+
+  /**
+   * For row/column specified by rowIdx/colIdx, delete the cell
+   * corresponding to the specified version.
+   */
+  private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
+                                int colIdx, long version)
+    throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Delete del = new Delete(row);
+    del.deleteColumn(cf, column, version);
+    ht.delete(del);
+  }
+}
+