You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/05 06:27:23 UTC
[1/6] hbase git commit: Add 1.3.3 to download page [Forced Update!]
Repository: hbase
Updated Branches:
refs/heads/HBASE-21512 04e6909ad -> df35a12ce (forced update)
Add 1.3.3 to download page
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/94093e86
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/94093e86
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/94093e86
Branch: refs/heads/HBASE-21512
Commit: 94093e869aa006e4443dcf6ff923843c9c61f192
Parents: 3fbdd5b
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Jan 4 12:32:00 2019 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jan 4 12:32:00 2019 -0800
----------------------------------------------------------------------
src/site/xdoc/downloads.xml | 21 +++++++++++++++++++++
1 file changed, 21 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/94093e86/src/site/xdoc/downloads.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/downloads.xml b/src/site/xdoc/downloads.xml
index 2859779..fb827aa 100644
--- a/src/site/xdoc/downloads.xml
+++ b/src/site/xdoc/downloads.xml
@@ -108,6 +108,27 @@ under the License.
</tr>
<tr>
<td style="test-align: left">
+ 1.3.3
+ </td>
+ <td style="test-align: left">
+ 2018/12/21
+ </td>
+ <td style="test-align: left">
+ <a href="https://apache.org/dist/hbase/1.3.3/compat-check-report.html">1.3.2 vs 1.3.3</a>
+ </td>
+ <td style="test-align: left">
+ <a href="https://github.com/apache/hbase/blob/rel/1.3.3/CHANGES.txt">Changes</a>
+ </td>
+ <td style="test-align: left">
+ <a href="https://s.apache.org/hbase-1.3.3-jira-release-notes">Release Notes</a>
+ </td>
+ <td style="test-align: left">
+ <a href="https://www.apache.org/dyn/closer.lua/hbase/1.3.3/hbase-1.3.3-src.tar.gz">src</a> (<a href="https://apache.org/dist/hbase/1.3.3/hbase-1.3.3-src.tar.gz.sha512">sha512</a> <a href="https://apache.org/dist/hbase/1.3.3/hbase-1.3.3-src.tar.gz.asc">asc</a>) <br />
+ <a href="https://www.apache.org/dyn/closer.lua/hbase/1.3.3/hbase-1.3.3-bin.tar.gz">bin</a> (<a href="https://apache.org/dist/hbase/1.3.3/hbase-1.3.3-bin.tar.gz.sha512">sha512</a> <a href="https://apache.org/dist/hbase/1.3.3/hbase-1.3.3-bin.tar.gz.asc">asc</a>)
+ </td>
+ </tr>
+ <tr>
+ <td style="test-align: left">
1.2.9
</td>
<td style="test-align: left">
[5/6] hbase git commit: HBASE-21516 Use AsyncConnection instead of
Connection in SecureBulkLoadManager
Posted by zh...@apache.org.
HBASE-21516 Use AsyncConnection instead of Connection in SecureBulkLoadManager
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7556dd5b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7556dd5b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7556dd5b
Branch: refs/heads/HBASE-21512
Commit: 7556dd5b45051265aa4ae570566f3e1f1a94c7e3
Parents: 3c7636d
Author: zhangduo <zh...@apache.org>
Authored: Sat Dec 1 21:15:48 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Jan 5 14:24:35 2019 +0800
----------------------------------------------------------------------
.../hadoop/hbase/protobuf/ProtobufUtil.java | 5 +-
.../hbase/shaded/protobuf/ProtobufUtil.java | 7 ++-
.../hbase/regionserver/HRegionServer.java | 2 +-
.../regionserver/SecureBulkLoadManager.java | 24 +++++----
.../hadoop/hbase/security/token/TokenUtil.java | 57 +++++++++++++++-----
.../hbase/security/token/TestTokenUtil.java | 42 +++++++++++----
6 files changed, 96 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/7556dd5b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index a3d49b5..d9e620b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -261,13 +261,12 @@ public final class ProtobufUtil {
* just {@link ServiceException}. Prefer this method to
* {@link #getRemoteException(ServiceException)} because trying to
* contain direct protobuf references.
- * @param e
*/
- public static IOException handleRemoteException(Exception e) {
+ public static IOException handleRemoteException(Throwable e) {
return makeIOExceptionOfException(e);
}
- private static IOException makeIOExceptionOfException(Exception e) {
+ private static IOException makeIOExceptionOfException(Throwable e) {
Throwable t = e;
if (e instanceof ServiceException ||
e instanceof org.apache.hbase.thirdparty.com.google.protobuf.ServiceException) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/7556dd5b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index fea81f1..de2fb7d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
@@ -123,6 +122,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@@ -343,13 +343,12 @@ public final class ProtobufUtil {
* just {@link ServiceException}. Prefer this method to
* {@link #getRemoteException(ServiceException)} because trying to
* contain direct protobuf references.
- * @param e
*/
- public static IOException handleRemoteException(Exception e) {
+ public static IOException handleRemoteException(Throwable e) {
return makeIOExceptionOfException(e);
}
- private static IOException makeIOExceptionOfException(Exception e) {
+ private static IOException makeIOExceptionOfException(Throwable e) {
Throwable t = e;
if (e instanceof ServiceException) {
t = e.getCause();
http://git-wip-us.apache.org/repos/asf/hbase/blob/7556dd5b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 375b3f8..6e5fc9f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1930,7 +1930,7 @@ public class HRegionServer extends HasThread implements
if (!isStopped() && !isAborted()) {
initializeThreads();
}
- this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection);
+ this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
this.secureBulkLoadManager.start();
// Health checker thread.
http://git-wip-us.apache.org/repos/asf/hbase/blob/7556dd5b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index 566a6b6..add6519 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Consumer;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -38,11 +37,12 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.HRegion.BulkLoadListener;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.security.token.TokenUtil;
import org.apache.hadoop.hbase.util.Bytes;
@@ -56,7 +56,9 @@ import org.apache.hadoop.security.token.Token;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
@@ -111,9 +113,9 @@ public class SecureBulkLoadManager {
private UserProvider userProvider;
private ConcurrentHashMap<UserGroupInformation, Integer> ugiReferenceCounter;
- private Connection conn;
+ private AsyncConnection conn;
- SecureBulkLoadManager(Configuration conf, Connection conn) {
+ SecureBulkLoadManager(Configuration conf, AsyncConnection conn) {
this.conf = conf;
this.conn = conn;
}
@@ -212,23 +214,23 @@ public class SecureBulkLoadManager {
familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath()));
}
- Token userToken = null;
+ Token<AuthenticationTokenIdentifier> userToken = null;
if (userProvider.isHadoopSecurityEnabled()) {
- userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
- .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text(
- request.getFsToken().getService()));
+ userToken = new Token<>(request.getFsToken().getIdentifier().toByteArray(),
+ request.getFsToken().getPassword().toByteArray(), new Text(request.getFsToken().getKind()),
+ new Text(request.getFsToken().getService()));
}
final String bulkToken = request.getBulkToken();
User user = getActiveUser();
final UserGroupInformation ugi = user.getUGI();
if (userProvider.isHadoopSecurityEnabled()) {
try {
- Token tok = TokenUtil.obtainToken(conn);
+ Token<AuthenticationTokenIdentifier> tok = TokenUtil.obtainToken(conn).get();
if (tok != null) {
boolean b = ugi.addToken(tok);
LOG.debug("token added " + tok + " for user " + ugi + " return=" + b);
}
- } catch (IOException ioe) {
+ } catch (Exception ioe) {
LOG.warn("unable to add token", ioe);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7556dd5b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
index c54d905..28efb84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,27 +15,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.security.token;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ServiceException;
-
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.AuthenticationService;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenResponse;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
@@ -45,6 +47,8 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+
/**
* Utility methods for obtaining authentication tokens.
*/
@@ -64,12 +68,39 @@ public class TokenUtil {
/**
* Obtain and return an authentication token for the current user.
+ * @param conn The async HBase cluster connection
+ * @return the authentication token instance, wrapped by a {@link CompletableFuture}.
+ */
+ public static CompletableFuture<Token<AuthenticationTokenIdentifier>> obtainToken(
+ AsyncConnection conn) {
+ CompletableFuture<Token<AuthenticationTokenIdentifier>> future = new CompletableFuture<>();
+ if (injectedException != null) {
+ future.completeExceptionally(injectedException);
+ return future;
+ }
+ AsyncTable<?> table = conn.getTable(TableName.META_TABLE_NAME);
+ table.<AuthenticationService.Interface, GetAuthenticationTokenResponse> coprocessorService(
+ AuthenticationProtos.AuthenticationService::newStub,
+ (s, c, r) -> s.getAuthenticationToken(c,
+ AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance(), r),
+ HConstants.EMPTY_START_ROW).whenComplete((resp, error) -> {
+ if (error != null) {
+ future.completeExceptionally(ProtobufUtil.handleRemoteException(error));
+ } else {
+ future.complete(toToken(resp.getToken()));
+ }
+ });
+ return future;
+ }
+
+ /**
+ * Obtain and return an authentication token for the current user.
* @param conn The HBase cluster connection
* @throws IOException if a remote error or serialization problem occurs.
* @return the authentication token instance
*/
- public static Token<AuthenticationTokenIdentifier> obtainToken(
- Connection conn) throws IOException {
+ public static Token<AuthenticationTokenIdentifier> obtainToken(Connection conn)
+ throws IOException {
Table meta = null;
try {
injectFault();
@@ -77,9 +108,9 @@ public class TokenUtil {
meta = conn.getTable(TableName.META_TABLE_NAME);
CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
AuthenticationProtos.AuthenticationService.BlockingInterface service =
- AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
- AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
- AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
+ AuthenticationService.newBlockingStub(rpcChannel);
+ GetAuthenticationTokenResponse response =
+ service.getAuthenticationToken(null, GetAuthenticationTokenRequest.getDefaultInstance());
return toToken(response.getToken());
} catch (ServiceException se) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/7556dd5b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java
index 32fcddb..585a3ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java
@@ -18,35 +18,53 @@
package org.apache.hadoop.hbase.security.token;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
+import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.net.URLClassLoader;
-
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@Category(SmallTests.class)
public class TestTokenUtil {
+
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestTokenUtil.class);
+ HBaseClassTestRule.forClass(TestTokenUtil.class);
- @Test
- public void testObtainToken() throws Exception {
+ private URLClassLoader cl;
+
+ @Before
+ public void setUp() {
URL urlPU = ProtobufUtil.class.getProtectionDomain().getCodeSource().getLocation();
URL urlTU = TokenUtil.class.getProtectionDomain().getCodeSource().getLocation();
+ cl = new URLClassLoader(new URL[] { urlPU, urlTU }, getClass().getClassLoader());
+ }
- ClassLoader cl = new URLClassLoader(new URL[] { urlPU, urlTU }, getClass().getClassLoader());
+ @After
+ public void tearDown() throws IOException {
+ Closeables.close(cl, true);
+ }
+ @Test
+ public void testObtainToken() throws Exception {
Throwable injected = new com.google.protobuf.ServiceException("injected");
Class<?> tokenUtil = cl.loadClass(TokenUtil.class.getCanonicalName());
@@ -55,8 +73,7 @@ public class TestTokenUtil {
shouldInjectFault.set(null, injected);
try {
- tokenUtil.getMethod("obtainToken", Connection.class)
- .invoke(null, new Object[] { null });
+ tokenUtil.getMethod("obtainToken", Connection.class).invoke(null, new Object[] { null });
fail("Should have injected exception.");
} catch (InvocationTargetException e) {
Throwable t = e;
@@ -72,9 +89,16 @@ public class TestTokenUtil {
}
}
+ CompletableFuture<?> future = (CompletableFuture<?>) tokenUtil
+ .getMethod("obtainToken", AsyncConnection.class).invoke(null, new Object[] { null });
+ try {
+ future.get();
+ fail("Should have injected exception.");
+ } catch (ExecutionException e) {
+ assertSame(injected, e.getCause());
+ }
Boolean loaded = (Boolean) cl.loadClass(ProtobufUtil.class.getCanonicalName())
- .getDeclaredMethod("isClassLoaderLoaded")
- .invoke(null);
+ .getDeclaredMethod("isClassLoaderLoaded").invoke(null);
assertFalse("Should not have loaded DynamicClassLoader", loaded);
}
}
[6/6] hbase git commit: HBASE-21538 Rewrite RegionReplicaFlushHandler
to use AsyncClusterConnection
Posted by zh...@apache.org.
HBASE-21538 Rewrite RegionReplicaFlushHandler to use AsyncClusterConnection
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/df35a12c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/df35a12c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/df35a12c
Branch: refs/heads/HBASE-21512
Commit: df35a12ce93ef6ec20a31c50978fe69b441a6da1
Parents: 8cd0e99
Author: Duo Zhang <zh...@apache.org>
Authored: Wed Dec 12 09:33:33 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Jan 5 14:25:41 2019 +0800
----------------------------------------------------------------------
.../hbase/client/AsyncClusterConnection.java | 8 ++
.../hbase/client/AsyncConnectionImpl.java | 8 ++
.../hbase/client/ClusterConnectionFactory.java | 16 +--
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 36 ++++--
.../apache/hadoop/hbase/util/FutureUtils.java | 22 ++++
.../master/procedure/RSProcedureDispatcher.java | 34 +-----
.../hbase/protobuf/ReplicationProtbufUtil.java | 15 +--
.../hbase/regionserver/HRegionServer.java | 3 +-
.../handler/RegionReplicaFlushHandler.java | 110 ++++++++++---------
9 files changed, 132 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 1327fd7..f1f64ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -17,10 +17,13 @@
*/
package org.apache.hadoop.hbase.client;
+import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+
/**
* The asynchronous connection for internal usage.
*/
@@ -41,4 +44,9 @@ public interface AsyncClusterConnection extends AsyncConnection {
* Get the rpc client we used to communicate with other servers.
*/
RpcClient getRpcClient();
+
+ /**
+ * Flush a region and get the response.
+ */
+ CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean writeFlushWALMarker);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 4e7f421..d883809 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -54,6 +54,7 @@ import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
@@ -335,4 +336,11 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
return new AsyncRegionServerAdmin(serverName, this);
}
+
+ @Override
+ public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
+ boolean writeFlushWALMarker) {
+ RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
+ return admin.flushRegionInternal(regionName, writeFlushWALMarker);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
index 68c0630..79484db 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
@@ -18,15 +18,12 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.net.SocketAddress;
-import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
-
/**
* The factory for creating {@link AsyncClusterConnection}.
*/
@@ -48,16 +45,7 @@ public final class ClusterConnectionFactory {
public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
SocketAddress localAddress, User user) throws IOException {
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
- String clusterId;
- try {
- clusterId = registry.getClusterId().get();
- } catch (InterruptedException e) {
- throw (IOException) new InterruptedIOException().initCause(e);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- Throwables.propagateIfPossible(cause, IOException.class);
- throw new IOException(cause);
- }
+ String clusterId = FutureUtils.get(registry.getClusterId());
return new AsyncConnectionImpl(conf, registry, clusterId, localAddress, user);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 869a630..4e9e64c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -818,7 +818,19 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> flushRegion(byte[] regionName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ return flushRegionInternal(regionName, false).thenAccept(r -> {
+ });
+ }
+
+ /**
+ * This method is for internal use only, where we need the response of the flush.
+ * <p/>
+ * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
+ * API.
+ */
+ CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
+ boolean writeFlushWALMarker) {
+ CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
addListener(getRegionLocation(regionName), (location, err) -> {
if (err != null) {
future.completeExceptionally(err);
@@ -830,7 +842,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
.completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
return;
}
- addListener(flush(serverName, location.getRegion()), (ret, err2) -> {
+ addListener(flush(serverName, location.getRegion(), writeFlushWALMarker), (ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
@@ -841,15 +853,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return future;
}
- private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) {
- return this.<Void> newAdminCaller()
- .serverName(serverName)
- .action(
- (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
- controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
- .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
- resp -> null))
- .call();
+ private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
+ boolean writeFlushWALMarker) {
+ return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
+ .action((controller, stub) -> this
+ .<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
+ RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), writeFlushWALMarker),
+ (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
+ .call();
}
@Override
@@ -862,7 +873,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
if (hRegionInfos != null) {
- hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
+ hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, false).thenAccept(r -> {
+ })));
}
addListener(CompletableFuture.allOf(
compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
index f4a7332..f509f87 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -17,12 +17,18 @@
*/
package org.apache.hadoop.hbase.util;
+import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+
/**
* Helper class for processing futures.
*/
@@ -57,4 +63,20 @@ public final class FutureUtils {
}
});
}
+
+ /**
+ * A helper class for getting the result of a Future, and convert the error to an
+ * {@link IOException}.
+ */
+ public static <T> T get(Future<T> future) throws IOException {
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ throw new IOException(cause);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
index f3ab4b3..f772b68 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -18,11 +18,9 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
@@ -35,12 +33,12 @@ import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
@@ -341,15 +339,7 @@ public class RSProcedureDispatcher
protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
final ExecuteProceduresRequest request) throws IOException {
- try {
- return getRsAdmin().executeProcedures(request).get();
- } catch (InterruptedException e) {
- throw (IOException) new InterruptedIOException().initCause(e);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- Throwables.propagateIfPossible(cause, IOException.class);
- throw new IOException(cause);
- }
+ return FutureUtils.get(getRsAdmin().executeProcedures(request));
}
protected void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
@@ -408,15 +398,7 @@ public class RSProcedureDispatcher
private OpenRegionResponse sendRequest(final ServerName serverName,
final OpenRegionRequest request) throws IOException {
- try {
- return getRsAdmin().openRegion(request).get();
- } catch (InterruptedException e) {
- throw (IOException) new InterruptedIOException().initCause(e);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- Throwables.propagateIfPossible(cause, IOException.class);
- throw new IOException(cause);
- }
+ return FutureUtils.get(getRsAdmin().openRegion(request));
}
private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
@@ -458,15 +440,7 @@ public class RSProcedureDispatcher
private CloseRegionResponse sendRequest(final ServerName serverName,
final CloseRegionRequest request) throws IOException {
- try {
- return getRsAdmin().closeRegion(request).get();
- } catch (InterruptedException e) {
- throw (IOException) new InterruptedIOException().initCause(e);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- Throwables.propagateIfPossible(cause, IOException.class);
- throw new IOException(cause);
- }
+ return FutureUtils.get(getRsAdmin().closeRegion(request));
}
private void remoteCallCompleted(final MasterProcedureEnv env,
http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 74fad26..9f41a76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -18,13 +18,10 @@
*/
package org.apache.hadoop.hbase.protobuf;
-
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@@ -32,12 +29,12 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
@@ -60,15 +57,7 @@ public class ReplicationProtbufUtil {
throws IOException {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(
entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
- try {
- admin.replicateWALEntry(p.getFirst(), p.getSecond()).get();
- } catch (InterruptedException e) {
- throw (IOException) new InterruptedIOException().initCause(e);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- Throwables.propagateIfPossible(cause, IOException.class);
- throw new IOException(e);
- }
+ FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond()));
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 6e5fc9f..27972fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2380,8 +2380,7 @@ public class HRegionServer extends HasThread implements
// submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
if (this.executorService != null) {
- this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection,
- rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
+ this.executorService.submit(new RegionReplicaFlushHandler(this, region));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/df35a12c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
index b917379..921a69c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
@@ -20,26 +20,23 @@ package org.apache.hadoop.hbase.regionserver.handler;
import java.io.IOException;
import java.io.InterruptedIOException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.FlushRegionCallable;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
/**
* HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in
@@ -56,20 +53,13 @@ public class RegionReplicaFlushHandler extends EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class);
- private final ClusterConnection connection;
- private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
- private final RpcControllerFactory rpcControllerFactory;
- private final int operationTimeout;
+ private final AsyncClusterConnection connection;
+
private final HRegion region;
- public RegionReplicaFlushHandler(Server server, ClusterConnection connection,
- RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory,
- int operationTimeout, HRegion region) {
+ public RegionReplicaFlushHandler(Server server, HRegion region) {
super(server, EventType.RS_REGION_REPLICA_FLUSH);
- this.connection = connection;
- this.rpcRetryingCallerFactory = rpcRetryingCallerFactory;
- this.rpcControllerFactory = rpcControllerFactory;
- this.operationTimeout = operationTimeout;
+ this.connection = server.getAsyncClusterConnection();
this.region = region;
}
@@ -103,7 +93,7 @@ public class RegionReplicaFlushHandler extends EventHandler {
return numRetries;
}
- void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException {
+ void triggerFlushInPrimaryRegion(final HRegion region) throws IOException {
long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
@@ -117,44 +107,58 @@ public class RegionReplicaFlushHandler extends EventHandler {
}
while (!region.isClosing() && !region.isClosed()
&& !server.isAborted() && !server.isStopped()) {
- FlushRegionCallable flushCallable = new FlushRegionCallable(
- connection, rpcControllerFactory,
- RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true);
-
// TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we
// do not have to wait for the whole flush here, just initiate it.
- FlushRegionResponse response = null;
+ FlushRegionResponse response;
try {
- response = rpcRetryingCallerFactory.<FlushRegionResponse>newCaller()
- .callWithRetries(flushCallable, this.operationTimeout);
- } catch (IOException ex) {
- if (ex instanceof TableNotFoundException
- || connection.isTableDisabled(region.getRegionInfo().getTable())) {
+ response = FutureUtils.get(connection.flush(ServerRegionReplicaUtil
+ .getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionName(), true));
+ } catch (IOException e) {
+ if (e instanceof TableNotFoundException || FutureUtils
+ .get(connection.getAdmin().isTableDisabled(region.getRegionInfo().getTable()))) {
return;
}
- throw ex;
+ if (!counter.shouldRetry()) {
+ throw e;
+ }
+ // The reason that why we need to retry here is that, the retry for asynchronous admin
+ // request is much simpler than the normal operation, if we failed to locate the region once
+ // then we will throw the exception out and will not try to relocate again. So here we need
+ // to add some retries by ourselves to prevent shutting down the region server too
+ // frequent...
+ LOG.debug("Failed to trigger a flush of primary region replica {} of region {}, retry={}",
+ ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+ .getRegionNameAsString(),
+ region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes(), e);
+ try {
+ counter.sleepUntilNextRetry();
+ } catch (InterruptedException e1) {
+ throw new InterruptedIOException(e1.getMessage());
+ }
+ continue;
}
if (response.getFlushed()) {
// then we have to wait for seeing the flush entry. All reads will be rejected until we see
// a complete flush cycle or replay a region open event
if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully triggered a flush of primary region replica "
- + ServerRegionReplicaUtil
- .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
- + " of region " + region.getRegionInfo().getEncodedName()
- + " Now waiting and blocking reads until observing a full flush cycle");
+ LOG.debug("Successfully triggered a flush of primary region replica " +
+ ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+ .getRegionNameAsString() +
+ " of region " + region.getRegionInfo().getRegionNameAsString() +
+ " Now waiting and blocking reads until observing a full flush cycle");
}
break;
} else {
if (response.hasWroteFlushWalMarker()) {
- if(response.getWroteFlushWalMarker()) {
+ if (response.getWroteFlushWalMarker()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary "
- + "region replica " + ServerRegionReplicaUtil
- .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
- + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and "
- + "blocking reads until observing a flush marker");
+ LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " +
+ "region replica " +
+ ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+ .getRegionNameAsString() +
+ " of region " + region.getRegionInfo().getRegionNameAsString() +
+ " Now waiting and " + "blocking reads until observing a flush marker");
}
break;
} else {
@@ -162,15 +166,23 @@ public class RegionReplicaFlushHandler extends EventHandler {
// closing or already flushing. Retry flush again after some sleep.
if (!counter.shouldRetry()) {
throw new IOException("Cannot cause primary to flush or drop a wal marker after " +
- "retries. Failing opening of this region replica "
- + region.getRegionInfo().getEncodedName());
+ counter.getAttemptTimes() + " retries. Failing opening of this region replica " +
+ region.getRegionInfo().getRegionNameAsString());
+ } else {
+ LOG.warn(
+ "Cannot cause primary replica {} to flush or drop a wal marker " +
+ "for region replica {}, retry={}",
+ ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+ .getRegionNameAsString(),
+ region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes());
}
}
} else {
// nothing to do. Are we dealing with an old server?
- LOG.warn("Was not able to trigger a flush from primary region due to old server version? "
- + "Continuing to open the secondary region replica: "
- + region.getRegionInfo().getEncodedName());
+ LOG.warn(
+ "Was not able to trigger a flush from primary region due to old server version? " +
+ "Continuing to open the secondary region replica: " +
+ region.getRegionInfo().getRegionNameAsString());
region.setReadsEnabled(true);
break;
}
[4/6] hbase git commit: HBASE-21515 Also initialize an
AsyncClusterConnection in HRegionServer
Posted by zh...@apache.org.
HBASE-21515 Also initialize an AsyncClusterConnection in HRegionServer
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3c7636da
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3c7636da
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3c7636da
Branch: refs/heads/HBASE-21512
Commit: 3c7636da1174d5b9f85b1d7403773fc9e2d9cc3f
Parents: 94093e8
Author: zhangduo <zh...@apache.org>
Authored: Fri Nov 30 08:23:47 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Jan 5 14:24:35 2019 +0800
----------------------------------------------------------------------
.../hbase/client/AsyncClusterConnection.java | 38 ++++++++++++
.../hbase/client/AsyncConnectionImpl.java | 39 ++++++------
.../hbase/client/ClusterConnectionFactory.java | 63 ++++++++++++++++++++
.../hadoop/hbase/client/ConnectionFactory.java | 5 +-
.../hadoop/hbase/util/ReflectionUtils.java | 22 ++++---
.../java/org/apache/hadoop/hbase/Server.java | 20 +++++++
.../org/apache/hadoop/hbase/master/HMaster.java | 3 +
.../hbase/regionserver/HRegionServer.java | 56 ++++++++++++-----
.../regionserver/ReplicationSyncUp.java | 6 ++
.../hadoop/hbase/MockRegionServerServices.java | 5 ++
.../client/TestAsyncNonMetaRegionLocator.java | 2 +-
...syncNonMetaRegionLocatorConcurrenyLimit.java | 2 +-
.../client/TestAsyncRegionLocatorTimeout.java | 2 +-
...TestAsyncSingleRequestRpcRetryingCaller.java | 4 +-
.../hbase/client/TestAsyncTableNoncedRetry.java | 2 +-
.../hbase/master/MockNoopMasterServices.java | 6 ++
.../hadoop/hbase/master/MockRegionServer.java | 5 ++
.../hbase/master/TestActiveMasterManager.java | 6 ++
.../hbase/master/cleaner/TestHFileCleaner.java | 6 ++
.../master/cleaner/TestHFileLinkCleaner.java | 6 ++
.../hbase/master/cleaner/TestLogsCleaner.java | 6 ++
.../cleaner/TestReplicationHFileCleaner.java | 6 ++
.../regionserver/TestHeapMemoryManager.java | 6 ++
.../hbase/regionserver/TestSplitLogWorker.java | 6 ++
.../hbase/regionserver/TestWALLockup.java | 6 ++
.../TestReplicationTrackerZKImpl.java | 6 ++
.../TestReplicationSourceManager.java | 6 ++
.../security/token/TestTokenAuthentication.java | 6 ++
.../apache/hadoop/hbase/util/MockServer.java | 6 ++
29 files changed, 302 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
new file mode 100644
index 0000000..c7dea25
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The asynchronous connection for internal usage.
+ */
+@InterfaceAudience.Private
+public interface AsyncClusterConnection extends AsyncConnection {
+
+ /**
+ * Get the nonce generator for this connection.
+ */
+ NonceGenerator getNonceGenerator();
+
+ /**
+ * Get the rpc client we used to communicate with other servers.
+ */
+ RpcClient getRpcClient();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 361d5b2..188e830 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -21,48 +21,48 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
-import org.apache.hadoop.hbase.AuthUtil;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-
import java.io.IOException;
+import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.CollectionUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
-import org.apache.hadoop.hbase.util.CollectionUtils;
-import org.apache.hadoop.hbase.util.Threads;
/**
* The implementation of AsyncConnection.
*/
@InterfaceAudience.Private
-class AsyncConnectionImpl implements AsyncConnection {
+class AsyncConnectionImpl implements AsyncClusterConnection {
private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
@@ -105,7 +105,7 @@ class AsyncConnectionImpl implements AsyncConnection {
private ChoreService authService;
public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
- User user) {
+ SocketAddress localAddress, User user) {
this.conf = conf;
this.user = user;
if (user.isLoginFromKeytab()) {
@@ -113,7 +113,7 @@ class AsyncConnectionImpl implements AsyncConnection {
}
this.connConf = new AsyncConnectionConfiguration(conf);
this.registry = registry;
- this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
+ this.rpcClient = RpcClientFactory.createClient(conf, clusterId, localAddress, null);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
this.rpcTimeout =
@@ -157,11 +157,16 @@ class AsyncConnectionImpl implements AsyncConnection {
}
// ditto
- @VisibleForTesting
+ @Override
public NonceGenerator getNonceGenerator() {
return nonceGenerator;
}
+ @Override
+ public RpcClient getRpcClient() {
+ return rpcClient;
+ }
+
private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
new file mode 100644
index 0000000..68c0630
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.SocketAddress;
+import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+
+/**
+ * The factory for creating {@link AsyncClusterConnection}.
+ */
+@InterfaceAudience.Private
+public final class ClusterConnectionFactory {
+
+ private ClusterConnectionFactory() {
+ }
+
+ /**
+ * Create a new {@link AsyncClusterConnection} instance.
+ * <p/>
+ * Unlike what we have done in {@link ConnectionFactory}, here we just return an
+ * {@link AsyncClusterConnection} instead of a {@link java.util.concurrent.CompletableFuture},
+ * which means this method could block on fetching the cluster id. This is just used to simplify
+ * the implementation, as when starting new region servers, we do not need to be event-driven. Can
+ * change later if we want a {@link java.util.concurrent.CompletableFuture} here.
+ */
+ public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
+ SocketAddress localAddress, User user) throws IOException {
+ AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
+ String clusterId;
+ try {
+ clusterId = registry.getClusterId().get();
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ throw new IOException(cause);
+ }
+ return new AsyncConnectionImpl(conf, registry, clusterId, localAddress, user);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
index e24af74..2ba732a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
@@ -295,9 +295,8 @@ public class ConnectionFactory {
AsyncConnectionImpl.class, AsyncConnection.class);
try {
future.complete(
- user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>)() ->
- ReflectionUtils.newInstance(clazz, conf, registry, clusterId, user))
- );
+ user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
+ .newInstance(clazz, conf, registry, clusterId, null, user)));
} catch (Exception e) {
future.completeExceptionally(e);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
index a136846..268249d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
@@ -83,15 +83,19 @@ public class ReflectionUtils {
boolean match = true;
for (int i = 0; i < ctorParamTypes.length && match; ++i) {
- Class<?> paramType = paramTypes[i].getClass();
- match = (!ctorParamTypes[i].isPrimitive()) ? ctorParamTypes[i].isAssignableFrom(paramType) :
- ((int.class.equals(ctorParamTypes[i]) && Integer.class.equals(paramType)) ||
- (long.class.equals(ctorParamTypes[i]) && Long.class.equals(paramType)) ||
- (double.class.equals(ctorParamTypes[i]) && Double.class.equals(paramType)) ||
- (char.class.equals(ctorParamTypes[i]) && Character.class.equals(paramType)) ||
- (short.class.equals(ctorParamTypes[i]) && Short.class.equals(paramType)) ||
- (boolean.class.equals(ctorParamTypes[i]) && Boolean.class.equals(paramType)) ||
- (byte.class.equals(ctorParamTypes[i]) && Byte.class.equals(paramType)));
+ if (paramTypes[i] == null) {
+ match = !ctorParamTypes[i].isPrimitive();
+ } else {
+ Class<?> paramType = paramTypes[i].getClass();
+ match = (!ctorParamTypes[i].isPrimitive()) ? ctorParamTypes[i].isAssignableFrom(paramType)
+ : ((int.class.equals(ctorParamTypes[i]) && Integer.class.equals(paramType)) ||
+ (long.class.equals(ctorParamTypes[i]) && Long.class.equals(paramType)) ||
+ (double.class.equals(ctorParamTypes[i]) && Double.class.equals(paramType)) ||
+ (char.class.equals(ctorParamTypes[i]) && Character.class.equals(paramType)) ||
+ (short.class.equals(ctorParamTypes[i]) && Short.class.equals(paramType)) ||
+ (boolean.class.equals(ctorParamTypes[i]) && Boolean.class.equals(paramType)) ||
+ (byte.class.equals(ctorParamTypes[i]) && Byte.class.equals(paramType)));
+ }
}
if (match) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java
index fb898ea..c33d5af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -61,6 +63,24 @@ public interface Server extends Abortable, Stoppable {
ClusterConnection getClusterConnection();
/**
+ * Returns a reference to the servers' async connection.
+ * <p/>
+ * Important note: this method returns a reference to Connection which is managed by Server
+ * itself, so callers must NOT attempt to close connection obtained.
+ */
+ default AsyncConnection getAsyncConnection() {
+ return getAsyncClusterConnection();
+ }
+
+ /**
+ * Returns a reference to the servers' async cluster connection.
+ * <p/>
+ * Important note: this method returns a reference to Connection which is managed by Server
+ * itself, so callers must NOT attempt to close connection obtained.
+ */
+ AsyncClusterConnection getAsyncClusterConnection();
+
+ /**
* @return The unique server name for this server.
*/
ServerName getServerName();
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 0bcef59..52005d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -3008,6 +3008,9 @@ public class HMaster extends HRegionServer implements MasterServices {
if (this.clusterConnection != null) {
this.clusterConnection.close();
}
+ if (this.asyncClusterConnection != null) {
+ this.asyncClusterConnection.close();
+ }
}
public void stopMaster() throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 13f277b..375b3f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -77,7 +77,9 @@ import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.ZNodeClearer;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -105,7 +107,6 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
import org.apache.hadoop.hbase.ipc.RpcClient;
-import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@@ -263,6 +264,11 @@ public class HRegionServer extends HasThread implements
protected ClusterConnection clusterConnection;
/**
+ * The asynchronous cluster connection to be shared by services.
+ */
+ protected AsyncClusterConnection asyncClusterConnection;
+
+ /**
* Go here to get table descriptors.
*/
protected TableDescriptors tableDescriptors;
@@ -776,11 +782,7 @@ public class HRegionServer extends HasThread implements
return true;
}
- /**
- * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to the
- * local server; i.e. a short-circuit Connection. Safe to use going to local or remote server.
- */
- private ClusterConnection createClusterConnection() throws IOException {
+ private Configuration unsetClientZookeeperQuorum() {
Configuration conf = this.conf;
if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
// Use server ZK cluster for server-issued connections, so we clone
@@ -788,11 +790,20 @@ public class HRegionServer extends HasThread implements
conf = new Configuration(this.conf);
conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
}
+ return conf;
+ }
+
+ /**
+ * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to the
+ * local server; i.e. a short-circuit Connection. Safe to use going to local or remote server.
+ */
+ private ClusterConnection createClusterConnection() throws IOException {
// Create a cluster connection that when appropriate, can short-circuit and go directly to the
// local server if the request is to the local server bypassing RPC. Can be used for both local
// and remote invocations.
- ClusterConnection conn = ConnectionUtils.createShortCircuitConnection(conf, null,
- userProvider.getCurrent(), serverName, rpcServices, rpcServices);
+ ClusterConnection conn =
+ ConnectionUtils.createShortCircuitConnection(unsetClientZookeeperQuorum(), null,
+ userProvider.getCurrent(), serverName, rpcServices, rpcServices);
// This is used to initialize the batch thread pool inside the connection implementation.
// When deploy a fresh cluster, we may first use the cluster connection in InitMetaProcedure,
// which will be executed inside the PEWorker, and then the batch thread pool will inherit the
@@ -826,9 +837,12 @@ public class HRegionServer extends HasThread implements
/**
* Setup our cluster connection if not already initialized.
*/
- protected synchronized void setupClusterConnection() throws IOException {
+ protected final synchronized void setupClusterConnection() throws IOException {
if (clusterConnection == null) {
clusterConnection = createClusterConnection();
+ asyncClusterConnection =
+ ClusterConnectionFactory.createAsyncClusterConnection(unsetClientZookeeperQuorum(),
+ new InetSocketAddress(this.rpcServices.isa.getAddress(), 0), userProvider.getCurrent());
}
}
@@ -842,8 +856,7 @@ public class HRegionServer extends HasThread implements
initializeZooKeeper();
setupClusterConnection();
// Setup RPC client for master communication
- this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
- this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
+ this.rpcClient = asyncClusterConnection.getRpcClient();
} catch (Throwable t) {
// Call stop if error or process will stick around for ever since server
// puts up non-daemon threads.
@@ -1107,7 +1120,15 @@ public class HRegionServer extends HasThread implements
LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e);
}
}
-
+ if (this.asyncClusterConnection != null) {
+ try {
+ this.asyncClusterConnection.close();
+ } catch (IOException e) {
+ // Although the {@link Closeable} interface throws an {@link
+ // IOException}, in reality, the implementation would never do that.
+ LOG.warn("Attempt to close server's AsyncClusterConnection failed.", e);
+ }
+ }
// Closing the compactSplit thread before closing meta regions
if (!this.killed && containsMetaTableRegions()) {
if (!abortRequested || this.fsOk) {
@@ -3737,9 +3758,9 @@ public class HRegionServer extends HasThread implements
}
@Override
- public EntityLock regionLock(List<RegionInfo> regionInfos, String description,
- Abortable abort) throws IOException {
- return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator())
+ public EntityLock regionLock(List<RegionInfo> regionInfos, String description, Abortable abort)
+ throws IOException {
+ return new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator())
.regionLock(regionInfos, description, abort);
}
@@ -3843,4 +3864,9 @@ public class HRegionServer extends HasThread implements
System.exit(1);
}
}
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return asyncClusterConnection;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index c7bccb3..7d1245c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -180,5 +181,10 @@ public class ReplicationSyncUp extends Configured implements Tool {
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 0e4f241..5205960 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -368,4 +369,8 @@ public class MockRegionServerServices implements RegionServerServices {
public Optional<MobFileCache> getMobFileCache() {
return Optional.empty();
}
+
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index eeaf99f..550a6f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -81,7 +81,7 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.getAdmin().balancerSwitch(false, true);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
- registry.getClusterId().get(), User.getCurrent());
+ registry.getClusterId().get(), null, User.getCurrent());
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
SPLIT_KEYS = new byte[8][];
for (int i = 111; i < 999; i += 111) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
index 8cdb4a9..7e06218 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -125,7 +125,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
TEST_UTIL.getAdmin().balancerSwitch(false, true);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
- registry.getClusterId().get(), User.getCurrent());
+ registry.getClusterId().get(), null, User.getCurrent());
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
.toArray(byte[][]::new);
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
index 758aa30..0e28f96 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
@@ -96,7 +96,7 @@ public class TestAsyncRegionLocatorTimeout {
TEST_UTIL.waitTableAvailable(TABLE_NAME);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
- registry.getClusterId().get(), User.getCurrent());
+ registry.getClusterId().get(), null, User.getCurrent());
LOCATOR = CONN.getLocator();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index 7d8956b..29dcd56 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -73,7 +73,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
TEST_UTIL.waitTableAvailable(TABLE_NAME);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
- registry.getClusterId().get(), User.getCurrent());
+ registry.getClusterId().get(), null, User.getCurrent());
}
@AfterClass
@@ -164,7 +164,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
}
};
try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(),
- CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
+ CONN.registry, CONN.registry.getClusterId().get(), null, User.getCurrent()) {
@Override
AsyncRegionLocator getLocator() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index 3008561..e1e55f5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -85,7 +85,7 @@ public class TestAsyncTableNoncedRetry {
TEST_UTIL.waitTableAvailable(TABLE_NAME);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
- registry.getClusterId().get(), User.getCurrent()) {
+ registry.getClusterId().get(), null, User.getCurrent()) {
@Override
public NonceGenerator getNonceGenerator() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 9c55f57..3ebad66 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
@@ -473,4 +474,9 @@ public class MockNoopMasterServices implements MasterServices {
public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() {
return null;
}
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index a930d7f..73d53c7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -721,4 +722,8 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
public Optional<MobFileCache> getMobFileCache() {
return Optional.empty();
}
+
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
index 2300f54..77667a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -349,5 +350,10 @@ public class TestActiveMasterManager {
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
index 5c8db3e..c5fad32 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -279,6 +280,11 @@ public class TestHFileCleaner {
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
index 119194b..fd11ff8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.HFileLink;
@@ -213,5 +214,10 @@ public class TestHFileLinkCleaner {
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 247ed01..3286032 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.master.HMaster;
@@ -409,6 +410,11 @@ public class TestLogsCleaner {
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
static class FaultyZooKeeperWatcher extends ZKWatcher {
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index d162bf3..9791643 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.master.HMaster;
@@ -303,6 +304,11 @@ public class TestReplicationHFileCleaner {
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
static class FaultyZooKeeperWatcher extends ZKWatcher {
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
index 8c9ce75..4a359e4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -862,6 +863,11 @@ public class TestHeapMemoryManager {
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
static class CustomHeapMemoryTuner implements HeapMemoryTuner {
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index cbf932c..5481ff8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
@@ -159,6 +160,11 @@ public class TestSplitLogWorker {
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index 0e20252..9e9d1d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Durability;
@@ -523,6 +524,11 @@ public class TestWALLockup {
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
static class DummyWALActionsListener implements WALActionsListener {
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index 863d558..62ab265 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -263,5 +264,10 @@ public class TestReplicationTrackerZKImpl {
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 86bbb09..427f319 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
@@ -906,5 +907,10 @@ public abstract class TestReplicationSourceManager {
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
index e4780f1..92c8e54 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -363,6 +364,11 @@ public class TestTokenAuthentication {
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
@Parameters(name = "{index}: rpcServerImpl={0}")
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c7636da/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockServer.java
index c25db01..13212d2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockServer.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.log.HBaseMarkers;
@@ -143,4 +144,9 @@ public class MockServer implements Server {
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ return null;
+ }
}
[3/6] hbase git commit: HBASE-21526 Use AsyncClusterConnection in
ServerManager for getRsAdmin
Posted by zh...@apache.org.
HBASE-21526 Use AsyncClusterConnection in ServerManager for getRsAdmin
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/eaad39b2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/eaad39b2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/eaad39b2
Branch: refs/heads/HBASE-21512
Commit: eaad39b297cc5e59e6f363c96e844255e113fc38
Parents: 7556dd5
Author: zhangduo <zh...@apache.org>
Authored: Thu Dec 6 21:25:34 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Jan 5 14:24:35 2019 +0800
----------------------------------------------------------------------
.../hbase/client/AsyncClusterConnection.java | 6 +
.../hbase/client/AsyncConnectionImpl.java | 5 +
.../hbase/client/AsyncRegionServerAdmin.java | 210 +++++++++++++++++++
.../apache/hadoop/hbase/util/FutureUtils.java | 2 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 15 +-
.../hadoop/hbase/master/ServerManager.java | 67 ------
.../master/procedure/RSProcedureDispatcher.java | 44 ++--
7 files changed, 263 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/eaad39b2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index c7dea25..1327fd7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.yetus.audience.InterfaceAudience;
@@ -27,6 +28,11 @@ import org.apache.yetus.audience.InterfaceAudience;
public interface AsyncClusterConnection extends AsyncConnection {
/**
+ * Get the admin service for the given region server.
+ */
+ AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);
+
+ /**
* Get the nonce generator for this connection.
*/
NonceGenerator getNonceGenerator();
http://git-wip-us.apache.org/repos/asf/hbase/blob/eaad39b2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 188e830..4e7f421 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -330,4 +330,9 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
RETRY_TIMER);
}
+
+ @Override
+ public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
+ return new AsyncRegionServerAdmin(serverName, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/eaad39b2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
new file mode 100644
index 0000000..9accd89
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
+
+/**
+ * A simple wrapper of the {@link AdminService} for a region server, which returns a
+ * {@link CompletableFuture}. This is easier to use, as if you use the raw protobuf interface, you
+ * need to get the result from the {@link RpcCallback}, and if there is an exception, you need to
+ * get it from the {@link RpcController} passed in.
+ * <p/>
+ * Notice that there is no retry, and this is intentional. We have different retry for different
+ * usage for now, if later we want to unify them, we can move the retry logic into this class.
+ */
+@InterfaceAudience.Private
+public class AsyncRegionServerAdmin {
+
+ private final ServerName server;
+
+ private final AsyncConnectionImpl conn;
+
+ AsyncRegionServerAdmin(ServerName server, AsyncConnectionImpl conn) {
+ this.server = server;
+ this.conn = conn;
+ }
+
+ @FunctionalInterface
+ private interface RpcCall<RESP> {
+ void call(AdminService.Interface stub, HBaseRpcController controller, RpcCallback<RESP> done);
+ }
+
+ private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
+ CompletableFuture<RESP> future = new CompletableFuture<>();
+ HBaseRpcController controller = conn.rpcControllerFactory.newController();
+ try {
+ rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback<RESP>() {
+
+ @Override
+ public void run(RESP resp) {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ future.complete(resp);
+ }
+ }
+ });
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ public CompletableFuture<GetRegionInfoResponse> getRegionInfo(GetRegionInfoRequest request) {
+ return call((stub, controller, done) -> stub.getRegionInfo(controller, request, done));
+ }
+
+ public CompletableFuture<GetStoreFileResponse> getStoreFile(GetStoreFileRequest request) {
+ return call((stub, controller, done) -> stub.getStoreFile(controller, request, done));
+ }
+
+ public CompletableFuture<GetOnlineRegionResponse> getOnlineRegion(
+ GetOnlineRegionRequest request) {
+ return call((stub, controller, done) -> stub.getOnlineRegion(controller, request, done));
+ }
+
+ public CompletableFuture<OpenRegionResponse> openRegion(OpenRegionRequest request) {
+ return call((stub, controller, done) -> stub.openRegion(controller, request, done));
+ }
+
+ public CompletableFuture<WarmupRegionResponse> warmupRegion(WarmupRegionRequest request) {
+ return call((stub, controller, done) -> stub.warmupRegion(controller, request, done));
+ }
+
+ public CompletableFuture<CloseRegionResponse> closeRegion(CloseRegionRequest request) {
+ return call((stub, controller, done) -> stub.closeRegion(controller, request, done));
+ }
+
+ public CompletableFuture<FlushRegionResponse> flushRegion(FlushRegionRequest request) {
+ return call((stub, controller, done) -> stub.flushRegion(controller, request, done));
+ }
+
+ public CompletableFuture<CompactionSwitchResponse> compactionSwitch(
+ CompactionSwitchRequest request) {
+ return call((stub, controller, done) -> stub.compactionSwitch(controller, request, done));
+ }
+
+ public CompletableFuture<CompactRegionResponse> compactRegion(CompactRegionRequest request) {
+ return call((stub, controller, done) -> stub.compactRegion(controller, request, done));
+ }
+
+ public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
+ ReplicateWALEntryRequest request) {
+ return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done));
+ }
+
+ public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) {
+ return call((stub, controller, done) -> stub.replay(controller, request, done));
+ }
+
+ public CompletableFuture<RollWALWriterResponse> rollWALWriter(RollWALWriterRequest request) {
+ return call((stub, controller, done) -> stub.rollWALWriter(controller, request, done));
+ }
+
+ public CompletableFuture<GetServerInfoResponse> getServerInfo(GetServerInfoRequest request) {
+ return call((stub, controller, done) -> stub.getServerInfo(controller, request, done));
+ }
+
+ public CompletableFuture<StopServerResponse> stopServer(StopServerRequest request) {
+ return call((stub, controller, done) -> stub.stopServer(controller, request, done));
+ }
+
+ public CompletableFuture<UpdateFavoredNodesResponse> updateFavoredNodes(
+ UpdateFavoredNodesRequest request) {
+ return call((stub, controller, done) -> stub.updateFavoredNodes(controller, request, done));
+ }
+
+ public CompletableFuture<UpdateConfigurationResponse> updateConfiguration(
+ UpdateConfigurationRequest request) {
+ return call((stub, controller, done) -> stub.updateConfiguration(controller, request, done));
+ }
+
+ public CompletableFuture<GetRegionLoadResponse> getRegionLoad(GetRegionLoadRequest request) {
+ return call((stub, controller, done) -> stub.getRegionLoad(controller, request, done));
+ }
+
+ public CompletableFuture<ClearCompactionQueuesResponse> clearCompactionQueues(
+ ClearCompactionQueuesRequest request) {
+ return call((stub, controller, done) -> stub.clearCompactionQueues(controller, request, done));
+ }
+
+ public CompletableFuture<ClearRegionBlockCacheResponse> clearRegionBlockCache(
+ ClearRegionBlockCacheRequest request) {
+ return call((stub, controller, done) -> stub.clearRegionBlockCache(controller, request, done));
+ }
+
+ public CompletableFuture<GetSpaceQuotaSnapshotsResponse> getSpaceQuotaSnapshots(
+ GetSpaceQuotaSnapshotsRequest request) {
+ return call((stub, controller, done) -> stub.getSpaceQuotaSnapshots(controller, request, done));
+ }
+
+ public CompletableFuture<ExecuteProceduresResponse> executeProcedures(
+ ExecuteProceduresRequest request) {
+ return call((stub, controller, done) -> stub.executeProcedures(controller, request, done));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/eaad39b2/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
index 067e66b..f4a7332 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -57,4 +57,4 @@ public final class FutureUtils {
}
});
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/eaad39b2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 52005d6..4fd3a37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -193,6 +193,7 @@ import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EncryptionTest;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.HasThread;
@@ -225,6 +226,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
@@ -1937,6 +1939,15 @@ public class HMaster extends HRegionServer implements MasterServices {
});
}
+ private void warmUpRegion(ServerName server, RegionInfo region) {
+ FutureUtils.addListener(asyncClusterConnection.getRegionServerAdmin(server)
+ .warmupRegion(RequestConverter.buildWarmupRegionRequest(region)), (r, e) -> {
+ if (e != null) {
+ LOG.warn("Failed to warm up region {} on server {}", region, server, e);
+ }
+ });
+ }
+
// Public so can be accessed by tests. Blocks until move is done.
// Replace with an async implementation from which you can get
// a success/failure result.
@@ -2008,7 +2019,9 @@ public class HMaster extends HRegionServer implements MasterServices {
// Warmup the region on the destination before initiating the move. this call
// is synchronous and takes some time. doing it before the source region gets
// closed
- serverManager.sendRegionWarmup(rp.getDestination(), hri);
+ // A region server could reject the close request because it either does not
+ // have the specified region or the region is being split.
+ warmUpRegion(rp.getDestination(), hri);
LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
Future<byte []> future = this.assignmentManager.moveAsync(rp);
http://git-wip-us.apache.org/repos/asf/hbase/blob/eaad39b2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 86d72d1..c26ef6c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -51,12 +50,9 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -159,25 +155,16 @@ public class ServerManager {
private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
new ConcurrentSkipListMap<>();
- /**
- * Map of admin interfaces per registered regionserver; these interfaces we use to control
- * regionservers out on the cluster
- */
- private final Map<ServerName, AdminService.BlockingInterface> rsAdmins = new HashMap<>();
-
/** List of region servers that should not get any more new regions. */
private final ArrayList<ServerName> drainingServers = new ArrayList<>();
private final MasterServices master;
- private final ClusterConnection connection;
private final DeadServer deadservers = new DeadServer();
private final long maxSkew;
private final long warningSkew;
- private final RpcControllerFactory rpcControllerFactory;
-
/** Listeners that are called on server events. */
private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
@@ -189,8 +176,6 @@ public class ServerManager {
Configuration c = master.getConfiguration();
maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
- this.connection = master.getClusterConnection();
- this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory();
persistFlushedSequenceId = c.getBoolean(PERSIST_FLUSHEDSEQUENCEID,
PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
}
@@ -438,7 +423,6 @@ public class ServerManager {
void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
LOG.info("Registering regionserver=" + serverName);
this.onlineServers.put(serverName, sl);
- this.rsAdmins.remove(serverName);
}
@VisibleForTesting
@@ -633,7 +617,6 @@ public class ServerManager {
this.onlineServers.remove(sn);
onlineServers.notifyAll();
}
- this.rsAdmins.remove(sn);
}
/*
@@ -676,34 +659,6 @@ public class ServerManager {
return this.drainingServers.add(sn);
}
- // RPC methods to region servers
-
- private HBaseRpcController newRpcController() {
- return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
- }
-
- /**
- * Sends a WARMUP RPC to the specified server to warmup the specified region.
- * <p>
- * A region server could reject the close request because it either does not
- * have the specified region or the region is being split.
- * @param server server to warmup a region
- * @param region region to warmup
- */
- public void sendRegionWarmup(ServerName server,
- RegionInfo region) {
- if (server == null) return;
- try {
- AdminService.BlockingInterface admin = getRsAdmin(server);
- HBaseRpcController controller = newRpcController();
- ProtobufUtil.warmupRegion(controller, admin, region);
- } catch (IOException e) {
- LOG.error("Received exception in RPC for warmup server:" +
- server + "region: " + region +
- "exception: " + e);
- }
- }
-
/**
* Contacts a region server and waits up to timeout ms
* to close the region. This bypasses the active hmaster.
@@ -737,28 +692,6 @@ public class ServerManager {
}
/**
- * @param sn
- * @return Admin interface for the remote regionserver named <code>sn</code>
- * @throws IOException
- * @throws RetriesExhaustedException wrapping a ConnectException if failed
- */
- public AdminService.BlockingInterface getRsAdmin(final ServerName sn)
- throws IOException {
- AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
- if (admin == null) {
- LOG.debug("New admin connection to " + sn.toString());
- if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
- // A master is also a region server now, see HBASE-10569 for details
- admin = ((HRegionServer)master).getRSRpcServices();
- } else {
- admin = this.connection.getAdmin(sn);
- }
- this.rsAdmins.put(sn, admin);
- }
- return admin;
- }
-
- /**
* Calculate min necessary to start. This is not an absolute. It is just
* a friction that will cause us hang around a bit longer waiting on
* RegionServers to check-in.
http://git-wip-us.apache.org/repos/asf/hbase/blob/eaad39b2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
index 638f9d3..f3ab4b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -18,12 +18,15 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@@ -37,11 +40,11 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@@ -159,13 +162,8 @@ public class RSProcedureDispatcher
this.serverName = serverName;
}
- protected AdminService.BlockingInterface getRsAdmin() throws IOException {
- final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
- if (admin == null) {
- throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
- " failed because no RPC connection found to this server");
- }
- return admin;
+ protected AsyncRegionServerAdmin getRsAdmin() throws IOException {
+ return master.getAsyncClusterConnection().getRegionServerAdmin(serverName);
}
protected ServerName getServerName() {
@@ -344,9 +342,13 @@ public class RSProcedureDispatcher
protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
final ExecuteProceduresRequest request) throws IOException {
try {
- return getRsAdmin().executeProcedures(null, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ return getRsAdmin().executeProcedures(request).get();
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ throw new IOException(cause);
}
}
@@ -407,9 +409,13 @@ public class RSProcedureDispatcher
private OpenRegionResponse sendRequest(final ServerName serverName,
final OpenRegionRequest request) throws IOException {
try {
- return getRsAdmin().openRegion(null, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ return getRsAdmin().openRegion(request).get();
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ throw new IOException(cause);
}
}
@@ -453,9 +459,13 @@ public class RSProcedureDispatcher
private CloseRegionResponse sendRequest(final ServerName serverName,
final CloseRegionRequest request) throws IOException {
try {
- return getRsAdmin().closeRegion(null, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ return getRsAdmin().closeRegion(request).get();
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ throw new IOException(cause);
}
}
[2/6] hbase git commit: HBASE-21579 Use AsyncClusterConnection for
HBaseInterClusterReplicationEndpoint
Posted by zh...@apache.org.
HBASE-21579 Use AsyncClusterConnection for HBaseInterClusterReplicationEndpoint
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8cd0e994
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8cd0e994
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8cd0e994
Branch: refs/heads/HBASE-21512
Commit: 8cd0e994e44d921873d9ac16934000c09feca35f
Parents: eaad39b
Author: zhangduo <zh...@apache.org>
Authored: Tue Jan 1 21:27:14 2019 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Jan 5 14:24:35 2019 +0800
----------------------------------------------------------------------
.../hbase/client/AsyncRegionServerAdmin.java | 14 +++++--
.../hbase/protobuf/ReplicationProtbufUtil.java | 35 +++++++++--------
.../HBaseInterClusterReplicationEndpoint.java | 31 +++++++--------
.../regionserver/ReplicationSinkManager.java | 40 +++++++-------------
.../replication/SyncReplicationTestBase.java | 12 +++---
.../TestReplicationSinkManager.java | 21 +++++-----
6 files changed, 74 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd0e994/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
index 9accd89..b9141a9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
@@ -94,9 +95,9 @@ public class AsyncRegionServerAdmin {
void call(AdminService.Interface stub, HBaseRpcController controller, RpcCallback<RESP> done);
}
- private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
+ private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall, CellScanner cellScanner) {
CompletableFuture<RESP> future = new CompletableFuture<>();
- HBaseRpcController controller = conn.rpcControllerFactory.newController();
+ HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner);
try {
rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback<RESP>() {
@@ -115,6 +116,10 @@ public class AsyncRegionServerAdmin {
return future;
}
+ private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
+ return call(rpcCall, null);
+ }
+
public CompletableFuture<GetRegionInfoResponse> getRegionInfo(GetRegionInfoRequest request) {
return call((stub, controller, done) -> stub.getRegionInfo(controller, request, done));
}
@@ -154,8 +159,9 @@ public class AsyncRegionServerAdmin {
}
public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
- ReplicateWALEntryRequest request) {
- return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done));
+ ReplicateWALEntryRequest request, CellScanner cellScanner) {
+ return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done),
+ cellScanner);
}
public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd0e994/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index c1b3911..74fad26 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -20,51 +20,54 @@ package org.apache.hadoop.hbase.protobuf;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.io.SizedCellScanner;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@InterfaceAudience.Private
public class ReplicationProtbufUtil {
+
/**
- * A helper to replicate a list of WAL entries using admin protocol.
- * @param admin Admin service
+ * A helper to replicate a list of WAL entries using region server admin
+ * @param admin the region server admin
* @param entries Array of WAL entries to be replicated
* @param replicationClusterId Id which will uniquely identify source cluster FS client
* configurations in the replication configuration directory
* @param sourceBaseNamespaceDir Path to source cluster base namespace directory
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
- * @throws java.io.IOException
*/
- public static void replicateWALEntry(final AdminService.BlockingInterface admin,
- final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
- Path sourceHFileArchiveDir) throws IOException {
- Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
- buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
- sourceHFileArchiveDir);
- HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond());
+ public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
+ String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir)
+ throws IOException {
+ Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(
+ entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
try {
- admin.replicateWALEntry(controller, p.getFirst());
- } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
- throw ProtobufUtil.getServiceException(e);
+ admin.replicateWALEntry(p.getFirst(), p.getSecond()).get();
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ throw new IOException(e);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd0e994/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 7db53aa..0359096 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -39,7 +39,6 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -48,13 +47,16 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -65,8 +67,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
-
/**
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
* implementation for replicating to another HBase cluster.
@@ -85,8 +85,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
- private ClusterConnection conn;
- private Configuration localConf;
+ private AsyncClusterConnection conn;
private Configuration conf;
// How long should we sleep for each retry
private long sleepForRetries;
@@ -117,7 +116,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
public void init(Context context) throws IOException {
super.init(context);
this.conf = HBaseConfiguration.create(ctx.getConfiguration());
- this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
decorateConf();
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
@@ -132,12 +130,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// TODO: This connection is replication specific or we should make it particular to
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
- this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
+ this.conn =
+ ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();
// ReplicationQueueInfo parses the peerId out of the znode for us
- this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
+ this.replicationSinkMgr = new ReplicationSinkManager(conn, this, this.conf);
// per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
@@ -284,9 +283,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
}
private void reconnectToPeerCluster() {
- ClusterConnection connection = null;
+ AsyncClusterConnection connection = null;
try {
- connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
+ connection =
+ ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
} catch (IOException ioe) {
LOG.warn("Failed to create connection for peer cluster", ioe);
}
@@ -367,7 +367,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
}
continue;
}
- if (this.conn == null || this.conn.isClosed()) {
+ if (this.conn == null) {
reconnectToPeerCluster();
}
try {
@@ -480,10 +480,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
entriesHashCode, entries.size(), size, replicationClusterId);
}
sinkPeer = replicationSinkMgr.getReplicationSink();
- BlockingInterface rrs = sinkPeer.getRegionServer();
+ AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
try {
- ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
- replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ ReplicationProtbufUtil.replicateWALEntry(rsAdmin,
+ entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
+ hfileArchiveDir);
LOG.trace("Completed replicating batch {}", entriesHashCode);
} catch (IOException e) {
LOG.trace("Failed replicating batch {}", entriesHashCode, e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd0e994/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
index 3cd7884..21b07ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
@@ -21,11 +21,11 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -35,8 +35,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-
/**
* Maintains a collection of peers to replicate to, and randomly selects a
* single peer to replicate to per set of data to replicate. Also handles
@@ -61,9 +59,7 @@ public class ReplicationSinkManager {
static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
- private final Connection conn;
-
- private final String peerClusterId;
+ private final AsyncClusterConnection conn;
private final HBaseReplicationEndpoint endpoint;
@@ -77,8 +73,6 @@ public class ReplicationSinkManager {
// replication sinks is refreshed
private final int badSinkThreshold;
- private final Random random;
-
// A timestamp of the last time the list of replication peers changed
private long lastUpdateToPeers;
@@ -88,26 +82,22 @@ public class ReplicationSinkManager {
/**
* Instantiate for a single replication peer cluster.
* @param conn connection to the peer cluster
- * @param peerClusterId identifier of the peer cluster
* @param endpoint replication endpoint for inter cluster replication
* @param conf HBase configuration, used for determining replication source ratio and bad peer
* threshold
*/
- public ReplicationSinkManager(ClusterConnection conn, String peerClusterId,
- HBaseReplicationEndpoint endpoint, Configuration conf) {
+ public ReplicationSinkManager(AsyncClusterConnection conn, HBaseReplicationEndpoint endpoint,
+ Configuration conf) {
this.conn = conn;
- this.peerClusterId = peerClusterId;
this.endpoint = endpoint;
this.badReportCounts = Maps.newHashMap();
this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
- this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
- DEFAULT_BAD_SINK_THRESHOLD);
- this.random = new Random();
+ this.badSinkThreshold =
+ conf.getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
}
/**
* Get a randomly-chosen replication sink to replicate to.
- *
* @return a replication sink to replicate to
*/
public synchronized SinkPeer getReplicationSink() throws IOException {
@@ -119,8 +109,8 @@ public class ReplicationSinkManager {
if (sinks.isEmpty()) {
throw new IOException("No replication sinks are available");
}
- ServerName serverName = sinks.get(random.nextInt(sinks.size()));
- return new SinkPeer(serverName, ((ClusterConnection) conn).getAdmin(serverName));
+ ServerName serverName = sinks.get(ThreadLocalRandom.current().nextInt(sinks.size()));
+ return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
}
/**
@@ -160,7 +150,7 @@ public class ReplicationSinkManager {
*/
public synchronized void chooseSinks() {
List<ServerName> slaveAddresses = endpoint.getRegionServers();
- Collections.shuffle(slaveAddresses, random);
+ Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
sinks = slaveAddresses.subList(0, numSinks);
lastUpdateToPeers = System.currentTimeMillis();
@@ -182,9 +172,9 @@ public class ReplicationSinkManager {
*/
public static class SinkPeer {
private ServerName serverName;
- private AdminService.BlockingInterface regionServer;
+ private AsyncRegionServerAdmin regionServer;
- public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) {
+ public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
this.serverName = serverName;
this.regionServer = regionServer;
}
@@ -193,10 +183,8 @@ public class ReplicationSinkManager {
return serverName;
}
- public AdminService.BlockingInterface getRegionServer() {
+ public AsyncRegionServerAdmin getRegionServer() {
return regionServer;
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd0e994/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index f373590..e0d112d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@@ -250,19 +250,19 @@ public class SyncReplicationTestBase {
protected final void verifyReplicationRequestRejection(HBaseTestingUtility utility,
boolean expectedRejection) throws Exception {
HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME);
- ClusterConnection connection = regionServer.getClusterConnection();
+ AsyncClusterConnection connection = regionServer.getAsyncClusterConnection();
Entry[] entries = new Entry[10];
for (int i = 0; i < entries.length; i++) {
entries[i] =
new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
}
if (!expectedRejection) {
- ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
- entries, null, null, null);
+ ReplicationProtbufUtil.replicateWALEntry(
+ connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null);
} else {
try {
- ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
- entries, null, null, null);
+ ReplicationProtbufUtil.replicateWALEntry(
+ connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null);
fail("Should throw IOException when sync-replication state is in A or DA");
} catch (DoNotRetryIOException e) {
assertTrue(e.getMessage().contains("Reject to apply to sink cluster"));
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd0e994/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
index 39dabb4..60afd40 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
@@ -25,7 +25,8 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -37,8 +38,6 @@ import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-
@Category({ReplicationTests.class, SmallTests.class})
public class TestReplicationSinkManager {
@@ -46,16 +45,14 @@ public class TestReplicationSinkManager {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationSinkManager.class);
- private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
-
private HBaseReplicationEndpoint replicationEndpoint;
private ReplicationSinkManager sinkManager;
@Before
public void setUp() {
replicationEndpoint = mock(HBaseReplicationEndpoint.class);
- sinkManager = new ReplicationSinkManager(mock(ClusterConnection.class),
- PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
+ sinkManager = new ReplicationSinkManager(mock(AsyncClusterConnection.class),
+ replicationEndpoint, new Configuration());
}
@Test
@@ -100,7 +97,7 @@ public class TestReplicationSinkManager {
// Sanity check
assertEquals(1, sinkManager.getNumSinks());
- SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
+ SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
sinkManager.reportBadSink(sinkPeer);
@@ -131,7 +128,7 @@ public class TestReplicationSinkManager {
ServerName serverName = sinkManager.getSinksForTesting().get(0);
- SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
+ SinkPeer sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
@@ -147,7 +144,7 @@ public class TestReplicationSinkManager {
//
serverName = sinkManager.getSinksForTesting().get(0);
- sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
+ sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
sinkManager.reportBadSink(sinkPeer);
}
@@ -188,8 +185,8 @@ public class TestReplicationSinkManager {
ServerName serverNameA = sinkList.get(0);
ServerName serverNameB = sinkList.get(1);
- SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
- SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class));
+ SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
+ SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
sinkManager.reportBadSink(sinkPeerA);