You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/11 03:52:37 UTC
[hbase] branch master updated: HBASE-21580 Support getting Hbck
instance from AsyncConnection
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 7bebdff HBASE-21580 Support getting Hbck instance from AsyncConnection
7bebdff is described below
commit 7bebdff6a254b1ee8d58ef2a1d5a8200f69446de
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Jan 9 17:11:17 2019 +0800
HBASE-21580 Support getting Hbck instance from AsyncConnection
---
.../hadoop/hbase/client/AsyncConnection.java | 33 ++++++++++++++
.../hadoop/hbase/client/AsyncConnectionImpl.java | 53 ++++++++++++++++------
.../hbase/client/ConnectionImplementation.java | 16 +++----
.../org/apache/hadoop/hbase/client/HBaseHbck.java | 6 +--
.../org/apache/hadoop/hbase/client/TestHbck.java | 42 +++++++++++++----
5 files changed, 117 insertions(+), 33 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
index eda2394..5640eb3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -18,9 +18,13 @@
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
@@ -181,4 +185,33 @@ public interface AsyncConnection extends Closeable {
* @param pool the thread pool to use for executing callback
*/
AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, ExecutorService pool);
+
+ /**
+ * Retrieve an Hbck implementation to fix an HBase cluster. The returned Hbck is not guaranteed to
+ * be thread-safe. A new instance should be created by each thread. This is a lightweight
+ * operation. Pooling or caching of the returned Hbck instance is not recommended.
+ * <p/>
+ * The caller is responsible for calling {@link Hbck#close()} on the returned Hbck instance.
+ * <p/>
+ * This will be used mostly by hbck tool.
+ * @return an Hbck instance for active master. Active master is fetched from the zookeeper.
+ */
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
+ CompletableFuture<Hbck> getHbck();
+
+ /**
+ * Retrieve an Hbck implementation to fix an HBase cluster. The returned Hbck is not guaranteed to
+ * be thread-safe. A new instance should be created by each thread. This is a lightweight
+ * operation. Pooling or caching of the returned Hbck instance is not recommended.
+ * <p/>
+ * The caller is responsible for calling {@link Hbck#close()} on the returned Hbck instance.
+ * <p/>
+ * This will be used mostly by hbck tool. This may only be used to by pass getting registered
+ * master from ZK. In situations where ZK is not available or active master is not registered with
+ * ZK and user can get master address by other means, master can be explicitly specified.
+ * @param masterServer explicit {@link ServerName} for master server
+ * @return an Hbck instance for a specified master server
+ */
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
+ Hbck getHbck(ServerName masterServer) throws IOException;
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 361d5b2..1b99f84 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -20,13 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
-
-import org.apache.hadoop.hbase.AuthUtil;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@@ -35,28 +29,35 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.CollectionUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
-import org.apache.hadoop.hbase.util.CollectionUtils;
-import org.apache.hadoop.hbase.util.Threads;
/**
* The implementation of AsyncConnection.
@@ -325,4 +326,30 @@ class AsyncConnectionImpl implements AsyncConnection {
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
RETRY_TIMER);
}
+
+ @Override
+ public CompletableFuture<Hbck> getHbck() {
+ CompletableFuture<Hbck> future = new CompletableFuture<>();
+ addListener(registry.getMasterAddress(), (sn, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ } else {
+ try {
+ future.complete(getHbck(sn));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public Hbck getHbck(ServerName masterServer) throws IOException {
+ // we will not create a new connection when creating a new protobuf stub, and for hbck there
+ // will be no performance consideration, so for simplification we will create a new stub every
+ // time instead of caching the stub here.
+ return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
+ rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 992a95c..4e3543f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -438,14 +438,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
throw new RegionServerStoppedException(masterServer + " is dead.");
}
String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(),
- masterServer, this.hostnamesCanChange);
-
- return new HBaseHbck(this,
- (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
- BlockingRpcChannel channel =
- this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
- return MasterProtos.HbckService.newBlockingStub(channel);
- }));
+ masterServer, this.hostnamesCanChange);
+
+ return new HBaseHbck(
+ (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
+ BlockingRpcChannel channel =
+ this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
+ return MasterProtos.HbckService.newBlockingStub(channel);
+ }), rpcControllerFactory);
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java
index eb39a2d..a276017 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -67,9 +67,9 @@ public class HBaseHbck implements Hbck {
private RpcControllerFactory rpcControllerFactory;
- HBaseHbck(ClusterConnection connection, BlockingInterface hbck) throws IOException {
+ HBaseHbck(BlockingInterface hbck, RpcControllerFactory rpcControllerFactory) {
this.hbck = hbck;
- this.rpcControllerFactory = connection.getRpcControllerFactory();
+ this.rpcControllerFactory = rpcControllerFactory;
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java
index 8d9380f..2951600 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java
@@ -21,11 +21,9 @@ package org.apache.hadoop.hbase.client;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
-import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
-
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
@@ -49,9 +47,15 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
@@ -59,6 +63,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
* Spins up the minicluster once at test start and then takes it down afterward.
* Add any testing of HBaseHbck functionality here.
*/
+@RunWith(Parameterized.class)
@Category({LargeTests.class, ClientTests.class})
public class TestHbck {
@ClassRule
@@ -71,19 +76,39 @@ public class TestHbck {
@Rule
public TestName name = new TestName();
+ @Parameter
+ public boolean async;
+
private static final TableName TABLE_NAME = TableName.valueOf(TestHbck.class.getSimpleName());
private static ProcedureExecutor<MasterProcedureEnv> procExec;
+ private static AsyncConnection ASYNC_CONN;
+
+ @Parameters(name = "{index}: async={0}")
+ public static List<Object[]> params() {
+ return Arrays.asList(new Object[] { false }, new Object[] { true });
+ }
+
+ private Hbck getHbck() throws Exception {
+ if (async) {
+ return ASYNC_CONN.getHbck().get();
+ } else {
+ return TEST_UTIL.getHbck();
+ }
+ }
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("family1"), 5);
procExec = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+ ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
+ Closeables.close(ASYNC_CONN, true);
TEST_UTIL.shutdownMiniCluster();
}
@@ -120,16 +145,15 @@ public class TestHbck {
//bypass the procedure
List<Long> pids = Arrays.<Long>asList(procId);
- List<Boolean> results =
- TEST_UTIL.getHbck().bypassProcedure(pids, 30000, false, false);
+ List<Boolean> results = getHbck().bypassProcedure(pids, 30000, false, false);
assertTrue("Failed to by pass procedure!", results.get(0));
TEST_UTIL.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
LOG.info("{} finished", proc);
}
@Test
- public void testSetTableStateInMeta() throws IOException {
- Hbck hbck = TEST_UTIL.getHbck();
+ public void testSetTableStateInMeta() throws Exception {
+ Hbck hbck = getHbck();
// set table state to DISABLED
hbck.setTableStateInMeta(new TableState(TABLE_NAME, TableState.State.DISABLED));
// Method {@link Hbck#setTableStateInMeta()} returns previous state, which in this case
@@ -141,8 +165,8 @@ public class TestHbck {
}
@Test
- public void testAssigns() throws IOException {
- Hbck hbck = TEST_UTIL.getHbck();
+ public void testAssigns() throws Exception {
+ Hbck hbck = getHbck();
try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
List<RegionInfo> regions = admin.getRegions(TABLE_NAME);
for (RegionInfo ri: regions) {
@@ -183,7 +207,7 @@ public class TestHbck {
TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), Bytes.toBytes("family1"),
true);
ServerName serverName = testRs.getServerName();
- Hbck hbck = TEST_UTIL.getHbck();
+ Hbck hbck = getHbck();
List<Long> pids =
hbck.scheduleServerCrashProcedure(Arrays.asList(ProtobufUtil.toServerName(serverName)));
assertTrue(pids.get(0) > 0);