You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/12/18 12:46:48 UTC

[16/26] hbase git commit: HBASE-19522 The complete order may be wrong in AsyncBufferedMutatorImpl

HBASE-19522 The complete order may be wrong in AsyncBufferedMutatorImpl


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/97976782
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/97976782
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/97976782

Branch: refs/heads/HBASE-19397
Commit: 979767824d37df0e05002fa76402ff2b9e534d50
Parents: a651ab9
Author: Guanghao Zhang <zg...@apache.org>
Authored: Fri Dec 15 19:39:16 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Sat Dec 16 14:00:59 2017 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncBufferedMutatorImpl.java    |  6 +++---
 .../hbase/client/TestAsyncBufferMutator.java      | 18 +++++++++++++++---
 2 files changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/97976782/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index ac159b4..5a92ace 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -74,12 +74,12 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
     bufferedSize = 0L;
     Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator();
     for (CompletableFuture<?> future : table.batch(toSend)) {
+      CompletableFuture<Void> toCompleteFuture = toCompleteIter.next();
       future.whenComplete((r, e) -> {
-        CompletableFuture<Void> f = toCompleteIter.next();
         if (e != null) {
-          f.completeExceptionally(e);
+          toCompleteFuture.completeExceptionally(e);
         } else {
-          f.complete(null);
+          toCompleteFuture.complete(null);
         }
       });
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/97976782/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
index 5493772..6a5a00e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
@@ -51,6 +51,8 @@ public class TestAsyncBufferMutator {
 
   private static TableName TABLE_NAME = TableName.valueOf("async");
 
+  private static TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("async-multi-region");
+
   private static byte[] CF = Bytes.toBytes("cf");
 
   private static byte[] CQ = Bytes.toBytes("cq");
@@ -65,6 +67,7 @@ public class TestAsyncBufferMutator {
   public static void setUp() throws Exception {
     TEST_UTIL.startMiniCluster(1);
     TEST_UTIL.createTable(TABLE_NAME, CF);
+    TEST_UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, CF);
     CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
     ThreadLocalRandom.current().nextBytes(VALUE);
   }
@@ -76,10 +79,19 @@ public class TestAsyncBufferMutator {
   }
 
   @Test
-  public void test() throws InterruptedException {
+  public void testWithMultiRegionTable() throws InterruptedException {
+    test(MULTI_REGION_TABLE_NAME);
+  }
+
+  @Test
+  public void testWithSingleRegionTable() throws InterruptedException {
+    test(TABLE_NAME);
+  }
+
+  private void test(TableName tableName) throws InterruptedException {
     List<CompletableFuture<Void>> futures = new ArrayList<>();
     try (AsyncBufferedMutator mutator =
-        CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferSize(16 * 1024).build()) {
+        CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
       List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2)
           .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
           .collect(Collectors.toList()));
@@ -96,7 +108,7 @@ public class TestAsyncBufferMutator {
     }
     // mutator.close will call mutator.flush automatically so all tasks should have been done.
     futures.forEach(f -> f.join());
-    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+    AsyncTable<?> table = CONN.getTable(tableName);
     IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join())
         .forEach(r -> {
           assertArrayEquals(VALUE, r.getValue(CF, CQ));