You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/01/04 07:39:31 UTC
[29/50] [abbrv] hbase git commit: HBASE-17376
ClientAsyncPrefetchScanner may fail due to too many rows (ChiaPing Tsai)
HBASE-17376 ClientAsyncPrefetchScanner may fail due to too many rows (ChiaPing Tsai)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e18e9a22
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e18e9a22
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e18e9a22
Branch: refs/heads/hbase-12439
Commit: e18e9a22daf32306f966641ea02a72fca96dee32
Parents: 463ffa7
Author: tedyu <yu...@gmail.com>
Authored: Mon Dec 26 15:55:22 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Mon Dec 26 15:55:22 2016 -0800
----------------------------------------------------------------------
.../client/ClientAsyncPrefetchScanner.java | 2 +-
.../client/TestScannersFromClientSide.java | 93 ++++++++++++++------
2 files changed, 69 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e18e9a22/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
index ec33dd2..6b70a88 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
@@ -76,7 +76,7 @@ public class ClientAsyncPrefetchScanner extends ClientScanner {
protected void initCache() {
// concurrent cache
cacheCapacity = calcCacheCapacity();
- cache = new LinkedBlockingQueue<Result>(cacheCapacity);
+ cache = new LinkedBlockingQueue<Result>();
cacheSizeInBytes = new AtomicLong(0);
exceptionsQueue = new ConcurrentLinkedQueue<Exception>();
prefetchRunnable = new PrefetchRunnable();
http://git-wip-us.apache.org/repos/asf/hbase/blob/e18e9a22/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index 19b06b5..8862109 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -646,45 +649,72 @@ public class TestScannersFromClientSide {
verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region");
}
- /**
- * Test from client side for async scan
- *
- * @throws Exception
- */
@Test
- public void testAsyncScanner() throws Exception {
- TableName TABLE = TableName.valueOf("testAsyncScan");
- byte [][] ROWS = HTestConst.makeNAscii(ROW, 2);
- byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
- byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
+ public void testAsyncScannerWithSmallData() throws Exception {
+ testAsyncScanner(TableName.valueOf("testAsyncScannerWithSmallData"),
+ 2,
+ 3,
+ 10);
+ }
- Table ht = TEST_UTIL.createTable(TABLE, FAMILIES);
+ @Test
+ public void testAsyncScannerWithManyRows() throws Exception {
+ testAsyncScanner(TableName.valueOf("testAsyncScannerWithManyRows"),
+ 30000,
+ 1,
+ 1);
+ }
- Put put;
- Scan scan;
- Result result;
- boolean toLog = true;
- List<Cell> kvListExp, kvListScan;
+ private void testAsyncScanner(TableName table, int rowNumber, int familyNumber,
+ int qualifierNumber) throws Exception {
+ assert rowNumber > 0;
+ assert familyNumber > 0;
+ assert qualifierNumber > 0;
+ byte[] row = Bytes.toBytes("r");
+ byte[] family = Bytes.toBytes("f");
+ byte[] qualifier = Bytes.toBytes("q");
+ byte[][] rows = makeNAsciiWithZeroPrefix(row, rowNumber);
+ byte[][] families = makeNAsciiWithZeroPrefix(family, familyNumber);
+ byte[][] qualifiers = makeNAsciiWithZeroPrefix(qualifier, qualifierNumber);
- kvListExp = new ArrayList<Cell>();
+ Table ht = TEST_UTIL.createTable(table, families);
- for (int r=0; r < ROWS.length; r++) {
- put = new Put(ROWS[r]);
- for (int c=0; c < FAMILIES.length; c++) {
- for (int q=0; q < QUALIFIERS.length; q++) {
- KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE);
+ boolean toLog = true;
+ List<Cell> kvListExp = new ArrayList<>();
+
+ List<Put> puts = new ArrayList<>();
+ for (byte[] r : rows) {
+ Put put = new Put(r);
+ for (byte[] f : families) {
+ for (byte[] q : qualifiers) {
+ KeyValue kv = new KeyValue(r, f, q, 1, VALUE);
put.add(kv);
kvListExp.add(kv);
}
}
- ht.put(put);
+ puts.add(put);
+ if (puts.size() > 1000) {
+ ht.put(puts);
+ puts.clear();
+ }
+ }
+ if (!puts.isEmpty()) {
+ ht.put(puts);
+ puts.clear();
}
- scan = new Scan();
+ Scan scan = new Scan();
scan.setAsyncPrefetch(true);
ResultScanner scanner = ht.getScanner(scan);
- kvListScan = new ArrayList<Cell>();
+ List<Cell> kvListScan = new ArrayList<>();
+ Result result;
+ boolean first = true;
while ((result = scanner.next()) != null) {
+ // waiting for cache. see HBASE-17376
+ if (first) {
+ TimeUnit.SECONDS.sleep(1);
+ first = false;
+ }
for (Cell kv : result.listCells()) {
kvListScan.add(kv);
}
@@ -692,7 +722,20 @@ public class TestScannersFromClientSide {
result = Result.create(kvListScan);
assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner);
verifyResult(result, kvListExp, toLog, "Testing async scan");
+ TEST_UTIL.deleteTable(table);
+ }
+ private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) {
+ int maxLength = Integer.toString(n).length();
+ byte [][] ret = new byte[n][];
+ for (int i = 0; i < n; i++) {
+ int length = Integer.toString(i).length();
+ StringBuilder buf = new StringBuilder(Integer.toString(i));
+ IntStream.range(0, maxLength - length).forEach(v -> buf.insert(0, "0"));
+ byte[] tail = Bytes.toBytes(buf.toString());
+ ret[i] = Bytes.add(base, tail);
+ }
+ return ret;
}
static void verifyResult(Result result, List<Cell> expKvList, boolean toLog,