You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:20:34 UTC

svn commit: r1181562 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/client/

Author: nspiegelberg
Date: Tue Oct 11 02:20:33 2011
New Revision: 1181562

URL: http://svn.apache.org/viewvc?rev=1181562&view=rev
Log:
Adding limit / row / CF clause to Get (and Scan)

Summary:
Use setMaxResultsPerColumnFamily to limit the number of values returned by Get
per CF.

Test Plan:
Added unit tests.

Reviewed By: kannan
Reviewers: kannan, gqchen, jfan
Commenters: gqchen, jfan
CC: madhuvaidya, kannan, gqchen, liyintang, jfan, hbase@lists, anirudht
Differential Revision: 250900

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Get.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Get.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Get.java?rev=1181562&r1=1181561&r2=1181562&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Get.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Get.java Tue Oct 11 02:20:33 2011
@@ -58,14 +58,18 @@ import java.util.TreeSet;
  * To limit the number of versions of each column to be returned, execute
  * {@link #setMaxVersions(int) setMaxVersions}.
  * <p>
+ * To limit the number of values of each column family to be returned, execute
+ * {@link #setMaxResultsPerColumnFamily(int) setMaxResultsPerColumnFamily}.
+ * <p>
  * To add a filter, execute {@link #setFilter(Filter) setFilter}.
  */
 public class Get implements Writable {
-  private static final byte GET_VERSION = (byte)1;
+  private static final byte GET_VERSION = (byte)2;
 
   private byte [] row = null;
   private long lockId = -1L;
   private int maxVersions = 1;
+  private int storeLimit = -1;
   private Filter filter = null;
   private TimeRange tr = new TimeRange();
   private Map<byte [], NavigableSet<byte []>> familyMap =
@@ -183,6 +187,16 @@ public class Get implements Writable {
   }
 
   /**
+   * Set the maximum number of values to return per row per Column Family
+   * @param limit the maximum number of values returned / row / CF
+   * @return this for invocation chaining
+   */
+  public Get setMaxResultsPerColumnFamily(int limit) {
+    this.storeLimit = limit;
+    return this;
+  }
+
+  /**
    * Apply the specified server-side filter when performing the Get.
    * Only {@link Filter#filterKeyValue(KeyValue)} is called AFTER all tests
    * for ttl, column match, deletes and max versions have been run.
@@ -236,6 +250,15 @@ public class Get implements Writable {
   }
 
   /**
+   * Method for retrieving the get's maximum number of values
+   * to return per Column Family
+   * @return the maximum number of values to fetch per CF
+   */
+  public int getMaxResultsPerColumnFamily() {
+    return this.storeLimit;
+  }
+
+  /**
    * Method for retrieving the get's TimeRange
    * @return timeRange
    */
@@ -285,6 +308,8 @@ public class Get implements Writable {
     sb.append(Bytes.toString(this.row));
     sb.append(", maxVersions=");
     sb.append("").append(this.maxVersions);
+    sb.append(", storeLimit=");
+    sb.append("").append(this.storeLimit);
     sb.append(", timeRange=");
     sb.append("[").append(this.tr.getMin()).append(",");
     sb.append(this.tr.getMax()).append(")");
@@ -335,6 +360,9 @@ public class Get implements Writable {
     this.row = Bytes.readByteArray(in);
     this.lockId = in.readLong();
     this.maxVersions = in.readInt();
+    if (version > 1) {
+      this.storeLimit = in.readInt();
+    }
     boolean hasFilter = in.readBoolean();
     if (hasFilter) {
       this.filter = (Filter)createForName(Bytes.toString(Bytes.readByteArray(in)));
@@ -363,10 +391,17 @@ public class Get implements Writable {
 
   public void write(final DataOutput out)
   throws IOException {
-    out.writeByte(GET_VERSION);
+    if (this.storeLimit != -1) {
+      out.writeByte(GET_VERSION);
+    } else {
+      out.writeByte((byte)1);
+    }
     Bytes.writeByteArray(out, this.row);
     out.writeLong(this.lockId);
     out.writeInt(this.maxVersions);
+    if (this.storeLimit != -1) {
+      out.writeInt(this.storeLimit);
+    }
     if(this.filter == null) {
       out.writeBoolean(false);
     } else {

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1181562&r1=1181561&r2=1181562&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Scan.java Tue Oct 11 02:20:33 2011
@@ -71,17 +71,21 @@ import java.util.TreeSet;
  * To limit the maximum number of values returned for each call to next(),
  * execute {@link #setBatch(int) setBatch}.
  * <p>
+ * To limit the maximum number of values returned per row per Column Family,
+ * execute {@link #setMaxResultsPerColumnFamily(int) setMaxResultsPerColumnFamily}.
+ * <p>
  * To add a filter, execute {@link #setFilter(org.apache.hadoop.hbase.filter.Filter) setFilter}.
  * <p>
  * Expert: To explicitly disable server-side block caching for this scan,
  * execute {@link #setCacheBlocks(boolean)}.
  */
 public class Scan implements Writable {
-  private static final byte SCAN_VERSION = (byte)1;
+  private static final byte SCAN_VERSION = (byte)2;
   private byte [] startRow = HConstants.EMPTY_START_ROW;
   private byte [] stopRow  = HConstants.EMPTY_END_ROW;
   private int maxVersions = 1;
   private int batch = -1;
+  private int storeLimit = -1;
   private int caching = -1;
   private boolean cacheBlocks = true;
   private Filter filter = null;
@@ -131,6 +135,7 @@ public class Scan implements Writable {
     stopRow  = scan.getStopRow();
     maxVersions = scan.getMaxVersions();
     batch = scan.getBatch();
+    storeLimit = scan.getMaxResultsPerColumnFamily();
     caching = scan.getCaching();
     cacheBlocks = scan.getCacheBlocks();
     filter = scan.getFilter(); // clone?
@@ -159,6 +164,7 @@ public class Scan implements Writable {
     this.stopRow = get.getRow();
     this.filter = get.getFilter();
     this.maxVersions = get.getMaxVersions();
+    this.storeLimit = get.getMaxResultsPerColumnFamily();
     this.tr = get.getTimeRange();
     this.familyMap = get.getFamilyMap();
   }
@@ -290,6 +296,14 @@ public class Scan implements Writable {
   }
 
   /**
+   * Set the maximum number of values to return per row per Column Family
+   * @param limit the maximum number of values returned / row / CF
+   */
+  public void setMaxResultsPerColumnFamily(int limit) {
+    this.storeLimit = limit;
+  }
+
+  /**
    * Set the number of rows for caching that will be passed to scanners.
    * If not set, the default setting from {@link HTable#getScannerCaching()} will apply.
    * Higher caching values will enable faster scanners but will use more memory.
@@ -383,6 +397,13 @@ public class Scan implements Writable {
   }
 
   /**
+   * @return maximum number of values to return per row per CF
+   */
+  public int getMaxResultsPerColumnFamily() {
+    return this.storeLimit;
+  }
+
+  /**
    * @return caching the number of rows fetched when calling next on a scanner
    */
   public int getCaching() {
@@ -447,6 +468,8 @@ public class Scan implements Writable {
     sb.append(this.maxVersions);
     sb.append(", batch=");
     sb.append(this.batch);
+    sb.append(", storeLimit=");
+    sb.append(this.storeLimit);
     sb.append(", caching=");
     sb.append(this.caching);
     sb.append(", cacheBlocks=");
@@ -512,6 +535,9 @@ public class Scan implements Writable {
     this.stopRow = Bytes.readByteArray(in);
     this.maxVersions = in.readInt();
     this.batch = in.readInt();
+    if (version > 1) {
+      this.storeLimit = in.readInt();
+    }
     this.caching = in.readInt();
     this.cacheBlocks = in.readBoolean();
     if(in.readBoolean()) {
@@ -537,11 +563,18 @@ public class Scan implements Writable {
 
   public void write(final DataOutput out)
   throws IOException {
-    out.writeByte(SCAN_VERSION);
+    if (this.storeLimit != -1) {
+      out.writeByte(SCAN_VERSION);
+    } else {
+      out.writeByte((byte)1);
+    }
     Bytes.writeByteArray(out, this.startRow);
     Bytes.writeByteArray(out, this.stopRow);
     out.writeInt(this.maxVersions);
     out.writeInt(this.batch);
+    if (this.storeLimit != -1) {
+      out.writeInt(this.storeLimit);
+    }
     out.writeInt(this.caching);
     out.writeBoolean(this.cacheBlocks);
     if(this.filter == null) {

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1181562&r1=1181561&r2=1181562&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Tue Oct 11 02:20:33 2011
@@ -40,6 +40,8 @@ class StoreScanner implements KeyValueSc
   private ScanQueryMatcher matcher;
   private KeyValueHeap heap;
   private boolean cacheBlocks;
+  private int countPerRow = 0;
+  private int storeLimit;
 
   // Used to indicate that the scanner has closed (see HBASE-1107)
   // Doesnt need to be volatile because it's always accessed via synchronized methods
@@ -76,6 +78,9 @@ class StoreScanner implements KeyValueSc
       scanner.seek(matcher.getStartKey());
     }
 
+    // set storeLimit
+    this.storeLimit = scan.getMaxResultsPerColumnFamily();
+
     // Combine all seeked scanners with a heap
     heap = new KeyValueHeap(scanners, store.comparator);
 
@@ -236,6 +241,7 @@ class StoreScanner implements KeyValueSc
     }
 
     if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) {
+	this.countPerRow = 0;
 	matcher.setRow(peeked.getRow());
     }
     KeyValue kv;
@@ -247,6 +253,17 @@ class StoreScanner implements KeyValueSc
       //DebugPrint.println("SS peek kv = " + kv + " with qcode = " + qcode);
       switch(qcode) {
         case INCLUDE:
+          this.countPerRow++;
+          if (storeLimit > 0 && this.countPerRow > storeLimit) {
+            // do what SEEK_NEXT_ROW does.
+            if (!matcher.moreRowsMayExistAfter(kv)) {
+              outResult.addAll(results);
+              return false;
+            }
+            reseek(matcher.getKeyForNextRow(kv));
+            break LOOP;
+          }
+
           results.add(copyKv);
           this.heap.next();
           if (limit > 0 && (results.size() == limit)) {
@@ -371,6 +388,7 @@ class StoreScanner implements KeyValueSc
 	kv = lastTopKey;
     }
     if ((matcher.row == null) || !kv.matchingRow(matcher.row)) {
+	this.countPerRow = 0;
 	matcher.reset();
 	matcher.setRow(kv.getRow());
     }

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java?rev=1181562&r1=1181561&r2=1181562&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java Tue Oct 11 02:20:33 2011
@@ -141,6 +141,175 @@ public class TestFromClientSide2 {
 
   }
 
+  /**
+   * Test from client side for get with maxResultPerCF set
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testGetMaxResults() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testGetMaxResults");
+    byte [][] FAMILIES = makeNAscii(FAMILY, 3);
+    byte [][] QUALIFIERS = makeNAscii(QUALIFIER, 20);
+
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES);
+
+    Get get;
+    Put put;
+    Result result;
+    boolean toLog = true;
+    List<KeyValue> kvListExp;
+
+    kvListExp = new ArrayList<KeyValue>();
+    // Insert one CF for row[0]
+    put = new Put(ROW);
+    for (int i=0; i < 10; i++) {
+      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
+      put.add(kv);
+      kvListExp.add(kv);
+    }
+    ht.put(put);
+
+    get = new Get(ROW);
+    result = ht.get(get);
+    verifyResult(result, kvListExp, toLog, "Testing without setting maxResults");
+
+    get = new Get(ROW);
+    get.setMaxResultsPerColumnFamily(2);
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
+    verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults");
+
+    // Filters: ColumnRangeFiltero
+    get = new Get(ROW);
+    get.setMaxResultsPerColumnFamily(5);
+    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true));
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
+    verifyResult(result, kvListExp, toLog, "Testing single CF with CRF");
+
+    // Insert two more CF for row[0]
+    // 20 columns for CF2, 10 columns for CF1
+    put = new Put(ROW);
+    for (int i=0; i < QUALIFIERS.length; i++) {
+      KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE);
+      put.add(kv);
+    }
+    ht.put(put);
+
+    put = new Put(ROW);
+    for (int i=0; i < 10; i++) {
+      KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE);
+      put.add(kv);
+    }
+    ht.put(put);
+
+    get = new Get(ROW);
+    get.setMaxResultsPerColumnFamily(12);
+    get.addFamily(FAMILIES[1]);
+    get.addFamily(FAMILIES[2]);
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    //Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19
+    for (int i=0; i < 10; i++) {
+      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
+    }
+    for (int i=0; i < 2; i++) {
+        kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
+      }
+    for (int i=10; i < 20; i++) {
+      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
+    }
+    verifyResult(result, kvListExp, toLog, "Testing multiple CFs");
+
+    // Filters: ColumnRangeFilter and ColumnPrefixFilter
+    get = new Get(ROW);
+    get.setMaxResultsPerColumnFamily(3);
+    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true));
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    for (int i=2; i < 5; i++) {
+      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
+    }
+    for (int i=2; i < 5; i++) {
+      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
+    }
+    for (int i=2; i < 5; i++) {
+      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
+    }
+    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF");
+
+    get = new Get(ROW);
+    get.setMaxResultsPerColumnFamily(7);
+    get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1]));
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE));
+    for (int i=10; i < 16; i++) {
+      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
+    }
+    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF");
+
+  }
+
+  /**
+   * Test from client side for scan with maxResultPerCF set
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testScanMaxResults() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testScanLimit");
+    byte [][] ROWS= makeNAscii(ROW, 2);
+    byte [][] FAMILIES = makeNAscii(FAMILY, 3);
+    byte [][] QUALIFIERS = makeNAscii(QUALIFIER, 10);
+
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES);
+
+    Put put;
+    Scan scan;
+    Result result;
+    boolean toLog = true;
+    List<KeyValue> kvListExp, kvListScan;
+
+    kvListExp = new ArrayList<KeyValue>();
+
+    for (int r=0; r < ROWS.length; r++) {
+      put = new Put(ROWS[r]);
+      for (int c=0; c < FAMILIES.length; c++) {
+        for (int q=0; q < QUALIFIERS.length; q++) {
+          KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE);
+          put.add(kv);
+          if (q < 4) {
+            kvListExp.add(kv);
+          }
+        }
+      }
+      ht.put(put);
+    }
+
+    scan = new Scan();
+    scan.setMaxResultsPerColumnFamily(4);
+    ResultScanner scanner = ht.getScanner(scan);
+    kvListScan = new ArrayList<KeyValue>();
+    while ((result = scanner.next()) != null) {
+      for (KeyValue kv : result.list()) {
+        kvListScan.add(kv);
+      }
+    }
+    result = new Result(kvListScan);
+    verifyResult(result, kvListExp, toLog, "Testing scan with maxResults");
+
+  }
+
   private void verifyResult(Result result, List<KeyValue> kvList, boolean toLog, String msg) {
     int i =0;
 
@@ -152,8 +321,8 @@ public class TestFromClientSide2 {
     for (KeyValue kv : result.sorted()) {
       KeyValue kvExp = kvList.get(i++);
       if (toLog) {
-	LOG.info("get kv is: " + kv.toString());
-	LOG.info("exp kv is: " + kvExp.toString());
+        LOG.info("get kv is: " + kv.toString());
+        LOG.info("exp kv is: " + kvExp.toString());
       }
       assertTrue("Not equal", kvExp.equals(kv));
     }