You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/01/04 07:39:31 UTC

[29/50] [abbrv] hbase git commit: HBASE-17376 ClientAsyncPrefetchScanner may fail due to too many rows (ChiaPing Tsai)

HBASE-17376 ClientAsyncPrefetchScanner may fail due to too many rows (ChiaPing Tsai)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e18e9a22
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e18e9a22
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e18e9a22

Branch: refs/heads/hbase-12439
Commit: e18e9a22daf32306f966641ea02a72fca96dee32
Parents: 463ffa7
Author: tedyu <yu...@gmail.com>
Authored: Mon Dec 26 15:55:22 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Mon Dec 26 15:55:22 2016 -0800

----------------------------------------------------------------------
 .../client/ClientAsyncPrefetchScanner.java      |  2 +-
 .../client/TestScannersFromClientSide.java      | 93 ++++++++++++++------
 2 files changed, 69 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e18e9a22/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
index ec33dd2..6b70a88 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
@@ -76,7 +76,7 @@ public class ClientAsyncPrefetchScanner extends ClientScanner {
   protected void initCache() {
     // concurrent cache
     cacheCapacity = calcCacheCapacity();
-    cache = new LinkedBlockingQueue<Result>(cacheCapacity);
+    cache = new LinkedBlockingQueue<Result>();
     cacheSizeInBytes = new AtomicLong(0);
     exceptionsQueue = new ConcurrentLinkedQueue<Exception>();
     prefetchRunnable = new PrefetchRunnable();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e18e9a22/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index 19b06b5..8862109 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hbase.master.RegionState.State;
 import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -646,45 +649,72 @@ public class TestScannersFromClientSide {
     verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region");
   }
 
-  /**
-   * Test from client side for async scan
-   *
-   * @throws Exception
-   */
   @Test
-  public void testAsyncScanner() throws Exception {
-    TableName TABLE = TableName.valueOf("testAsyncScan");
-    byte [][] ROWS = HTestConst.makeNAscii(ROW, 2);
-    byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
-    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
+  public void testAsyncScannerWithSmallData() throws Exception {
+    testAsyncScanner(TableName.valueOf("testAsyncScannerWithSmallData"),
+      2,
+      3,
+      10);
+  }
 
-    Table ht = TEST_UTIL.createTable(TABLE, FAMILIES);
+  @Test
+  public void testAsyncScannerWithManyRows() throws Exception {
+    testAsyncScanner(TableName.valueOf("testAsyncScannerWithManyRows"),
+      30000,
+      1,
+      1);
+  }
 
-    Put put;
-    Scan scan;
-    Result result;
-    boolean toLog = true;
-    List<Cell> kvListExp, kvListScan;
+  private void testAsyncScanner(TableName table, int rowNumber, int familyNumber,
+      int qualifierNumber) throws Exception {
+    assert rowNumber > 0;
+    assert familyNumber > 0;
+    assert qualifierNumber > 0;
+    byte[] row = Bytes.toBytes("r");
+    byte[] family = Bytes.toBytes("f");
+    byte[] qualifier = Bytes.toBytes("q");
+    byte[][] rows = makeNAsciiWithZeroPrefix(row, rowNumber);
+    byte[][] families = makeNAsciiWithZeroPrefix(family, familyNumber);
+    byte[][] qualifiers = makeNAsciiWithZeroPrefix(qualifier, qualifierNumber);
 
-    kvListExp = new ArrayList<Cell>();
+    Table ht = TEST_UTIL.createTable(table, families);
 
-    for (int r=0; r < ROWS.length; r++) {
-      put = new Put(ROWS[r]);
-      for (int c=0; c < FAMILIES.length; c++) {
-        for (int q=0; q < QUALIFIERS.length; q++) {
-          KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE);
+    boolean toLog = true;
+    List<Cell> kvListExp = new ArrayList<>();
+
+    List<Put> puts = new ArrayList<>();
+    for (byte[] r : rows) {
+      Put put = new Put(r);
+      for (byte[] f : families) {
+        for (byte[] q : qualifiers) {
+          KeyValue kv = new KeyValue(r, f, q, 1, VALUE);
           put.add(kv);
           kvListExp.add(kv);
         }
       }
-      ht.put(put);
+      puts.add(put);
+      if (puts.size() > 1000) {
+        ht.put(puts);
+        puts.clear();
+      }
+    }
+    if (!puts.isEmpty()) {
+      ht.put(puts);
+      puts.clear();
     }
 
-    scan = new Scan();
+    Scan scan = new Scan();
     scan.setAsyncPrefetch(true);
     ResultScanner scanner = ht.getScanner(scan);
-    kvListScan = new ArrayList<Cell>();
+    List<Cell> kvListScan = new ArrayList<>();
+    Result result;
+    boolean first = true;
     while ((result = scanner.next()) != null) {
+      // waiting for cache. see HBASE-17376
+      if (first) {
+        TimeUnit.SECONDS.sleep(1);
+        first = false;
+      }
       for (Cell kv : result.listCells()) {
         kvListScan.add(kv);
       }
@@ -692,7 +722,20 @@ public class TestScannersFromClientSide {
     result = Result.create(kvListScan);
     assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner);
     verifyResult(result, kvListExp, toLog, "Testing async scan");
+    TEST_UTIL.deleteTable(table);
+  }
 
+  private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) {
+    int maxLength = Integer.toString(n).length();
+    byte [][] ret = new byte[n][];
+    for (int i = 0; i < n; i++) {
+      int length = Integer.toString(i).length();
+      StringBuilder buf = new StringBuilder(Integer.toString(i));
+      IntStream.range(0, maxLength - length).forEach(v -> buf.insert(0, "0"));
+      byte[] tail = Bytes.toBytes(buf.toString());
+      ret[i] = Bytes.add(base, tail);
+    }
+    return ret;
   }
 
   static void verifyResult(Result result, List<Cell> expKvList, boolean toLog,