You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/08 02:03:45 UTC
hbase git commit: HBASE-21682 Support getting from specific replica
Repository: hbase
Updated Branches:
refs/heads/master 5aaa73434 -> 4f0514e39
HBASE-21682 Support getting from specific replica
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4f0514e3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4f0514e3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4f0514e3
Branch: refs/heads/master
Commit: 4f0514e39aeed5aa12c0399faedbed7298a975c7
Parents: 5aaa734
Author: zhangduo <zh...@apache.org>
Authored: Mon Jan 7 20:34:01 2019 +0800
Committer: Duo Zhang <zh...@apache.org>
Committed: Tue Jan 8 09:49:12 2019 +0800
----------------------------------------------------------------------
.../hadoop/hbase/client/RawAsyncTableImpl.java | 13 +++--
.../client/TestAsyncTableRegionReplicasGet.java | 60 ++++++++++++++------
2 files changed, 51 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/4f0514e3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 28db7e8..2ab9f6a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -261,12 +261,17 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Result> get(Get get) {
- CompletableFuture<Result> primaryFuture =
- get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
if (get.getConsistency() == Consistency.STRONG) {
- return primaryFuture;
+ return get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
+ }
+ // user specifies a replica id explicitly, just send request to the specific replica
+ if (get.getReplicaId() >= 0) {
+ return get(get, get.getReplicaId(), readRpcTimeoutNs);
}
- // Timeline consistent read, where we will send requests to other region replicas
+
+ // Timeline consistent read, where we may send requests to other region replicas
+ CompletableFuture<Result> primaryFuture =
+ get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
CompletableFuture<Result> future = new CompletableFuture<>();
connect(primaryFuture, future);
long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();
http://git-wip-us.apache.org/repos/asf/hbase/blob/4f0514e3/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
index 0445a0e..2117116 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -75,6 +77,8 @@ public class TestAsyncTableRegionReplicasGet {
private static byte[] VALUE = Bytes.toBytes("value");
+ private static int REPLICA_COUNT = 3;
+
private static AsyncConnection ASYNC_CONN;
@Rule
@@ -99,9 +103,8 @@ public class TestAsyncTableRegionReplicasGet {
private static volatile boolean FAIL_PRIMARY_GET = false;
- private static AtomicInteger PRIMARY_GET_COUNT = new AtomicInteger(0);
-
- private static AtomicInteger SECONDARY_GET_COUNT = new AtomicInteger(0);
+ private static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
+ new ConcurrentHashMap<>();
public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
@@ -117,13 +120,10 @@ public class TestAsyncTableRegionReplicasGet {
if (!region.getTable().equals(TABLE_NAME)) {
return;
}
- if (region.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) {
- SECONDARY_GET_COUNT.incrementAndGet();
- } else {
- PRIMARY_GET_COUNT.incrementAndGet();
- if (FAIL_PRIMARY_GET) {
- throw new IOException("Inject error");
- }
+ REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger())
+ .incrementAndGet();
+ if (region.getRegionId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) {
+ throw new IOException("Inject error");
}
}
}
@@ -152,10 +152,9 @@ public class TestAsyncTableRegionReplicasGet {
// infinite retry
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
TEST_UTIL.startMiniCluster(3);
- TEST_UTIL.getAdmin()
- .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
- .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3)
- .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
+ TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT)
+ .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
@@ -172,10 +171,21 @@ public class TestAsyncTableRegionReplicasGet {
TEST_UTIL.shutdownMiniCluster();
}
+ private static int getSecondaryGetCount() {
+ return REPLICA_ID_TO_COUNT.entrySet().stream()
+ .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID)
+ .mapToInt(e -> e.getValue().get()).sum();
+ }
+
+ private static int getPrimaryGetCount() {
+ AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID);
+ return primaryGetCount != null ? primaryGetCount.get() : 0;
+ }
+
@Test
public void testNoReplicaRead() throws Exception {
FAIL_PRIMARY_GET = false;
- SECONDARY_GET_COUNT.set(0);
+ REPLICA_ID_TO_COUNT.clear();
AsyncTable<?> table = getTable.get();
Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
for (int i = 0; i < 1000; i++) {
@@ -184,21 +194,35 @@ public class TestAsyncTableRegionReplicasGet {
// the primary region is fine and the primary timeout is 1 second which is long enough, so we
// should not send any requests to secondary replicas even if the consistency is timeline.
Thread.sleep(5000);
- assertEquals(0, SECONDARY_GET_COUNT.get());
+ assertEquals(0, getSecondaryGetCount());
}
@Test
public void testReplicaRead() throws Exception {
// fail the primary get request
FAIL_PRIMARY_GET = true;
+ REPLICA_ID_TO_COUNT.clear();
Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
// make sure that we could still get the value from secondary replicas
AsyncTable<?> table = getTable.get();
assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
// make sure that the primary request has been canceled
Thread.sleep(5000);
- int count = PRIMARY_GET_COUNT.get();
+ int count = getPrimaryGetCount();
Thread.sleep(10000);
- assertEquals(count, PRIMARY_GET_COUNT.get());
+ assertEquals(count, getPrimaryGetCount());
+ }
+
+ @Test
+ public void testReadSpecificReplica() throws Exception {
+ FAIL_PRIMARY_GET = false;
+ REPLICA_ID_TO_COUNT.clear();
+ Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
+ AsyncTable<?> table = getTable.get();
+ for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) {
+ get.setReplicaId(replicaId);
+ assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
+ assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get());
+ }
}
}