You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/04/14 04:45:37 UTC
svn commit: r1326043 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/client/
Author: mbautin
Date: Sat Apr 14 02:45:36 2012
New Revision: 1326043
URL: http://svn.apache.org/viewvc?rev=1326043&view=rev
Log:
[jira] [HBASE-5104] [89-fb] Intra-row pagination by adding row offset per CF.
Summary: Added row offset per CF to Get and Scan.
Test Plan: Added unit test.
Reviewers: kannan, mbautin
Reviewed By: mbautin
Differential Revision: https://reviews.facebook.net/D2385
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java?rev=1326043&r1=1326042&r2=1326043&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java Sat Apr 14 02:45:36 2012
@@ -67,12 +67,13 @@ import java.util.TreeSet;
* To add a filter, execute {@link #setFilter(Filter) setFilter}.
*/
public class Get extends Operation implements Writable, Row, Comparable<Row> {
- private static final byte GET_VERSION = (byte)2;
+ private static final byte GET_VERSION = (byte)3;
private byte [] row = null;
private long lockId = -1L;
private int maxVersions = 1;
private int storeLimit = -1;
+ private int storeOffset = 0;
private Filter filter = null;
private TimeRange tr = new TimeRange();
private Map<byte [], NavigableSet<byte []>> familyMap =
@@ -200,6 +201,16 @@ public class Get extends Operation imple
}
/**
+ * Set offset for the row per Column Family.
+ * @param offset is the number of kvs that will be skipped.
+ * @return this for invocation chaining
+ */
+ public Get setRowOffsetPerColumnFamily(int offset) {
+ this.storeOffset = offset;
+ return this;
+ }
+
+ /**
* Apply the specified server-side filter when performing the Get.
* Only {@link Filter#filterKeyValue(KeyValue)} is called AFTER all tests
* for ttl, column match, deletes and max versions have been run.
@@ -262,6 +273,15 @@ public class Get extends Operation imple
}
/**
+ * Method for retrieving the get's offset per row per column
+ * family (#kvs to be skipped)
+ * @return the row offset
+ */
+ public int getRowOffsetPerColumnFamily() {
+ return this.storeOffset;
+ }
+
+ /**
* Method for retrieving the get's TimeRange
* @return timeRange
*/
@@ -389,6 +409,9 @@ public class Get extends Operation imple
if (version > 1) {
this.storeLimit = in.readInt();
}
+ if (version > 2) {
+ this.storeOffset = in.readInt();
+ }
boolean hasFilter = in.readBoolean();
if (hasFilter) {
this.filter = (Filter)createForName(Bytes.toString(Bytes.readByteArray(in)));
@@ -417,17 +440,22 @@ public class Get extends Operation imple
public void write(final DataOutput out)
throws IOException {
- if (this.storeLimit != -1) {
- out.writeByte(GET_VERSION);
- } else {
- out.writeByte((byte)1);
+ byte version = (byte)1;
+ if (this.storeOffset != 0) {
+ version = GET_VERSION;
+ } else if (this.storeLimit != -1) {
+ version = (byte)2;
}
+ out.writeByte(version);
Bytes.writeByteArray(out, this.row);
out.writeLong(this.lockId);
out.writeInt(this.maxVersions);
- if (this.storeLimit != -1) {
+ if (version > 1) {
out.writeInt(this.storeLimit);
}
+ if (version > 2) {
+ out.writeInt(this.storeOffset);
+ }
if(this.filter == null) {
out.writeBoolean(false);
} else {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1326043&r1=1326042&r2=1326043&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java Sat Apr 14 02:45:36 2012
@@ -83,13 +83,14 @@ import java.util.TreeSet;
* execute {@link #setCacheBlocks(boolean)}.
*/
public class Scan extends Operation implements Writable {
- private static final byte SCAN_VERSION = (byte)2;
+ private static final byte SCAN_VERSION = (byte)3;
private byte [] startRow = HConstants.EMPTY_START_ROW;
private byte [] stopRow = HConstants.EMPTY_END_ROW;
private int maxVersions = 1;
private int batch = -1;
private int storeLimit = -1;
+ private int storeOffset = 0;
private int caching = -1;
private boolean cacheBlocks = true;
private Filter filter = null;
@@ -140,6 +141,7 @@ public class Scan extends Operation impl
maxVersions = scan.getMaxVersions();
batch = scan.getBatch();
storeLimit = scan.getMaxResultsPerColumnFamily();
+ storeOffset = scan.getRowOffsetPerColumnFamily();
caching = scan.getCaching();
cacheBlocks = scan.getCacheBlocks();
filter = scan.getFilter(); // clone?
@@ -169,6 +171,7 @@ public class Scan extends Operation impl
this.filter = get.getFilter();
this.maxVersions = get.getMaxVersions();
this.storeLimit = get.getMaxResultsPerColumnFamily();
+ this.storeOffset = get.getRowOffsetPerColumnFamily();
this.tr = get.getTimeRange();
this.familyMap = get.getFamilyMap();
}
@@ -308,6 +311,14 @@ public class Scan extends Operation impl
}
/**
+ * Set offset for the row per Column Family.
+ * @param offset is the number of kvs that will be skipped.
+ */
+ public void setRowOffsetPerColumnFamily(int offset) {
+ this.storeOffset = offset;
+ }
+
+ /**
* Set the number of rows for caching that will be passed to scanners.
* If not set, the default setting from {@link HTable#getScannerCaching()} will apply.
* Higher caching values will enable faster scanners but will use more memory.
@@ -408,6 +419,15 @@ public class Scan extends Operation impl
}
/**
+ * Method for retrieving the scan's offset per row per column
+ * family (#kvs to be skipped)
+ * @return row offset
+ */
+ public int getRowOffsetPerColumnFamily() {
+ return this.storeOffset;
+ }
+
+ /**
* @return caching the number of rows fetched when calling next on a scanner
*/
public int getCaching() {
@@ -562,6 +582,9 @@ public class Scan extends Operation impl
if (version > 1) {
this.storeLimit = in.readInt();
}
+ if (version > 2) {
+ this.storeOffset = in.readInt();
+ }
this.caching = in.readInt();
this.cacheBlocks = in.readBoolean();
if(in.readBoolean()) {
@@ -587,18 +610,23 @@ public class Scan extends Operation impl
public void write(final DataOutput out)
throws IOException {
- if (this.storeLimit != -1) {
- out.writeByte(SCAN_VERSION);
- } else {
- out.writeByte((byte)1);
+ byte version = (byte)1;
+ if (this.storeOffset != 0) {
+ version = SCAN_VERSION;
+ } else if (this.storeLimit != -1) {
+ version = 2;
}
+ out.writeByte(version);
Bytes.writeByteArray(out, this.startRow);
Bytes.writeByteArray(out, this.stopRow);
out.writeInt(this.maxVersions);
out.writeInt(this.batch);
- if (this.storeLimit != -1) {
+ if (version > 1) {
out.writeInt(this.storeLimit);
}
+ if (version > 2) {
+ out.writeInt(this.storeOffset);
+ }
out.writeInt(this.caching);
out.writeBoolean(this.cacheBlocks);
if(this.filter == null) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1326043&r1=1326042&r2=1326043&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sat Apr 14 02:45:36 2012
@@ -45,7 +45,8 @@ class StoreScanner extends NonLazyKeyVal
private KeyValueHeap heap;
private boolean cacheBlocks;
private int countPerRow = 0;
- private int storeLimit;
+ private int storeLimit = -1;
+ private int storeOffset = 0;
private String metricNamePrefix;
// Used to indicate that the scanner has closed (see HBASE-1107)
// Doesnt need to be volatile because it's always accessed via synchronized methods
@@ -125,6 +126,9 @@ class StoreScanner extends NonLazyKeyVal
// set storeLimit
this.storeLimit = scan.getMaxResultsPerColumnFamily();
+ // set rowOffset
+ this.storeOffset = scan.getRowOffsetPerColumnFamily();
+
// Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.comparator);
@@ -332,7 +336,8 @@ class StoreScanner extends NonLazyKeyVal
case INCLUDE_AND_SEEK_NEXT_ROW:
case INCLUDE_AND_SEEK_NEXT_COL:
this.countPerRow++;
- if (storeLimit > 0 && this.countPerRow > storeLimit) {
+ if (storeLimit > -1 &&
+ this.countPerRow > (storeLimit + storeOffset)) {
// do what SEEK_NEXT_ROW does.
if (!matcher.moreRowsMayExistAfter(kv)) {
outResult.addAll(results);
@@ -342,13 +347,16 @@ class StoreScanner extends NonLazyKeyVal
break LOOP;
}
- if (metric != null) {
- HRegion.incrNumericMetric(this.metricNamePrefix + metric,
+ // add to results only if we have skipped #rowOffset kvs
+ // also update metric accordingly
+ if (this.countPerRow > storeOffset) {
+ if (metric != null) {
+ HRegion.incrNumericMetric(this.metricNamePrefix + metric,
copyKv.getLength());
+ }
+ results.add(copyKv);
}
- results.add(copyKv);
-
if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
if (!matcher.moreRowsMayExistAfter(kv)) {
outResult.addAll(results);
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java?rev=1326043&r1=1326042&r2=1326043&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java Sat Apr 14 02:45:36 2012
@@ -182,10 +182,11 @@ public class TestFromClientSide2 {
kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults");
- // Filters: ColumnRangeFiltero
+ // Filters: ColumnRangeFilter
get = new Get(ROW);
get.setMaxResultsPerColumnFamily(5);
- get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true));
+ get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5],
+ true));
result = ht.get(get);
kvListExp = new ArrayList<KeyValue>();
kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE));
@@ -310,7 +311,104 @@ public class TestFromClientSide2 {
}
- private void verifyResult(Result result, List<KeyValue> kvList, boolean toLog, String msg) {
+ /**
+ * Test from client side for get with rowOffset
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testGetRowOffset() throws Exception {
+ byte [] TABLE = Bytes.toBytes("testGetRowOffset");
+ byte [][] FAMILIES = makeNAscii(FAMILY, 3);
+ byte [][] QUALIFIERS = makeNAscii(QUALIFIER, 20);
+
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES);
+
+ Get get;
+ Put put;
+ Result result;
+ boolean toLog = true;
+ List<KeyValue> kvListExp;
+
+ // Insert one CF for row
+ kvListExp = new ArrayList<KeyValue>();
+ put = new Put(ROW);
+ for (int i=0; i < 10; i++) {
+ KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
+ put.add(kv);
+ // skipping first two kvs
+ if (i < 2) continue;
+ kvListExp.add(kv);
+ }
+ ht.put(put);
+
+ //setting offset to 2
+ get = new Get(ROW);
+ get.setRowOffsetPerColumnFamily(2);
+ result = ht.get(get);
+ verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset");
+
+ //setting offset to 20
+ get = new Get(ROW);
+ get.setRowOffsetPerColumnFamily(20);
+ result = ht.get(get);
+ kvListExp = new ArrayList<KeyValue>();
+ verifyResult(result, kvListExp, toLog, "Testing offset > #kvs");
+
+ //offset + maxResultPerCF
+ get = new Get(ROW);
+ get.setRowOffsetPerColumnFamily(4);
+ get.setMaxResultsPerColumnFamily(5);
+ result = ht.get(get);
+ kvListExp = new ArrayList<KeyValue>();
+ for (int i=4; i < 9; i++) {
+ kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
+ }
+ verifyResult(result, kvListExp, toLog,
+ "Testing offset + setMaxResultsPerCF");
+
+ // Filters: ColumnRangeFilter
+ get = new Get(ROW);
+ get.setRowOffsetPerColumnFamily(1);
+ get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5],
+ true));
+ result = ht.get(get);
+ kvListExp = new ArrayList<KeyValue>();
+ kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
+ kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
+ kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
+ verifyResult(result, kvListExp, toLog, "Testing offset with CRF");
+
+ // Insert into two more CFs for row
+ // 10 columns for CF2, 10 columns for CF1
+ for(int j=2; j > 0; j--) {
+ put = new Put(ROW);
+ for (int i=0; i < 10; i++) {
+ KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE);
+ put.add(kv);
+ }
+ ht.put(put);
+ }
+
+ get = new Get(ROW);
+ get.setRowOffsetPerColumnFamily(4);
+ get.setMaxResultsPerColumnFamily(2);
+ get.addFamily(FAMILIES[1]);
+ get.addFamily(FAMILIES[2]);
+ result = ht.get(get);
+ kvListExp = new ArrayList<KeyValue>();
+ //Exp: CF1:q4, q5, CF2: q4, q5
+ kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE));
+ kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE));
+ kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE));
+ kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE));
+ verifyResult(result, kvListExp, toLog,
+ "Testing offset + multiple CFs + maxResults");
+
+ }
+
+ private void verifyResult(Result result, List<KeyValue> kvList, boolean toLog,
+ String msg) {
int i =0;
LOG.info(msg);
@@ -318,6 +416,8 @@ public class TestFromClientSide2 {
LOG.info("True cnt is: " + result.size());
assertEquals(kvList.size(), result.size());
+ if (kvList.size() == 0) return;
+
for (KeyValue kv : result.sorted()) {
KeyValue kvExp = kvList.get(i++);
if (toLog) {