You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/04/14 04:45:37 UTC

svn commit: r1326043 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/client/

Author: mbautin
Date: Sat Apr 14 02:45:36 2012
New Revision: 1326043

URL: http://svn.apache.org/viewvc?rev=1326043&view=rev
Log:
[jira] [HBASE-5104] [89-fb] Intra-row pagination by adding row offset per CF.

Summary: Added row offset per CF to Get and Scan.

Test Plan: Added unit test.

Reviewers: kannan, mbautin

Reviewed By: mbautin

Differential Revision: https://reviews.facebook.net/D2385

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java?rev=1326043&r1=1326042&r2=1326043&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java Sat Apr 14 02:45:36 2012
@@ -67,12 +67,13 @@ import java.util.TreeSet;
  * To add a filter, execute {@link #setFilter(Filter) setFilter}.
  */
 public class Get extends Operation implements Writable, Row, Comparable<Row> {
-  private static final byte GET_VERSION = (byte)2;
+  private static final byte GET_VERSION = (byte)3;
 
   private byte [] row = null;
   private long lockId = -1L;
   private int maxVersions = 1;
   private int storeLimit = -1;
+  private int storeOffset = 0;
   private Filter filter = null;
   private TimeRange tr = new TimeRange();
   private Map<byte [], NavigableSet<byte []>> familyMap =
@@ -200,6 +201,16 @@ public class Get extends Operation imple
   }
 
   /**
+   * Set offset for the row per Column Family.
+   * @param offset is the number of kvs that will be skipped.
+   * @return this for invocation chaining
+   */
+  public Get setRowOffsetPerColumnFamily(int offset) {
+    this.storeOffset = offset;
+    return this;
+  }
+
+  /**
    * Apply the specified server-side filter when performing the Get.
    * Only {@link Filter#filterKeyValue(KeyValue)} is called AFTER all tests
    * for ttl, column match, deletes and max versions have been run.
@@ -262,6 +273,15 @@ public class Get extends Operation imple
   }
 
   /**
+   * Method for retrieving the get's offset per row per column
+   * family (#kvs to be skipped)
+   * @return the row offset
+   */
+  public int getRowOffsetPerColumnFamily() {
+    return this.storeOffset;
+  }
+
+  /**
    * Method for retrieving the get's TimeRange
    * @return timeRange
    */
@@ -389,6 +409,9 @@ public class Get extends Operation imple
     if (version > 1) {
       this.storeLimit = in.readInt();
     }
+    if (version > 2) {
+      this.storeOffset = in.readInt();
+    }
     boolean hasFilter = in.readBoolean();
     if (hasFilter) {
       this.filter = (Filter)createForName(Bytes.toString(Bytes.readByteArray(in)));
@@ -417,17 +440,22 @@ public class Get extends Operation imple
 
   public void write(final DataOutput out)
   throws IOException {
-    if (this.storeLimit != -1) {
-      out.writeByte(GET_VERSION);
-    } else {
-      out.writeByte((byte)1);
+    byte version = (byte)1;
+    if (this.storeOffset != 0) {
+      version = GET_VERSION;
+    } else if (this.storeLimit != -1) {
+      version = (byte)2;
     }
+    out.writeByte(version);
     Bytes.writeByteArray(out, this.row);
     out.writeLong(this.lockId);
     out.writeInt(this.maxVersions);
-    if (this.storeLimit != -1) {
+    if (version > 1) {
       out.writeInt(this.storeLimit);
     }
+    if (version > 2) {
+      out.writeInt(this.storeOffset);
+    }
     if(this.filter == null) {
       out.writeBoolean(false);
     } else {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1326043&r1=1326042&r2=1326043&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java Sat Apr 14 02:45:36 2012
@@ -83,13 +83,14 @@ import java.util.TreeSet;
  * execute {@link #setCacheBlocks(boolean)}.
  */
 public class Scan extends Operation implements Writable {
-  private static final byte SCAN_VERSION = (byte)2;
+  private static final byte SCAN_VERSION = (byte)3;
 
   private byte [] startRow = HConstants.EMPTY_START_ROW;
   private byte [] stopRow  = HConstants.EMPTY_END_ROW;
   private int maxVersions = 1;
   private int batch = -1;
   private int storeLimit = -1;
+  private int storeOffset = 0;
   private int caching = -1;
   private boolean cacheBlocks = true;
   private Filter filter = null;
@@ -140,6 +141,7 @@ public class Scan extends Operation impl
     maxVersions = scan.getMaxVersions();
     batch = scan.getBatch();
     storeLimit = scan.getMaxResultsPerColumnFamily();
+    storeOffset = scan.getRowOffsetPerColumnFamily();
     caching = scan.getCaching();
     cacheBlocks = scan.getCacheBlocks();
     filter = scan.getFilter(); // clone?
@@ -169,6 +171,7 @@ public class Scan extends Operation impl
     this.filter = get.getFilter();
     this.maxVersions = get.getMaxVersions();
     this.storeLimit = get.getMaxResultsPerColumnFamily();
+    this.storeOffset = get.getRowOffsetPerColumnFamily();
     this.tr = get.getTimeRange();
     this.familyMap = get.getFamilyMap();
   }
@@ -308,6 +311,14 @@ public class Scan extends Operation impl
   }
 
   /**
+   * Set offset for the row per Column Family.
+   * @param offset is the number of kvs that will be skipped.
+   */
+  public void setRowOffsetPerColumnFamily(int offset) {
+    this.storeOffset = offset;
+  }
+
+  /**
    * Set the number of rows for caching that will be passed to scanners.
    * If not set, the default setting from {@link HTable#getScannerCaching()} will apply.
    * Higher caching values will enable faster scanners but will use more memory.
@@ -408,6 +419,15 @@ public class Scan extends Operation impl
   }
 
   /**
+   * Method for retrieving the scan's offset per row per column
+   * family (#kvs to be skipped)
+   * @return row offset
+   */
+  public int getRowOffsetPerColumnFamily() {
+    return this.storeOffset;
+  }
+
+  /**
    * @return caching the number of rows fetched when calling next on a scanner
    */
   public int getCaching() {
@@ -562,6 +582,9 @@ public class Scan extends Operation impl
     if (version > 1) {
       this.storeLimit = in.readInt();
     }
+    if (version > 2) {
+      this.storeOffset = in.readInt();
+    }
     this.caching = in.readInt();
     this.cacheBlocks = in.readBoolean();
     if(in.readBoolean()) {
@@ -587,18 +610,23 @@ public class Scan extends Operation impl
 
   public void write(final DataOutput out)
   throws IOException {
-    if (this.storeLimit != -1) {
-      out.writeByte(SCAN_VERSION);
-    } else {
-      out.writeByte((byte)1);
+    byte version = (byte)1;
+    if (this.storeOffset != 0) {
+      version = SCAN_VERSION;
+    } else if (this.storeLimit != -1) {
+      version = 2;
     }
+    out.writeByte(version);
     Bytes.writeByteArray(out, this.startRow);
     Bytes.writeByteArray(out, this.stopRow);
     out.writeInt(this.maxVersions);
     out.writeInt(this.batch);
-    if (this.storeLimit != -1) {
+    if (version > 1) {
       out.writeInt(this.storeLimit);
     }
+    if (version > 2) {
+      out.writeInt(this.storeOffset);
+    }
     out.writeInt(this.caching);
     out.writeBoolean(this.cacheBlocks);
     if(this.filter == null) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1326043&r1=1326042&r2=1326043&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sat Apr 14 02:45:36 2012
@@ -45,7 +45,8 @@ class StoreScanner extends NonLazyKeyVal
   private KeyValueHeap heap;
   private boolean cacheBlocks;
   private int countPerRow = 0;
-  private int storeLimit;
+  private int storeLimit = -1;
+  private int storeOffset = 0;
   private String metricNamePrefix;
   // Used to indicate that the scanner has closed (see HBASE-1107)
   // Doesnt need to be volatile because it's always accessed via synchronized methods
@@ -125,6 +126,9 @@ class StoreScanner extends NonLazyKeyVal
     // set storeLimit
     this.storeLimit = scan.getMaxResultsPerColumnFamily();
 
+    // set rowOffset
+    this.storeOffset = scan.getRowOffsetPerColumnFamily();
+
     // Combine all seeked scanners with a heap
     heap = new KeyValueHeap(scanners, store.comparator);
 
@@ -332,7 +336,8 @@ class StoreScanner extends NonLazyKeyVal
         case INCLUDE_AND_SEEK_NEXT_ROW:
         case INCLUDE_AND_SEEK_NEXT_COL:
           this.countPerRow++;
-          if (storeLimit > 0 && this.countPerRow > storeLimit) {
+          if (storeLimit > -1 &&
+              this.countPerRow > (storeLimit + storeOffset)) {
             // do what SEEK_NEXT_ROW does.
             if (!matcher.moreRowsMayExistAfter(kv)) {
               outResult.addAll(results);
@@ -342,13 +347,16 @@ class StoreScanner extends NonLazyKeyVal
             break LOOP;
           }
 
-          if (metric != null) {
-            HRegion.incrNumericMetric(this.metricNamePrefix + metric,
+          // add to results only if we have skipped #rowOffset kvs
+          // also update metric accordingly
+          if (this.countPerRow > storeOffset) {
+            if (metric != null) {
+              HRegion.incrNumericMetric(this.metricNamePrefix + metric,
                 copyKv.getLength());
+            }
+            results.add(copyKv);
           }
 
-          results.add(copyKv);
-
           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
             if (!matcher.moreRowsMayExistAfter(kv)) {
               outResult.addAll(results);

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java?rev=1326043&r1=1326042&r2=1326043&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java Sat Apr 14 02:45:36 2012
@@ -182,10 +182,11 @@ public class TestFromClientSide2 {
     kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
     verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults");
 
-    // Filters: ColumnRangeFiltero
+    // Filters: ColumnRangeFilter
     get = new Get(ROW);
     get.setMaxResultsPerColumnFamily(5);
-    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true));
+    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5],
+                                        true));
     result = ht.get(get);
     kvListExp = new ArrayList<KeyValue>();
     kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE));
@@ -310,7 +311,104 @@ public class TestFromClientSide2 {
 
   }
 
-  private void verifyResult(Result result, List<KeyValue> kvList, boolean toLog, String msg) {
+  /**
+   * Test from client side for get with rowOffset
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testGetRowOffset() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testGetRowOffset");
+    byte [][] FAMILIES = makeNAscii(FAMILY, 3);
+    byte [][] QUALIFIERS = makeNAscii(QUALIFIER, 20);
+
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES);
+
+    Get get;
+    Put put;
+    Result result;
+    boolean toLog = true;
+    List<KeyValue> kvListExp;
+
+    // Insert one CF for row
+    kvListExp = new ArrayList<KeyValue>();
+    put = new Put(ROW);
+    for (int i=0; i < 10; i++) {
+      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
+      put.add(kv);
+      // skipping first two kvs
+      if (i < 2) continue;
+      kvListExp.add(kv);
+    }
+    ht.put(put);
+
+    //setting offset to 2
+    get = new Get(ROW);
+    get.setRowOffsetPerColumnFamily(2);
+    result = ht.get(get);
+    verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset");
+
+    //setting offset to 20
+    get = new Get(ROW);
+    get.setRowOffsetPerColumnFamily(20);
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    verifyResult(result, kvListExp, toLog, "Testing offset > #kvs");
+
+    //offset + maxResultPerCF
+    get = new Get(ROW);
+    get.setRowOffsetPerColumnFamily(4);
+    get.setMaxResultsPerColumnFamily(5);
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    for (int i=4; i < 9; i++) {
+      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
+    }
+    verifyResult(result, kvListExp, toLog,
+      "Testing offset + setMaxResultsPerCF");
+
+    // Filters: ColumnRangeFilter
+    get = new Get(ROW);
+    get.setRowOffsetPerColumnFamily(1);
+    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5],
+                                        true));
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
+    verifyResult(result, kvListExp, toLog, "Testing offset with CRF");
+
+    // Insert into two more CFs for row
+    // 10 columns for CF2, 10 columns for CF1
+    for(int j=2; j > 0; j--) {
+      put = new Put(ROW);
+      for (int i=0; i < 10; i++) {
+        KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE);
+        put.add(kv);
+      }
+      ht.put(put);
+    }
+
+    get = new Get(ROW);
+    get.setRowOffsetPerColumnFamily(4);
+    get.setMaxResultsPerColumnFamily(2);
+    get.addFamily(FAMILIES[1]);
+    get.addFamily(FAMILIES[2]);
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    //Exp: CF1:q4, q5, CF2: q4, q5
+    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE));
+    verifyResult(result, kvListExp, toLog,
+       "Testing offset + multiple CFs + maxResults");
+
+  }
+
+  private void verifyResult(Result result, List<KeyValue> kvList, boolean toLog,
+    String msg) {
     int i =0;
 
     LOG.info(msg);
@@ -318,6 +416,8 @@ public class TestFromClientSide2 {
     LOG.info("True cnt is: " + result.size());
     assertEquals(kvList.size(), result.size());
 
+    if (kvList.size() == 0) return;
+
     for (KeyValue kv : result.sorted()) {
       KeyValue kvExp = kvList.get(i++);
       if (toLog) {