You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/02/15 12:34:46 UTC
[hbase] branch branch-2 updated: HBASE-21910 The nonce
implementation is wrong for AsyncTable
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 48f2ef4 HBASE-21910 The nonce implementation is wrong for AsyncTable
48f2ef4 is described below
commit 48f2ef432bb171dba4267b23b0e6f61fcb187abd
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Fri Feb 15 17:07:21 2019 +0800
HBASE-21910 The nonce implementation is wrong for AsyncTable
Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
.../hadoop/hbase/client/RawAsyncTableImpl.java | 20 +++--
.../hbase/client/TestAsyncTableNoncedRetry.java | 86 +++++++++++++---------
2 files changed, 63 insertions(+), 43 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index be94ca4..7562e6f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -197,12 +197,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
D convert(I info, S src, long nonceGroup, long nonce) throws IOException;
}
- private <REQ, RESP> CompletableFuture<RESP> noncedMutate(HBaseRpcController controller,
- HRegionLocation loc, ClientService.Interface stub, REQ req,
+ private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce,
+ HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
- long nonceGroup = conn.getNonceGenerator().getNonceGroup();
- long nonce = conn.getNonceGenerator().newNonce();
return mutate(controller, loc, stub, req,
(info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
}
@@ -254,18 +252,24 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Result> append(Append append) {
checkHasFamilies(append);
+ long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+ long nonce = conn.getNonceGenerator().newNonce();
return this.<Result> newCaller(append, rpcTimeoutNs)
- .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
- append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
+ .action(
+ (controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, controller,
+ loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
.call();
}
@Override
public CompletableFuture<Result> increment(Increment increment) {
checkHasFamilies(increment);
+ long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+ long nonce = conn.getNonceGenerator().newNonce();
return this.<Result> newCaller(increment, rpcTimeoutNs)
- .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
- stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
+ .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce,
+ controller, loc, stub, increment, RequestConverter::buildMutateRequest,
+ RawAsyncTableImpl::toResult))
.call();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index 3008561..82a57f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -21,15 +21,22 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -44,7 +51,7 @@ public class TestAsyncTableNoncedRetry {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncTableNoncedRetry.class);
+ HBaseClassTestRule.forClass(TestAsyncTableNoncedRetry.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -58,40 +65,50 @@ public class TestAsyncTableNoncedRetry {
private static AsyncConnection ASYNC_CONN;
- private static long NONCE = 1L;
+ @Rule
+ public TestName testName = new TestName();
+
+ private byte[] row;
+
+ private static AtomicInteger CALLED = new AtomicInteger();
- private static NonceGenerator NONCE_GENERATOR = new NonceGenerator() {
+ private static long SLEEP_TIME = 2000;
+
+ public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor {
@Override
- public long newNonce() {
- return NONCE;
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
}
@Override
- public long getNonceGroup() {
- return 1L;
+ public Result postAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append,
+ Result result) throws IOException {
+ if (CALLED.getAndIncrement() == 0) {
+ Threads.sleepWithoutInterrupt(SLEEP_TIME);
+ }
+ return RegionObserver.super.postAppend(c, append, result);
}
- };
-
- @Rule
- public TestName testName = new TestName();
- private byte[] row;
+ @Override
+ public Result postIncrement(ObserverContext<RegionCoprocessorEnvironment> c,
+ Increment increment, Result result) throws IOException {
+ if (CALLED.getAndIncrement() == 0) {
+ Threads.sleepWithoutInterrupt(SLEEP_TIME);
+ }
+ return RegionObserver.super.postIncrement(c, increment, result);
+ }
+ }
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(1);
- TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ TEST_UTIL.getAdmin()
+ .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
+ .setCoprocessor(SleepOnceCP.class.getName()).build());
TEST_UTIL.waitTableAvailable(TABLE_NAME);
- AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
- ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
- registry.getClusterId().get(), User.getCurrent()) {
-
- @Override
- public NonceGenerator getNonceGenerator() {
- return NONCE_GENERATOR;
- }
- };
+ ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
@AfterClass
@@ -103,28 +120,27 @@ public class TestAsyncTableNoncedRetry {
@Before
public void setUp() throws IOException, InterruptedException {
row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
- NONCE++;
+ CALLED.set(0);
}
@Test
public void testAppend() throws InterruptedException, ExecutionException {
- AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
+ assertEquals(0, CALLED.get());
+ AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+ .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
- assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
- result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
- // the second call should have no effect as we always generate the same nonce.
- assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
- result = table.get(new Get(row)).get();
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
}
@Test
public void testIncrement() throws InterruptedException, ExecutionException {
- AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
- assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue());
- // the second call should have no effect as we always generate the same nonce.
+ assertEquals(0, CALLED.get());
+ AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+ .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue());
- Result result = table.get(new Get(row)).get();
- assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
}
}