You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/08 02:03:45 UTC

hbase git commit: HBASE-21682 Support getting from specific replica

Repository: hbase
Updated Branches:
  refs/heads/master 5aaa73434 -> 4f0514e39


HBASE-21682 Support getting from specific replica


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4f0514e3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4f0514e3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4f0514e3

Branch: refs/heads/master
Commit: 4f0514e39aeed5aa12c0399faedbed7298a975c7
Parents: 5aaa734
Author: zhangduo <zh...@apache.org>
Authored: Mon Jan 7 20:34:01 2019 +0800
Committer: Duo Zhang <zh...@apache.org>
Committed: Tue Jan 8 09:49:12 2019 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/RawAsyncTableImpl.java  | 13 +++--
 .../client/TestAsyncTableRegionReplicasGet.java | 60 ++++++++++++++------
 2 files changed, 51 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4f0514e3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 28db7e8..2ab9f6a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -261,12 +261,17 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
   @Override
   public CompletableFuture<Result> get(Get get) {
-    CompletableFuture<Result> primaryFuture =
-      get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
     if (get.getConsistency() == Consistency.STRONG) {
-      return primaryFuture;
+      return get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
+    }
+    // user specifies a replica id explicitly, just send request to the specific replica
+    if (get.getReplicaId() >= 0) {
+      return get(get, get.getReplicaId(), readRpcTimeoutNs);
     }
-    // Timeline consistent read, where we will send requests to other region replicas
+
+    // Timeline consistent read, where we may send requests to other region replicas
+    CompletableFuture<Result> primaryFuture =
+      get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
     CompletableFuture<Result> future = new CompletableFuture<>();
     connect(primaryFuture, future);
     long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();

http://git-wip-us.apache.org/repos/asf/hbase/blob/4f0514e3/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
index 0445a0e..2117116 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -75,6 +77,8 @@ public class TestAsyncTableRegionReplicasGet {
 
   private static byte[] VALUE = Bytes.toBytes("value");
 
+  private static int REPLICA_COUNT = 3;
+
   private static AsyncConnection ASYNC_CONN;
 
   @Rule
@@ -99,9 +103,8 @@ public class TestAsyncTableRegionReplicasGet {
 
   private static volatile boolean FAIL_PRIMARY_GET = false;
 
-  private static AtomicInteger PRIMARY_GET_COUNT = new AtomicInteger(0);
-
-  private static AtomicInteger SECONDARY_GET_COUNT = new AtomicInteger(0);
+  private static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
+    new ConcurrentHashMap<>();
 
   public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
 
@@ -117,13 +120,10 @@ public class TestAsyncTableRegionReplicasGet {
       if (!region.getTable().equals(TABLE_NAME)) {
         return;
       }
-      if (region.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) {
-        SECONDARY_GET_COUNT.incrementAndGet();
-      } else {
-        PRIMARY_GET_COUNT.incrementAndGet();
-        if (FAIL_PRIMARY_GET) {
-          throw new IOException("Inject error");
-        }
+      REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger())
+        .incrementAndGet();
+      if (region.getRegionId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) {
+        throw new IOException("Inject error");
       }
     }
   }
@@ -152,10 +152,9 @@ public class TestAsyncTableRegionReplicasGet {
     // infinite retry
     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
     TEST_UTIL.startMiniCluster(3);
-    TEST_UTIL.getAdmin()
-      .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
-        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3)
-        .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
+    TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT)
+      .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
     TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
     ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
     AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
@@ -172,10 +171,21 @@ public class TestAsyncTableRegionReplicasGet {
     TEST_UTIL.shutdownMiniCluster();
   }
 
+  private static int getSecondaryGetCount() {
+    return REPLICA_ID_TO_COUNT.entrySet().stream()
+      .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID)
+      .mapToInt(e -> e.getValue().get()).sum();
+  }
+
+  private static int getPrimaryGetCount() {
+    AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID);
+    return primaryGetCount != null ? primaryGetCount.get() : 0;
+  }
+
   @Test
   public void testNoReplicaRead() throws Exception {
     FAIL_PRIMARY_GET = false;
-    SECONDARY_GET_COUNT.set(0);
+    REPLICA_ID_TO_COUNT.clear();
     AsyncTable<?> table = getTable.get();
     Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
     for (int i = 0; i < 1000; i++) {
@@ -184,21 +194,35 @@ public class TestAsyncTableRegionReplicasGet {
     // the primary region is fine and the primary timeout is 1 second which is long enough, so we
     // should not send any requests to secondary replicas even if the consistency is timeline.
     Thread.sleep(5000);
-    assertEquals(0, SECONDARY_GET_COUNT.get());
+    assertEquals(0, getSecondaryGetCount());
   }
 
   @Test
   public void testReplicaRead() throws Exception {
     // fail the primary get request
     FAIL_PRIMARY_GET = true;
+    REPLICA_ID_TO_COUNT.clear();
     Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
     // make sure that we could still get the value from secondary replicas
     AsyncTable<?> table = getTable.get();
     assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
     // make sure that the primary request has been canceled
     Thread.sleep(5000);
-    int count = PRIMARY_GET_COUNT.get();
+    int count = getPrimaryGetCount();
     Thread.sleep(10000);
-    assertEquals(count, PRIMARY_GET_COUNT.get());
+    assertEquals(count, getPrimaryGetCount());
+  }
+
+  @Test
+  public void testReadSpecificReplica() throws Exception {
+    FAIL_PRIMARY_GET = false;
+    REPLICA_ID_TO_COUNT.clear();
+    Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
+    AsyncTable<?> table = getTable.get();
+    for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) {
+      get.setReplicaId(replicaId);
+      assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
+      assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get());
+    }
   }
 }