You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/21 10:20:34 UTC

[hbase] branch HBASE-21512 updated (cb91089 -> b283aa0)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a change to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git.


 discard cb91089  HBASE-21719 Rewrite RegionPlacementMaintainer to use AsyncClusterConnection
 discard 2b1fa8e  HBASE-21537 Rewrite ServerManager.closeRegionSilentlyAndWait to use AsyncClusterConnection
 discard 0f65f5a  HBASE-21671 Rewrite RegionReplicaReplicationEndpoint to use AsyncClusterConnection
 discard e8a273b  HBASE-21538 Rewrite RegionReplicaFlushHandler to use AsyncClusterConnection
 discard 1f08bdc  HBASE-21579 Use AsyncClusterConnection for HBaseInterClusterReplicationEndpoint
 discard 040bd20  HBASE-21526 Use AsyncClusterConnection in ServerManager for getRsAdmin
 discard 2321225  HBASE-21516 Use AsyncConnection instead of Connection in SecureBulkLoadManager
 discard 39e09e8  HBASE-21515 Also initialize an AsyncClusterConnection in HRegionServer
     add a2f6768  HBASE-21746 Fix two concern cases in RegionMover
     add 35df614  HBASE-21738 Remove all the CLSM#size operation in our memstore because it's an quite time consuming.
     new 5dc19f0  HBASE-21515 Also initialize an AsyncClusterConnection in HRegionServer
     new 3d9a34e  HBASE-21516 Use AsyncConnection instead of Connection in SecureBulkLoadManager
     new 00beca4  HBASE-21526 Use AsyncClusterConnection in ServerManager for getRsAdmin
     new c24dc98  HBASE-21579 Use AsyncClusterConnection for HBaseInterClusterReplicationEndpoint
     new 3d6ffb3  HBASE-21538 Rewrite RegionReplicaFlushHandler to use AsyncClusterConnection
     new 5ac2e5c  HBASE-21671 Rewrite RegionReplicaReplicationEndpoint to use AsyncClusterConnection
     new 5f26b73  HBASE-21537 Rewrite ServerManager.closeRegionSilentlyAndWait to use AsyncClusterConnection
     new b283aa0  HBASE-21719 Rewrite RegionPlacementMaintainer to use AsyncClusterConnection

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (cb91089)
            \
             N -- N -- N   refs/heads/HBASE-21512 (b283aa0)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hbase/regionserver/AbstractMemStore.java       |   4 +-
 .../hbase/regionserver/CSLMImmutableSegment.java   |   4 +-
 .../regionserver/CellArrayImmutableSegment.java    |  15 +-
 .../regionserver/CellChunkImmutableSegment.java    |  28 +--
 .../apache/hadoop/hbase/regionserver/CellSet.java  |   3 +
 .../hbase/regionserver/CompactingMemStore.java     |  13 +-
 .../hbase/regionserver/CompactionPipeline.java     |  46 ++--
 .../regionserver/CompositeImmutableSegment.java    |   2 +-
 .../hadoop/hbase/regionserver/DefaultMemStore.java |   4 +-
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  24 ++-
 .../hadoop/hbase/regionserver/MemStoreSize.java    |  26 ++-
 .../hadoop/hbase/regionserver/MemStoreSizing.java  |  28 ++-
 .../hadoop/hbase/regionserver/MutableSegment.java  |   8 +-
 .../regionserver/NonThreadSafeMemStoreSizing.java  |  26 ++-
 .../regionserver/RegionServicesForStores.java      |   5 +-
 .../apache/hadoop/hbase/regionserver/Segment.java  |  33 ++-
 .../regionserver/ThreadSafeMemStoreSizing.java     |  27 ++-
 .../org/apache/hadoop/hbase/util/RegionMover.java  |  30 +--
 .../hbase/regionserver/TestCellSkipListSet.java    |   8 +-
 .../hbase/regionserver/TestCompactingMemStore.java |  13 +-
 .../TestCompactingToCellFlatMapMemStore.java       |   6 +-
 .../hadoop/hbase/regionserver/TestHStore.java      |   8 +-
 .../regionserver/TestRegionServerAccounting.java   |  31 +--
 .../apache/hadoop/hbase/util/TestRegionMover.java  | 235 ++++++++++-----------
 24 files changed, 341 insertions(+), 286 deletions(-)


[hbase] 02/08: HBASE-21516 Use AsyncConnection instead of Connection in SecureBulkLoadManager

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 3d9a34e2c2030688c1eb710459bf51a7cf7d9457
Author: zhangduo <zh...@apache.org>
AuthorDate: Sat Dec 1 21:15:48 2018 +0800

    HBASE-21516 Use AsyncConnection instead of Connection in SecureBulkLoadManager
---
 .../apache/hadoop/hbase/protobuf/ProtobufUtil.java |  5 +-
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |  7 ++-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  2 +-
 .../hbase/regionserver/SecureBulkLoadManager.java  | 24 ++++-----
 .../hadoop/hbase/security/token/TokenUtil.java     | 57 +++++++++++++++++-----
 .../hadoop/hbase/security/token/TestTokenUtil.java | 42 ++++++++++++----
 6 files changed, 96 insertions(+), 41 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index a3d49b5..d9e620b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -261,13 +261,12 @@ public final class ProtobufUtil {
    * just {@link ServiceException}. Prefer this method to
    * {@link #getRemoteException(ServiceException)} because trying to
    * contain direct protobuf references.
-   * @param e
    */
-  public static IOException handleRemoteException(Exception e) {
+  public static IOException handleRemoteException(Throwable e) {
     return makeIOExceptionOfException(e);
   }
 
-  private static IOException makeIOExceptionOfException(Exception e) {
+  private static IOException makeIOExceptionOfException(Throwable e) {
     Throwable t = e;
     if (e instanceof ServiceException ||
         e instanceof org.apache.hbase.thirdparty.com.google.protobuf.ServiceException) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index fea81f1..de2fb7d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ByteBufferExtendedCell;
@@ -123,6 +122,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@@ -343,13 +343,12 @@ public final class ProtobufUtil {
    * just {@link ServiceException}. Prefer this method to
    * {@link #getRemoteException(ServiceException)} because trying to
    * contain direct protobuf references.
-   * @param e
    */
-  public static IOException handleRemoteException(Exception e) {
+  public static IOException handleRemoteException(Throwable e) {
     return makeIOExceptionOfException(e);
   }
 
-  private static IOException makeIOExceptionOfException(Exception e) {
+  private static IOException makeIOExceptionOfException(Throwable e) {
     Throwable t = e;
     if (e instanceof ServiceException) {
       t = e.getCause();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index dbc5e77..2e54907 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1937,7 +1937,7 @@ public class HRegionServer extends HasThread implements
     if (!isStopped() && !isAborted()) {
       initializeThreads();
     }
-    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection);
+    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
     this.secureBulkLoadManager.start();
 
     // Health checker thread.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index 566a6b6..add6519 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,11 +37,12 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.AsyncConnection;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.regionserver.HRegion.BulkLoadListener;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
 import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -56,7 +56,9 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
@@ -111,9 +113,9 @@ public class SecureBulkLoadManager {
 
   private UserProvider userProvider;
   private ConcurrentHashMap<UserGroupInformation, Integer> ugiReferenceCounter;
-  private Connection conn;
+  private AsyncConnection conn;
 
-  SecureBulkLoadManager(Configuration conf, Connection conn) {
+  SecureBulkLoadManager(Configuration conf, AsyncConnection conn) {
     this.conf = conf;
     this.conn = conn;
   }
@@ -212,23 +214,23 @@ public class SecureBulkLoadManager {
       familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath()));
     }
 
-    Token userToken = null;
+    Token<AuthenticationTokenIdentifier> userToken = null;
     if (userProvider.isHadoopSecurityEnabled()) {
-      userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
-              .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text(
-              request.getFsToken().getService()));
+      userToken = new Token<>(request.getFsToken().getIdentifier().toByteArray(),
+        request.getFsToken().getPassword().toByteArray(), new Text(request.getFsToken().getKind()),
+        new Text(request.getFsToken().getService()));
     }
     final String bulkToken = request.getBulkToken();
     User user = getActiveUser();
     final UserGroupInformation ugi = user.getUGI();
     if (userProvider.isHadoopSecurityEnabled()) {
       try {
-        Token tok = TokenUtil.obtainToken(conn);
+        Token<AuthenticationTokenIdentifier> tok = TokenUtil.obtainToken(conn).get();
         if (tok != null) {
           boolean b = ugi.addToken(tok);
           LOG.debug("token added " + tok + " for user " + ugi + " return=" + b);
         }
-      } catch (IOException ioe) {
+      } catch (Exception ioe) {
         LOG.warn("unable to add token", ioe);
       }
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
index c54d905..28efb84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,27 +15,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.security.token;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ServiceException;
-
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.AuthenticationService;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenResponse;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
@@ -45,6 +47,8 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+
 /**
  * Utility methods for obtaining authentication tokens.
  */
@@ -64,12 +68,39 @@ public class TokenUtil {
 
   /**
    * Obtain and return an authentication token for the current user.
+   * @param conn The async HBase cluster connection
+   * @return the authentication token instance, wrapped by a {@link CompletableFuture}.
+   */
+  public static CompletableFuture<Token<AuthenticationTokenIdentifier>> obtainToken(
+      AsyncConnection conn) {
+    CompletableFuture<Token<AuthenticationTokenIdentifier>> future = new CompletableFuture<>();
+    if (injectedException != null) {
+      future.completeExceptionally(injectedException);
+      return future;
+    }
+    AsyncTable<?> table = conn.getTable(TableName.META_TABLE_NAME);
+    table.<AuthenticationService.Interface, GetAuthenticationTokenResponse> coprocessorService(
+      AuthenticationProtos.AuthenticationService::newStub,
+      (s, c, r) -> s.getAuthenticationToken(c,
+        AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance(), r),
+      HConstants.EMPTY_START_ROW).whenComplete((resp, error) -> {
+        if (error != null) {
+          future.completeExceptionally(ProtobufUtil.handleRemoteException(error));
+        } else {
+          future.complete(toToken(resp.getToken()));
+        }
+      });
+    return future;
+  }
+
+  /**
+   * Obtain and return an authentication token for the current user.
    * @param conn The HBase cluster connection
    * @throws IOException if a remote error or serialization problem occurs.
    * @return the authentication token instance
    */
-  public static Token<AuthenticationTokenIdentifier> obtainToken(
-      Connection conn) throws IOException {
+  public static Token<AuthenticationTokenIdentifier> obtainToken(Connection conn)
+      throws IOException {
     Table meta = null;
     try {
       injectFault();
@@ -77,9 +108,9 @@ public class TokenUtil {
       meta = conn.getTable(TableName.META_TABLE_NAME);
       CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
       AuthenticationProtos.AuthenticationService.BlockingInterface service =
-          AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
-      AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
-          AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
+        AuthenticationService.newBlockingStub(rpcChannel);
+      GetAuthenticationTokenResponse response =
+        service.getAuthenticationToken(null, GetAuthenticationTokenRequest.getDefaultInstance());
 
       return toToken(response.getToken());
     } catch (ServiceException se) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java
index 32fcddb..585a3ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java
@@ -18,35 +18,53 @@
 package org.apache.hadoop.hbase.security.token;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
 import java.net.URLClassLoader;
-
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.AsyncConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 
 @Category(SmallTests.class)
 public class TestTokenUtil {
+
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestTokenUtil.class);
+    HBaseClassTestRule.forClass(TestTokenUtil.class);
 
-  @Test
-  public void testObtainToken() throws Exception {
+  private URLClassLoader cl;
+
+  @Before
+  public void setUp() {
     URL urlPU = ProtobufUtil.class.getProtectionDomain().getCodeSource().getLocation();
     URL urlTU = TokenUtil.class.getProtectionDomain().getCodeSource().getLocation();
+    cl = new URLClassLoader(new URL[] { urlPU, urlTU }, getClass().getClassLoader());
+  }
 
-    ClassLoader cl = new URLClassLoader(new URL[] { urlPU, urlTU }, getClass().getClassLoader());
+  @After
+  public void tearDown() throws IOException {
+    Closeables.close(cl, true);
+  }
 
+  @Test
+  public void testObtainToken() throws Exception {
     Throwable injected = new com.google.protobuf.ServiceException("injected");
 
     Class<?> tokenUtil = cl.loadClass(TokenUtil.class.getCanonicalName());
@@ -55,8 +73,7 @@ public class TestTokenUtil {
     shouldInjectFault.set(null, injected);
 
     try {
-      tokenUtil.getMethod("obtainToken", Connection.class)
-          .invoke(null, new Object[] { null });
+      tokenUtil.getMethod("obtainToken", Connection.class).invoke(null, new Object[] { null });
       fail("Should have injected exception.");
     } catch (InvocationTargetException e) {
       Throwable t = e;
@@ -72,9 +89,16 @@ public class TestTokenUtil {
       }
     }
 
+    CompletableFuture<?> future = (CompletableFuture<?>) tokenUtil
+      .getMethod("obtainToken", AsyncConnection.class).invoke(null, new Object[] { null });
+    try {
+      future.get();
+      fail("Should have injected exception.");
+    } catch (ExecutionException e) {
+      assertSame(injected, e.getCause());
+    }
     Boolean loaded = (Boolean) cl.loadClass(ProtobufUtil.class.getCanonicalName())
-        .getDeclaredMethod("isClassLoaderLoaded")
-        .invoke(null);
+      .getDeclaredMethod("isClassLoaderLoaded").invoke(null);
     assertFalse("Should not have loaded DynamicClassLoader", loaded);
   }
 }


[hbase] 08/08: HBASE-21719 Rewrite RegionPlacementMaintainer to use AsyncClusterConnection

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit b283aa06da7228f67bae453dba61e36399a5132e
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Jan 15 11:43:41 2019 +0800

    HBASE-21719 Rewrite RegionPlacementMaintainer to use AsyncClusterConnection
    
    Signed-off-by: Michael Stack <st...@apache.org>
---
 .../hbase/master/RegionPlacementMaintainer.java    | 225 +++++++++++----------
 1 file changed, 113 insertions(+), 112 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java
index faf5e4a..fda0a9c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.master;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
@@ -39,29 +38,30 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
 import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.MunkresAssignment;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
 
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
 
@@ -71,7 +71,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor
  */
 @InterfaceAudience.Private
 // TODO: Remove? Unused. Partially implemented only.
-public class RegionPlacementMaintainer {
+public class RegionPlacementMaintainer implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(RegionPlacementMaintainer.class
       .getName());
   //The cost of a placement that should never be assigned.
@@ -96,9 +96,9 @@ public class RegionPlacementMaintainer {
   private final boolean enforceMinAssignmentMove;
   private RackManager rackManager;
   private Set<TableName> targetTableSet;
-  private final Connection connection;
+  private AsyncClusterConnection connection;
 
-  public RegionPlacementMaintainer(Configuration conf) {
+  public RegionPlacementMaintainer(Configuration conf) throws IOException {
     this(conf, true, true);
   }
 
@@ -109,11 +109,6 @@ public class RegionPlacementMaintainer {
     this.enforceMinAssignmentMove = enforceMinAssignmentMove;
     this.targetTableSet = new HashSet<>();
     this.rackManager = new RackManager(conf);
-    try {
-      this.connection = ConnectionFactory.createConnection(this.conf);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
   }
 
   private static void printHelp(Options opt) {
@@ -124,6 +119,14 @@ public class RegionPlacementMaintainer {
         " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt);
   }
 
+  private AsyncClusterConnection getConnection() throws IOException {
+    if (connection == null) {
+      connection =
+        ClusterConnectionFactory.createAsyncClusterConnection(this.conf, null, User.getCurrent());
+    }
+    return connection;
+  }
+
   public void setTargetTableName(String[] tableNames) {
     if (tableNames != null) {
       for (String table : tableNames)
@@ -133,10 +136,8 @@ public class RegionPlacementMaintainer {
 
   /**
    * @return the new RegionAssignmentSnapshot
-   * @throws IOException
    */
-  public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot()
-  throws IOException {
+  public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() throws IOException {
     SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
       new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf));
     currentAssignmentShapshot.initialize();
@@ -145,9 +146,6 @@ public class RegionPlacementMaintainer {
 
   /**
    * Verify the region placement is consistent with the assignment plan
-   * @param isDetailMode
-   * @return reports
-   * @throws IOException
    */
   public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode)
       throws IOException {
@@ -206,10 +204,9 @@ public class RegionPlacementMaintainer {
 
     // Get the all the region servers
     List<ServerName> servers = new ArrayList<>();
-    try (Admin admin = this.connection.getAdmin()) {
-      servers.addAll(admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
+    servers.addAll(
+      FutureUtils.get(getConnection().getAdmin().getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)))
         .getLiveServerMetrics().keySet());
-    }
 
     LOG.info("Start to generate assignment plan for " + numRegions +
         " regions from table " + tableName + " with " +
@@ -492,6 +489,11 @@ public class RegionPlacementMaintainer {
     return plan;
   }
 
+  @Override
+  public void close() throws IOException {
+    Closeables.close(connection, true);
+  }
+
   /**
    * Some algorithms for solving the assignment problem may traverse workers or
    * jobs in linear order which may result in skewing the assignments of the
@@ -690,19 +692,17 @@ public class RegionPlacementMaintainer {
         }
         if (singleServerPlan != null) {
           // Update the current region server with its updated favored nodes
-          BlockingInterface currentRegionServer =
-            ((ClusterConnection)this.connection).getAdmin(entry.getKey());
+          AsyncRegionServerAdmin rsAdmin = getConnection().getRegionServerAdmin(entry.getKey());
           UpdateFavoredNodesRequest request =
-              RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
-
+            RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
           UpdateFavoredNodesResponse updateFavoredNodesResponse =
-              currentRegionServer.updateFavoredNodes(null, request);
+            FutureUtils.get(rsAdmin.updateFavoredNodes(request));
           LOG.info("Region server " +
-              ProtobufUtil.getServerInfo(null, currentRegionServer).getServerName() +
-              " has updated " + updateFavoredNodesResponse.getResponse() + " / " +
-              singleServerPlan.getAssignmentMap().size() +
-              " regions with the assignment plan");
-          succeededNum ++;
+            FutureUtils.get(rsAdmin.getServerInfo(RequestConverter.buildGetServerInfoRequest()))
+              .getServerInfo() +
+            " has updated " + updateFavoredNodesResponse.getResponse() + " / " +
+            singleServerPlan.getAssignmentMap().size() + " regions with the assignment plan");
+          succeededNum++;
         }
       } catch (Exception e) {
         failedUpdateMap.put(entry.getKey(), e);
@@ -719,7 +719,7 @@ public class RegionPlacementMaintainer {
           " region servers with its corresponding favored nodes");
       for (Map.Entry<ServerName, Exception> entry :
         failedUpdateMap.entrySet() ) {
-        LOG.error("Failed to update " + entry.getKey().getHostAndPort() +
+        LOG.error("Failed to update " + entry.getKey().getAddress() +
             " because of " + entry.getValue().getMessage());
       }
     }
@@ -1019,93 +1019,94 @@ public class RegionPlacementMaintainer {
       }
 
       // Create the region placement obj
-      RegionPlacementMaintainer rp = new RegionPlacementMaintainer(conf, enforceLocality,
-          enforceMinAssignmentMove);
+      try (RegionPlacementMaintainer rp =
+        new RegionPlacementMaintainer(conf, enforceLocality, enforceMinAssignmentMove)) {
 
-      if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
-        verificationDetails = true;
-      }
-
-      if (cmd.hasOption("tables")) {
-        String tableNameListStr = cmd.getOptionValue("tables");
-        String[] tableNames = StringUtils.split(tableNameListStr, ",");
-        rp.setTargetTableName(tableNames);
-      }
+        if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
+          verificationDetails = true;
+        }
 
-      if (cmd.hasOption("munkres")) {
-        USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
-      }
+        if (cmd.hasOption("tables")) {
+          String tableNameListStr = cmd.getOptionValue("tables");
+          String[] tableNames = StringUtils.split(tableNameListStr, ",");
+          rp.setTargetTableName(tableNames);
+        }
 
-      // Read all the modes
-      if (cmd.hasOption("v") || cmd.hasOption("verify")) {
-        // Verify the region placement.
-        rp.verifyRegionPlacement(verificationDetails);
-      } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
-        // Generate the assignment plan only without updating the hbase:meta and RS
-        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
-        printAssignmentPlan(plan);
-      } else if (cmd.hasOption("w") || cmd.hasOption("write")) {
-        // Generate the new assignment plan
-        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
-        // Print the new assignment plan
-        printAssignmentPlan(plan);
-        // Write the new assignment plan to META
-        rp.updateAssignmentPlanToMeta(plan);
-      } else if (cmd.hasOption("u") || cmd.hasOption("update")) {
-        // Generate the new assignment plan
-        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
-        // Print the new assignment plan
-        printAssignmentPlan(plan);
-        // Update the assignment to hbase:meta and Region Servers
-        rp.updateAssignmentPlan(plan);
-      } else if (cmd.hasOption("diff")) {
-        FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
-        Map<String, Map<String, Float>> locality = FSUtils
-            .getRegionDegreeLocalityMappingFromFS(conf);
-        Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
-        rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
-        System.out.println("Do you want to update the assignment plan? [y/n]");
-        Scanner s = new Scanner(System.in);
-        String input = s.nextLine().trim();
-        if (input.equals("y")) {
-          System.out.println("Updating assignment plan...");
-          rp.updateAssignmentPlan(newPlan);
+        if (cmd.hasOption("munkres")) {
+          USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
         }
-        s.close();
-      } else if (cmd.hasOption("ld")) {
-        Map<String, Map<String, Float>> locality = FSUtils
-            .getRegionDegreeLocalityMappingFromFS(conf);
-        rp.printLocalityAndDispersionForCurrentPlan(locality);
-      } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
-        FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
-        printAssignmentPlan(plan);
-      } else if (cmd.hasOption("overwrite")) {
-        if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
-          throw new IllegalArgumentException("Please specify: " +
+
+        // Read all the modes
+        if (cmd.hasOption("v") || cmd.hasOption("verify")) {
+          // Verify the region placement.
+          rp.verifyRegionPlacement(verificationDetails);
+        } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
+          // Generate the assignment plan only without updating the hbase:meta and RS
+          FavoredNodesPlan plan = rp.getNewAssignmentPlan();
+          printAssignmentPlan(plan);
+        } else if (cmd.hasOption("w") || cmd.hasOption("write")) {
+          // Generate the new assignment plan
+          FavoredNodesPlan plan = rp.getNewAssignmentPlan();
+          // Print the new assignment plan
+          printAssignmentPlan(plan);
+          // Write the new assignment plan to META
+          rp.updateAssignmentPlanToMeta(plan);
+        } else if (cmd.hasOption("u") || cmd.hasOption("update")) {
+          // Generate the new assignment plan
+          FavoredNodesPlan plan = rp.getNewAssignmentPlan();
+          // Print the new assignment plan
+          printAssignmentPlan(plan);
+          // Update the assignment to hbase:meta and Region Servers
+          rp.updateAssignmentPlan(plan);
+        } else if (cmd.hasOption("diff")) {
+          FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
+          Map<String, Map<String, Float>> locality =
+            FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
+          Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
+          rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
+          System.out.println("Do you want to update the assignment plan? [y/n]");
+          Scanner s = new Scanner(System.in);
+          String input = s.nextLine().trim();
+          if (input.equals("y")) {
+            System.out.println("Updating assignment plan...");
+            rp.updateAssignmentPlan(newPlan);
+          }
+          s.close();
+        } else if (cmd.hasOption("ld")) {
+          Map<String, Map<String, Float>> locality =
+            FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
+          rp.printLocalityAndDispersionForCurrentPlan(locality);
+        } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
+          FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
+          printAssignmentPlan(plan);
+        } else if (cmd.hasOption("overwrite")) {
+          if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
+            throw new IllegalArgumentException("Please specify: " +
               " -update -r regionName -f server1:port,server2:port,server3:port");
-        }
+          }
 
-        String regionName = cmd.getOptionValue("r");
-        String favoredNodesStr = cmd.getOptionValue("f");
-        LOG.info("Going to update the region " + regionName + " with the new favored nodes " +
+          String regionName = cmd.getOptionValue("r");
+          String favoredNodesStr = cmd.getOptionValue("f");
+          LOG.info("Going to update the region " + regionName + " with the new favored nodes " +
             favoredNodesStr);
-        List<ServerName> favoredNodes = null;
-        RegionInfo regionInfo =
+          List<ServerName> favoredNodes = null;
+          RegionInfo regionInfo =
             rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName);
-        if (regionInfo == null) {
-          LOG.error("Cannot find the region " + regionName + " from the META");
-        } else {
-          try {
-            favoredNodes = getFavoredNodeList(favoredNodesStr);
-          } catch (IllegalArgumentException e) {
-            LOG.error("Cannot parse the invalid favored nodes because " + e);
+          if (regionInfo == null) {
+            LOG.error("Cannot find the region " + regionName + " from the META");
+          } else {
+            try {
+              favoredNodes = getFavoredNodeList(favoredNodesStr);
+            } catch (IllegalArgumentException e) {
+              LOG.error("Cannot parse the invalid favored nodes because " + e);
+            }
+            FavoredNodesPlan newPlan = new FavoredNodesPlan();
+            newPlan.updateFavoredNodesMap(regionInfo, favoredNodes);
+            rp.updateAssignmentPlan(newPlan);
           }
-          FavoredNodesPlan newPlan = new FavoredNodesPlan();
-          newPlan.updateFavoredNodesMap(regionInfo, favoredNodes);
-          rp.updateAssignmentPlan(newPlan);
+        } else {
+          printHelp(opt);
         }
-      } else {
-        printHelp(opt);
       }
     } catch (ParseException e) {
       printHelp(opt);


[hbase] 05/08: HBASE-21538 Rewrite RegionReplicaFlushHandler to use AsyncClusterConnection

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 3d6ffb3762eb37dbda56daaef50adee404620e48
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Dec 12 09:33:33 2018 +0800

    HBASE-21538 Rewrite RegionReplicaFlushHandler to use AsyncClusterConnection
---
 .../hbase/client/AsyncClusterConnection.java       |   8 ++
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |   8 ++
 .../hbase/client/ClusterConnectionFactory.java     |  16 +--
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java    |  36 ++++---
 .../org/apache/hadoop/hbase/util/FutureUtils.java  |  22 +++++
 .../master/procedure/RSProcedureDispatcher.java    |  34 +------
 .../hbase/protobuf/ReplicationProtbufUtil.java     |  15 +--
 .../hadoop/hbase/regionserver/HRegionServer.java   |   3 +-
 .../handler/RegionReplicaFlushHandler.java         | 110 ++++++++++++---------
 9 files changed, 132 insertions(+), 120 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 1327fd7..f1f64ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+
 /**
  * The asynchronous connection for internal usage.
  */
@@ -41,4 +44,9 @@ public interface AsyncClusterConnection extends AsyncConnection {
    * Get the rpc client we used to communicate with other servers.
    */
   RpcClient getRpcClient();
+
+  /**
+   * Flush a region and get the response.
+   */
+  CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean writeFlushWALMarker);
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 9bead83..ce6bfac 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -55,6 +55,7 @@ import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
@@ -363,4 +364,11 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
   public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
     return new AsyncRegionServerAdmin(serverName, this);
   }
+
+  @Override
+  public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
+      boolean writeFlushWALMarker) {
+    RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
+    return admin.flushRegionInternal(regionName, writeFlushWALMarker);
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
index 68c0630..79484db 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
@@ -18,15 +18,12 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.net.SocketAddress;
-import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
-
 /**
  * The factory for creating {@link AsyncClusterConnection}.
  */
@@ -48,16 +45,7 @@ public final class ClusterConnectionFactory {
   public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
       SocketAddress localAddress, User user) throws IOException {
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
-    String clusterId;
-    try {
-      clusterId = registry.getClusterId().get();
-    } catch (InterruptedException e) {
-      throw (IOException) new InterruptedIOException().initCause(e);
-    } catch (ExecutionException e) {
-      Throwable cause = e.getCause();
-      Throwables.propagateIfPossible(cause, IOException.class);
-      throw new IOException(cause);
-    }
+    String clusterId = FutureUtils.get(registry.getClusterId());
     return new AsyncConnectionImpl(conf, registry, clusterId, localAddress, user);
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 914dcc4..a5899d1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -844,7 +844,19 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public CompletableFuture<Void> flushRegion(byte[] regionName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
+    return flushRegionInternal(regionName, false).thenAccept(r -> {
+    });
+  }
+
+  /**
+   * This method is for internal use only, where we need the response of the flush.
+   * <p/>
+   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
+   * API.
+   */
+  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
+      boolean writeFlushWALMarker) {
+    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
     addListener(getRegionLocation(regionName), (location, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
@@ -856,7 +868,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
           .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
         return;
       }
-      addListener(flush(serverName, location.getRegion()), (ret, err2) -> {
+      addListener(flush(serverName, location.getRegion(), writeFlushWALMarker), (ret, err2) -> {
         if (err2 != null) {
           future.completeExceptionally(err2);
         } else {
@@ -867,15 +879,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     return future;
   }
 
-  private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) {
-    return this.<Void> newAdminCaller()
-            .serverName(serverName)
-            .action(
-              (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
-                controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
-                  .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
-                resp -> null))
-            .call();
+  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
+      boolean writeFlushWALMarker) {
+    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
+      .action((controller, stub) -> this
+        .<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
+          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), writeFlushWALMarker),
+          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
+      .call();
   }
 
   @Override
@@ -888,7 +899,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       }
       List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
       if (hRegionInfos != null) {
-        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
+        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, false).thenAccept(r -> {
+        })));
       }
       addListener(CompletableFuture.allOf(
         compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
index fb42aa6..ff83b9d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -17,12 +17,18 @@
  */
 package org.apache.hadoop.hbase.util;
 
+import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.function.BiConsumer;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+
 /**
  * Helper class for processing futures.
  */
@@ -76,4 +82,20 @@ public final class FutureUtils {
       }
     });
   }
+
+  /**
+   * A helper class for getting the result of a Future, and convert the error to an
+   * {@link IOException}.
+   */
+  public static <T> T get(Future<T> future) throws IOException {
+    try {
+      return future.get();
+    } catch (InterruptedException e) {
+      throw (IOException) new InterruptedIOException().initCause(e);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      Throwables.propagateIfPossible(cause, IOException.class);
+      throw new IOException(cause);
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
index f3ab4b3..f772b68 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -18,11 +18,9 @@
 package org.apache.hadoop.hbase.master.procedure;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ServerName;
@@ -35,12 +33,12 @@ import org.apache.hadoop.hbase.master.ServerListener;
 import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
@@ -341,15 +339,7 @@ public class RSProcedureDispatcher
 
     protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
         final ExecuteProceduresRequest request) throws IOException {
-      try {
-        return getRsAdmin().executeProcedures(request).get();
-      } catch (InterruptedException e) {
-        throw (IOException) new InterruptedIOException().initCause(e);
-      } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
-        Throwables.propagateIfPossible(cause, IOException.class);
-        throw new IOException(cause);
-      }
+      return FutureUtils.get(getRsAdmin().executeProcedures(request));
     }
 
     protected void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
@@ -408,15 +398,7 @@ public class RSProcedureDispatcher
 
     private OpenRegionResponse sendRequest(final ServerName serverName,
         final OpenRegionRequest request) throws IOException {
-      try {
-        return getRsAdmin().openRegion(request).get();
-      } catch (InterruptedException e) {
-        throw (IOException) new InterruptedIOException().initCause(e);
-      } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
-        Throwables.propagateIfPossible(cause, IOException.class);
-        throw new IOException(cause);
-      }
+      return FutureUtils.get(getRsAdmin().openRegion(request));
     }
 
     private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
@@ -458,15 +440,7 @@ public class RSProcedureDispatcher
 
     private CloseRegionResponse sendRequest(final ServerName serverName,
         final CloseRegionRequest request) throws IOException {
-      try {
-        return getRsAdmin().closeRegion(request).get();
-      } catch (InterruptedException e) {
-        throw (IOException) new InterruptedIOException().initCause(e);
-      } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
-        Throwables.propagateIfPossible(cause, IOException.class);
-        throw new IOException(cause);
-      }
+      return FutureUtils.get(getRsAdmin().closeRegion(request));
     }
 
     private void remoteCallCompleted(final MasterProcedureEnv env,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 74fad26..9f41a76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -18,13 +18,10 @@
  */
 package org.apache.hadoop.hbase.protobuf;
 
-
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -32,12 +29,12 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
@@ -60,15 +57,7 @@ public class ReplicationProtbufUtil {
       throws IOException {
     Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(
       entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
-    try {
-      admin.replicateWALEntry(p.getFirst(), p.getSecond()).get();
-    } catch (InterruptedException e) {
-      throw (IOException) new InterruptedIOException().initCause(e);
-    } catch (ExecutionException e) {
-      Throwable cause = e.getCause();
-      Throwables.propagateIfPossible(cause, IOException.class);
-      throw new IOException(e);
-    }
+    FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond()));
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 2e54907..880525d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2390,8 +2390,7 @@ public class HRegionServer extends HasThread implements
 
     // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
     if (this.executorService != null) {
-      this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection,
-          rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
+      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
index 81b6d7e..0729203 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
@@ -20,26 +20,23 @@ package org.apache.hadoop.hbase.regionserver.handler;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.FlushRegionCallable;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 
 /**
  * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in
@@ -56,20 +53,13 @@ public class RegionReplicaFlushHandler extends EventHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class);
 
-  private final ClusterConnection connection;
-  private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
-  private final RpcControllerFactory rpcControllerFactory;
-  private final int operationTimeout;
+  private final AsyncClusterConnection connection;
+
   private final HRegion region;
 
-  public RegionReplicaFlushHandler(Server server, ClusterConnection connection,
-      RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory,
-      int operationTimeout, HRegion region) {
+  public RegionReplicaFlushHandler(Server server, HRegion region) {
     super(server, EventType.RS_REGION_REPLICA_FLUSH);
-    this.connection = connection;
-    this.rpcRetryingCallerFactory = rpcRetryingCallerFactory;
-    this.rpcControllerFactory = rpcControllerFactory;
-    this.operationTimeout = operationTimeout;
+    this.connection = server.getAsyncClusterConnection();
     this.region = region;
   }
 
@@ -103,7 +93,7 @@ public class RegionReplicaFlushHandler extends EventHandler {
     return numRetries;
   }
 
-  void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException {
+  void triggerFlushInPrimaryRegion(final HRegion region) throws IOException {
     long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
       HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
 
@@ -117,45 +107,59 @@ public class RegionReplicaFlushHandler extends EventHandler {
     }
     while (!region.isClosing() && !region.isClosed()
         && !server.isAborted() && !server.isStopped()) {
-      FlushRegionCallable flushCallable = new FlushRegionCallable(
-        connection, rpcControllerFactory,
-        RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true);
-
       // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we
       // do not have to wait for the whole flush here, just initiate it.
-      FlushRegionResponse response = null;
+      FlushRegionResponse response;
       try {
-         response = rpcRetryingCallerFactory.<FlushRegionResponse>newCaller()
-          .callWithRetries(flushCallable, this.operationTimeout);
-      } catch (IOException ex) {
-        if (ex instanceof TableNotFoundException
-            || connection.isTableDisabled(region.getRegionInfo().getTable())) {
+        response = FutureUtils.get(connection.flush(ServerRegionReplicaUtil
+          .getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionName(), true));
+      } catch (IOException e) {
+        if (e instanceof TableNotFoundException || FutureUtils
+          .get(connection.getAdmin().isTableDisabled(region.getRegionInfo().getTable()))) {
           return;
         }
-        throw ex;
+        if (!counter.shouldRetry()) {
+          throw e;
+        }
+        // The reason that why we need to retry here is that, the retry for asynchronous admin
+        // request is much simpler than the normal operation, if we failed to locate the region once
+        // then we will throw the exception out and will not try to relocate again. So here we need
+        // to add some retries by ourselves to prevent shutting down the region server too
+        // frequent...
+        LOG.debug("Failed to trigger a flush of primary region replica {} of region {}, retry={}",
+          ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+            .getRegionNameAsString(),
+          region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes(), e);
+        try {
+          counter.sleepUntilNextRetry();
+        } catch (InterruptedException e1) {
+          throw new InterruptedIOException(e1.getMessage());
+        }
+        continue;
       }
 
       if (response.getFlushed()) {
         // then we have to wait for seeing the flush entry. All reads will be rejected until we see
         // a complete flush cycle or replay a region open event
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Successfully triggered a flush of primary region replica "
-              + ServerRegionReplicaUtil
-                .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
-                + " of region " + region.getRegionInfo().getEncodedName()
-                + " Now waiting and blocking reads until observing a full flush cycle");
+          LOG.debug("Successfully triggered a flush of primary region replica " +
+            ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+              .getRegionNameAsString() +
+            " of region " + region.getRegionInfo().getRegionNameAsString() +
+            " Now waiting and blocking reads until observing a full flush cycle");
         }
         region.setReadsEnabled(true);
         break;
       } else {
         if (response.hasWroteFlushWalMarker()) {
-          if(response.getWroteFlushWalMarker()) {
+          if (response.getWroteFlushWalMarker()) {
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary "
-                  + "region replica " + ServerRegionReplicaUtil
-                    .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
-                  + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and "
-                  + "blocking reads until observing a flush marker");
+              LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " +
+                "region replica " +
+                ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+                  .getRegionNameAsString() +
+                " of region " + region.getRegionInfo().getRegionNameAsString() +
+                " Now waiting and " + "blocking reads until observing a flush marker");
             }
             region.setReadsEnabled(true);
             break;
@@ -164,15 +168,23 @@ public class RegionReplicaFlushHandler extends EventHandler {
             // closing or already flushing. Retry flush again after some sleep.
             if (!counter.shouldRetry()) {
               throw new IOException("Cannot cause primary to flush or drop a wal marker after " +
-                  "retries. Failing opening of this region replica "
-                  + region.getRegionInfo().getEncodedName());
+                counter.getAttemptTimes() + " retries. Failing opening of this region replica " +
+                region.getRegionInfo().getRegionNameAsString());
+            } else {
+              LOG.warn(
+                "Cannot cause primary replica {} to flush or drop a wal marker " +
+                  "for region replica {}, retry={}",
+                ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+                  .getRegionNameAsString(),
+                region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes());
             }
           }
         } else {
           // nothing to do. Are we dealing with an old server?
-          LOG.warn("Was not able to trigger a flush from primary region due to old server version? "
-              + "Continuing to open the secondary region replica: "
-              + region.getRegionInfo().getEncodedName());
+          LOG.warn(
+            "Was not able to trigger a flush from primary region due to old server version? " +
+              "Continuing to open the secondary region replica: " +
+              region.getRegionInfo().getRegionNameAsString());
           region.setReadsEnabled(true);
           break;
         }


[hbase] 04/08: HBASE-21579 Use AsyncClusterConnection for HBaseInterClusterReplicationEndpoint

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit c24dc98a12ca8f859a5c101cb8ce7b116f93cd92
Author: zhangduo <zh...@apache.org>
AuthorDate: Tue Jan 1 21:27:14 2019 +0800

    HBASE-21579 Use AsyncClusterConnection for HBaseInterClusterReplicationEndpoint
---
 .../hbase/client/AsyncRegionServerAdmin.java       | 14 +++++---
 .../hbase/protobuf/ReplicationProtbufUtil.java     | 35 ++++++++++---------
 .../HBaseInterClusterReplicationEndpoint.java      | 31 +++++++++--------
 .../regionserver/ReplicationSinkManager.java       | 40 ++++++++--------------
 .../hbase/replication/SyncReplicationTestBase.java | 12 +++----
 .../regionserver/TestReplicationSinkManager.java   | 21 +++++-------
 6 files changed, 74 insertions(+), 79 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
index 9accd89..b9141a9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -94,9 +95,9 @@ public class AsyncRegionServerAdmin {
     void call(AdminService.Interface stub, HBaseRpcController controller, RpcCallback<RESP> done);
   }
 
-  private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
+  private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall, CellScanner cellScanner) {
     CompletableFuture<RESP> future = new CompletableFuture<>();
-    HBaseRpcController controller = conn.rpcControllerFactory.newController();
+    HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner);
     try {
       rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback<RESP>() {
 
@@ -115,6 +116,10 @@ public class AsyncRegionServerAdmin {
     return future;
   }
 
+  private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
+    return call(rpcCall, null);
+  }
+
   public CompletableFuture<GetRegionInfoResponse> getRegionInfo(GetRegionInfoRequest request) {
     return call((stub, controller, done) -> stub.getRegionInfo(controller, request, done));
   }
@@ -154,8 +159,9 @@ public class AsyncRegionServerAdmin {
   }
 
   public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
-      ReplicateWALEntryRequest request) {
-    return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done));
+      ReplicateWALEntryRequest request, CellScanner cellScanner) {
+    return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done),
+      cellScanner);
   }
 
   public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index c1b3911..74fad26 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -20,51 +20,54 @@ package org.apache.hadoop.hbase.protobuf;
 
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
 @InterfaceAudience.Private
 public class ReplicationProtbufUtil {
+
   /**
-   * A helper to replicate a list of WAL entries using admin protocol.
-   * @param admin Admin service
+   * A helper to replicate a list of WAL entries using region server admin
+   * @param admin the region server admin
    * @param entries Array of WAL entries to be replicated
    * @param replicationClusterId Id which will uniquely identify source cluster FS client
    *          configurations in the replication configuration directory
    * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
    * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
-   * @throws java.io.IOException
    */
-  public static void replicateWALEntry(final AdminService.BlockingInterface admin,
-      final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
-      Path sourceHFileArchiveDir) throws IOException {
-    Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
-        buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
-          sourceHFileArchiveDir);
-    HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond());
+  public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
+      String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir)
+      throws IOException {
+    Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(
+      entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
     try {
-      admin.replicateWALEntry(controller, p.getFirst());
-    } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
-      throw ProtobufUtil.getServiceException(e);
+      admin.replicateWALEntry(p.getFirst(), p.getSecond()).get();
+    } catch (InterruptedException e) {
+      throw (IOException) new InterruptedIOException().initCause(e);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      Throwables.propagateIfPossible(cause, IOException.class);
+      throw new IOException(e);
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 7db53aa..0359096 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -39,7 +39,6 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -48,13 +47,16 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -65,8 +67,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
-
 /**
  * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
  * implementation for replicating to another HBase cluster.
@@ -85,8 +85,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
 
   private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
 
-  private ClusterConnection conn;
-  private Configuration localConf;
+  private AsyncClusterConnection conn;
   private Configuration conf;
   // How long should we sleep for each retry
   private long sleepForRetries;
@@ -117,7 +116,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   public void init(Context context) throws IOException {
     super.init(context);
     this.conf = HBaseConfiguration.create(ctx.getConfiguration());
-    this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
     decorateConf();
     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
     this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
@@ -132,12 +130,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     // TODO: This connection is replication specific or we should make it particular to
     // replication and make replication specific settings such as compression or codec to use
     // passing Cells.
-    this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
+    this.conn =
+      ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);
     this.metrics = context.getMetrics();
     // ReplicationQueueInfo parses the peerId out of the znode for us
-    this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
+    this.replicationSinkMgr = new ReplicationSinkManager(conn, this, this.conf);
     // per sink thread pool
     this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
       HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
@@ -284,9 +283,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   }
 
   private void reconnectToPeerCluster() {
-    ClusterConnection connection = null;
+    AsyncClusterConnection connection = null;
     try {
-      connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
+      connection =
+        ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
     } catch (IOException ioe) {
       LOG.warn("Failed to create connection for peer cluster", ioe);
     }
@@ -367,7 +367,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         }
         continue;
       }
-      if (this.conn == null || this.conn.isClosed()) {
+      if (this.conn == null) {
         reconnectToPeerCluster();
       }
       try {
@@ -480,10 +480,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
           entriesHashCode, entries.size(), size, replicationClusterId);
       }
       sinkPeer = replicationSinkMgr.getReplicationSink();
-      BlockingInterface rrs = sinkPeer.getRegionServer();
+      AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
       try {
-        ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
-          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+        ReplicationProtbufUtil.replicateWALEntry(rsAdmin,
+          entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
+          hfileArchiveDir);
         LOG.trace("Completed replicating batch {}", entriesHashCode);
       } catch (IOException e) {
         LOG.trace("Failed replicating batch {}", entriesHashCode, e);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
index 3cd7884..21b07ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
@@ -21,11 +21,11 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -35,8 +35,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-
 /**
  * Maintains a collection of peers to replicate to, and randomly selects a
  * single peer to replicate to per set of data to replicate. Also handles
@@ -61,9 +59,7 @@ public class ReplicationSinkManager {
   static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
 
 
-  private final Connection conn;
-
-  private final String peerClusterId;
+  private final AsyncClusterConnection conn;
 
   private final HBaseReplicationEndpoint endpoint;
 
@@ -77,8 +73,6 @@ public class ReplicationSinkManager {
   // replication sinks is refreshed
   private final int badSinkThreshold;
 
-  private final Random random;
-
   // A timestamp of the last time the list of replication peers changed
   private long lastUpdateToPeers;
 
@@ -88,26 +82,22 @@ public class ReplicationSinkManager {
   /**
    * Instantiate for a single replication peer cluster.
    * @param conn connection to the peer cluster
-   * @param peerClusterId identifier of the peer cluster
    * @param endpoint replication endpoint for inter cluster replication
    * @param conf HBase configuration, used for determining replication source ratio and bad peer
    *          threshold
    */
-  public ReplicationSinkManager(ClusterConnection conn, String peerClusterId,
-      HBaseReplicationEndpoint endpoint, Configuration conf) {
+  public ReplicationSinkManager(AsyncClusterConnection conn, HBaseReplicationEndpoint endpoint,
+      Configuration conf) {
     this.conn = conn;
-    this.peerClusterId = peerClusterId;
     this.endpoint = endpoint;
     this.badReportCounts = Maps.newHashMap();
     this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
-    this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
-                                        DEFAULT_BAD_SINK_THRESHOLD);
-    this.random = new Random();
+    this.badSinkThreshold =
+      conf.getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
   }
 
   /**
    * Get a randomly-chosen replication sink to replicate to.
-   *
    * @return a replication sink to replicate to
    */
   public synchronized SinkPeer getReplicationSink() throws IOException {
@@ -119,8 +109,8 @@ public class ReplicationSinkManager {
     if (sinks.isEmpty()) {
       throw new IOException("No replication sinks are available");
     }
-    ServerName serverName = sinks.get(random.nextInt(sinks.size()));
-    return new SinkPeer(serverName, ((ClusterConnection) conn).getAdmin(serverName));
+    ServerName serverName = sinks.get(ThreadLocalRandom.current().nextInt(sinks.size()));
+    return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
   }
 
   /**
@@ -160,7 +150,7 @@ public class ReplicationSinkManager {
    */
   public synchronized void chooseSinks() {
     List<ServerName> slaveAddresses = endpoint.getRegionServers();
-    Collections.shuffle(slaveAddresses, random);
+    Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
     int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
     sinks = slaveAddresses.subList(0, numSinks);
     lastUpdateToPeers = System.currentTimeMillis();
@@ -182,9 +172,9 @@ public class ReplicationSinkManager {
    */
   public static class SinkPeer {
     private ServerName serverName;
-    private AdminService.BlockingInterface regionServer;
+    private AsyncRegionServerAdmin regionServer;
 
-    public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) {
+    public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
       this.serverName = serverName;
       this.regionServer = regionServer;
     }
@@ -193,10 +183,8 @@ public class ReplicationSinkManager {
       return serverName;
     }
 
-    public AdminService.BlockingInterface getRegionServer() {
+    public AsyncRegionServerAdmin getRegionServer() {
       return regionServer;
     }
-
   }
-
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index f373590..e0d112d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
@@ -250,19 +250,19 @@ public class SyncReplicationTestBase {
   protected final void verifyReplicationRequestRejection(HBaseTestingUtility utility,
       boolean expectedRejection) throws Exception {
     HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME);
-    ClusterConnection connection = regionServer.getClusterConnection();
+    AsyncClusterConnection connection = regionServer.getAsyncClusterConnection();
     Entry[] entries = new Entry[10];
     for (int i = 0; i < entries.length; i++) {
       entries[i] =
         new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
     }
     if (!expectedRejection) {
-      ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
-        entries, null, null, null);
+      ReplicationProtbufUtil.replicateWALEntry(
+        connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null);
     } else {
       try {
-        ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
-          entries, null, null, null);
+        ReplicationProtbufUtil.replicateWALEntry(
+          connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null);
         fail("Should throw IOException when sync-replication state is in A or DA");
       } catch (DoNotRetryIOException e) {
         assertTrue(e.getMessage().contains("Reject to apply to sink cluster"));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
index 39dabb4..60afd40 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
@@ -25,7 +25,8 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -37,8 +38,6 @@ import org.junit.experimental.categories.Category;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-
 @Category({ReplicationTests.class, SmallTests.class})
 public class TestReplicationSinkManager {
 
@@ -46,16 +45,14 @@ public class TestReplicationSinkManager {
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestReplicationSinkManager.class);
 
-  private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
-
   private HBaseReplicationEndpoint replicationEndpoint;
   private ReplicationSinkManager sinkManager;
 
   @Before
   public void setUp() {
     replicationEndpoint = mock(HBaseReplicationEndpoint.class);
-    sinkManager = new ReplicationSinkManager(mock(ClusterConnection.class),
-                      PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
+    sinkManager = new ReplicationSinkManager(mock(AsyncClusterConnection.class),
+      replicationEndpoint, new Configuration());
   }
 
   @Test
@@ -100,7 +97,7 @@ public class TestReplicationSinkManager {
     // Sanity check
     assertEquals(1, sinkManager.getNumSinks());
 
-    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
+    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
 
     sinkManager.reportBadSink(sinkPeer);
 
@@ -131,7 +128,7 @@ public class TestReplicationSinkManager {
 
     ServerName serverName = sinkManager.getSinksForTesting().get(0);
 
-    SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
+    SinkPeer sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
 
     sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative
     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
@@ -147,7 +144,7 @@ public class TestReplicationSinkManager {
     //
     serverName = sinkManager.getSinksForTesting().get(0);
 
-    sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
+    sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
       sinkManager.reportBadSink(sinkPeer);
     }
@@ -188,8 +185,8 @@ public class TestReplicationSinkManager {
     ServerName serverNameA = sinkList.get(0);
     ServerName serverNameB = sinkList.get(1);
 
-    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
-    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class));
+    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
+    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
 
     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
       sinkManager.reportBadSink(sinkPeerA);


[hbase] 01/08: HBASE-21515 Also initialize an AsyncClusterConnection in HRegionServer

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 5dc19f059537f16112e3164ac294375a8ad34eb2
Author: zhangduo <zh...@apache.org>
AuthorDate: Fri Nov 30 08:23:47 2018 +0800

    HBASE-21515 Also initialize an AsyncClusterConnection in HRegionServer
---
 .../hbase/client/AsyncClusterConnection.java       | 38 +++++++++++++
 .../hadoop/hbase/client/AsyncConnectionImpl.java   | 14 +++--
 .../hbase/client/ClusterConnectionFactory.java     | 63 ++++++++++++++++++++++
 .../hadoop/hbase/client/ConnectionFactory.java     |  5 +-
 .../apache/hadoop/hbase/util/ReflectionUtils.java  | 22 ++++----
 .../main/java/org/apache/hadoop/hbase/Server.java  | 20 +++++++
 .../org/apache/hadoop/hbase/master/HMaster.java    |  3 ++
 .../hadoop/hbase/regionserver/HRegionServer.java   | 56 +++++++++++++------
 .../regionserver/ReplicationSyncUp.java            |  6 +++
 .../hadoop/hbase/MockRegionServerServices.java     |  5 ++
 .../client/TestAsyncNonMetaRegionLocator.java      |  2 +-
 ...stAsyncNonMetaRegionLocatorConcurrenyLimit.java |  2 +-
 .../client/TestAsyncRegionLocatorTimeout.java      |  2 +-
 .../TestAsyncSingleRequestRpcRetryingCaller.java   |  4 +-
 .../hbase/client/TestAsyncTableNoncedRetry.java    |  2 +-
 .../hbase/master/MockNoopMasterServices.java       |  6 +++
 .../hadoop/hbase/master/MockRegionServer.java      |  5 ++
 .../hbase/master/TestActiveMasterManager.java      |  6 +++
 .../hbase/master/cleaner/TestHFileCleaner.java     |  6 +++
 .../hbase/master/cleaner/TestHFileLinkCleaner.java |  6 +++
 .../hbase/master/cleaner/TestLogsCleaner.java      |  6 +++
 .../cleaner/TestReplicationHFileCleaner.java       |  6 +++
 .../hbase/regionserver/TestHeapMemoryManager.java  |  6 +++
 .../hbase/regionserver/TestSplitLogWorker.java     |  6 +++
 .../hadoop/hbase/regionserver/TestWALLockup.java   |  6 +++
 .../replication/TestReplicationTrackerZKImpl.java  |  6 +++
 .../regionserver/TestReplicationSourceManager.java |  6 +++
 .../security/token/TestTokenAuthentication.java    |  6 +++
 .../org/apache/hadoop/hbase/util/MockServer.java   |  6 +++
 29 files changed, 290 insertions(+), 37 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
new file mode 100644
index 0000000..c7dea25
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The asynchronous connection for internal usage.
+ */
+@InterfaceAudience.Private
+public interface AsyncClusterConnection extends AsyncConnection {
+
+  /**
+   * Get the nonce generator for this connection.
+   */
+  NonceGenerator getNonceGenerator();
+
+  /**
+   * Get the rpc client we used to communicate with other servers.
+   */
+  RpcClient getRpcClient();
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 3cbd950..50e27c4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLE
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
+import java.net.SocketAddress;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -63,7 +64,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterServ
  * The implementation of AsyncConnection.
  */
 @InterfaceAudience.Private
-class AsyncConnectionImpl implements AsyncConnection {
+class AsyncConnectionImpl implements AsyncClusterConnection {
 
   private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
 
@@ -106,7 +107,7 @@ class AsyncConnectionImpl implements AsyncConnection {
   private ChoreService authService;
 
   public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
-      User user) {
+      SocketAddress localAddress, User user) {
     this.conf = conf;
     this.user = user;
     if (user.isLoginFromKeytab()) {
@@ -114,7 +115,7 @@ class AsyncConnectionImpl implements AsyncConnection {
     }
     this.connConf = new AsyncConnectionConfiguration(conf);
     this.registry = registry;
-    this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
+    this.rpcClient = RpcClientFactory.createClient(conf, clusterId, localAddress, null);
     this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
     this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
     this.rpcTimeout =
@@ -158,11 +159,16 @@ class AsyncConnectionImpl implements AsyncConnection {
   }
 
   // ditto
-  @VisibleForTesting
+  @Override
   public NonceGenerator getNonceGenerator() {
     return nonceGenerator;
   }
 
+  @Override
+  public RpcClient getRpcClient() {
+    return rpcClient;
+  }
+
   private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
     return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
new file mode 100644
index 0000000..68c0630
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.SocketAddress;
+import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+
+/**
+ * The factory for creating {@link AsyncClusterConnection}.
+ */
+@InterfaceAudience.Private
+public final class ClusterConnectionFactory {
+
+  private ClusterConnectionFactory() {
+  }
+
+  /**
+   * Create a new {@link AsyncClusterConnection} instance.
+   * <p/>
+   * Unlike what we have done in {@link ConnectionFactory}, here we just return an
+   * {@link AsyncClusterConnection} instead of a {@link java.util.concurrent.CompletableFuture},
+   * which means this method could block on fetching the cluster id. This is just used to simplify
+   * the implementation, as when starting new region servers, we do not need to be event-driven. Can
+   * change later if we want a {@link java.util.concurrent.CompletableFuture} here.
+   */
+  public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
+      SocketAddress localAddress, User user) throws IOException {
+    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
+    String clusterId;
+    try {
+      clusterId = registry.getClusterId().get();
+    } catch (InterruptedException e) {
+      throw (IOException) new InterruptedIOException().initCause(e);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      Throwables.propagateIfPossible(cause, IOException.class);
+      throw new IOException(cause);
+    }
+    return new AsyncConnectionImpl(conf, registry, clusterId, localAddress, user);
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
index e24af74..2ba732a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
@@ -295,9 +295,8 @@ public class ConnectionFactory {
         AsyncConnectionImpl.class, AsyncConnection.class);
       try {
         future.complete(
-          user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>)() ->
-            ReflectionUtils.newInstance(clazz, conf, registry, clusterId, user))
-        );
+          user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
+            .newInstance(clazz, conf, registry, clusterId, null, user)));
       } catch (Exception e) {
         future.completeExceptionally(e);
       }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
index a136846..268249d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
@@ -83,15 +83,19 @@ public class ReflectionUtils {
 
       boolean match = true;
       for (int i = 0; i < ctorParamTypes.length && match; ++i) {
-        Class<?> paramType = paramTypes[i].getClass();
-        match = (!ctorParamTypes[i].isPrimitive()) ? ctorParamTypes[i].isAssignableFrom(paramType) :
-                  ((int.class.equals(ctorParamTypes[i]) && Integer.class.equals(paramType)) ||
-                   (long.class.equals(ctorParamTypes[i]) && Long.class.equals(paramType)) ||
-                   (double.class.equals(ctorParamTypes[i]) && Double.class.equals(paramType)) ||
-                   (char.class.equals(ctorParamTypes[i]) && Character.class.equals(paramType)) ||
-                   (short.class.equals(ctorParamTypes[i]) && Short.class.equals(paramType)) ||
-                   (boolean.class.equals(ctorParamTypes[i]) && Boolean.class.equals(paramType)) ||
-                   (byte.class.equals(ctorParamTypes[i]) && Byte.class.equals(paramType)));
+        if (paramTypes[i] == null) {
+          match = !ctorParamTypes[i].isPrimitive();
+        } else {
+          Class<?> paramType = paramTypes[i].getClass();
+          match = (!ctorParamTypes[i].isPrimitive()) ? ctorParamTypes[i].isAssignableFrom(paramType)
+            : ((int.class.equals(ctorParamTypes[i]) && Integer.class.equals(paramType)) ||
+              (long.class.equals(ctorParamTypes[i]) && Long.class.equals(paramType)) ||
+              (double.class.equals(ctorParamTypes[i]) && Double.class.equals(paramType)) ||
+              (char.class.equals(ctorParamTypes[i]) && Character.class.equals(paramType)) ||
+              (short.class.equals(ctorParamTypes[i]) && Short.class.equals(paramType)) ||
+              (boolean.class.equals(ctorParamTypes[i]) && Boolean.class.equals(paramType)) ||
+              (byte.class.equals(ctorParamTypes[i]) && Byte.class.equals(paramType)));
+        }
       }
 
       if (match) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java
index fb898ea..c33d5af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase;
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -61,6 +63,24 @@ public interface Server extends Abortable, Stoppable {
   ClusterConnection getClusterConnection();
 
   /**
+   * Returns a reference to the servers' async connection.
+   * <p/>
+   * Important note: this method returns a reference to Connection which is managed by Server
+   * itself, so callers must NOT attempt to close connection obtained.
+   */
+  default AsyncConnection getAsyncConnection() {
+    return getAsyncClusterConnection();
+  }
+
+  /**
+   * Returns a reference to the servers' async cluster connection.
+   * <p/>
+   * Important note: this method returns a reference to Connection which is managed by Server
+   * itself, so callers must NOT attempt to close connection obtained.
+   */
+  AsyncClusterConnection getAsyncClusterConnection();
+
+  /**
    * @return The unique server name for this server.
    */
   ServerName getServerName();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 9d2a743..7579fd5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -3033,6 +3033,9 @@ public class HMaster extends HRegionServer implements MasterServices {
     if (this.clusterConnection != null) {
       this.clusterConnection.close();
     }
+    if (this.asyncClusterConnection != null) {
+      this.asyncClusterConnection.close();
+    }
   }
 
   public void stopMaster() throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 34a6c13..dbc5e77 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -82,7 +82,9 @@ import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.YouAreDeadException;
 import org.apache.hadoop.hbase.ZNodeClearer;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -109,7 +111,6 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
 import org.apache.hadoop.hbase.ipc.RpcClient;
-import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@@ -267,6 +268,11 @@ public class HRegionServer extends HasThread implements
   protected ClusterConnection clusterConnection;
 
   /**
+   * The asynchronous cluster connection to be shared by services.
+   */
+  protected AsyncClusterConnection asyncClusterConnection;
+
+  /**
    * Go here to get table descriptors.
    */
   protected TableDescriptors tableDescriptors;
@@ -783,11 +789,7 @@ public class HRegionServer extends HasThread implements
     return true;
   }
 
-  /**
-   * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to the
-   * local server; i.e. a short-circuit Connection. Safe to use going to local or remote server.
-   */
-  private ClusterConnection createClusterConnection() throws IOException {
+  private Configuration unsetClientZookeeperQuorum() {
     Configuration conf = this.conf;
     if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
       // Use server ZK cluster for server-issued connections, so we clone
@@ -795,11 +797,20 @@ public class HRegionServer extends HasThread implements
       conf = new Configuration(this.conf);
       conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
     }
+    return conf;
+  }
+
+  /**
+   * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to the
+   * local server; i.e. a short-circuit Connection. Safe to use going to local or remote server.
+   */
+  private ClusterConnection createClusterConnection() throws IOException {
     // Create a cluster connection that when appropriate, can short-circuit and go directly to the
     // local server if the request is to the local server bypassing RPC. Can be used for both local
     // and remote invocations.
-    ClusterConnection conn = ConnectionUtils.createShortCircuitConnection(conf, null,
-      userProvider.getCurrent(), serverName, rpcServices, rpcServices);
+    ClusterConnection conn =
+      ConnectionUtils.createShortCircuitConnection(unsetClientZookeeperQuorum(), null,
+        userProvider.getCurrent(), serverName, rpcServices, rpcServices);
     // This is used to initialize the batch thread pool inside the connection implementation.
     // When deploy a fresh cluster, we may first use the cluster connection in InitMetaProcedure,
     // which will be executed inside the PEWorker, and then the batch thread pool will inherit the
@@ -833,9 +844,12 @@ public class HRegionServer extends HasThread implements
   /**
    * Setup our cluster connection if not already initialized.
    */
-  protected synchronized void setupClusterConnection() throws IOException {
+  protected final synchronized void setupClusterConnection() throws IOException {
     if (clusterConnection == null) {
       clusterConnection = createClusterConnection();
+      asyncClusterConnection =
+        ClusterConnectionFactory.createAsyncClusterConnection(unsetClientZookeeperQuorum(),
+          new InetSocketAddress(this.rpcServices.isa.getAddress(), 0), userProvider.getCurrent());
     }
   }
 
@@ -849,8 +863,7 @@ public class HRegionServer extends HasThread implements
       initializeZooKeeper();
       setupClusterConnection();
       // Setup RPC client for master communication
-      this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
-          this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
+      this.rpcClient = asyncClusterConnection.getRpcClient();
     } catch (Throwable t) {
       // Call stop if error or process will stick around for ever since server
       // puts up non-daemon threads.
@@ -1114,7 +1127,15 @@ public class HRegionServer extends HasThread implements
         LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e);
       }
     }
-
+    if (this.asyncClusterConnection != null) {
+      try {
+        this.asyncClusterConnection.close();
+      } catch (IOException e) {
+        // Although the {@link Closeable} interface throws an {@link
+        // IOException}, in reality, the implementation would never do that.
+        LOG.warn("Attempt to close server's AsyncClusterConnection failed.", e);
+      }
+    }
     // Closing the compactSplit thread before closing meta regions
     if (!this.killed && containsMetaTableRegions()) {
       if (!abortRequested || this.fsOk) {
@@ -3747,9 +3768,9 @@ public class HRegionServer extends HasThread implements
   }
 
   @Override
-  public EntityLock regionLock(List<RegionInfo> regionInfos, String description,
-      Abortable abort) throws IOException {
-    return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator())
+  public EntityLock regionLock(List<RegionInfo> regionInfos, String description, Abortable abort)
+      throws IOException {
+    return new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator())
       .regionLock(regionInfos, description, abort);
   }
 
@@ -3854,4 +3875,9 @@ public class HRegionServer extends HasThread implements
       System.exit(1);
     }
   }
+
+  @Override
+  public AsyncClusterConnection getAsyncClusterConnection() {
+    return asyncClusterConnection;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index c7bccb3..7d1245c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -180,5 +181,10 @@ public class ReplicationSyncUp extends Configured implements Tool {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 0e4f241..5205960 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -368,4 +369,8 @@ public class MockRegionServerServices implements RegionServerServices {
   public Optional<MobFileCache> getMobFileCache() {
     return Optional.empty();
   }
+
+  public AsyncClusterConnection getAsyncClusterConnection() {
+    return null;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index eeaf99f..550a6f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -81,7 +81,7 @@ public class TestAsyncNonMetaRegionLocator {
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-      registry.getClusterId().get(), User.getCurrent());
+      registry.getClusterId().get(), null, User.getCurrent());
     LOCATOR = new AsyncNonMetaRegionLocator(CONN);
     SPLIT_KEYS = new byte[8][];
     for (int i = 111; i < 999; i += 111) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
index 8cdb4a9..7e06218 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -125,7 +125,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-      registry.getClusterId().get(), User.getCurrent());
+      registry.getClusterId().get(), null, User.getCurrent());
     LOCATOR = new AsyncNonMetaRegionLocator(CONN);
     SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
       .toArray(byte[][]::new);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
index 758aa30..0e28f96 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
@@ -96,7 +96,7 @@ public class TestAsyncRegionLocatorTimeout {
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-        registry.getClusterId().get(), User.getCurrent());
+      registry.getClusterId().get(), null, User.getCurrent());
     LOCATOR = CONN.getLocator();
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index 7d8956b..29dcd56 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -73,7 +73,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-      registry.getClusterId().get(), User.getCurrent());
+      registry.getClusterId().get(), null, User.getCurrent());
   }
 
   @AfterClass
@@ -164,7 +164,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
         }
       };
     try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(),
-      CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
+      CONN.registry, CONN.registry.getClusterId().get(), null, User.getCurrent()) {
 
       @Override
       AsyncRegionLocator getLocator() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index 3008561..e1e55f5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -85,7 +85,7 @@ public class TestAsyncTableNoncedRetry {
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-        registry.getClusterId().get(), User.getCurrent()) {
+      registry.getClusterId().get(), null, User.getCurrent()) {
 
       @Override
       public NonceGenerator getNonceGenerator() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 9c55f57..3ebad66 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Connection;
@@ -473,4 +474,9 @@ public class MockNoopMasterServices implements MasterServices {
   public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() {
     return null;
   }
+
+  @Override
+  public AsyncClusterConnection getAsyncClusterConnection() {
+    return null;
+  }
 }
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index a930d7f..73d53c7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -721,4 +722,8 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
   public Optional<MobFileCache> getMobFileCache() {
     return Optional.empty();
   }
+
+  public AsyncClusterConnection getAsyncClusterConnection() {
+    return null;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
index 2300f54..77667a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -349,5 +350,10 @@ public class TestActiveMasterManager {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
index 5c8db3e..c5fad32 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -279,6 +280,11 @@ public class TestHFileCleaner {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 
   @Test
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
index 119194b..fd11ff8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.io.HFileLink;
@@ -213,5 +214,10 @@ public class TestHFileLinkCleaner {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 247ed01..3286032 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.master.HMaster;
@@ -409,6 +410,11 @@ public class TestLogsCleaner {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 
   static class FaultyZooKeeperWatcher extends ZKWatcher {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index d162bf3..9791643 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.master.HMaster;
@@ -303,6 +304,11 @@ public class TestReplicationHFileCleaner {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 
   static class FaultyZooKeeperWatcher extends ZKWatcher {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
index 8c9ce75..4a359e4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -862,6 +863,11 @@ public class TestHeapMemoryManager {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 
   static class CustomHeapMemoryTuner implements HeapMemoryTuner {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index 14dc619..43da846 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
@@ -160,6 +161,11 @@ public class TestSplitLogWorker {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 
   private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index 0e20252..9e9d1d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Durability;
@@ -523,6 +524,11 @@ public class TestWALLockup {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 
   static class DummyWALActionsListener implements WALActionsListener {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index 863d558..62ab265 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -263,5 +264,10 @@ public class TestReplicationTrackerZKImpl {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 86bbb09..427f319 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
@@ -906,5 +907,10 @@ public abstract class TestReplicationSourceManager {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
index e4780f1..92c8e54 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -363,6 +364,11 @@ public class TestTokenAuthentication {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 
   @Parameters(name = "{index}: rpcServerImpl={0}")
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockServer.java
index c25db01..13212d2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockServer.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
@@ -143,4 +144,9 @@ public class MockServer implements Server {
   public Connection createConnection(Configuration conf) throws IOException {
     return null;
   }
+
+  @Override
+  public AsyncClusterConnection getAsyncClusterConnection() {
+    return null;
+  }
 }


[hbase] 03/08: HBASE-21526 Use AsyncClusterConnection in ServerManager for getRsAdmin

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 00beca44bcfc2ba34bda56baaaae10ec7994843f
Author: zhangduo <zh...@apache.org>
AuthorDate: Thu Dec 6 21:25:34 2018 +0800

    HBASE-21526 Use AsyncClusterConnection in ServerManager for getRsAdmin
---
 .../hbase/client/AsyncClusterConnection.java       |   6 +
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |   4 +
 .../hbase/client/AsyncRegionServerAdmin.java       | 210 +++++++++++++++++++++
 .../org/apache/hadoop/hbase/util/FutureUtils.java  |   2 +-
 .../org/apache/hadoop/hbase/master/HMaster.java    |  15 +-
 .../apache/hadoop/hbase/master/ServerManager.java  |  67 -------
 .../master/procedure/RSProcedureDispatcher.java    |  44 +++--
 7 files changed, 262 insertions(+), 86 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index c7dea25..1327fd7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -27,6 +28,11 @@ import org.apache.yetus.audience.InterfaceAudience;
 public interface AsyncClusterConnection extends AsyncConnection {
 
   /**
+   * Get the admin service for the given region server.
+   */
+  AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);
+
+  /**
    * Get the nonce generator for this connection.
    */
   NonceGenerator getNonceGenerator();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 50e27c4..9bead83 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -359,4 +359,8 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
     return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
       rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
   }
+
+  public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
+    return new AsyncRegionServerAdmin(serverName, this);
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
new file mode 100644
index 0000000..9accd89
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
+
+/**
+ * A simple wrapper of the {@link AdminService} for a region server, which returns a
+ * {@link CompletableFuture}. This is easier to use, as if you use the raw protobuf interface, you
+ * need to get the result from the {@link RpcCallback}, and if there is an exception, you need to
+ * get it from the {@link RpcController} passed in.
+ * <p/>
+ * Notice that there is no retry, and this is intentional. We have different retry for different
+ * usage for now, if later we want to unify them, we can move the retry logic into this class.
+ */
+@InterfaceAudience.Private
+public class AsyncRegionServerAdmin {
+
+  private final ServerName server;
+
+  private final AsyncConnectionImpl conn;
+
+  AsyncRegionServerAdmin(ServerName server, AsyncConnectionImpl conn) {
+    this.server = server;
+    this.conn = conn;
+  }
+
+  @FunctionalInterface
+  private interface RpcCall<RESP> {
+    void call(AdminService.Interface stub, HBaseRpcController controller, RpcCallback<RESP> done);
+  }
+
+  private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
+    CompletableFuture<RESP> future = new CompletableFuture<>();
+    HBaseRpcController controller = conn.rpcControllerFactory.newController();
+    try {
+      rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback<RESP>() {
+
+        @Override
+        public void run(RESP resp) {
+          if (controller.failed()) {
+            future.completeExceptionally(controller.getFailed());
+          } else {
+            future.complete(resp);
+          }
+        }
+      });
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  public CompletableFuture<GetRegionInfoResponse> getRegionInfo(GetRegionInfoRequest request) {
+    return call((stub, controller, done) -> stub.getRegionInfo(controller, request, done));
+  }
+
+  public CompletableFuture<GetStoreFileResponse> getStoreFile(GetStoreFileRequest request) {
+    return call((stub, controller, done) -> stub.getStoreFile(controller, request, done));
+  }
+
+  public CompletableFuture<GetOnlineRegionResponse> getOnlineRegion(
+      GetOnlineRegionRequest request) {
+    return call((stub, controller, done) -> stub.getOnlineRegion(controller, request, done));
+  }
+
+  public CompletableFuture<OpenRegionResponse> openRegion(OpenRegionRequest request) {
+    return call((stub, controller, done) -> stub.openRegion(controller, request, done));
+  }
+
+  public CompletableFuture<WarmupRegionResponse> warmupRegion(WarmupRegionRequest request) {
+    return call((stub, controller, done) -> stub.warmupRegion(controller, request, done));
+  }
+
+  public CompletableFuture<CloseRegionResponse> closeRegion(CloseRegionRequest request) {
+    return call((stub, controller, done) -> stub.closeRegion(controller, request, done));
+  }
+
+  public CompletableFuture<FlushRegionResponse> flushRegion(FlushRegionRequest request) {
+    return call((stub, controller, done) -> stub.flushRegion(controller, request, done));
+  }
+
+  public CompletableFuture<CompactionSwitchResponse> compactionSwitch(
+      CompactionSwitchRequest request) {
+    return call((stub, controller, done) -> stub.compactionSwitch(controller, request, done));
+  }
+
+  public CompletableFuture<CompactRegionResponse> compactRegion(CompactRegionRequest request) {
+    return call((stub, controller, done) -> stub.compactRegion(controller, request, done));
+  }
+
+  public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
+      ReplicateWALEntryRequest request) {
+    return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done));
+  }
+
+  public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) {
+    return call((stub, controller, done) -> stub.replay(controller, request, done));
+  }
+
+  public CompletableFuture<RollWALWriterResponse> rollWALWriter(RollWALWriterRequest request) {
+    return call((stub, controller, done) -> stub.rollWALWriter(controller, request, done));
+  }
+
+  public CompletableFuture<GetServerInfoResponse> getServerInfo(GetServerInfoRequest request) {
+    return call((stub, controller, done) -> stub.getServerInfo(controller, request, done));
+  }
+
+  public CompletableFuture<StopServerResponse> stopServer(StopServerRequest request) {
+    return call((stub, controller, done) -> stub.stopServer(controller, request, done));
+  }
+
+  public CompletableFuture<UpdateFavoredNodesResponse> updateFavoredNodes(
+      UpdateFavoredNodesRequest request) {
+    return call((stub, controller, done) -> stub.updateFavoredNodes(controller, request, done));
+  }
+
+  public CompletableFuture<UpdateConfigurationResponse> updateConfiguration(
+      UpdateConfigurationRequest request) {
+    return call((stub, controller, done) -> stub.updateConfiguration(controller, request, done));
+  }
+
+  public CompletableFuture<GetRegionLoadResponse> getRegionLoad(GetRegionLoadRequest request) {
+    return call((stub, controller, done) -> stub.getRegionLoad(controller, request, done));
+  }
+
+  public CompletableFuture<ClearCompactionQueuesResponse> clearCompactionQueues(
+      ClearCompactionQueuesRequest request) {
+    return call((stub, controller, done) -> stub.clearCompactionQueues(controller, request, done));
+  }
+
+  public CompletableFuture<ClearRegionBlockCacheResponse> clearRegionBlockCache(
+      ClearRegionBlockCacheRequest request) {
+    return call((stub, controller, done) -> stub.clearRegionBlockCache(controller, request, done));
+  }
+
+  public CompletableFuture<GetSpaceQuotaSnapshotsResponse> getSpaceQuotaSnapshots(
+      GetSpaceQuotaSnapshotsRequest request) {
+    return call((stub, controller, done) -> stub.getSpaceQuotaSnapshots(controller, request, done));
+  }
+
+  public CompletableFuture<ExecuteProceduresResponse> executeProcedures(
+      ExecuteProceduresRequest request) {
+    return call((stub, controller, done) -> stub.executeProcedures(controller, request, done));
+  }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
index 6c3e026..fb42aa6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -76,4 +76,4 @@ public final class FutureUtils {
       }
     });
   }
-}
\ No newline at end of file
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 7579fd5..cf56c4f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -195,6 +195,7 @@ import org.apache.hadoop.hbase.util.BloomFilterUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CompressionTest;
 import org.apache.hadoop.hbase.util.EncryptionTest;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.HBaseFsck;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.hadoop.hbase.util.HasThread;
@@ -227,6 +228,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
@@ -1955,6 +1957,15 @@ public class HMaster extends HRegionServer implements MasterServices {
     });
   }
 
+  private void warmUpRegion(ServerName server, RegionInfo region) {
+    FutureUtils.addListener(asyncClusterConnection.getRegionServerAdmin(server)
+      .warmupRegion(RequestConverter.buildWarmupRegionRequest(region)), (r, e) -> {
+        if (e != null) {
+          LOG.warn("Failed to warm up region {} on server {}", region, server, e);
+        }
+      });
+  }
+
   // Public so can be accessed by tests. Blocks until move is done.
   // Replace with an async implementation from which you can get
   // a success/failure result.
@@ -2026,7 +2037,9 @@ public class HMaster extends HRegionServer implements MasterServices {
       // Warmup the region on the destination before initiating the move. this call
       // is synchronous and takes some time. doing it before the source region gets
       // closed
-      serverManager.sendRegionWarmup(rp.getDestination(), hri);
+      // A region server could reject the close request because it either does not
+      // have the specified region or the region is being split.
+      warmUpRegion(rp.getDestination(), hri);
 
       LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
       Future<byte []> future = this.assignmentManager.moveAsync(rp);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 86d72d1..c26ef6c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -51,12 +50,9 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.YouAreDeadException;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -159,25 +155,16 @@ public class ServerManager {
   private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
     new ConcurrentSkipListMap<>();
 
-  /**
-   * Map of admin interfaces per registered regionserver; these interfaces we use to control
-   * regionservers out on the cluster
-   */
-  private final Map<ServerName, AdminService.BlockingInterface> rsAdmins = new HashMap<>();
-
   /** List of region servers that should not get any more new regions. */
   private final ArrayList<ServerName> drainingServers = new ArrayList<>();
 
   private final MasterServices master;
-  private final ClusterConnection connection;
 
   private final DeadServer deadservers = new DeadServer();
 
   private final long maxSkew;
   private final long warningSkew;
 
-  private final RpcControllerFactory rpcControllerFactory;
-
   /** Listeners that are called on server events. */
   private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
 
@@ -189,8 +176,6 @@ public class ServerManager {
     Configuration c = master.getConfiguration();
     maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
     warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
-    this.connection = master.getClusterConnection();
-    this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory();
     persistFlushedSequenceId = c.getBoolean(PERSIST_FLUSHEDSEQUENCEID,
         PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
   }
@@ -438,7 +423,6 @@ public class ServerManager {
   void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
     LOG.info("Registering regionserver=" + serverName);
     this.onlineServers.put(serverName, sl);
-    this.rsAdmins.remove(serverName);
   }
 
   @VisibleForTesting
@@ -633,7 +617,6 @@ public class ServerManager {
       this.onlineServers.remove(sn);
       onlineServers.notifyAll();
     }
-    this.rsAdmins.remove(sn);
   }
 
   /*
@@ -676,34 +659,6 @@ public class ServerManager {
     return this.drainingServers.add(sn);
   }
 
-  // RPC methods to region servers
-
-  private HBaseRpcController newRpcController() {
-    return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
-  }
-
-  /**
-   * Sends a WARMUP RPC to the specified server to warmup the specified region.
-   * <p>
-   * A region server could reject the close request because it either does not
-   * have the specified region or the region is being split.
-   * @param server server to warmup a region
-   * @param region region to  warmup
-   */
-  public void sendRegionWarmup(ServerName server,
-      RegionInfo region) {
-    if (server == null) return;
-    try {
-      AdminService.BlockingInterface admin = getRsAdmin(server);
-      HBaseRpcController controller = newRpcController();
-      ProtobufUtil.warmupRegion(controller, admin, region);
-    } catch (IOException e) {
-      LOG.error("Received exception in RPC for warmup server:" +
-        server + "region: " + region +
-        "exception: " + e);
-    }
-  }
-
   /**
    * Contacts a region server and waits up to timeout ms
    * to close the region.  This bypasses the active hmaster.
@@ -737,28 +692,6 @@ public class ServerManager {
   }
 
   /**
-   * @param sn
-   * @return Admin interface for the remote regionserver named <code>sn</code>
-   * @throws IOException
-   * @throws RetriesExhaustedException wrapping a ConnectException if failed
-   */
-  public AdminService.BlockingInterface getRsAdmin(final ServerName sn)
-  throws IOException {
-    AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
-    if (admin == null) {
-      LOG.debug("New admin connection to " + sn.toString());
-      if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
-        // A master is also a region server now, see HBASE-10569 for details
-        admin = ((HRegionServer)master).getRSRpcServices();
-      } else {
-        admin = this.connection.getAdmin(sn);
-      }
-      this.rsAdmins.put(sn, admin);
-    }
-    return admin;
-  }
-
-  /**
    * Calculate min necessary to start. This is not an absolute. It is just
    * a friction that will cause us hang around a bit longer waiting on
    * RegionServers to check-in.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
index 638f9d3..f3ab4b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -18,12 +18,15 @@
 package org.apache.hadoop.hbase.master.procedure;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@@ -37,11 +40,11 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@@ -159,13 +162,8 @@ public class RSProcedureDispatcher
       this.serverName = serverName;
     }
 
-    protected AdminService.BlockingInterface getRsAdmin() throws IOException {
-      final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
-      if (admin == null) {
-        throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
-          " failed because no RPC connection found to this server");
-      }
-      return admin;
+    protected AsyncRegionServerAdmin getRsAdmin() throws IOException {
+      return master.getAsyncClusterConnection().getRegionServerAdmin(serverName);
     }
 
     protected ServerName getServerName() {
@@ -344,9 +342,13 @@ public class RSProcedureDispatcher
     protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
         final ExecuteProceduresRequest request) throws IOException {
       try {
-        return getRsAdmin().executeProcedures(null, request);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+        return getRsAdmin().executeProcedures(request).get();
+      } catch (InterruptedException e) {
+        throw (IOException) new InterruptedIOException().initCause(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        Throwables.propagateIfPossible(cause, IOException.class);
+        throw new IOException(cause);
       }
     }
 
@@ -407,9 +409,13 @@ public class RSProcedureDispatcher
     private OpenRegionResponse sendRequest(final ServerName serverName,
         final OpenRegionRequest request) throws IOException {
       try {
-        return getRsAdmin().openRegion(null, request);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+        return getRsAdmin().openRegion(request).get();
+      } catch (InterruptedException e) {
+        throw (IOException) new InterruptedIOException().initCause(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        Throwables.propagateIfPossible(cause, IOException.class);
+        throw new IOException(cause);
       }
     }
 
@@ -453,9 +459,13 @@ public class RSProcedureDispatcher
     private CloseRegionResponse sendRequest(final ServerName serverName,
         final CloseRegionRequest request) throws IOException {
       try {
-        return getRsAdmin().closeRegion(null, request);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+        return getRsAdmin().closeRegion(request).get();
+      } catch (InterruptedException e) {
+        throw (IOException) new InterruptedIOException().initCause(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        Throwables.propagateIfPossible(cause, IOException.class);
+        throw new IOException(cause);
       }
     }
 


[hbase] 07/08: HBASE-21537 Rewrite ServerManager.closeRegionSilentlyAndWait to use AsyncClusterConnection

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 5f26b73a1eac3142488acc0eb8d22f52ee0291d2
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Thu Jan 10 11:47:41 2019 +0800

    HBASE-21537 Rewrite ServerManager.closeRegionSilentlyAndWait to use AsyncClusterConnection
    
    Signed-off-by: Michael Stack <st...@apache.org>
---
 .../hadoop/hbase/master/MasterMetaBootstrap.java   |  2 +-
 .../apache/hadoop/hbase/master/ServerManager.java  | 41 +++++++++++++---------
 .../apache/hadoop/hbase/util/HBaseFsckRepair.java  | 22 +++++++-----
 3 files changed, 38 insertions(+), 27 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
index e57817e..6e38bdd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
@@ -101,7 +101,7 @@ class MasterMetaBootstrap {
           RegionState r = MetaTableLocator.getMetaRegionState(zooKeeper, replicaId);
           LOG.info("Closing excess replica of meta region " + r.getRegion());
           // send a close and wait for a max of 30 seconds
-          ServerManager.closeRegionSilentlyAndWait(master.getClusterConnection(),
+          ServerManager.closeRegionSilentlyAndWait(master.getAsyncClusterConnection(),
               r.getServerName(), r.getRegion(), 30000);
           ZKUtil.deleteNode(zooKeeper, zooKeeper.getZNodePaths().getZNodeForReplica(replicaId));
         }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index c26ef6c..44a0770 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -48,13 +48,15 @@ import org.apache.hadoop.hbase.ServerMetrics;
 import org.apache.hadoop.hbase.ServerMetricsBuilder;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.YouAreDeadException;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -67,6 +69,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
@@ -660,35 +663,39 @@ public class ServerManager {
   }
 
   /**
-   * Contacts a region server and waits up to timeout ms
-   * to close the region.  This bypasses the active hmaster.
+   * Contacts a region server and waits up to timeout ms to close the region. This bypasses the
+   * active hmaster.
    */
-  public static void closeRegionSilentlyAndWait(ClusterConnection connection,
-    ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
-    AdminService.BlockingInterface rs = connection.getAdmin(server);
-    HBaseRpcController controller = connection.getRpcControllerFactory().newController();
+  public static void closeRegionSilentlyAndWait(AsyncClusterConnection connection,
+      ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
+    AsyncRegionServerAdmin admin = connection.getRegionServerAdmin(server);
     try {
-      ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName());
+      FutureUtils.get(
+        admin.closeRegion(ProtobufUtil.buildCloseRegionRequest(server, region.getRegionName())));
     } catch (IOException e) {
       LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
     }
     long expiration = timeout + System.currentTimeMillis();
     while (System.currentTimeMillis() < expiration) {
-      controller.reset();
       try {
-        RegionInfo rsRegion =
-          ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());
-        if (rsRegion == null) return;
+        RegionInfo rsRegion = ProtobufUtil.toRegionInfo(FutureUtils
+          .get(
+            admin.getRegionInfo(RequestConverter.buildGetRegionInfoRequest(region.getRegionName())))
+          .getRegionInfo());
+        if (rsRegion == null) {
+          return;
+        }
       } catch (IOException ioe) {
-        if (ioe instanceof NotServingRegionException) // no need to retry again
+        if (ioe instanceof NotServingRegionException) {
+          // no need to retry again
           return;
-        LOG.warn("Exception when retrieving regioninfo from: "
-          + region.getRegionNameAsString(), ioe);
+        }
+        LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(),
+          ioe);
       }
       Thread.sleep(1000);
     }
-    throw new IOException("Region " + region + " failed to close within"
-        + " timeout " + timeout);
+    throw new IOException("Region " + region + " failed to close within" + " timeout " + timeout);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java
index ec7f717..121d06c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.ServerManager;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -143,16 +146,17 @@ public class HBaseFsckRepair {
   }
 
   /**
-   * Contacts a region server and waits up to hbase.hbck.close.timeout ms
-   * (default 120s) to close the region.  This bypasses the active hmaster.
+   * Contacts a region server and waits up to hbase.hbck.close.timeout ms (default 120s) to close
+   * the region. This bypasses the active hmaster.
    */
-  @SuppressWarnings("deprecation")
-  public static void closeRegionSilentlyAndWait(Connection connection,
-      ServerName server, RegionInfo region) throws IOException, InterruptedException {
-    long timeout = connection.getConfiguration()
-      .getLong("hbase.hbck.close.timeout", 120000);
-    ServerManager.closeRegionSilentlyAndWait((ClusterConnection)connection, server,
-         region, timeout);
+  public static void closeRegionSilentlyAndWait(Connection connection, ServerName server,
+      RegionInfo region) throws IOException, InterruptedException {
+    long timeout = connection.getConfiguration().getLong("hbase.hbck.close.timeout", 120000);
+    // this is a bit ugly but it is only used in the old hbck and tests, so I think it is fine.
+    try (AsyncClusterConnection asyncConn = ClusterConnectionFactory
+      .createAsyncClusterConnection(connection.getConfiguration(), null, User.getCurrent())) {
+      ServerManager.closeRegionSilentlyAndWait(asyncConn, server, region, timeout);
+    }
   }
 
   /**


[hbase] 06/08: HBASE-21671 Rewrite RegionReplicaReplicationEndpoint to use AsyncClusterConnection

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 5ac2e5ccd5d3c5d98a6549f9cd10e296b16d2f04
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Fri Jan 11 16:22:24 2019 +0800

    HBASE-21671 Rewrite RegionReplicaReplicationEndpoint to use AsyncClusterConnection
---
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |  24 +-
 .../hbase/client/AsyncClusterConnection.java       |  17 +
 .../hbase/client/AsyncClusterConnectionImpl.java   |  80 +++
 .../AsyncRegionReplicaReplayRetryingCaller.java    | 146 ++++
 .../hbase/client/AsyncRegionServerAdmin.java       |   5 +-
 .../hbase/client/ClusterConnectionFactory.java     |   2 +-
 .../hbase/protobuf/ReplicationProtbufUtil.java     |  31 +-
 .../handler/RegionReplicaFlushHandler.java         |   3 +-
 .../hbase/replication/ReplicationEndpoint.java     |  35 +-
 .../RegionReplicaReplicationEndpoint.java          | 782 +++++++--------------
 .../regionserver/ReplicationSource.java            |   2 +-
 .../hbase/client/TestAsyncTableNoncedRetry.java    |   2 +-
 .../TestRegionReplicaReplicationEndpoint.java      |  56 +-
 ...stRegionReplicaReplicationEndpointNoMaster.java |  99 ++-
 14 files changed, 627 insertions(+), 657 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index ce6bfac..f6a2149 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -55,7 +55,6 @@ import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
@@ -65,7 +64,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterServ
  * The implementation of AsyncConnection.
  */
 @InterfaceAudience.Private
-class AsyncConnectionImpl implements AsyncClusterConnection {
+class AsyncConnectionImpl implements AsyncConnection {
 
   private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
 
@@ -85,7 +84,7 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
 
   private final int rpcTimeout;
 
-  private final RpcClient rpcClient;
+  protected final RpcClient rpcClient;
 
   final RpcControllerFactory rpcControllerFactory;
 
@@ -160,16 +159,10 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
   }
 
   // ditto
-  @Override
-  public NonceGenerator getNonceGenerator() {
+  NonceGenerator getNonceGenerator() {
     return nonceGenerator;
   }
 
-  @Override
-  public RpcClient getRpcClient() {
-    return rpcClient;
-  }
-
   private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
     return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
   }
@@ -360,15 +353,4 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
     return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
       rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
   }
-
-  public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
-    return new AsyncRegionServerAdmin(serverName, this);
-  }
-
-  @Override
-  public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
-      boolean writeFlushWALMarker) {
-    RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
-    return admin.flushRegionInternal(regionName, writeFlushWALMarker);
-  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
similarity index 72%
rename from hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index f1f64ca..0ad77ba 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -17,9 +17,13 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
@@ -49,4 +53,17 @@ public interface AsyncClusterConnection extends AsyncConnection {
    * Flush a region and get the response.
    */
   CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean writeFlushWALMarker);
+
+  /**
+   * Replicate wal edits for replica regions. The return value is the edits we skipped, as the
+   * original return value is useless.
+   */
+  CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
+      List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs);
+
+  /**
+   * Return all the replicas for a region. Used for regiong replica replication.
+   */
+  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
+      boolean reload);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
new file mode 100644
index 0000000..d61f01f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+
+/**
+ * The implementation of AsyncClusterConnection.
+ */
+@InterfaceAudience.Private
+class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClusterConnection {
+
+  public AsyncClusterConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
+      SocketAddress localAddress, User user) {
+    super(conf, registry, clusterId, localAddress, user);
+  }
+
+  @Override
+  public NonceGenerator getNonceGenerator() {
+    return super.getNonceGenerator();
+  }
+
+  @Override
+  public RpcClient getRpcClient() {
+    return rpcClient;
+  }
+
+  @Override
+  public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
+    return new AsyncRegionServerAdmin(serverName, this);
+  }
+
+  @Override
+  public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
+      boolean writeFlushWALMarker) {
+    RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
+    return admin.flushRegionInternal(regionName, writeFlushWALMarker);
+  }
+
+  @Override
+  public CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
+      List<Entry> entries, int replicaId, int retries, long operationTimeoutNs) {
+    return new AsyncRegionReplicaReplayRetryingCaller(RETRY_TIMER, this,
+      ConnectionUtils.retries2Attempts(retries), operationTimeoutNs, tableName, encodedRegionName,
+      row, entries, replicaId).call();
+  }
+
+  @Override
+  public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
+      boolean reload) {
+    return getLocator().getRegionLocations(tableName, row, RegionLocateType.CURRENT, reload, -1L);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java
new file mode 100644
index 0000000..3364f15
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+
+/**
+ * For replaying edits for region replica.
+ * <p/>
+ * The mainly difference here is that, every time after locating, we will check whether the region
+ * name is equal, if not, we will give up, as this usually means the region has been split or
+ * merged, and the new region(s) should already have all the data of the parent region(s).
+ * <p/>
+ * Notice that, the return value is the edits we skipped, as the original response message is not
+ * used at upper layer.
+ */
+@InterfaceAudience.Private
+public class AsyncRegionReplicaReplayRetryingCaller extends AsyncRpcRetryingCaller<Long> {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(AsyncRegionReplicaReplayRetryingCaller.class);
+
+  private final TableName tableName;
+
+  private final byte[] encodedRegionName;
+
+  private final byte[] row;
+
+  private final Entry[] entries;
+
+  private final int replicaId;
+
+  public AsyncRegionReplicaReplayRetryingCaller(HashedWheelTimer retryTimer,
+      AsyncClusterConnectionImpl conn, int maxAttempts, long operationTimeoutNs,
+      TableName tableName, byte[] encodedRegionName, byte[] row, List<Entry> entries,
+      int replicaId) {
+    super(retryTimer, conn, conn.connConf.getPauseNs(), maxAttempts, operationTimeoutNs,
+      conn.connConf.getWriteRpcTimeoutNs(), conn.connConf.getStartLogErrorsCnt());
+    this.tableName = tableName;
+    this.encodedRegionName = encodedRegionName;
+    this.row = row;
+    this.entries = entries.toArray(new Entry[0]);
+    this.replicaId = replicaId;
+  }
+
+  private void call(HRegionLocation loc) {
+    if (!Bytes.equals(encodedRegionName, loc.getRegion().getEncodedNameAsBytes())) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(
+          "Skipping {} entries in table {} because located region {} is different than" +
+            " the original region {} from WALEdit",
+          entries.length, tableName, loc.getRegion().getEncodedName(),
+          Bytes.toStringBinary(encodedRegionName));
+        for (Entry entry : entries) {
+          LOG.trace("Skipping : " + entry);
+        }
+      }
+      future.complete(Long.valueOf(entries.length));
+      return;
+    }
+
+    AdminService.Interface stub;
+    try {
+      stub = conn.getAdminStub(loc.getServerName());
+    } catch (IOException e) {
+      onError(e,
+        () -> "Get async admin stub to " + loc.getServerName() + " for '" +
+          Bytes.toStringBinary(row) + "' in " + loc.getRegion().getEncodedName() + " of " +
+          tableName + " failed",
+        err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+      return;
+    }
+    Pair<ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil
+      .buildReplicateWALEntryRequest(entries, encodedRegionName, null, null, null);
+    resetCallTimeout();
+    controller.setCellScanner(p.getSecond());
+    stub.replay(controller, p.getFirst(), r -> {
+      if (controller.failed()) {
+        onError(controller.getFailed(),
+          () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
+            loc.getRegion().getEncodedName() + " of " + tableName + " failed",
+          err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+      } else {
+        future.complete(0L);
+      }
+    });
+
+  }
+
+  @Override
+  protected void doCall() {
+    long locateTimeoutNs;
+    if (operationTimeoutNs > 0) {
+      locateTimeoutNs = remainingTimeNs();
+      if (locateTimeoutNs <= 0) {
+        completeExceptionally();
+        return;
+      }
+    } else {
+      locateTimeoutNs = -1L;
+    }
+    addListener(conn.getLocator().getRegionLocation(tableName, row, replicaId,
+      RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> {
+        if (error != null) {
+          onError(error,
+            () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> {
+            });
+          return;
+        }
+        call(loc);
+      });
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
similarity index 99%
rename from hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
index b9141a9..d491890 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -164,8 +164,9 @@ public class AsyncRegionServerAdmin {
       cellScanner);
   }
 
-  public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) {
-    return call((stub, controller, done) -> stub.replay(controller, request, done));
+  public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request,
+      CellScanner cellScanner) {
+    return call((stub, controller, done) -> stub.replay(controller, request, done), cellScanner);
   }
 
   public CompletableFuture<RollWALWriterResponse> rollWALWriter(RollWALWriterRequest request) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
similarity index 95%
rename from hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
index 79484db..2670420 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
@@ -46,6 +46,6 @@ public final class ClusterConnectionFactory {
       SocketAddress localAddress, User user) throws IOException {
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
     String clusterId = FutureUtils.get(registry.getClusterId());
-    return new AsyncConnectionImpl(conf, registry, clusterId, localAddress, user);
+    return new AsyncClusterConnectionImpl(conf, registry, clusterId, localAddress, user);
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 9f41a76..c39c86c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -37,7 +37,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
 @InterfaceAudience.Private
@@ -55,20 +56,18 @@ public class ReplicationProtbufUtil {
   public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
       String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir)
       throws IOException {
-    Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(
-      entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
+    Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
+      replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
     FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond()));
   }
 
   /**
    * Create a new ReplicateWALEntryRequest from a list of WAL entries
-   *
    * @param entries the WAL entries to be replicated
-   * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
-   * found.
+   * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
    */
-  public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
-      buildReplicateWALEntryRequest(final Entry[] entries) throws IOException {
+  public static Pair<ReplicateWALEntryRequest, CellScanner> buildReplicateWALEntryRequest(
+      final Entry[] entries) {
     return buildReplicateWALEntryRequest(entries, null, null, null, null);
   }
 
@@ -82,16 +81,14 @@ public class ReplicationProtbufUtil {
    * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
    * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
    */
-  public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
-      buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName,
-          String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir)
-          throws IOException {
+  public static Pair<ReplicateWALEntryRequest, CellScanner> buildReplicateWALEntryRequest(
+      final Entry[] entries, byte[] encodedRegionName, String replicationClusterId,
+      Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) {
     // Accumulate all the Cells seen in here.
     List<List<? extends Cell>> allCells = new ArrayList<>(entries.length);
     int size = 0;
-    AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
-    AdminProtos.ReplicateWALEntryRequest.Builder builder =
-      AdminProtos.ReplicateWALEntryRequest.newBuilder();
+    WALEntry.Builder entryBuilder = WALEntry.newBuilder();
+    ReplicateWALEntryRequest.Builder builder = ReplicateWALEntryRequest.newBuilder();
 
     for (Entry entry: entries) {
       entryBuilder.clear();
@@ -99,8 +96,8 @@ public class ReplicationProtbufUtil {
       try {
         keyBuilder = entry.getKey().getBuilder(WALCellCodec.getNoneCompressor());
       } catch (IOException e) {
-        throw new IOException(
-            "There should not throw exception since NoneCompressor do not throw any exceptions", e);
+        throw new AssertionError(
+          "There should not throw exception since NoneCompressor do not throw any exceptions", e);
       }
       if(encodedRegionName != null){
         keyBuilder.setEncodedRegionName(
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
index 0729203..cc798cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
@@ -185,7 +185,6 @@ public class RegionReplicaFlushHandler extends EventHandler {
             "Was not able to trigger a flush from primary region due to old server version? " +
               "Continuing to open the secondary region replica: " +
               region.getRegionInfo().getRegionNameAsString());
-          region.setReadsEnabled(true);
           break;
         }
       }
@@ -195,6 +194,6 @@ public class RegionReplicaFlushHandler extends EventHandler {
         throw new InterruptedIOException(e.getMessage());
       }
     }
+    region.setReadsEnabled(true);
   }
-
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index f4c37b1..ca73663 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
@@ -53,6 +54,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
 
   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
   class Context {
+    private final Server server;
     private final Configuration localConf;
     private final Configuration conf;
     private final FileSystem fs;
@@ -64,16 +66,11 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
     private final Abortable abortable;
 
     @InterfaceAudience.Private
-    public Context(
-        final Configuration localConf,
-        final Configuration conf,
-        final FileSystem fs,
-        final String peerId,
-        final UUID clusterId,
-        final ReplicationPeer replicationPeer,
-        final MetricsSource metrics,
-        final TableDescriptors tableDescriptors,
-        final Abortable abortable) {
+    public Context(final Server server, final Configuration localConf, final Configuration conf,
+        final FileSystem fs, final String peerId, final UUID clusterId,
+        final ReplicationPeer replicationPeer, final MetricsSource metrics,
+        final TableDescriptors tableDescriptors, final Abortable abortable) {
+      this.server = server;
       this.localConf = localConf;
       this.conf = conf;
       this.fs = fs;
@@ -84,34 +81,50 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
       this.tableDescriptors = tableDescriptors;
       this.abortable = abortable;
     }
+
+    public Server getServer() {
+      return server;
+    }
+
     public Configuration getConfiguration() {
       return conf;
     }
+
     public Configuration getLocalConfiguration() {
       return localConf;
     }
+
     public FileSystem getFilesystem() {
       return fs;
     }
+
     public UUID getClusterId() {
       return clusterId;
     }
+
     public String getPeerId() {
       return peerId;
     }
+
     public ReplicationPeerConfig getPeerConfig() {
       return replicationPeer.getPeerConfig();
     }
+
     public ReplicationPeer getReplicationPeer() {
       return replicationPeer;
     }
+
     public MetricsSource getMetrics() {
       return metrics;
     }
+
     public TableDescriptors getTableDescriptors() {
       return tableDescriptors;
     }
-    public Abortable getAbortable() { return abortable; }
+
+    public Abortable getAbortable() {
+      return abortable;
+    }
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index f7721e0..65cf9a8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -19,67 +19,47 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.RetryingCallable;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.util.AtomicUtils;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
-import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
-import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
-import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
-import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
 import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
 
 /**
- * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint
- * which receives the WAL edits from the WAL, and sends the edits to replicas
- * of regions.
+ * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint which receives the WAL
+ * edits from the WAL, and sends the edits to replicas of regions.
  */
 @InterfaceAudience.Private
 public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
@@ -87,32 +67,55 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
   private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class);
 
   // Can be configured differently than hbase.client.retries.number
-  private static String CLIENT_RETRIES_NUMBER
-    = "hbase.region.replica.replication.client.retries.number";
+  private static String CLIENT_RETRIES_NUMBER =
+    "hbase.region.replica.replication.client.retries.number";
 
   private Configuration conf;
-  private ClusterConnection connection;
+  private AsyncClusterConnection connection;
   private TableDescriptors tableDescriptors;
 
-  // Reuse WALSplitter constructs as a WAL pipe
-  private PipelineController controller;
-  private RegionReplicaOutputSink outputSink;
-  private EntryBuffers entryBuffers;
+  private int numRetries;
+
+  private long operationTimeoutNs;
 
-  // Number of writer threads
-  private int numWriterThreads;
+  private LoadingCache<TableName, Optional<TableDescriptor>> tableDescriptorCache;
 
-  private int operationTimeout;
+  private Cache<TableName, TableName> disabledTableCache;
 
-  private ExecutorService pool;
+  private final RetryCounterFactory retryCounterFactory =
+    new RetryCounterFactory(Integer.MAX_VALUE, 1000, 60000);
 
   @Override
   public void init(Context context) throws IOException {
     super.init(context);
-
-    this.conf = HBaseConfiguration.create(context.getConfiguration());
+    this.conf = context.getConfiguration();
     this.tableDescriptors = context.getTableDescriptors();
-
+    int memstoreReplicationEnabledCacheExpiryMs = conf
+      .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
+    // A cache for the table "memstore replication enabled" flag.
+    // It has a default expiry of 5 sec. This means that if the table is altered
+    // with a different flag value, we might miss to replicate for that amount of
+    // time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
+    tableDescriptorCache = CacheBuilder.newBuilder()
+      .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
+      .initialCapacity(10).maximumSize(1000)
+      .build(new CacheLoader<TableName, Optional<TableDescriptor>>() {
+
+        @Override
+        public Optional<TableDescriptor> load(TableName tableName) throws Exception {
+          // check if the table requires memstore replication
+          // some unit-test drop the table, so we should do a bypass check and always replicate.
+          return Optional.ofNullable(tableDescriptors.get(tableName));
+        }
+      });
+    int nonExistentTableCacheExpiryMs =
+      conf.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
+    // A cache for non existing tables that have a default expiry of 5 sec. This means that if the
+    // table is created again with the same name, we might miss to replicate for that amount of
+    // time. But this cache prevents overloading meta requests for every edit from a deleted file.
+    disabledTableCache = CacheBuilder.newBuilder()
+      .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS).initialCapacity(10)
+      .maximumSize(1000).build();
     // HRS multiplies client retries by 10 globally for meta operations, but we do not want this.
     // We are resetting it here because we want default number of retries (35) rather than 10 times
     // that which makes very long retries for disabled tables etc.
@@ -123,516 +126,261 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
         HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
       defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already
     }
-
-    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
-    int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
-    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
-
-    this.numWriterThreads = this.conf.getInt(
-      "hbase.region.replica.replication.writer.threads", 3);
-    controller = new PipelineController();
-    entryBuffers = new EntryBuffers(controller,
-        this.conf.getLong("hbase.region.replica.replication.buffersize", 128 * 1024 * 1024));
-
+    this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
     // use the regular RPC timeout for replica replication RPC's
-    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-      HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
-  }
-
-  @Override
-  protected void doStart() {
-    try {
-      connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
-      this.pool = getDefaultThreadPool(conf);
-      outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers,
-        connection, pool, numWriterThreads, operationTimeout);
-      outputSink.startWriterThreads();
-      super.doStart();
-    } catch (IOException ex) {
-      LOG.warn("Received exception while creating connection :" + ex);
-      notifyFailed(ex);
-    }
-  }
-
-  @Override
-  protected void doStop() {
-    if (outputSink != null) {
-      try {
-        outputSink.finishWritingAndClose();
-      } catch (IOException ex) {
-        LOG.warn("Got exception while trying to close OutputSink", ex);
-      }
-    }
-    if (this.pool != null) {
-      this.pool.shutdownNow();
-      try {
-        // wait for 10 sec
-        boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS);
-        if (!shutdown) {
-          LOG.warn("Failed to shutdown the thread pool after 10 seconds");
-        }
-      } catch (InterruptedException e) {
-        LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e);
-      }
-    }
-    if (connection != null) {
-      try {
-        connection.close();
-      } catch (IOException ex) {
-        LOG.warn("Got exception closing connection :" + ex);
-      }
-    }
-    super.doStop();
+    this.operationTimeoutNs =
+      TimeUnit.MILLISECONDS.toNanos(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
+    this.connection = context.getServer().getAsyncClusterConnection();
   }
 
   /**
-   * Returns a Thread pool for the RPC's to region replicas. Similar to
-   * Connection's thread pool.
+   * returns true if the specified entry must be replicated. We should always replicate meta
+   * operations (e.g. flush) and use the user HTD flag to decide whether or not replicate the
+   * memstore.
    */
-  private ExecutorService getDefaultThreadPool(Configuration conf) {
-    int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256);
-    if (maxThreads == 0) {
-      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
+  private boolean requiresReplication(Optional<TableDescriptor> tableDesc, Entry entry) {
+    // empty edit does not need to be replicated
+    if (entry.getEdit().isEmpty() || !tableDesc.isPresent()) {
+      return false;
     }
-    long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
-    LinkedBlockingQueue<Runnable> workQueue =
-        new LinkedBlockingQueue<>(maxThreads *
-            conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
-              HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
-    ThreadPoolExecutor tpe = new ThreadPoolExecutor(
-      maxThreads,
-      maxThreads,
-      keepAliveTime,
-      TimeUnit.SECONDS,
-      workQueue,
-      Threads.newDaemonThreadFactory(this.getClass().getSimpleName() + "-rpc-shared-"));
-    tpe.allowCoreThreadTimeOut(true);
-    return tpe;
+    // meta edits (e.g. flush) must be always replicated
+    return entry.getEdit().isMetaEdit() || tableDesc.get().hasRegionMemStoreReplication();
   }
 
-  @Override
-  public boolean replicate(ReplicateContext replicateContext) {
-    /* A note on batching in RegionReplicaReplicationEndpoint (RRRE):
-     *
-     * RRRE relies on batching from two different mechanisms. The first is the batching from
-     * ReplicationSource since RRRE is a ReplicationEndpoint driven by RS. RS reads from a single
-     * WAL file filling up a buffer of heap size "replication.source.size.capacity"(64MB) or at most
-     * "replication.source.nb.capacity" entries or until it sees the end of file (in live tailing).
-     * Then RS passes all the buffered edits in this replicate() call context. RRRE puts the edits
-     * to the WALSplitter.EntryBuffers which is a blocking buffer space of up to
-     * "hbase.region.replica.replication.buffersize" (128MB) in size. This buffer splits the edits
-     * based on regions.
-     *
-     * There are "hbase.region.replica.replication.writer.threads"(default 3) writer threads which
-     * pick largest per-region buffer and send it to the SinkWriter (see RegionReplicaOutputSink).
-     * The SinkWriter in this case will send the wal edits to all secondary region replicas in
-     * parallel via a retrying rpc call. EntryBuffers guarantees that while a buffer is
-     * being written to the sink, another buffer for the same region will not be made available to
-     * writers ensuring regions edits are not replayed out of order.
-     *
-     * The replicate() call won't return until all the buffers are sent and ack'd by the sinks so
-     * that the replication can assume all edits are persisted. We may be able to do a better
-     * pipelining between the replication thread and output sinks later if it becomes a bottleneck.
-     */
-
-    while (this.isRunning()) {
-      try {
-        for (Entry entry: replicateContext.getEntries()) {
-          entryBuffers.appendEntry(entry);
+  private void getRegionLocations(CompletableFuture<RegionLocations> future,
+      TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, boolean reload) {
+    FutureUtils.addListener(connection.getRegionLocations(tableDesc.getTableName(), row, reload),
+      (r, e) -> {
+        if (e != null) {
+          future.completeExceptionally(e);
+          return;
         }
-        outputSink.flush(); // make sure everything is flushed
-        ctx.getMetrics().incrLogEditsFiltered(
-          outputSink.getSkippedEditsCounter().getAndSet(0));
-        return true;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return false;
-      } catch (IOException e) {
-        LOG.warn("Received IOException while trying to replicate"
-            + StringUtils.stringifyException(e));
-      }
-    }
-
-    return false;
-  }
-
-  @Override
-  public boolean canReplicateToSameCluster() {
-    return true;
-  }
-
-  @Override
-  protected WALEntryFilter getScopeWALEntryFilter() {
-    // we do not care about scope. We replicate everything.
-    return null;
+        // if we are not loading from cache, just return
+        if (reload) {
+          future.complete(r);
+          return;
+        }
+        // check if the number of region replicas is correct, and also the primary region name
+        // matches
+        if (r.size() == tableDesc.getRegionReplication() && Bytes.equals(
+          r.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(), encodedRegionName)) {
+          future.complete(r);
+        } else {
+          // reload again as the information in cache maybe stale
+          getRegionLocations(future, tableDesc, encodedRegionName, row, true);
+        }
+      });
   }
 
-  static class RegionReplicaOutputSink extends OutputSink {
-    private final RegionReplicaSinkWriter sinkWriter;
-    private final TableDescriptors tableDescriptors;
-    private final Cache<TableName, Boolean> memstoreReplicationEnabled;
-
-    public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors,
-        EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
-        int numWriters, int operationTimeout) {
-      super(controller, entryBuffers, numWriters);
-      this.sinkWriter =
-          new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors);
-      this.tableDescriptors = tableDescriptors;
-
-      // A cache for the table "memstore replication enabled" flag.
-      // It has a default expiry of 5 sec. This means that if the table is altered
-      // with a different flag value, we might miss to replicate for that amount of
-      // time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
-      int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration()
-        .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
-      this.memstoreReplicationEnabled = CacheBuilder.newBuilder()
-        .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
-        .initialCapacity(10)
-        .maximumSize(1000)
-        .build();
+  private void replicate(CompletableFuture<Long> future, RegionLocations locs,
+      TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, List<Entry> entries) {
+    if (locs.size() == 1) {
+      // Could this happen?
+      future.complete(Long.valueOf(entries.size()));
+      return;
     }
-
-    @Override
-    public void append(RegionEntryBuffer buffer) throws IOException {
-      List<Entry> entries = buffer.getEntryBuffer();
-
-      if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) {
-        return;
-      }
-
-      // meta edits (e.g. flush) are always replicated.
-      // data edits (e.g. put) are replicated if the table requires them.
-      if (!requiresReplication(buffer.getTableName(), entries)) {
-        return;
+    if (!Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
+      encodedRegionName)) {
+      // the region name is not equal, this usually means the region has been split or merged, so
+      // give up replicating as the new region(s) should already have all the data of the parent
+      // region(s).
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(
+          "Skipping {} entries in table {} because located region {} is different than" +
+            " the original region {} from WALEdit",
+          tableDesc.getTableName(), locs.getDefaultRegionLocation().getRegion().getEncodedName(),
+          Bytes.toStringBinary(encodedRegionName));
       }
-
-      sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
-        CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries);
-    }
-
-    @Override
-    public boolean flush() throws IOException {
-      // nothing much to do for now. Wait for the Writer threads to finish up
-      // append()'ing the data.
-      entryBuffers.waitUntilDrained();
-      return super.flush();
-    }
-
-    @Override
-    public boolean keepRegionEvent(Entry entry) {
-      return true;
-    }
-
-    @Override
-    public List<Path> finishWritingAndClose() throws IOException {
-      finishWriting(true);
-      return null;
-    }
-
-    @Override
-    public Map<byte[], Long> getOutputCounts() {
-      return null; // only used in tests
-    }
-
-    @Override
-    public int getNumberOfRecoveredRegions() {
-      return 0;
-    }
-
-    AtomicLong getSkippedEditsCounter() {
-      return skippedEdits;
+      future.complete(Long.valueOf(entries.size()));
+      return;
     }
-
-    /**
-     * returns true if the specified entry must be replicated.
-     * We should always replicate meta operations (e.g. flush)
-     * and use the user HTD flag to decide whether or not replicate the memstore.
-     */
-    private boolean requiresReplication(final TableName tableName, final List<Entry> entries)
-        throws IOException {
-      // unit-tests may not the TableDescriptors, bypass the check and always replicate
-      if (tableDescriptors == null) return true;
-
-      Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName);
-      if (requiresReplication == null) {
-        // check if the table requires memstore replication
-        // some unit-test drop the table, so we should do a bypass check and always replicate.
-        TableDescriptor htd = tableDescriptors.get(tableName);
-        requiresReplication = htd == null || htd.hasRegionMemStoreReplication();
-        memstoreReplicationEnabled.put(tableName, requiresReplication);
-      }
-
-      // if memstore replication is not required, check the entries.
-      // meta edits (e.g. flush) must be always replicated.
-      if (!requiresReplication) {
-        int skipEdits = 0;
-        java.util.Iterator<Entry> it = entries.iterator();
-        while (it.hasNext()) {
-          Entry entry = it.next();
-          if (entry.getEdit().isMetaEdit()) {
-            requiresReplication = true;
+    AtomicReference<Throwable> error = new AtomicReference<>();
+    AtomicInteger remainingTasks = new AtomicInteger(locs.size() - 1);
+    AtomicLong skippedEdits = new AtomicLong(0);
+
+    for (int i = 1, n = locs.size(); i < n; i++) {
+      final int replicaId = i;
+      FutureUtils.addListener(connection.replay(tableDesc.getTableName(),
+        locs.getRegionLocation(replicaId).getRegion().getEncodedNameAsBytes(), row, entries,
+        replicaId, numRetries, operationTimeoutNs), (r, e) -> {
+          if (e != null) {
+            LOG.warn("Failed to replicate to {}", locs.getRegionLocation(replicaId), e);
+            error.compareAndSet(null, e);
           } else {
-            it.remove();
-            skipEdits++;
+            AtomicUtils.updateMax(skippedEdits, r.longValue());
           }
-        }
-        skippedEdits.addAndGet(skipEdits);
-      }
-      return requiresReplication;
+          if (remainingTasks.decrementAndGet() == 0) {
+            if (error.get() != null) {
+              future.completeExceptionally(error.get());
+            } else {
+              future.complete(skippedEdits.get());
+            }
+          }
+        });
     }
   }
 
-  static class RegionReplicaSinkWriter extends SinkWriter {
-    RegionReplicaOutputSink sink;
-    ClusterConnection connection;
-    RpcControllerFactory rpcControllerFactory;
-    RpcRetryingCallerFactory rpcRetryingCallerFactory;
-    int operationTimeout;
-    ExecutorService pool;
-    Cache<TableName, Boolean> disabledAndDroppedTables;
-    TableDescriptors tableDescriptors;
-
-    public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
-        ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) {
-      this.sink = sink;
-      this.connection = connection;
-      this.operationTimeout = operationTimeout;
-      this.rpcRetryingCallerFactory
-        = RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
-      this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
-      this.pool = pool;
-      this.tableDescriptors = tableDescriptors;
-
-      int nonExistentTableCacheExpiryMs = connection.getConfiguration()
-        .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
-      // A cache for non existing tables that have a default expiry of 5 sec. This means that if the
-      // table is created again with the same name, we might miss to replicate for that amount of
-      // time. But this cache prevents overloading meta requests for every edit from a deleted file.
-      disabledAndDroppedTables = CacheBuilder.newBuilder()
-        .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS)
-        .initialCapacity(10)
-        .maximumSize(1000)
-        .build();
+  private void logSkipped(TableName tableName, List<Entry> entries, String reason) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Skipping {} entries because table {} is {}", entries.size(), tableName, reason);
+      for (Entry entry : entries) {
+        LOG.trace("Skipping : {}", entry);
+      }
     }
+  }
 
-    public void append(TableName tableName, byte[] encodedRegionName, byte[] row,
-        List<Entry> entries) throws IOException {
-
-      if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
-            + " is cached as a disabled or dropped table");
-          for (Entry entry : entries) {
-            LOG.trace("Skipping : " + entry);
-          }
-        }
-        sink.getSkippedEditsCounter().addAndGet(entries.size());
-        return;
+  private CompletableFuture<Long> replicate(TableDescriptor tableDesc, byte[] encodedRegionName,
+      List<Entry> entries) {
+    if (disabledTableCache.getIfPresent(tableDesc.getTableName()) != null) {
+      logSkipped(tableDesc.getTableName(), entries, "cached as a disabled table");
+      return CompletableFuture.completedFuture(Long.valueOf(entries.size()));
+    }
+    byte[] row = CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0));
+    CompletableFuture<RegionLocations> locateFuture = new CompletableFuture<>();
+    getRegionLocations(locateFuture, tableDesc, encodedRegionName, row, false);
+    CompletableFuture<Long> future = new CompletableFuture<>();
+    FutureUtils.addListener(locateFuture, (locs, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+      } else {
+        replicate(future, locs, tableDesc, encodedRegionName, row, entries);
       }
+    });
+    return future;
+  }
 
-      // If the table is disabled or dropped, we should not replay the entries, and we can skip
-      // replaying them. However, we might not know whether the table is disabled until we
-      // invalidate the cache and check from meta
-      RegionLocations locations = null;
-      boolean useCache = true;
-      while (true) {
-        // get the replicas of the primary region
+  @Override
+  public boolean replicate(ReplicateContext replicateContext) {
+    Map<byte[], Pair<TableDescriptor, List<Entry>>> encodedRegionName2Entries =
+      new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    long skippedEdits = 0;
+    RetryCounter retryCounter = retryCounterFactory.create();
+    outer: while (isRunning()) {
+      encodedRegionName2Entries.clear();
+      skippedEdits = 0;
+      for (Entry entry : replicateContext.getEntries()) {
+        Optional<TableDescriptor> tableDesc;
         try {
-          locations = RegionReplicaReplayCallable
-              .getRegionLocations(connection, tableName, row, useCache, 0);
-
-          if (locations == null) {
-            throw new HBaseIOException("Cannot locate locations for "
-                + tableName + ", row:" + Bytes.toStringBinary(row));
+          tableDesc = tableDescriptorCache.get(entry.getKey().getTableName());
+        } catch (ExecutionException e) {
+          LOG.warn("Failed to load table descriptor for {}, attempts={}",
+            entry.getKey().getTableName(), retryCounter.getAttemptTimes(), e.getCause());
+          if (!retryCounter.shouldRetry()) {
+            return false;
           }
-        } catch (TableNotFoundException e) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
-              + " is dropped. Adding table to cache.");
-            for (Entry entry : entries) {
-              LOG.trace("Skipping : " + entry);
-            }
+          try {
+            retryCounter.sleepUntilNextRetry();
+          } catch (InterruptedException e1) {
+            // restore the interrupted state
+            Thread.currentThread().interrupt();
+            return false;
           }
-          disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored
-          // skip this entry
-          sink.getSkippedEditsCounter().addAndGet(entries.size());
-          return;
+          continue outer;
         }
-
-        // check whether we should still replay this entry. If the regions are changed, or the
-        // entry is not coming from the primary region, filter it out.
-        HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
-        if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
-          encodedRegionName)) {
-          if (useCache) {
-            useCache = false;
-            continue; // this will retry location lookup
-          }
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
-              + " because located region " + primaryLocation.getRegionInfo().getEncodedName()
-              + " is different than the original region " + Bytes.toStringBinary(encodedRegionName)
-              + " from WALEdit");
-            for (Entry entry : entries) {
-              LOG.trace("Skipping : " + entry);
-            }
-          }
-          sink.getSkippedEditsCounter().addAndGet(entries.size());
-          return;
+        if (!requiresReplication(tableDesc, entry)) {
+          skippedEdits++;
+          continue;
         }
-        break;
+        byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
+        encodedRegionName2Entries
+          .computeIfAbsent(encodedRegionName, k -> Pair.newPair(tableDesc.get(), new ArrayList<>()))
+          .getSecond().add(entry);
       }
-
-      if (locations.size() == 1) {
-        return;
-      }
-
-      ArrayList<Future<ReplicateWALEntryResponse>> tasks = new ArrayList<>(locations.size() - 1);
-
-      // All passed entries should belong to one region because it is coming from the EntryBuffers
-      // split per region. But the regions might split and merge (unlike log recovery case).
-      for (int replicaId = 0; replicaId < locations.size(); replicaId++) {
-        HRegionLocation location = locations.getRegionLocation(replicaId);
-        if (!RegionReplicaUtil.isDefaultReplica(replicaId)) {
-          RegionInfo regionInfo = location == null
-              ? RegionReplicaUtil.getRegionInfoForReplica(
-                locations.getDefaultRegionLocation().getRegionInfo(), replicaId)
-              : location.getRegionInfo();
-          RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
-            rpcControllerFactory, tableName, location, regionInfo, row, entries,
-            sink.getSkippedEditsCounter());
-           Future<ReplicateWALEntryResponse> task = pool.submit(
-             new RetryingRpcCallable<>(rpcRetryingCallerFactory, callable, operationTimeout));
-           tasks.add(task);
-        }
+      break;
+    }
+    // send the request to regions
+    retryCounter = retryCounterFactory.create();
+    while (isRunning()) {
+      List<Pair<CompletableFuture<Long>, byte[]>> futureAndEncodedRegionNameList =
+        new ArrayList<Pair<CompletableFuture<Long>, byte[]>>();
+      for (Map.Entry<byte[], Pair<TableDescriptor, List<Entry>>> entry : encodedRegionName2Entries
+        .entrySet()) {
+        CompletableFuture<Long> future =
+          replicate(entry.getValue().getFirst(), entry.getKey(), entry.getValue().getSecond());
+        futureAndEncodedRegionNameList.add(Pair.newPair(future, entry.getKey()));
       }
-
-      boolean tasksCancelled = false;
-      for (int replicaId = 0; replicaId < tasks.size(); replicaId++) {
+      for (Pair<CompletableFuture<Long>, byte[]> pair : futureAndEncodedRegionNameList) {
+        byte[] encodedRegionName = pair.getSecond();
         try {
-          tasks.get(replicaId).get();
+          skippedEdits += pair.getFirst().get().longValue();
+          encodedRegionName2Entries.remove(encodedRegionName);
         } catch (InterruptedException e) {
-          throw new InterruptedIOException(e.getMessage());
+          // restore the interrupted state
+          Thread.currentThread().interrupt();
+          return false;
         } catch (ExecutionException e) {
+          Pair<TableDescriptor, List<Entry>> tableAndEntries =
+            encodedRegionName2Entries.get(encodedRegionName);
+          TableName tableName = tableAndEntries.getFirst().getTableName();
+          List<Entry> entries = tableAndEntries.getSecond();
           Throwable cause = e.getCause();
-          boolean canBeSkipped = false;
-          if (cause instanceof IOException) {
-            // The table can be disabled or dropped at this time. For disabled tables, we have no
-            // cheap mechanism to detect this case because meta does not contain this information.
-            // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay
-            // RPC. So instead we start the replay RPC with retries and check whether the table is
-            // dropped or disabled which might cause SocketTimeoutException, or
-            // RetriesExhaustedException or similar if we get IOE.
-            if (cause instanceof TableNotFoundException
-                || connection.isTableDisabled(tableName)) {
-              disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
-              canBeSkipped = true;
-            } else if (tableDescriptors != null) {
-              TableDescriptor tableDescriptor = tableDescriptors.get(tableName);
-              if (tableDescriptor != null
-                  //(replicaId + 1) as no task is added for primary replica for replication
-                  && tableDescriptor.getRegionReplication() <= (replicaId + 1)) {
-                canBeSkipped = true;
-              }
-            }
-            if (canBeSkipped) {
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
-                    + " because received exception for dropped or disabled table",
-                  cause);
-                for (Entry entry : entries) {
-                  LOG.trace("Skipping : " + entry);
-                }
-              }
-              if (!tasksCancelled) {
-                sink.getSkippedEditsCounter().addAndGet(entries.size());
-                tasksCancelled = true; // so that we do not add to skipped counter again
-              }
-              continue;
-            }
-
-            // otherwise rethrow
-            throw (IOException)cause;
+          // The table can be disabled or dropped at this time. For disabled tables, we have no
+          // cheap mechanism to detect this case because meta does not contain this information.
+          // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay
+          // RPC. So instead we start the replay RPC with retries and check whether the table is
+          // dropped or disabled which might cause SocketTimeoutException, or
+          // RetriesExhaustedException or similar if we get IOE.
+          if (cause instanceof TableNotFoundException) {
+            // add to cache that the table does not exist
+            tableDescriptorCache.put(tableName, Optional.empty());
+            logSkipped(tableName, entries, "dropped");
+            skippedEdits += entries.size();
+            encodedRegionName2Entries.remove(encodedRegionName);
+            continue;
+          }
+          boolean disabled = false;
+          try {
+            disabled = connection.getAdmin().isTableDisabled(tableName).get();
+          } catch (InterruptedException e1) {
+            // restore the interrupted state
+            Thread.currentThread().interrupt();
+            return false;
+          } catch (ExecutionException e1) {
+            LOG.warn("Failed to test whether {} is disabled, assume it is not disabled", tableName,
+              e1.getCause());
+          }
+          if (disabled) {
+            disabledTableCache.put(tableName, tableName);
+            logSkipped(tableName, entries, "disabled");
+            skippedEdits += entries.size();
+            encodedRegionName2Entries.remove(encodedRegionName);
+            continue;
           }
-          // unexpected exception
-          throw new IOException(cause);
+          LOG.warn("Failed to replicate {} entries for region {} of table {}", entries.size(),
+            Bytes.toStringBinary(encodedRegionName), tableName);
+        }
+      }
+      // we have done
+      if (encodedRegionName2Entries.isEmpty()) {
+        ctx.getMetrics().incrLogEditsFiltered(skippedEdits);
+        return true;
+      } else {
+        LOG.warn("Failed to replicate all entris, retry={}", retryCounter.getAttemptTimes());
+        if (!retryCounter.shouldRetry()) {
+          return false;
+        }
+        try {
+          retryCounter.sleepUntilNextRetry();
+        } catch (InterruptedException e) {
+          // restore the interrupted state
+          Thread.currentThread().interrupt();
+          return false;
         }
       }
     }
-  }
 
-  static class RetryingRpcCallable<V> implements Callable<V> {
-    RpcRetryingCallerFactory factory;
-    RetryingCallable<V> callable;
-    int timeout;
-    public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable,
-        int timeout) {
-      this.factory = factory;
-      this.callable = callable;
-      this.timeout = timeout;
-    }
-    @Override
-    public V call() throws Exception {
-      return factory.<V>newCaller().callWithRetries(callable, timeout);
-    }
+    return false;
   }
 
-  /**
-   * Calls replay on the passed edits for the given set of entries belonging to the region. It skips
-   * the entry if the region boundaries have changed or the region is gone.
-   */
-  static class RegionReplicaReplayCallable extends
-      RegionAdminServiceCallable<ReplicateWALEntryResponse> {
-    private final List<Entry> entries;
-    private final byte[] initialEncodedRegionName;
-    private final AtomicLong skippedEntries;
-
-    public RegionReplicaReplayCallable(ClusterConnection connection,
-        RpcControllerFactory rpcControllerFactory, TableName tableName,
-        HRegionLocation location, RegionInfo regionInfo, byte[] row,List<Entry> entries,
-        AtomicLong skippedEntries) {
-      super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId());
-      this.entries = entries;
-      this.skippedEntries = skippedEntries;
-      this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
-    }
-
-    @Override
-    public ReplicateWALEntryResponse call(HBaseRpcController controller) throws Exception {
-      // Check whether we should still replay this entry. If the regions are changed, or the
-      // entry is not coming form the primary region, filter it out because we do not need it.
-      // Regions can change because of (1) region split (2) region merge (3) table recreated
-      boolean skip = false;
-      if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
-          initialEncodedRegionName)) {
-        skip = true;
-      }
-      if (!this.entries.isEmpty() && !skip) {
-        Entry[] entriesArray = new Entry[this.entries.size()];
-        entriesArray = this.entries.toArray(entriesArray);
-
-        // set the region name for the target region replica
-        Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
-            ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location
-                .getRegionInfo().getEncodedNameAsBytes(), null, null, null);
-        controller.setCellScanner(p.getSecond());
-        return stub.replay(controller, p.getFirst());
-      }
+  @Override
+  public boolean canReplicateToSameCluster() {
+    return true;
+  }
 
-      if (skip) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
-            + " because located region " + location.getRegionInfo().getEncodedName()
-            + " is different than the original region "
-            + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
-          for (Entry entry : entries) {
-            LOG.trace("Skipping : " + entry);
-          }
-        }
-        skippedEntries.addAndGet(entries.size());
-      }
-      return ReplicateWALEntryResponse.newBuilder().build();
-    }
+  @Override
+  protected WALEntryFilter getScopeWALEntryFilter() {
+    // we do not care about scope. We replicate everything.
+    return null;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 10fa50f..86f40ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -282,7 +282,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
     }
     replicationEndpoint
-      .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs,
+      .init(new ReplicationEndpoint.Context(server, conf, replicationPeer.getConfiguration(), fs,
         replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server));
     replicationEndpoint.start();
     replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index e1e55f5..9af60c5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -88,7 +88,7 @@ public class TestAsyncTableNoncedRetry {
       registry.getClusterId().get(), null, User.getCurrent()) {
 
       @Override
-      public NonceGenerator getNonceGenerator() {
+      NonceGenerator getNonceGenerator() {
         return NONCE_GENERATOR;
       }
     };
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
index 04db81a..017d7c9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
@@ -20,16 +20,17 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.Cell.Type;
 import org.apache.hadoop.hbase.CellBuilderFactory;
@@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionLocator;
@@ -51,12 +51,12 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.testclassification.FlakeyTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -383,9 +383,8 @@ public class TestRegionReplicaReplicationEndpoint {
     testRegionReplicaReplicationIgnores(false, true);
   }
 
-  public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
+  private void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
       throws Exception {
-
     // tests having edits from a disabled or dropped table is handled correctly by skipping those
     // entries and further edits after the edits from dropped/disabled table can be replicated
     // without problems.
@@ -405,8 +404,7 @@ public class TestRegionReplicaReplicationEndpoint {
     HTU.getAdmin().createTable(htd);
 
     // both tables are created, now pause replication
-    ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
-    admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
+    HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
 
     // now that the replication is disabled, write to the table to be dropped, then drop the table.
 
@@ -416,19 +414,9 @@ public class TestRegionReplicaReplicationEndpoint {
 
     HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000);
 
-    AtomicLong skippedEdits = new AtomicLong();
-    RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
-        mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
-    when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
-    FSTableDescriptors fstd = new FSTableDescriptors(HTU.getConfiguration(),
-        FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath());
-    RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
-        new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
-            (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE,
-            fstd);
     RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
     HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
-    byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
+    byte[] encodedRegionName = hrl.getRegion().getEncodedNameAsBytes();
 
     Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
         .setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
@@ -436,7 +424,6 @@ public class TestRegionReplicaReplicationEndpoint {
       new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
         new WALEdit()
             .add(cell));
-
     HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
     if (dropTable) {
       HTU.getAdmin().deleteTable(toBeDisabledTable);
@@ -445,11 +432,23 @@ public class TestRegionReplicaReplicationEndpoint {
       HTU.getAdmin().modifyTable(toBeDisabledTable, htd);
       HTU.getAdmin().enableTable(toBeDisabledTable);
     }
-    sinkWriter.append(toBeDisabledTable, encodedRegionName,
-      HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
-
-    assertEquals(2, skippedEdits.get());
 
+    HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
+    MetricsSource metrics = mock(MetricsSource.class);
+    ReplicationEndpoint.Context ctx =
+      new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), HTU.getConfiguration(),
+        HTU.getTestFileSystem(), ServerRegionReplicaUtil.getReplicationPeerId(),
+        UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().getReplicationPeers()
+          .getPeer(ServerRegionReplicaUtil.getReplicationPeerId()),
+        metrics, rs.getTableDescriptors(), rs);
+    RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
+    rrpe.init(ctx);
+    rrpe.start();
+    ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext();
+    repCtx.setEntries(Lists.newArrayList(entry, entry));
+    assertTrue(rrpe.replicate(repCtx));
+    verify(metrics, times(1)).incrLogEditsFiltered(eq(2L));
+    rrpe.stop();
     if (disableReplication) {
       // enable replication again so that we can verify replication
       HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
@@ -460,17 +459,14 @@ public class TestRegionReplicaReplicationEndpoint {
 
     try {
       // load some data to the to-be-dropped table
-
       // load the data to the table
       HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
 
       // now enable the replication
-      admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
+      HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
 
       verifyReplication(tableName, regionReplication, 0, 1000);
-
     } finally {
-      admin.close();
       table.close();
       rl.close();
       tableToBeDisabled.close();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index ab67d94..de0f151 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -19,15 +19,16 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
 import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
-import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -37,24 +38,22 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.WALObserver;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
-import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
@@ -73,8 +72,6 @@ import org.junit.experimental.categories.Category;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
-
 /**
  * Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this
  * class contains lower level tests using callables.
@@ -178,39 +175,34 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
   public void testReplayCallable() throws Exception {
     // tests replaying the edits to a secondary region replica using the Callable directly
     openRegion(HTU, rs0, hriSecondary);
-    ClusterConnection connection =
-        (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
 
-    //load some data to primary
+    // load some data to primary
     HTU.loadNumericRows(table, f, 0, 1000);
 
     Assert.assertEquals(1000, entries.size());
-    // replay the edits to the secondary using replay callable
-    replicateUsingCallable(connection, entries);
+    try (AsyncClusterConnection conn = ClusterConnectionFactory
+      .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
+      // replay the edits to the secondary using replay callable
+      replicateUsingCallable(conn, entries);
+    }
 
     Region region = rs0.getRegion(hriSecondary.getEncodedName());
     HTU.verifyNumericRows(region, f, 0, 1000);
 
     HTU.deleteNumericRows(table, f, 0, 1000);
     closeRegion(HTU, rs0, hriSecondary);
-    connection.close();
   }
 
-  private void replicateUsingCallable(ClusterConnection connection, Queue<Entry> entries)
-      throws IOException, RuntimeException {
+  private void replicateUsingCallable(AsyncClusterConnection connection, Queue<Entry> entries)
+      throws IOException, ExecutionException, InterruptedException {
     Entry entry;
     while ((entry = entries.poll()) != null) {
       byte[] row = CellUtil.cloneRow(entry.getEdit().getCells().get(0));
-      RegionLocations locations = connection.locateRegion(tableName, row, true, true);
-      RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
-        RpcControllerFactory.instantiate(connection.getConfiguration()),
-        table.getName(), locations.getRegionLocation(1),
-        locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry),
-        new AtomicLong());
-
-      RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(
-        connection.getConfiguration());
-      factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000);
+      RegionLocations locations = connection.getRegionLocations(tableName, row, true).get();
+      connection
+        .replay(tableName, locations.getRegionLocation(1).getRegion().getEncodedNameAsBytes(), row,
+          Collections.singletonList(entry), 1, Integer.MAX_VALUE, TimeUnit.SECONDS.toNanos(10))
+        .get();
     }
   }
 
@@ -218,49 +210,49 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
   public void testReplayCallableWithRegionMove() throws Exception {
     // tests replaying the edits to a secondary region replica using the Callable directly while
     // the region is moved to another location.It tests handling of RME.
-    openRegion(HTU, rs0, hriSecondary);
-    ClusterConnection connection =
-        (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
-    //load some data to primary
-    HTU.loadNumericRows(table, f, 0, 1000);
+    try (AsyncClusterConnection conn = ClusterConnectionFactory
+      .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
+      openRegion(HTU, rs0, hriSecondary);
+      // load some data to primary
+      HTU.loadNumericRows(table, f, 0, 1000);
 
-    Assert.assertEquals(1000, entries.size());
-    // replay the edits to the secondary using replay callable
-    replicateUsingCallable(connection, entries);
+      Assert.assertEquals(1000, entries.size());
 
-    Region region = rs0.getRegion(hriSecondary.getEncodedName());
-    HTU.verifyNumericRows(region, f, 0, 1000);
+      // replay the edits to the secondary using replay callable
+      replicateUsingCallable(conn, entries);
 
-    HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
+      Region region = rs0.getRegion(hriSecondary.getEncodedName());
+      HTU.verifyNumericRows(region, f, 0, 1000);
 
-    // move the secondary region from RS0 to RS1
-    closeRegion(HTU, rs0, hriSecondary);
-    openRegion(HTU, rs1, hriSecondary);
+      HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
 
-    // replicate the new data
-    replicateUsingCallable(connection, entries);
+      // move the secondary region from RS0 to RS1
+      closeRegion(HTU, rs0, hriSecondary);
+      openRegion(HTU, rs1, hriSecondary);
 
-    region = rs1.getRegion(hriSecondary.getEncodedName());
-    // verify the new data. old data may or may not be there
-    HTU.verifyNumericRows(region, f, 1000, 2000);
+      // replicate the new data
+      replicateUsingCallable(conn, entries);
 
-    HTU.deleteNumericRows(table, f, 0, 2000);
-    closeRegion(HTU, rs1, hriSecondary);
-    connection.close();
+      region = rs1.getRegion(hriSecondary.getEncodedName());
+      // verify the new data. old data may or may not be there
+      HTU.verifyNumericRows(region, f, 1000, 2000);
+
+      HTU.deleteNumericRows(table, f, 0, 2000);
+      closeRegion(HTU, rs1, hriSecondary);
+    }
   }
 
   @Test
   public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
     // tests replaying the edits to a secondary region replica using the RRRE.replicate()
     openRegion(HTU, rs0, hriSecondary);
-    ClusterConnection connection =
-        (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
     RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
 
     ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
     when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
     when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
-
+    when(context.getServer()).thenReturn(rs0);
+    when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
     replicator.init(context);
     replicator.startAsync();
 
@@ -272,12 +264,11 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
     final String fakeWalGroupId = "fakeWALGroup";
     replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
         .setWalGroupId(fakeWalGroupId));
-
+    replicator.stop();
     Region region = rs0.getRegion(hriSecondary.getEncodedName());
     HTU.verifyNumericRows(region, f, 0, 1000);
 
     HTU.deleteNumericRows(table, f, 0, 1000);
     closeRegion(HTU, rs0, hriSecondary);
-    connection.close();
   }
 }