You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/12/18 12:46:48 UTC
[16/26] hbase git commit: HBASE-19522 The complete order may be wrong
in AsyncBufferedMutatorImpl
HBASE-19522 The complete order may be wrong in AsyncBufferedMutatorImpl
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/97976782
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/97976782
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/97976782
Branch: refs/heads/HBASE-19397
Commit: 979767824d37df0e05002fa76402ff2b9e534d50
Parents: a651ab9
Author: Guanghao Zhang <zg...@apache.org>
Authored: Fri Dec 15 19:39:16 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Sat Dec 16 14:00:59 2017 +0800
----------------------------------------------------------------------
.../hbase/client/AsyncBufferedMutatorImpl.java | 6 +++---
.../hbase/client/TestAsyncBufferMutator.java | 18 +++++++++++++++---
2 files changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/97976782/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index ac159b4..5a92ace 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -74,12 +74,12 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
bufferedSize = 0L;
Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator();
for (CompletableFuture<?> future : table.batch(toSend)) {
+ CompletableFuture<Void> toCompleteFuture = toCompleteIter.next();
future.whenComplete((r, e) -> {
- CompletableFuture<Void> f = toCompleteIter.next();
if (e != null) {
- f.completeExceptionally(e);
+ toCompleteFuture.completeExceptionally(e);
} else {
- f.complete(null);
+ toCompleteFuture.complete(null);
}
});
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/97976782/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
index 5493772..6a5a00e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
@@ -51,6 +51,8 @@ public class TestAsyncBufferMutator {
private static TableName TABLE_NAME = TableName.valueOf("async");
+ private static TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("async-multi-region");
+
private static byte[] CF = Bytes.toBytes("cf");
private static byte[] CQ = Bytes.toBytes("cq");
@@ -65,6 +67,7 @@ public class TestAsyncBufferMutator {
public static void setUp() throws Exception {
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, CF);
+ TEST_UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, CF);
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
ThreadLocalRandom.current().nextBytes(VALUE);
}
@@ -76,10 +79,19 @@ public class TestAsyncBufferMutator {
}
@Test
- public void test() throws InterruptedException {
+ public void testWithMultiRegionTable() throws InterruptedException {
+ test(MULTI_REGION_TABLE_NAME);
+ }
+
+ @Test
+ public void testWithSingleRegionTable() throws InterruptedException {
+ test(TABLE_NAME);
+ }
+
+ private void test(TableName tableName) throws InterruptedException {
List<CompletableFuture<Void>> futures = new ArrayList<>();
try (AsyncBufferedMutator mutator =
- CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferSize(16 * 1024).build()) {
+ CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2)
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
.collect(Collectors.toList()));
@@ -96,7 +108,7 @@ public class TestAsyncBufferMutator {
}
// mutator.close will call mutator.flush automatically so all tasks should have been done.
futures.forEach(f -> f.join());
- AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+ AsyncTable<?> table = CONN.getTable(tableName);
IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join())
.forEach(r -> {
assertArrayEquals(VALUE, r.getValue(CF, CQ));