You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/05 06:27:23 UTC

[1/6] hbase git commit: Add 1.3.3 to download page [Forced Update!]

Repository: hbase
Updated Branches:
  refs/heads/HBASE-21512 04e6909ad -> df35a12ce (forced update)


Add 1.3.3 to download page


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/94093e86
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/94093e86
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/94093e86

Branch: refs/heads/HBASE-21512
Commit: 94093e869aa006e4443dcf6ff923843c9c61f192
Parents: 3fbdd5b
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Jan 4 12:32:00 2019 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jan 4 12:32:00 2019 -0800

----------------------------------------------------------------------
 src/site/xdoc/downloads.xml | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/94093e86/src/site/xdoc/downloads.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/downloads.xml b/src/site/xdoc/downloads.xml
index 2859779..fb827aa 100644
--- a/src/site/xdoc/downloads.xml
+++ b/src/site/xdoc/downloads.xml
@@ -108,6 +108,27 @@ under the License.
     </tr>
     <tr>
       <td style="test-align: left">
+        1.3.3
+      </td>
+      <td style="test-align: left">
+        2018/12/21
+      </td>
+      <td style="test-align: left">
+        <a href="https://apache.org/dist/hbase/1.3.3/compat-check-report.html">1.3.2 vs 1.3.3</a>
+      </td>
+      <td style="test-align: left">
+        <a href="https://github.com/apache/hbase/blob/rel/1.3.3/CHANGES.txt">Changes</a>
+      </td>
+      <td style="test-align: left">
+        <a href="https://s.apache.org/hbase-1.3.3-jira-release-notes">Release Notes</a>
+      </td>
+      <td style="test-align: left">
+        <a href="https://www.apache.org/dyn/closer.lua/hbase/1.3.3/hbase-1.3.3-src.tar.gz">src</a> (<a href="https://apache.org/dist/hbase/1.3.3/hbase-1.3.3-src.tar.gz.sha512">sha512</a> <a href="https://apache.org/dist/hbase/1.3.3/hbase-1.3.3-src.tar.gz.asc">asc</a>) <br />
+        <a href="https://www.apache.org/dyn/closer.lua/hbase/1.3.3/hbase-1.3.3-bin.tar.gz">bin</a> (<a href="https://apache.org/dist/hbase/1.3.3/hbase-1.3.3-bin.tar.gz.sha512">sha512</a> <a href="https://apache.org/dist/hbase/1.3.3/hbase-1.3.3-bin.tar.gz.asc">asc</a>)
+      </td>
+    </tr>
+    <tr>
+      <td style="test-align: left">
         1.2.9
       </td>
       <td style="test-align: left">


[5/6] hbase git commit: HBASE-21516 Use AsyncConnection instead of Connection in SecureBulkLoadManager

Posted by zh...@apache.org.
HBASE-21516 Use AsyncConnection instead of Connection in SecureBulkLoadManager


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7556dd5b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7556dd5b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7556dd5b

Branch: refs/heads/HBASE-21512
Commit: 7556dd5b45051265aa4ae570566f3e1f1a94c7e3
Parents: 3c7636d
Author: zhangduo <zh...@apache.org>
Authored: Sat Dec 1 21:15:48 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Jan 5 14:24:35 2019 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |  5 +-
 .../hbase/shaded/protobuf/ProtobufUtil.java     |  7 ++-
 .../hbase/regionserver/HRegionServer.java       |  2 +-
 .../regionserver/SecureBulkLoadManager.java     | 24 +++++----
 .../hadoop/hbase/security/token/TokenUtil.java  | 57 +++++++++++++++-----
 .../hbase/security/token/TestTokenUtil.java     | 42 +++++++++++----
 6 files changed, 96 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7556dd5b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index a3d49b5..d9e620b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -261,13 +261,12 @@ public final class ProtobufUtil {
    * just {@link ServiceException}. Prefer this method to
    * {@link #getRemoteException(ServiceException)} because trying to
    * contain direct protobuf references.
-   * @param e
    */
-  public static IOException handleRemoteException(Exception e) {
+  public static IOException handleRemoteException(Throwable e) {
     return makeIOExceptionOfException(e);
   }
 
-  private static IOException makeIOExceptionOfException(Exception e) {
+  private static IOException makeIOExceptionOfException(Throwable e) {
     Throwable t = e;
     if (e instanceof ServiceException ||
         e instanceof org.apache.hbase.thirdparty.com.google.protobuf.ServiceException) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7556dd5b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index fea81f1..de2fb7d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ByteBufferExtendedCell;
@@ -123,6 +122,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@@ -343,13 +343,12 @@ public final class ProtobufUtil {
    * just {@link ServiceException}. Prefer this method to
    * {@link #getRemoteException(ServiceException)} because trying to
    * contain direct protobuf references.
-   * @param e
    */
-  public static IOException handleRemoteException(Exception e) {
+  public static IOException handleRemoteException(Throwable e) {
     return makeIOExceptionOfException(e);
   }
 
-  private static IOException makeIOExceptionOfException(Exception e) {
+  private static IOException makeIOExceptionOfException(Throwable e) {
     Throwable t = e;
     if (e instanceof ServiceException) {
       t = e.getCause();

http://git-wip-us.apache.org/repos/asf/hbase/blob/7556dd5b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 375b3f8..6e5fc9f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1930,7 +1930,7 @@ public class HRegionServer extends HasThread implements
     if (!isStopped() && !isAborted()) {
       initializeThreads();
     }
-    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection);
+    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
     this.secureBulkLoadManager.start();
 
     // Health checker thread.

http://git-wip-us.apache.org/repos/asf/hbase/blob/7556dd5b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index 566a6b6..add6519 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,11 +37,12 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.AsyncConnection;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.regionserver.HRegion.BulkLoadListener;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
 import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -56,7 +56,9 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
@@ -111,9 +113,9 @@ public class SecureBulkLoadManager {
 
   private UserProvider userProvider;
   private ConcurrentHashMap<UserGroupInformation, Integer> ugiReferenceCounter;
-  private Connection conn;
+  private AsyncConnection conn;
 
-  SecureBulkLoadManager(Configuration conf, Connection conn) {
+  SecureBulkLoadManager(Configuration conf, AsyncConnection conn) {
     this.conf = conf;
     this.conn = conn;
   }
@@ -212,23 +214,23 @@ public class SecureBulkLoadManager {
       familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath()));
     }
 
-    Token userToken = null;
+    Token<AuthenticationTokenIdentifier> userToken = null;
     if (userProvider.isHadoopSecurityEnabled()) {
-      userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
-              .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text(
-              request.getFsToken().getService()));
+      userToken = new Token<>(request.getFsToken().getIdentifier().toByteArray(),
+        request.getFsToken().getPassword().toByteArray(), new Text(request.getFsToken().getKind()),
+        new Text(request.getFsToken().getService()));
     }
     final String bulkToken = request.getBulkToken();
     User user = getActiveUser();
     final UserGroupInformation ugi = user.getUGI();
     if (userProvider.isHadoopSecurityEnabled()) {
       try {
-        Token tok = TokenUtil.obtainToken(conn);
+        Token<AuthenticationTokenIdentifier> tok = TokenUtil.obtainToken(conn).get();
         if (tok != null) {
           boolean b = ugi.addToken(tok);
           LOG.debug("token added " + tok + " for user " + ugi + " return=" + b);
         }
-      } catch (IOException ioe) {
+      } catch (Exception ioe) {
         LOG.warn("unable to add token", ioe);
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7556dd5b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
index c54d905..28efb84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,27 +15,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.security.token;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ServiceException;
-
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.AuthenticationService;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenResponse;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
@@ -45,6 +47,8 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+
 /**
  * Utility methods for obtaining authentication tokens.
  */
@@ -64,12 +68,39 @@ public class TokenUtil {
 
   /**
    * Obtain and return an authentication token for the current user.
+   * @param conn The async HBase cluster connection
+   * @return the authentication token instance, wrapped by a {@link CompletableFuture}.
+   */
+  public static CompletableFuture<Token<AuthenticationTokenIdentifier>> obtainToken(
+      AsyncConnection conn) {
+    CompletableFuture<Token<AuthenticationTokenIdentifier>> future = new CompletableFuture<>();
+    if (injectedException != null) {
+      future.completeExceptionally(injectedException);
+      return future;
+    }
+    AsyncTable<?> table = conn.getTable(TableName.META_TABLE_NAME);
+    table.<AuthenticationService.Interface, GetAuthenticationTokenResponse> coprocessorService(
+      AuthenticationProtos.AuthenticationService::newStub,
+      (s, c, r) -> s.getAuthenticationToken(c,
+        AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance(), r),
+      HConstants.EMPTY_START_ROW).whenComplete((resp, error) -> {
+        if (error != null) {
+          future.completeExceptionally(ProtobufUtil.handleRemoteException(error));
+        } else {
+          future.complete(toToken(resp.getToken()));
+        }
+      });
+    return future;
+  }
+
+  /**
+   * Obtain and return an authentication token for the current user.
    * @param conn The HBase cluster connection
    * @throws IOException if a remote error or serialization problem occurs.
    * @return the authentication token instance
    */
-  public static Token<AuthenticationTokenIdentifier> obtainToken(
-      Connection conn) throws IOException {
+  public static Token<AuthenticationTokenIdentifier> obtainToken(Connection conn)
+      throws IOException {
     Table meta = null;
     try {
       injectFault();
@@ -77,9 +108,9 @@ public class TokenUtil {
       meta = conn.getTable(TableName.META_TABLE_NAME);
       CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
       AuthenticationProtos.AuthenticationService.BlockingInterface service =
-          AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
-      AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
-          AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
+        AuthenticationService.newBlockingStub(rpcChannel);
+      GetAuthenticationTokenResponse response =
+        service.getAuthenticationToken(null, GetAuthenticationTokenRequest.getDefaultInstance());
 
       return toToken(response.getToken());
     } catch (ServiceException se) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7556dd5b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java
index 32fcddb..585a3ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java
@@ -18,35 +18,53 @@
 package org.apache.hadoop.hbase.security.token;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
 import java.net.URLClassLoader;
-
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.AsyncConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 
 @Category(SmallTests.class)
 public class TestTokenUtil {
+
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestTokenUtil.class);
+    HBaseClassTestRule.forClass(TestTokenUtil.class);
 
-  @Test
-  public void testObtainToken() throws Exception {
+  private URLClassLoader cl;
+
+  @Before
+  public void setUp() {
     URL urlPU = ProtobufUtil.class.getProtectionDomain().getCodeSource().getLocation();
     URL urlTU = TokenUtil.class.getProtectionDomain().getCodeSource().getLocation();
+    cl = new URLClassLoader(new URL[] { urlPU, urlTU }, getClass().getClassLoader());
+  }
 
-    ClassLoader cl = new URLClassLoader(new URL[] { urlPU, urlTU }, getClass().getClassLoader());
+  @After
+  public void tearDown() throws IOException {
+    Closeables.close(cl, true);
+  }
 
+  @Test
+  public void testObtainToken() throws Exception {
     Throwable injected = new com.google.protobuf.ServiceException("injected");
 
     Class<?> tokenUtil = cl.loadClass(TokenUtil.class.getCanonicalName());
@@ -55,8 +73,7 @@ public class TestTokenUtil {
     shouldInjectFault.set(null, injected);
 
     try {
-      tokenUtil.getMethod("obtainToken", Connection.class)
-          .invoke(null, new Object[] { null });
+      tokenUtil.getMethod("obtainToken", Connection.class).invoke(null, new Object[] { null });
       fail("Should have injected exception.");
     } catch (InvocationTargetException e) {
       Throwable t = e;
@@ -72,9 +89,16 @@ public class TestTokenUtil {
       }
     }
 
+    CompletableFuture<?> future = (CompletableFuture<?>) tokenUtil
+      .getMethod("obtainToken", AsyncConnection.class).invoke(null, new Object[] { null });
+    try {
+      future.get();
+      fail("Should have injected exception.");
+    } catch (ExecutionException e) {
+      assertSame(injected, e.getCause());
+    }
     Boolean loaded = (Boolean) cl.loadClass(ProtobufUtil.class.getCanonicalName())
-        .getDeclaredMethod("isClassLoaderLoaded")
-        .invoke(null);
+      .getDeclaredMethod("isClassLoaderLoaded").invoke(null);
     assertFalse("Should not have loaded DynamicClassLoader", loaded);
   }
 }


[6/6] hbase git commit: HBASE-21538 Rewrite RegionReplicaFlushHandler to use AsyncClusterConnection

Posted by zh...@apache.org.
HBASE-21538 Rewrite RegionReplicaFlushHandler to use AsyncClusterConnection


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/df35a12c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/df35a12c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/df35a12c

Branch: refs/heads/HBASE-21512
Commit: df35a12ce93ef6ec20a31c50978fe69b441a6da1
Parents: 8cd0e99
Author: Duo Zhang <zh...@apache.org>
Authored: Wed Dec 12 09:33:33 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Jan 5 14:25:41 2019 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncClusterConnection.java    |   8 ++
 .../hbase/client/AsyncConnectionImpl.java       |   8 ++
 .../hbase/client/ClusterConnectionFactory.java  |  16 +--
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java |  36 ++++--
 .../apache/hadoop/hbase/util/FutureUtils.java   |  22 ++++
 .../master/procedure/RSProcedureDispatcher.java |  34 +-----
 .../hbase/protobuf/ReplicationProtbufUtil.java  |  15 +--
 .../hbase/regionserver/HRegionServer.java       |   3 +-
 .../handler/RegionReplicaFlushHandler.java      | 110 ++++++++++---------
 9 files changed, 132 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 1327fd7..f1f64ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+
 /**
  * The asynchronous connection for internal usage.
  */
@@ -41,4 +44,9 @@ public interface AsyncClusterConnection extends AsyncConnection {
    * Get the rpc client we used to communicate with other servers.
    */
   RpcClient getRpcClient();
+
+  /**
+   * Flush a region and get the response.
+   */
+  CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean writeFlushWALMarker);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 4e7f421..d883809 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -54,6 +54,7 @@ import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
@@ -335,4 +336,11 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
   public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
     return new AsyncRegionServerAdmin(serverName, this);
   }
+
+  @Override
+  public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
+      boolean writeFlushWALMarker) {
+    RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
+    return admin.flushRegionInternal(regionName, writeFlushWALMarker);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
index 68c0630..79484db 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
@@ -18,15 +18,12 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.net.SocketAddress;
-import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
-
 /**
  * The factory for creating {@link AsyncClusterConnection}.
  */
@@ -48,16 +45,7 @@ public final class ClusterConnectionFactory {
   public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
       SocketAddress localAddress, User user) throws IOException {
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
-    String clusterId;
-    try {
-      clusterId = registry.getClusterId().get();
-    } catch (InterruptedException e) {
-      throw (IOException) new InterruptedIOException().initCause(e);
-    } catch (ExecutionException e) {
-      Throwable cause = e.getCause();
-      Throwables.propagateIfPossible(cause, IOException.class);
-      throw new IOException(cause);
-    }
+    String clusterId = FutureUtils.get(registry.getClusterId());
     return new AsyncConnectionImpl(conf, registry, clusterId, localAddress, user);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 869a630..4e9e64c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -818,7 +818,19 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public CompletableFuture<Void> flushRegion(byte[] regionName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
+    return flushRegionInternal(regionName, false).thenAccept(r -> {
+    });
+  }
+
+  /**
+   * This method is for internal use only, where we need the response of the flush.
+   * <p/>
+   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
+   * API.
+   */
+  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
+      boolean writeFlushWALMarker) {
+    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
     addListener(getRegionLocation(regionName), (location, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
@@ -830,7 +842,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
           .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
         return;
       }
-      addListener(flush(serverName, location.getRegion()), (ret, err2) -> {
+      addListener(flush(serverName, location.getRegion(), writeFlushWALMarker), (ret, err2) -> {
         if (err2 != null) {
           future.completeExceptionally(err2);
         } else {
@@ -841,15 +853,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     return future;
   }
 
-  private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) {
-    return this.<Void> newAdminCaller()
-            .serverName(serverName)
-            .action(
-              (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
-                controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
-                  .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
-                resp -> null))
-            .call();
+  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
+      boolean writeFlushWALMarker) {
+    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
+      .action((controller, stub) -> this
+        .<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
+          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), writeFlushWALMarker),
+          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
+      .call();
   }
 
   @Override
@@ -862,7 +873,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       }
       List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
       if (hRegionInfos != null) {
-        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
+        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, false).thenAccept(r -> {
+        })));
       }
       addListener(CompletableFuture.allOf(
         compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {

http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
index f4a7332..f509f87 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -17,12 +17,18 @@
  */
 package org.apache.hadoop.hbase.util;
 
+import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.function.BiConsumer;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+
 /**
  * Helper class for processing futures.
  */
@@ -57,4 +63,20 @@ public final class FutureUtils {
       }
     });
   }
+
+  /**
+   * A helper class for getting the result of a Future, and convert the error to an
+   * {@link IOException}.
+   */
+  public static <T> T get(Future<T> future) throws IOException {
+    try {
+      return future.get();
+    } catch (InterruptedException e) {
+      throw (IOException) new InterruptedIOException().initCause(e);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      Throwables.propagateIfPossible(cause, IOException.class);
+      throw new IOException(cause);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
index f3ab4b3..f772b68 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -18,11 +18,9 @@
 package org.apache.hadoop.hbase.master.procedure;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ServerName;
@@ -35,12 +33,12 @@ import org.apache.hadoop.hbase.master.ServerListener;
 import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
@@ -341,15 +339,7 @@ public class RSProcedureDispatcher
 
     protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
         final ExecuteProceduresRequest request) throws IOException {
-      try {
-        return getRsAdmin().executeProcedures(request).get();
-      } catch (InterruptedException e) {
-        throw (IOException) new InterruptedIOException().initCause(e);
-      } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
-        Throwables.propagateIfPossible(cause, IOException.class);
-        throw new IOException(cause);
-      }
+      return FutureUtils.get(getRsAdmin().executeProcedures(request));
     }
 
     protected void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
@@ -408,15 +398,7 @@ public class RSProcedureDispatcher
 
     private OpenRegionResponse sendRequest(final ServerName serverName,
         final OpenRegionRequest request) throws IOException {
-      try {
-        return getRsAdmin().openRegion(request).get();
-      } catch (InterruptedException e) {
-        throw (IOException) new InterruptedIOException().initCause(e);
-      } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
-        Throwables.propagateIfPossible(cause, IOException.class);
-        throw new IOException(cause);
-      }
+      return FutureUtils.get(getRsAdmin().openRegion(request));
     }
 
     private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
@@ -458,15 +440,7 @@ public class RSProcedureDispatcher
 
     private CloseRegionResponse sendRequest(final ServerName serverName,
         final CloseRegionRequest request) throws IOException {
-      try {
-        return getRsAdmin().closeRegion(request).get();
-      } catch (InterruptedException e) {
-        throw (IOException) new InterruptedIOException().initCause(e);
-      } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
-        Throwables.propagateIfPossible(cause, IOException.class);
-        throw new IOException(cause);
-      }
+      return FutureUtils.get(getRsAdmin().closeRegion(request));
     }
 
     private void remoteCallCompleted(final MasterProcedureEnv env,

http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 74fad26..9f41a76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -18,13 +18,10 @@
  */
 package org.apache.hadoop.hbase.protobuf;
 
-
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -32,12 +29,12 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
@@ -60,15 +57,7 @@ public class ReplicationProtbufUtil {
       throws IOException {
     Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(
       entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
-    try {
-      admin.replicateWALEntry(p.getFirst(), p.getSecond()).get();
-    } catch (InterruptedException e) {
-      throw (IOException) new InterruptedIOException().initCause(e);
-    } catch (ExecutionException e) {
-      Throwable cause = e.getCause();
-      Throwables.propagateIfPossible(cause, IOException.class);
-      throw new IOException(e);
-    }
+    FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond()));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 6e5fc9f..27972fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2380,8 +2380,7 @@ public class HRegionServer extends HasThread implements
 
     // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
     if (this.executorService != null) {
-      this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection,
-          rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
+      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
index b917379..921a69c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
@@ -20,26 +20,23 @@ package org.apache.hadoop.hbase.regionserver.handler;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.FlushRegionCallable;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 
 /**
  * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in
@@ -56,20 +53,13 @@ public class RegionReplicaFlushHandler extends EventHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class);
 
-  private final ClusterConnection connection;
-  private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
-  private final RpcControllerFactory rpcControllerFactory;
-  private final int operationTimeout;
+  private final AsyncClusterConnection connection;
+
   private final HRegion region;
 
-  public RegionReplicaFlushHandler(Server server, ClusterConnection connection,
-      RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory,
-      int operationTimeout, HRegion region) {
+  public RegionReplicaFlushHandler(Server server, HRegion region) {
     super(server, EventType.RS_REGION_REPLICA_FLUSH);
-    this.connection = connection;
-    this.rpcRetryingCallerFactory = rpcRetryingCallerFactory;
-    this.rpcControllerFactory = rpcControllerFactory;
-    this.operationTimeout = operationTimeout;
+    this.connection = server.getAsyncClusterConnection();
     this.region = region;
   }
 
@@ -103,7 +93,7 @@ public class RegionReplicaFlushHandler extends EventHandler {
     return numRetries;
   }
 
-  void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException {
+  void triggerFlushInPrimaryRegion(final HRegion region) throws IOException {
     long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
       HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
 
@@ -117,44 +107,58 @@ public class RegionReplicaFlushHandler extends EventHandler {
     }
     while (!region.isClosing() && !region.isClosed()
         && !server.isAborted() && !server.isStopped()) {
-      FlushRegionCallable flushCallable = new FlushRegionCallable(
-        connection, rpcControllerFactory,
-        RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true);
-
       // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we
       // do not have to wait for the whole flush here, just initiate it.
-      FlushRegionResponse response = null;
+      FlushRegionResponse response;
       try {
-         response = rpcRetryingCallerFactory.<FlushRegionResponse>newCaller()
-          .callWithRetries(flushCallable, this.operationTimeout);
-      } catch (IOException ex) {
-        if (ex instanceof TableNotFoundException
-            || connection.isTableDisabled(region.getRegionInfo().getTable())) {
+        response = FutureUtils.get(connection.flush(ServerRegionReplicaUtil
+          .getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionName(), true));
+      } catch (IOException e) {
+        if (e instanceof TableNotFoundException || FutureUtils
+          .get(connection.getAdmin().isTableDisabled(region.getRegionInfo().getTable()))) {
           return;
         }
-        throw ex;
+        if (!counter.shouldRetry()) {
+          throw e;
+        }
+        // The reason that why we need to retry here is that, the retry for asynchronous admin
+        // request is much simpler than the normal operation, if we failed to locate the region once
+        // then we will throw the exception out and will not try to relocate again. So here we need
+        // to add some retries by ourselves to prevent shutting down the region server too
+        // frequent...
+        LOG.debug("Failed to trigger a flush of primary region replica {} of region {}, retry={}",
+          ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+            .getRegionNameAsString(),
+          region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes(), e);
+        try {
+          counter.sleepUntilNextRetry();
+        } catch (InterruptedException e1) {
+          throw new InterruptedIOException(e1.getMessage());
+        }
+        continue;
       }
 
       if (response.getFlushed()) {
         // then we have to wait for seeing the flush entry. All reads will be rejected until we see
         // a complete flush cycle or replay a region open event
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Successfully triggered a flush of primary region replica "
-              + ServerRegionReplicaUtil
-                .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
-                + " of region " + region.getRegionInfo().getEncodedName()
-                + " Now waiting and blocking reads until observing a full flush cycle");
+          LOG.debug("Successfully triggered a flush of primary region replica " +
+            ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+              .getRegionNameAsString() +
+            " of region " + region.getRegionInfo().getRegionNameAsString() +
+            " Now waiting and blocking reads until observing a full flush cycle");
         }
         break;
       } else {
         if (response.hasWroteFlushWalMarker()) {
-          if(response.getWroteFlushWalMarker()) {
+          if (response.getWroteFlushWalMarker()) {
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary "
-                  + "region replica " + ServerRegionReplicaUtil
-                    .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
-                  + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and "
-                  + "blocking reads until observing a flush marker");
+              LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " +
+                "region replica " +
+                ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+                  .getRegionNameAsString() +
+                " of region " + region.getRegionInfo().getRegionNameAsString() +
+                " Now waiting and " + "blocking reads until observing a flush marker");
             }
             break;
           } else {
@@ -162,15 +166,23 @@ public class RegionReplicaFlushHandler extends EventHandler {
             // closing or already flushing. Retry flush again after some sleep.
             if (!counter.shouldRetry()) {
               throw new IOException("Cannot cause primary to flush or drop a wal marker after " +
-                  "retries. Failing opening of this region replica "
-                  + region.getRegionInfo().getEncodedName());
+                counter.getAttemptTimes() + " retries. Failing opening of this region replica " +
+                region.getRegionInfo().getRegionNameAsString());
+            } else {
+              LOG.warn(
+                "Cannot cause primary replica {} to flush or drop a wal marker " +
+                  "for region replica {}, retry={}",
+                ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+                  .getRegionNameAsString(),
+                region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes());
             }
           }
         } else {
           // nothing to do. Are we dealing with an old server?
-          LOG.warn("Was not able to trigger a flush from primary region due to old server version? "
-              + "Continuing to open the secondary region replica: "
-              + region.getRegionInfo().getEncodedName());
+          LOG.warn(
+            "Was not able to trigger a flush from primary region due to old server version? " +
+              "Continuing to open the secondary region replica: " +
+              region.getRegionInfo().getRegionNameAsString());
           region.setReadsEnabled(true);
           break;
         }


[4/6] hbase git commit: HBASE-21515 Also initialize an AsyncClusterConnection in HRegionServer

Posted by zh...@apache.org.
HBASE-21515 Also initialize an AsyncClusterConnection in HRegionServer


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3c7636da
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3c7636da
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3c7636da

Branch: refs/heads/HBASE-21512
Commit: 3c7636da1174d5b9f85b1d7403773fc9e2d9cc3f
Parents: 94093e8
Author: zhangduo <zh...@apache.org>
Authored: Fri Nov 30 08:23:47 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Jan 5 14:24:35 2019 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncClusterConnection.java    | 38 ++++++++++++
 .../hbase/client/AsyncConnectionImpl.java       | 39 ++++++------
 .../hbase/client/ClusterConnectionFactory.java  | 63 ++++++++++++++++++++
 .../hadoop/hbase/client/ConnectionFactory.java  |  5 +-
 .../hadoop/hbase/util/ReflectionUtils.java      | 22 ++++---
 .../java/org/apache/hadoop/hbase/Server.java    | 20 +++++++
 .../org/apache/hadoop/hbase/master/HMaster.java |  3 +
 .../hbase/regionserver/HRegionServer.java       | 56 ++++++++++++-----
 .../regionserver/ReplicationSyncUp.java         |  6 ++
 .../hadoop/hbase/MockRegionServerServices.java  |  5 ++
 .../client/TestAsyncNonMetaRegionLocator.java   |  2 +-
 ...syncNonMetaRegionLocatorConcurrenyLimit.java |  2 +-
 .../client/TestAsyncRegionLocatorTimeout.java   |  2 +-
 ...TestAsyncSingleRequestRpcRetryingCaller.java |  4 +-
 .../hbase/client/TestAsyncTableNoncedRetry.java |  2 +-
 .../hbase/master/MockNoopMasterServices.java    |  6 ++
 .../hadoop/hbase/master/MockRegionServer.java   |  5 ++
 .../hbase/master/TestActiveMasterManager.java   |  6 ++
 .../hbase/master/cleaner/TestHFileCleaner.java  |  6 ++
 .../master/cleaner/TestHFileLinkCleaner.java    |  6 ++
 .../hbase/master/cleaner/TestLogsCleaner.java   |  6 ++
 .../cleaner/TestReplicationHFileCleaner.java    |  6 ++
 .../regionserver/TestHeapMemoryManager.java     |  6 ++
 .../hbase/regionserver/TestSplitLogWorker.java  |  6 ++
 .../hbase/regionserver/TestWALLockup.java       |  6 ++
 .../TestReplicationTrackerZKImpl.java           |  6 ++
 .../TestReplicationSourceManager.java           |  6 ++
 .../security/token/TestTokenAuthentication.java |  6 ++
 .../apache/hadoop/hbase/util/MockServer.java    |  6 ++
 29 files changed, 302 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
new file mode 100644
index 0000000..c7dea25
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The asynchronous connection for internal usage.
+ */
+@InterfaceAudience.Private
+public interface AsyncClusterConnection extends AsyncConnection {
+
+  /**
+   * Get the nonce generator for this connection.
+   */
+  NonceGenerator getNonceGenerator();
+
+  /**
+   * Get the rpc client we used to communicate with other servers.
+   */
+  RpcClient getRpcClient();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 361d5b2..188e830 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -21,48 +21,48 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
 import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
 
-import org.apache.hadoop.hbase.AuthUtil;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-
 import java.io.IOException;
+import java.net.SocketAddress;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.CollectionUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
-import org.apache.hadoop.hbase.util.CollectionUtils;
-import org.apache.hadoop.hbase.util.Threads;
 
 /**
  * The implementation of AsyncConnection.
  */
 @InterfaceAudience.Private
-class AsyncConnectionImpl implements AsyncConnection {
+class AsyncConnectionImpl implements AsyncClusterConnection {
 
   private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
 
@@ -105,7 +105,7 @@ class AsyncConnectionImpl implements AsyncConnection {
   private ChoreService authService;
 
   public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
-      User user) {
+      SocketAddress localAddress, User user) {
     this.conf = conf;
     this.user = user;
     if (user.isLoginFromKeytab()) {
@@ -113,7 +113,7 @@ class AsyncConnectionImpl implements AsyncConnection {
     }
     this.connConf = new AsyncConnectionConfiguration(conf);
     this.registry = registry;
-    this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
+    this.rpcClient = RpcClientFactory.createClient(conf, clusterId, localAddress, null);
     this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
     this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
     this.rpcTimeout =
@@ -157,11 +157,16 @@ class AsyncConnectionImpl implements AsyncConnection {
   }
 
   // ditto
-  @VisibleForTesting
+  @Override
   public NonceGenerator getNonceGenerator() {
     return nonceGenerator;
   }
 
+  @Override
+  public RpcClient getRpcClient() {
+    return rpcClient;
+  }
+
   private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
     return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
new file mode 100644
index 0000000..68c0630
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.SocketAddress;
+import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+
+/**
+ * The factory for creating {@link AsyncClusterConnection}.
+ */
+@InterfaceAudience.Private
+public final class ClusterConnectionFactory {
+
+  private ClusterConnectionFactory() {
+  }
+
+  /**
+   * Create a new {@link AsyncClusterConnection} instance.
+   * <p/>
+   * Unlike what we have done in {@link ConnectionFactory}, here we just return an
+   * {@link AsyncClusterConnection} instead of a {@link java.util.concurrent.CompletableFuture},
+   * which means this method could block on fetching the cluster id. This is just used to simplify
+   * the implementation, as when starting new region servers, we do not need to be event-driven. Can
+   * change later if we want a {@link java.util.concurrent.CompletableFuture} here.
+   */
+  public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
+      SocketAddress localAddress, User user) throws IOException {
+    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
+    String clusterId;
+    try {
+      clusterId = registry.getClusterId().get();
+    } catch (InterruptedException e) {
+      throw (IOException) new InterruptedIOException().initCause(e);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      Throwables.propagateIfPossible(cause, IOException.class);
+      throw new IOException(cause);
+    }
+    return new AsyncConnectionImpl(conf, registry, clusterId, localAddress, user);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
index e24af74..2ba732a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
@@ -295,9 +295,8 @@ public class ConnectionFactory {
         AsyncConnectionImpl.class, AsyncConnection.class);
       try {
         future.complete(
-          user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>)() ->
-            ReflectionUtils.newInstance(clazz, conf, registry, clusterId, user))
-        );
+          user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
+            .newInstance(clazz, conf, registry, clusterId, null, user)));
       } catch (Exception e) {
         future.completeExceptionally(e);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
index a136846..268249d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
@@ -83,15 +83,19 @@ public class ReflectionUtils {
 
       boolean match = true;
       for (int i = 0; i < ctorParamTypes.length && match; ++i) {
-        Class<?> paramType = paramTypes[i].getClass();
-        match = (!ctorParamTypes[i].isPrimitive()) ? ctorParamTypes[i].isAssignableFrom(paramType) :
-                  ((int.class.equals(ctorParamTypes[i]) && Integer.class.equals(paramType)) ||
-                   (long.class.equals(ctorParamTypes[i]) && Long.class.equals(paramType)) ||
-                   (double.class.equals(ctorParamTypes[i]) && Double.class.equals(paramType)) ||
-                   (char.class.equals(ctorParamTypes[i]) && Character.class.equals(paramType)) ||
-                   (short.class.equals(ctorParamTypes[i]) && Short.class.equals(paramType)) ||
-                   (boolean.class.equals(ctorParamTypes[i]) && Boolean.class.equals(paramType)) ||
-                   (byte.class.equals(ctorParamTypes[i]) && Byte.class.equals(paramType)));
+        if (paramTypes[i] == null) {
+          match = !ctorParamTypes[i].isPrimitive();
+        } else {
+          Class<?> paramType = paramTypes[i].getClass();
+          match = (!ctorParamTypes[i].isPrimitive()) ? ctorParamTypes[i].isAssignableFrom(paramType)
+            : ((int.class.equals(ctorParamTypes[i]) && Integer.class.equals(paramType)) ||
+              (long.class.equals(ctorParamTypes[i]) && Long.class.equals(paramType)) ||
+              (double.class.equals(ctorParamTypes[i]) && Double.class.equals(paramType)) ||
+              (char.class.equals(ctorParamTypes[i]) && Character.class.equals(paramType)) ||
+              (short.class.equals(ctorParamTypes[i]) && Short.class.equals(paramType)) ||
+              (boolean.class.equals(ctorParamTypes[i]) && Boolean.class.equals(paramType)) ||
+              (byte.class.equals(ctorParamTypes[i]) && Byte.class.equals(paramType)));
+        }
       }
 
       if (match) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java
index fb898ea..c33d5af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase;
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -61,6 +63,24 @@ public interface Server extends Abortable, Stoppable {
   ClusterConnection getClusterConnection();
 
   /**
+   * Returns a reference to the servers' async connection.
+   * <p/>
+   * Important note: this method returns a reference to Connection which is managed by Server
+   * itself, so callers must NOT attempt to close connection obtained.
+   */
+  default AsyncConnection getAsyncConnection() {
+    return getAsyncClusterConnection();
+  }
+
+  /**
+   * Returns a reference to the servers' async cluster connection.
+   * <p/>
+   * Important note: this method returns a reference to Connection which is managed by Server
+   * itself, so callers must NOT attempt to close connection obtained.
+   */
+  AsyncClusterConnection getAsyncClusterConnection();
+
+  /**
    * @return The unique server name for this server.
    */
   ServerName getServerName();

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 0bcef59..52005d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -3008,6 +3008,9 @@ public class HMaster extends HRegionServer implements MasterServices {
     if (this.clusterConnection != null) {
       this.clusterConnection.close();
     }
+    if (this.asyncClusterConnection != null) {
+      this.asyncClusterConnection.close();
+    }
   }
 
   public void stopMaster() throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 13f277b..375b3f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -77,7 +77,9 @@ import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.YouAreDeadException;
 import org.apache.hadoop.hbase.ZNodeClearer;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -105,7 +107,6 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
 import org.apache.hadoop.hbase.ipc.RpcClient;
-import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@@ -263,6 +264,11 @@ public class HRegionServer extends HasThread implements
   protected ClusterConnection clusterConnection;
 
   /**
+   * The asynchronous cluster connection to be shared by services.
+   */
+  protected AsyncClusterConnection asyncClusterConnection;
+
+  /**
    * Go here to get table descriptors.
    */
   protected TableDescriptors tableDescriptors;
@@ -776,11 +782,7 @@ public class HRegionServer extends HasThread implements
     return true;
   }
 
-  /**
-   * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to the
-   * local server; i.e. a short-circuit Connection. Safe to use going to local or remote server.
-   */
-  private ClusterConnection createClusterConnection() throws IOException {
+  private Configuration unsetClientZookeeperQuorum() {
     Configuration conf = this.conf;
     if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
       // Use server ZK cluster for server-issued connections, so we clone
@@ -788,11 +790,20 @@ public class HRegionServer extends HasThread implements
       conf = new Configuration(this.conf);
       conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
     }
+    return conf;
+  }
+
+  /**
+   * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to the
+   * local server; i.e. a short-circuit Connection. Safe to use going to local or remote server.
+   */
+  private ClusterConnection createClusterConnection() throws IOException {
     // Create a cluster connection that when appropriate, can short-circuit and go directly to the
     // local server if the request is to the local server bypassing RPC. Can be used for both local
     // and remote invocations.
-    ClusterConnection conn = ConnectionUtils.createShortCircuitConnection(conf, null,
-      userProvider.getCurrent(), serverName, rpcServices, rpcServices);
+    ClusterConnection conn =
+      ConnectionUtils.createShortCircuitConnection(unsetClientZookeeperQuorum(), null,
+        userProvider.getCurrent(), serverName, rpcServices, rpcServices);
     // This is used to initialize the batch thread pool inside the connection implementation.
     // When deploy a fresh cluster, we may first use the cluster connection in InitMetaProcedure,
     // which will be executed inside the PEWorker, and then the batch thread pool will inherit the
@@ -826,9 +837,12 @@ public class HRegionServer extends HasThread implements
   /**
    * Setup our cluster connection if not already initialized.
    */
-  protected synchronized void setupClusterConnection() throws IOException {
+  protected final synchronized void setupClusterConnection() throws IOException {
     if (clusterConnection == null) {
       clusterConnection = createClusterConnection();
+      asyncClusterConnection =
+        ClusterConnectionFactory.createAsyncClusterConnection(unsetClientZookeeperQuorum(),
+          new InetSocketAddress(this.rpcServices.isa.getAddress(), 0), userProvider.getCurrent());
     }
   }
 
@@ -842,8 +856,7 @@ public class HRegionServer extends HasThread implements
       initializeZooKeeper();
       setupClusterConnection();
       // Setup RPC client for master communication
-      this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
-          this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
+      this.rpcClient = asyncClusterConnection.getRpcClient();
     } catch (Throwable t) {
       // Call stop if error or process will stick around for ever since server
       // puts up non-daemon threads.
@@ -1107,7 +1120,15 @@ public class HRegionServer extends HasThread implements
         LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e);
       }
     }
-
+    if (this.asyncClusterConnection != null) {
+      try {
+        this.asyncClusterConnection.close();
+      } catch (IOException e) {
+        // Although the {@link Closeable} interface throws an {@link
+        // IOException}, in reality, the implementation would never do that.
+        LOG.warn("Attempt to close server's AsyncClusterConnection failed.", e);
+      }
+    }
     // Closing the compactSplit thread before closing meta regions
     if (!this.killed && containsMetaTableRegions()) {
       if (!abortRequested || this.fsOk) {
@@ -3737,9 +3758,9 @@ public class HRegionServer extends HasThread implements
   }
 
   @Override
-  public EntityLock regionLock(List<RegionInfo> regionInfos, String description,
-      Abortable abort) throws IOException {
-    return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator())
+  public EntityLock regionLock(List<RegionInfo> regionInfos, String description, Abortable abort)
+      throws IOException {
+    return new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator())
       .regionLock(regionInfos, description, abort);
   }
 
@@ -3843,4 +3864,9 @@ public class HRegionServer extends HasThread implements
       System.exit(1);
     }
   }
+
+  @Override
+  public AsyncClusterConnection getAsyncClusterConnection() {
+    return asyncClusterConnection;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index c7bccb3..7d1245c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -180,5 +181,10 @@ public class ReplicationSyncUp extends Configured implements Tool {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 0e4f241..5205960 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -368,4 +369,8 @@ public class MockRegionServerServices implements RegionServerServices {
   public Optional<MobFileCache> getMobFileCache() {
     return Optional.empty();
   }
+
+  public AsyncClusterConnection getAsyncClusterConnection() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index eeaf99f..550a6f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -81,7 +81,7 @@ public class TestAsyncNonMetaRegionLocator {
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-      registry.getClusterId().get(), User.getCurrent());
+      registry.getClusterId().get(), null, User.getCurrent());
     LOCATOR = new AsyncNonMetaRegionLocator(CONN);
     SPLIT_KEYS = new byte[8][];
     for (int i = 111; i < 999; i += 111) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
index 8cdb4a9..7e06218 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -125,7 +125,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-      registry.getClusterId().get(), User.getCurrent());
+      registry.getClusterId().get(), null, User.getCurrent());
     LOCATOR = new AsyncNonMetaRegionLocator(CONN);
     SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
       .toArray(byte[][]::new);

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
index 758aa30..0e28f96 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
@@ -96,7 +96,7 @@ public class TestAsyncRegionLocatorTimeout {
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-        registry.getClusterId().get(), User.getCurrent());
+      registry.getClusterId().get(), null, User.getCurrent());
     LOCATOR = CONN.getLocator();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index 7d8956b..29dcd56 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -73,7 +73,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-      registry.getClusterId().get(), User.getCurrent());
+      registry.getClusterId().get(), null, User.getCurrent());
   }
 
   @AfterClass
@@ -164,7 +164,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
         }
       };
     try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(),
-      CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
+      CONN.registry, CONN.registry.getClusterId().get(), null, User.getCurrent()) {
 
       @Override
       AsyncRegionLocator getLocator() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index 3008561..e1e55f5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -85,7 +85,7 @@ public class TestAsyncTableNoncedRetry {
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-        registry.getClusterId().get(), User.getCurrent()) {
+      registry.getClusterId().get(), null, User.getCurrent()) {
 
       @Override
       public NonceGenerator getNonceGenerator() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 9c55f57..3ebad66 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Connection;
@@ -473,4 +474,9 @@ public class MockNoopMasterServices implements MasterServices {
   public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() {
     return null;
   }
+
+  @Override
+  public AsyncClusterConnection getAsyncClusterConnection() {
+    return null;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index a930d7f..73d53c7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -721,4 +722,8 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
   public Optional<MobFileCache> getMobFileCache() {
     return Optional.empty();
   }
+
+  public AsyncClusterConnection getAsyncClusterConnection() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
index 2300f54..77667a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -349,5 +350,10 @@ public class TestActiveMasterManager {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
index 5c8db3e..c5fad32 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -279,6 +280,11 @@ public class TestHFileCleaner {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
index 119194b..fd11ff8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.io.HFileLink;
@@ -213,5 +214,10 @@ public class TestHFileLinkCleaner {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 247ed01..3286032 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.master.HMaster;
@@ -409,6 +410,11 @@ public class TestLogsCleaner {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 
   static class FaultyZooKeeperWatcher extends ZKWatcher {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index d162bf3..9791643 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.master.HMaster;
@@ -303,6 +304,11 @@ public class TestReplicationHFileCleaner {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 
   static class FaultyZooKeeperWatcher extends ZKWatcher {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
index 8c9ce75..4a359e4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -862,6 +863,11 @@ public class TestHeapMemoryManager {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 
   static class CustomHeapMemoryTuner implements HeapMemoryTuner {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index cbf932c..5481ff8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
@@ -159,6 +160,11 @@ public class TestSplitLogWorker {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 
   private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index 0e20252..9e9d1d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Durability;
@@ -523,6 +524,11 @@ public class TestWALLockup {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 
   static class DummyWALActionsListener implements WALActionsListener {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index 863d558..62ab265 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -263,5 +264,10 @@ public class TestReplicationTrackerZKImpl {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 86bbb09..427f319 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
@@ -906,5 +907,10 @@ public abstract class TestReplicationSourceManager {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
index e4780f1..92c8e54 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -363,6 +364,11 @@ public class TestTokenAuthentication {
     public Connection createConnection(Configuration conf) throws IOException {
       return null;
     }
+
+    @Override
+    public AsyncClusterConnection getAsyncClusterConnection() {
+      return null;
+    }
   }
 
   @Parameters(name = "{index}: rpcServerImpl={0}")

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockServer.java
index c25db01..13212d2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockServer.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
@@ -143,4 +144,9 @@ public class MockServer implements Server {
   public Connection createConnection(Configuration conf) throws IOException {
     return null;
   }
+
+  @Override
+  public AsyncClusterConnection getAsyncClusterConnection() {
+    return null;
+  }
 }


[3/6] hbase git commit: HBASE-21526 Use AsyncClusterConnection in ServerManager for getRsAdmin

Posted by zh...@apache.org.
HBASE-21526 Use AsyncClusterConnection in ServerManager for getRsAdmin


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/eaad39b2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/eaad39b2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/eaad39b2

Branch: refs/heads/HBASE-21512
Commit: eaad39b297cc5e59e6f363c96e844255e113fc38
Parents: 7556dd5
Author: zhangduo <zh...@apache.org>
Authored: Thu Dec 6 21:25:34 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Jan 5 14:24:35 2019 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncClusterConnection.java    |   6 +
 .../hbase/client/AsyncConnectionImpl.java       |   5 +
 .../hbase/client/AsyncRegionServerAdmin.java    | 210 +++++++++++++++++++
 .../apache/hadoop/hbase/util/FutureUtils.java   |   2 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |  15 +-
 .../hadoop/hbase/master/ServerManager.java      |  67 ------
 .../master/procedure/RSProcedureDispatcher.java |  44 ++--
 7 files changed, 263 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/eaad39b2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index c7dea25..1327fd7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -27,6 +28,11 @@ import org.apache.yetus.audience.InterfaceAudience;
 public interface AsyncClusterConnection extends AsyncConnection {
 
   /**
+   * Get the admin service for the given region server.
+   */
+  AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);
+
+  /**
    * Get the nonce generator for this connection.
    */
   NonceGenerator getNonceGenerator();

http://git-wip-us.apache.org/repos/asf/hbase/blob/eaad39b2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 188e830..4e7f421 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -330,4 +330,9 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
     return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
       RETRY_TIMER);
   }
+
+  @Override
+  public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
+    return new AsyncRegionServerAdmin(serverName, this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/eaad39b2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
new file mode 100644
index 0000000..9accd89
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
+
+/**
+ * A simple wrapper of the {@link AdminService} for a region server, which returns a
+ * {@link CompletableFuture}. This is easier to use, as if you use the raw protobuf interface, you
+ * need to get the result from the {@link RpcCallback}, and if there is an exception, you need to
+ * get it from the {@link RpcController} passed in.
+ * <p/>
+ * Notice that there is no retry, and this is intentional. We have different retry for different
+ * usage for now, if later we want to unify them, we can move the retry logic into this class.
+ */
+@InterfaceAudience.Private
+public class AsyncRegionServerAdmin {
+
+  private final ServerName server;
+
+  private final AsyncConnectionImpl conn;
+
+  AsyncRegionServerAdmin(ServerName server, AsyncConnectionImpl conn) {
+    this.server = server;
+    this.conn = conn;
+  }
+
+  @FunctionalInterface
+  private interface RpcCall<RESP> {
+    void call(AdminService.Interface stub, HBaseRpcController controller, RpcCallback<RESP> done);
+  }
+
+  private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
+    CompletableFuture<RESP> future = new CompletableFuture<>();
+    HBaseRpcController controller = conn.rpcControllerFactory.newController();
+    try {
+      rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback<RESP>() {
+
+        @Override
+        public void run(RESP resp) {
+          if (controller.failed()) {
+            future.completeExceptionally(controller.getFailed());
+          } else {
+            future.complete(resp);
+          }
+        }
+      });
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  public CompletableFuture<GetRegionInfoResponse> getRegionInfo(GetRegionInfoRequest request) {
+    return call((stub, controller, done) -> stub.getRegionInfo(controller, request, done));
+  }
+
+  public CompletableFuture<GetStoreFileResponse> getStoreFile(GetStoreFileRequest request) {
+    return call((stub, controller, done) -> stub.getStoreFile(controller, request, done));
+  }
+
+  public CompletableFuture<GetOnlineRegionResponse> getOnlineRegion(
+      GetOnlineRegionRequest request) {
+    return call((stub, controller, done) -> stub.getOnlineRegion(controller, request, done));
+  }
+
+  public CompletableFuture<OpenRegionResponse> openRegion(OpenRegionRequest request) {
+    return call((stub, controller, done) -> stub.openRegion(controller, request, done));
+  }
+
+  public CompletableFuture<WarmupRegionResponse> warmupRegion(WarmupRegionRequest request) {
+    return call((stub, controller, done) -> stub.warmupRegion(controller, request, done));
+  }
+
+  public CompletableFuture<CloseRegionResponse> closeRegion(CloseRegionRequest request) {
+    return call((stub, controller, done) -> stub.closeRegion(controller, request, done));
+  }
+
+  public CompletableFuture<FlushRegionResponse> flushRegion(FlushRegionRequest request) {
+    return call((stub, controller, done) -> stub.flushRegion(controller, request, done));
+  }
+
+  public CompletableFuture<CompactionSwitchResponse> compactionSwitch(
+      CompactionSwitchRequest request) {
+    return call((stub, controller, done) -> stub.compactionSwitch(controller, request, done));
+  }
+
+  public CompletableFuture<CompactRegionResponse> compactRegion(CompactRegionRequest request) {
+    return call((stub, controller, done) -> stub.compactRegion(controller, request, done));
+  }
+
+  public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
+      ReplicateWALEntryRequest request) {
+    return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done));
+  }
+
+  public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) {
+    return call((stub, controller, done) -> stub.replay(controller, request, done));
+  }
+
+  public CompletableFuture<RollWALWriterResponse> rollWALWriter(RollWALWriterRequest request) {
+    return call((stub, controller, done) -> stub.rollWALWriter(controller, request, done));
+  }
+
+  public CompletableFuture<GetServerInfoResponse> getServerInfo(GetServerInfoRequest request) {
+    return call((stub, controller, done) -> stub.getServerInfo(controller, request, done));
+  }
+
+  public CompletableFuture<StopServerResponse> stopServer(StopServerRequest request) {
+    return call((stub, controller, done) -> stub.stopServer(controller, request, done));
+  }
+
+  public CompletableFuture<UpdateFavoredNodesResponse> updateFavoredNodes(
+      UpdateFavoredNodesRequest request) {
+    return call((stub, controller, done) -> stub.updateFavoredNodes(controller, request, done));
+  }
+
+  public CompletableFuture<UpdateConfigurationResponse> updateConfiguration(
+      UpdateConfigurationRequest request) {
+    return call((stub, controller, done) -> stub.updateConfiguration(controller, request, done));
+  }
+
+  public CompletableFuture<GetRegionLoadResponse> getRegionLoad(GetRegionLoadRequest request) {
+    return call((stub, controller, done) -> stub.getRegionLoad(controller, request, done));
+  }
+
+  public CompletableFuture<ClearCompactionQueuesResponse> clearCompactionQueues(
+      ClearCompactionQueuesRequest request) {
+    return call((stub, controller, done) -> stub.clearCompactionQueues(controller, request, done));
+  }
+
+  public CompletableFuture<ClearRegionBlockCacheResponse> clearRegionBlockCache(
+      ClearRegionBlockCacheRequest request) {
+    return call((stub, controller, done) -> stub.clearRegionBlockCache(controller, request, done));
+  }
+
+  public CompletableFuture<GetSpaceQuotaSnapshotsResponse> getSpaceQuotaSnapshots(
+      GetSpaceQuotaSnapshotsRequest request) {
+    return call((stub, controller, done) -> stub.getSpaceQuotaSnapshots(controller, request, done));
+  }
+
+  public CompletableFuture<ExecuteProceduresResponse> executeProcedures(
+      ExecuteProceduresRequest request) {
+    return call((stub, controller, done) -> stub.executeProcedures(controller, request, done));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eaad39b2/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
index 067e66b..f4a7332 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -57,4 +57,4 @@ public final class FutureUtils {
       }
     });
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eaad39b2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 52005d6..4fd3a37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -193,6 +193,7 @@ import org.apache.hadoop.hbase.util.BloomFilterUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CompressionTest;
 import org.apache.hadoop.hbase.util.EncryptionTest;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.HBaseFsck;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.hadoop.hbase.util.HasThread;
@@ -225,6 +226,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
@@ -1937,6 +1939,15 @@ public class HMaster extends HRegionServer implements MasterServices {
     });
   }
 
+  private void warmUpRegion(ServerName server, RegionInfo region) {
+    FutureUtils.addListener(asyncClusterConnection.getRegionServerAdmin(server)
+      .warmupRegion(RequestConverter.buildWarmupRegionRequest(region)), (r, e) -> {
+        if (e != null) {
+          LOG.warn("Failed to warm up region {} on server {}", region, server, e);
+        }
+      });
+  }
+
   // Public so can be accessed by tests. Blocks until move is done.
   // Replace with an async implementation from which you can get
   // a success/failure result.
@@ -2008,7 +2019,9 @@ public class HMaster extends HRegionServer implements MasterServices {
       // Warmup the region on the destination before initiating the move. this call
       // is synchronous and takes some time. doing it before the source region gets
       // closed
-      serverManager.sendRegionWarmup(rp.getDestination(), hri);
+      // A region server could reject the close request because it either does not
+      // have the specified region or the region is being split.
+      warmUpRegion(rp.getDestination(), hri);
 
       LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
       Future<byte []> future = this.assignmentManager.moveAsync(rp);

http://git-wip-us.apache.org/repos/asf/hbase/blob/eaad39b2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 86d72d1..c26ef6c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -51,12 +50,9 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.YouAreDeadException;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -159,25 +155,16 @@ public class ServerManager {
   private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
     new ConcurrentSkipListMap<>();
 
-  /**
-   * Map of admin interfaces per registered regionserver; these interfaces we use to control
-   * regionservers out on the cluster
-   */
-  private final Map<ServerName, AdminService.BlockingInterface> rsAdmins = new HashMap<>();
-
   /** List of region servers that should not get any more new regions. */
   private final ArrayList<ServerName> drainingServers = new ArrayList<>();
 
   private final MasterServices master;
-  private final ClusterConnection connection;
 
   private final DeadServer deadservers = new DeadServer();
 
   private final long maxSkew;
   private final long warningSkew;
 
-  private final RpcControllerFactory rpcControllerFactory;
-
   /** Listeners that are called on server events. */
   private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
 
@@ -189,8 +176,6 @@ public class ServerManager {
     Configuration c = master.getConfiguration();
     maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
     warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
-    this.connection = master.getClusterConnection();
-    this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory();
     persistFlushedSequenceId = c.getBoolean(PERSIST_FLUSHEDSEQUENCEID,
         PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
   }
@@ -438,7 +423,6 @@ public class ServerManager {
   void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
     LOG.info("Registering regionserver=" + serverName);
     this.onlineServers.put(serverName, sl);
-    this.rsAdmins.remove(serverName);
   }
 
   @VisibleForTesting
@@ -633,7 +617,6 @@ public class ServerManager {
       this.onlineServers.remove(sn);
       onlineServers.notifyAll();
     }
-    this.rsAdmins.remove(sn);
   }
 
   /*
@@ -676,34 +659,6 @@ public class ServerManager {
     return this.drainingServers.add(sn);
   }
 
-  // RPC methods to region servers
-
-  private HBaseRpcController newRpcController() {
-    return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
-  }
-
-  /**
-   * Sends a WARMUP RPC to the specified server to warmup the specified region.
-   * <p>
-   * A region server could reject the close request because it either does not
-   * have the specified region or the region is being split.
-   * @param server server to warmup a region
-   * @param region region to  warmup
-   */
-  public void sendRegionWarmup(ServerName server,
-      RegionInfo region) {
-    if (server == null) return;
-    try {
-      AdminService.BlockingInterface admin = getRsAdmin(server);
-      HBaseRpcController controller = newRpcController();
-      ProtobufUtil.warmupRegion(controller, admin, region);
-    } catch (IOException e) {
-      LOG.error("Received exception in RPC for warmup server:" +
-        server + "region: " + region +
-        "exception: " + e);
-    }
-  }
-
   /**
    * Contacts a region server and waits up to timeout ms
    * to close the region.  This bypasses the active hmaster.
@@ -737,28 +692,6 @@ public class ServerManager {
   }
 
   /**
-   * @param sn
-   * @return Admin interface for the remote regionserver named <code>sn</code>
-   * @throws IOException
-   * @throws RetriesExhaustedException wrapping a ConnectException if failed
-   */
-  public AdminService.BlockingInterface getRsAdmin(final ServerName sn)
-  throws IOException {
-    AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
-    if (admin == null) {
-      LOG.debug("New admin connection to " + sn.toString());
-      if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
-        // A master is also a region server now, see HBASE-10569 for details
-        admin = ((HRegionServer)master).getRSRpcServices();
-      } else {
-        admin = this.connection.getAdmin(sn);
-      }
-      this.rsAdmins.put(sn, admin);
-    }
-    return admin;
-  }
-
-  /**
    * Calculate min necessary to start. This is not an absolute. It is just
    * a friction that will cause us hang around a bit longer waiting on
    * RegionServers to check-in.

http://git-wip-us.apache.org/repos/asf/hbase/blob/eaad39b2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
index 638f9d3..f3ab4b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -18,12 +18,15 @@
 package org.apache.hadoop.hbase.master.procedure;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@@ -37,11 +40,11 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@@ -159,13 +162,8 @@ public class RSProcedureDispatcher
       this.serverName = serverName;
     }
 
-    protected AdminService.BlockingInterface getRsAdmin() throws IOException {
-      final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
-      if (admin == null) {
-        throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
-          " failed because no RPC connection found to this server");
-      }
-      return admin;
+    protected AsyncRegionServerAdmin getRsAdmin() throws IOException {
+      return master.getAsyncClusterConnection().getRegionServerAdmin(serverName);
     }
 
     protected ServerName getServerName() {
@@ -344,9 +342,13 @@ public class RSProcedureDispatcher
     protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
         final ExecuteProceduresRequest request) throws IOException {
       try {
-        return getRsAdmin().executeProcedures(null, request);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+        return getRsAdmin().executeProcedures(request).get();
+      } catch (InterruptedException e) {
+        throw (IOException) new InterruptedIOException().initCause(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        Throwables.propagateIfPossible(cause, IOException.class);
+        throw new IOException(cause);
       }
     }
 
@@ -407,9 +409,13 @@ public class RSProcedureDispatcher
     private OpenRegionResponse sendRequest(final ServerName serverName,
         final OpenRegionRequest request) throws IOException {
       try {
-        return getRsAdmin().openRegion(null, request);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+        return getRsAdmin().openRegion(request).get();
+      } catch (InterruptedException e) {
+        throw (IOException) new InterruptedIOException().initCause(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        Throwables.propagateIfPossible(cause, IOException.class);
+        throw new IOException(cause);
       }
     }
 
@@ -453,9 +459,13 @@ public class RSProcedureDispatcher
     private CloseRegionResponse sendRequest(final ServerName serverName,
         final CloseRegionRequest request) throws IOException {
       try {
-        return getRsAdmin().closeRegion(null, request);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+        return getRsAdmin().closeRegion(request).get();
+      } catch (InterruptedException e) {
+        throw (IOException) new InterruptedIOException().initCause(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        Throwables.propagateIfPossible(cause, IOException.class);
+        throw new IOException(cause);
       }
     }
 


[2/6] hbase git commit: HBASE-21579 Use AsyncClusterConnection for HBaseInterClusterReplicationEndpoint

Posted by zh...@apache.org.
HBASE-21579 Use AsyncClusterConnection for HBaseInterClusterReplicationEndpoint


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8cd0e994
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8cd0e994
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8cd0e994

Branch: refs/heads/HBASE-21512
Commit: 8cd0e994e44d921873d9ac16934000c09feca35f
Parents: eaad39b
Author: zhangduo <zh...@apache.org>
Authored: Tue Jan 1 21:27:14 2019 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Jan 5 14:24:35 2019 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncRegionServerAdmin.java    | 14 +++++--
 .../hbase/protobuf/ReplicationProtbufUtil.java  | 35 +++++++++--------
 .../HBaseInterClusterReplicationEndpoint.java   | 31 +++++++--------
 .../regionserver/ReplicationSinkManager.java    | 40 +++++++-------------
 .../replication/SyncReplicationTestBase.java    | 12 +++---
 .../TestReplicationSinkManager.java             | 21 +++++-----
 6 files changed, 74 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd0e994/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
index 9accd89..b9141a9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -94,9 +95,9 @@ public class AsyncRegionServerAdmin {
     void call(AdminService.Interface stub, HBaseRpcController controller, RpcCallback<RESP> done);
   }
 
-  private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
+  private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall, CellScanner cellScanner) {
     CompletableFuture<RESP> future = new CompletableFuture<>();
-    HBaseRpcController controller = conn.rpcControllerFactory.newController();
+    HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner);
     try {
       rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback<RESP>() {
 
@@ -115,6 +116,10 @@ public class AsyncRegionServerAdmin {
     return future;
   }
 
+  private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
+    return call(rpcCall, null);
+  }
+
   public CompletableFuture<GetRegionInfoResponse> getRegionInfo(GetRegionInfoRequest request) {
     return call((stub, controller, done) -> stub.getRegionInfo(controller, request, done));
   }
@@ -154,8 +159,9 @@ public class AsyncRegionServerAdmin {
   }
 
   public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
-      ReplicateWALEntryRequest request) {
-    return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done));
+      ReplicateWALEntryRequest request, CellScanner cellScanner) {
+    return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done),
+      cellScanner);
   }
 
   public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd0e994/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index c1b3911..74fad26 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -20,51 +20,54 @@ package org.apache.hadoop.hbase.protobuf;
 
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
 @InterfaceAudience.Private
 public class ReplicationProtbufUtil {
+
   /**
-   * A helper to replicate a list of WAL entries using admin protocol.
-   * @param admin Admin service
+   * A helper to replicate a list of WAL entries using region server admin
+   * @param admin the region server admin
    * @param entries Array of WAL entries to be replicated
    * @param replicationClusterId Id which will uniquely identify source cluster FS client
    *          configurations in the replication configuration directory
    * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
    * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
-   * @throws java.io.IOException
    */
-  public static void replicateWALEntry(final AdminService.BlockingInterface admin,
-      final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
-      Path sourceHFileArchiveDir) throws IOException {
-    Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
-        buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
-          sourceHFileArchiveDir);
-    HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond());
+  public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
+      String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir)
+      throws IOException {
+    Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(
+      entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
     try {
-      admin.replicateWALEntry(controller, p.getFirst());
-    } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
-      throw ProtobufUtil.getServiceException(e);
+      admin.replicateWALEntry(p.getFirst(), p.getSecond()).get();
+    } catch (InterruptedException e) {
+      throw (IOException) new InterruptedIOException().initCause(e);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      Throwables.propagateIfPossible(cause, IOException.class);
+      throw new IOException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd0e994/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 7db53aa..0359096 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -39,7 +39,6 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -48,13 +47,16 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -65,8 +67,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
-
 /**
  * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
  * implementation for replicating to another HBase cluster.
@@ -85,8 +85,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
 
   private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
 
-  private ClusterConnection conn;
-  private Configuration localConf;
+  private AsyncClusterConnection conn;
   private Configuration conf;
   // How long should we sleep for each retry
   private long sleepForRetries;
@@ -117,7 +116,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   public void init(Context context) throws IOException {
     super.init(context);
     this.conf = HBaseConfiguration.create(ctx.getConfiguration());
-    this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
     decorateConf();
     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
     this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
@@ -132,12 +130,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     // TODO: This connection is replication specific or we should make it particular to
     // replication and make replication specific settings such as compression or codec to use
     // passing Cells.
-    this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
+    this.conn =
+      ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);
     this.metrics = context.getMetrics();
     // ReplicationQueueInfo parses the peerId out of the znode for us
-    this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
+    this.replicationSinkMgr = new ReplicationSinkManager(conn, this, this.conf);
     // per sink thread pool
     this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
       HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
@@ -284,9 +283,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   }
 
   private void reconnectToPeerCluster() {
-    ClusterConnection connection = null;
+    AsyncClusterConnection connection = null;
     try {
-      connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
+      connection =
+        ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
     } catch (IOException ioe) {
       LOG.warn("Failed to create connection for peer cluster", ioe);
     }
@@ -367,7 +367,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         }
         continue;
       }
-      if (this.conn == null || this.conn.isClosed()) {
+      if (this.conn == null) {
         reconnectToPeerCluster();
       }
       try {
@@ -480,10 +480,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
           entriesHashCode, entries.size(), size, replicationClusterId);
       }
       sinkPeer = replicationSinkMgr.getReplicationSink();
-      BlockingInterface rrs = sinkPeer.getRegionServer();
+      AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
       try {
-        ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
-          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+        ReplicationProtbufUtil.replicateWALEntry(rsAdmin,
+          entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
+          hfileArchiveDir);
         LOG.trace("Completed replicating batch {}", entriesHashCode);
       } catch (IOException e) {
         LOG.trace("Failed replicating batch {}", entriesHashCode, e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd0e994/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
index 3cd7884..21b07ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
@@ -21,11 +21,11 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -35,8 +35,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-
 /**
  * Maintains a collection of peers to replicate to, and randomly selects a
  * single peer to replicate to per set of data to replicate. Also handles
@@ -61,9 +59,7 @@ public class ReplicationSinkManager {
   static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
 
 
-  private final Connection conn;
-
-  private final String peerClusterId;
+  private final AsyncClusterConnection conn;
 
   private final HBaseReplicationEndpoint endpoint;
 
@@ -77,8 +73,6 @@ public class ReplicationSinkManager {
   // replication sinks is refreshed
   private final int badSinkThreshold;
 
-  private final Random random;
-
   // A timestamp of the last time the list of replication peers changed
   private long lastUpdateToPeers;
 
@@ -88,26 +82,22 @@ public class ReplicationSinkManager {
   /**
    * Instantiate for a single replication peer cluster.
    * @param conn connection to the peer cluster
-   * @param peerClusterId identifier of the peer cluster
    * @param endpoint replication endpoint for inter cluster replication
    * @param conf HBase configuration, used for determining replication source ratio and bad peer
    *          threshold
    */
-  public ReplicationSinkManager(ClusterConnection conn, String peerClusterId,
-      HBaseReplicationEndpoint endpoint, Configuration conf) {
+  public ReplicationSinkManager(AsyncClusterConnection conn, HBaseReplicationEndpoint endpoint,
+      Configuration conf) {
     this.conn = conn;
-    this.peerClusterId = peerClusterId;
     this.endpoint = endpoint;
     this.badReportCounts = Maps.newHashMap();
     this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
-    this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
-                                        DEFAULT_BAD_SINK_THRESHOLD);
-    this.random = new Random();
+    this.badSinkThreshold =
+      conf.getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
   }
 
   /**
    * Get a randomly-chosen replication sink to replicate to.
-   *
    * @return a replication sink to replicate to
    */
   public synchronized SinkPeer getReplicationSink() throws IOException {
@@ -119,8 +109,8 @@ public class ReplicationSinkManager {
     if (sinks.isEmpty()) {
       throw new IOException("No replication sinks are available");
     }
-    ServerName serverName = sinks.get(random.nextInt(sinks.size()));
-    return new SinkPeer(serverName, ((ClusterConnection) conn).getAdmin(serverName));
+    ServerName serverName = sinks.get(ThreadLocalRandom.current().nextInt(sinks.size()));
+    return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
   }
 
   /**
@@ -160,7 +150,7 @@ public class ReplicationSinkManager {
    */
   public synchronized void chooseSinks() {
     List<ServerName> slaveAddresses = endpoint.getRegionServers();
-    Collections.shuffle(slaveAddresses, random);
+    Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
     int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
     sinks = slaveAddresses.subList(0, numSinks);
     lastUpdateToPeers = System.currentTimeMillis();
@@ -182,9 +172,9 @@ public class ReplicationSinkManager {
    */
   public static class SinkPeer {
     private ServerName serverName;
-    private AdminService.BlockingInterface regionServer;
+    private AsyncRegionServerAdmin regionServer;
 
-    public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) {
+    public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
       this.serverName = serverName;
       this.regionServer = regionServer;
     }
@@ -193,10 +183,8 @@ public class ReplicationSinkManager {
       return serverName;
     }
 
-    public AdminService.BlockingInterface getRegionServer() {
+    public AsyncRegionServerAdmin getRegionServer() {
       return regionServer;
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd0e994/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index f373590..e0d112d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
@@ -250,19 +250,19 @@ public class SyncReplicationTestBase {
   protected final void verifyReplicationRequestRejection(HBaseTestingUtility utility,
       boolean expectedRejection) throws Exception {
     HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME);
-    ClusterConnection connection = regionServer.getClusterConnection();
+    AsyncClusterConnection connection = regionServer.getAsyncClusterConnection();
     Entry[] entries = new Entry[10];
     for (int i = 0; i < entries.length; i++) {
       entries[i] =
         new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
     }
     if (!expectedRejection) {
-      ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
-        entries, null, null, null);
+      ReplicationProtbufUtil.replicateWALEntry(
+        connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null);
     } else {
       try {
-        ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
-          entries, null, null, null);
+        ReplicationProtbufUtil.replicateWALEntry(
+          connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null);
         fail("Should throw IOException when sync-replication state is in A or DA");
       } catch (DoNotRetryIOException e) {
         assertTrue(e.getMessage().contains("Reject to apply to sink cluster"));

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd0e994/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
index 39dabb4..60afd40 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
@@ -25,7 +25,8 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -37,8 +38,6 @@ import org.junit.experimental.categories.Category;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-
 @Category({ReplicationTests.class, SmallTests.class})
 public class TestReplicationSinkManager {
 
@@ -46,16 +45,14 @@ public class TestReplicationSinkManager {
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestReplicationSinkManager.class);
 
-  private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
-
   private HBaseReplicationEndpoint replicationEndpoint;
   private ReplicationSinkManager sinkManager;
 
   @Before
   public void setUp() {
     replicationEndpoint = mock(HBaseReplicationEndpoint.class);
-    sinkManager = new ReplicationSinkManager(mock(ClusterConnection.class),
-                      PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
+    sinkManager = new ReplicationSinkManager(mock(AsyncClusterConnection.class),
+      replicationEndpoint, new Configuration());
   }
 
   @Test
@@ -100,7 +97,7 @@ public class TestReplicationSinkManager {
     // Sanity check
     assertEquals(1, sinkManager.getNumSinks());
 
-    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
+    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
 
     sinkManager.reportBadSink(sinkPeer);
 
@@ -131,7 +128,7 @@ public class TestReplicationSinkManager {
 
     ServerName serverName = sinkManager.getSinksForTesting().get(0);
 
-    SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
+    SinkPeer sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
 
     sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative
     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
@@ -147,7 +144,7 @@ public class TestReplicationSinkManager {
     //
     serverName = sinkManager.getSinksForTesting().get(0);
 
-    sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
+    sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
       sinkManager.reportBadSink(sinkPeer);
     }
@@ -188,8 +185,8 @@ public class TestReplicationSinkManager {
     ServerName serverNameA = sinkList.get(0);
     ServerName serverNameB = sinkList.get(1);
 
-    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
-    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class));
+    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
+    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
 
     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
       sinkManager.reportBadSink(sinkPeerA);