You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/11 03:52:37 UTC

[hbase] branch master updated: HBASE-21580 Support getting Hbck instance from AsyncConnection

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bebdff  HBASE-21580 Support getting Hbck instance from AsyncConnection
7bebdff is described below

commit 7bebdff6a254b1ee8d58ef2a1d5a8200f69446de
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Jan 9 17:11:17 2019 +0800

    HBASE-21580 Support getting Hbck instance from AsyncConnection
---
 .../hadoop/hbase/client/AsyncConnection.java       | 33 ++++++++++++++
 .../hadoop/hbase/client/AsyncConnectionImpl.java   | 53 ++++++++++++++++------
 .../hbase/client/ConnectionImplementation.java     | 16 +++----
 .../org/apache/hadoop/hbase/client/HBaseHbck.java  |  6 +--
 .../org/apache/hadoop/hbase/client/TestHbck.java   | 42 +++++++++++++----
 5 files changed, 117 insertions(+), 33 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
index eda2394..5640eb3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -18,9 +18,13 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -181,4 +185,33 @@ public interface AsyncConnection extends Closeable {
    * @param pool the thread pool to use for executing callback
    */
   AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, ExecutorService pool);
+
+  /**
+   * Retrieve an Hbck implementation to fix an HBase cluster. The returned Hbck is not guaranteed to
+   * be thread-safe. A new instance should be created by each thread. This is a lightweight
+   * operation. Pooling or caching of the returned Hbck instance is not recommended.
+   * <p/>
+   * The caller is responsible for calling {@link Hbck#close()} on the returned Hbck instance.
+   * <p/>
+   * This will be used mostly by hbck tool.
+   * @return an Hbck instance for active master. Active master is fetched from the zookeeper.
+   */
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
+  CompletableFuture<Hbck> getHbck();
+
+  /**
+   * Retrieve an Hbck implementation to fix an HBase cluster. The returned Hbck is not guaranteed to
+   * be thread-safe. A new instance should be created by each thread. This is a lightweight
+   * operation. Pooling or caching of the returned Hbck instance is not recommended.
+   * <p/>
+   * The caller is responsible for calling {@link Hbck#close()} on the returned Hbck instance.
+   * <p/>
+   * This will be used mostly by hbck tool. This may only be used to by pass getting registered
+   * master from ZK. In situations where ZK is not available or active master is not registered with
+   * ZK and user can get master address by other means, master can be explicitly specified.
+   * @param masterServer explicit {@link ServerName} for master server
+   * @return an Hbck instance for a specified master server
+   */
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
+  Hbck getHbck(ServerName masterServer) throws IOException;
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 361d5b2..1b99f84 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -20,13 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
 import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
-
-import org.apache.hadoop.hbase.AuthUtil;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
@@ -35,28 +29,35 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.CollectionUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
-import org.apache.hadoop.hbase.util.CollectionUtils;
-import org.apache.hadoop.hbase.util.Threads;
 
 /**
  * The implementation of AsyncConnection.
@@ -325,4 +326,30 @@ class AsyncConnectionImpl implements AsyncConnection {
     return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
       RETRY_TIMER);
   }
+
+  @Override
+  public CompletableFuture<Hbck> getHbck() {
+    CompletableFuture<Hbck> future = new CompletableFuture<>();
+    addListener(registry.getMasterAddress(), (sn, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+      } else {
+        try {
+          future.complete(getHbck(sn));
+        } catch (IOException e) {
+          future.completeExceptionally(e);
+        }
+      }
+    });
+    return future;
+  }
+
+  @Override
+  public Hbck getHbck(ServerName masterServer) throws IOException {
+    // we will not create a new connection when creating a new protobuf stub, and for hbck there
+    // will be no performance consideration, so for simplification we will create a new stub every
+    // time instead of caching the stub here.
+    return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
+      rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 992a95c..4e3543f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -438,14 +438,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
       throw new RegionServerStoppedException(masterServer + " is dead.");
     }
     String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(),
-        masterServer, this.hostnamesCanChange);
-
-    return new HBaseHbck(this,
-        (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
-          BlockingRpcChannel channel =
-              this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
-          return MasterProtos.HbckService.newBlockingStub(channel);
-        }));
+      masterServer, this.hostnamesCanChange);
+
+    return new HBaseHbck(
+      (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
+        BlockingRpcChannel channel =
+          this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
+        return MasterProtos.HbckService.newBlockingStub(channel);
+      }), rpcControllerFactory);
   }
 
   @Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java
index eb39a2d..a276017 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -67,9 +67,9 @@ public class HBaseHbck implements Hbck {
 
   private RpcControllerFactory rpcControllerFactory;
 
-  HBaseHbck(ClusterConnection connection, BlockingInterface hbck) throws IOException {
+  HBaseHbck(BlockingInterface hbck, RpcControllerFactory rpcControllerFactory) {
     this.hbck = hbck;
-    this.rpcControllerFactory = connection.getRpcControllerFactory();
+    this.rpcControllerFactory = rpcControllerFactory;
   }
 
   @Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java
index 8d9380f..2951600 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java
@@ -21,11 +21,9 @@ package org.apache.hadoop.hbase.client;
 import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
-
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.ServerName;
@@ -49,9 +47,15 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 
 /**
@@ -59,6 +63,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
  * Spins up the minicluster once at test start and then takes it down afterward.
  * Add any testing of HBaseHbck functionality here.
  */
+@RunWith(Parameterized.class)
 @Category({LargeTests.class, ClientTests.class})
 public class TestHbck {
   @ClassRule
@@ -71,19 +76,39 @@ public class TestHbck {
   @Rule
   public TestName name = new TestName();
 
+  @Parameter
+  public boolean async;
+
   private static final TableName TABLE_NAME = TableName.valueOf(TestHbck.class.getSimpleName());
 
   private static ProcedureExecutor<MasterProcedureEnv> procExec;
 
+  private static AsyncConnection ASYNC_CONN;
+
+  @Parameters(name = "{index}: async={0}")
+  public static List<Object[]> params() {
+    return Arrays.asList(new Object[] { false }, new Object[] { true });
+  }
+
+  private Hbck getHbck() throws Exception {
+    if (async) {
+      return ASYNC_CONN.getHbck().get();
+    } else {
+      return TEST_UTIL.getHbck();
+    }
+  }
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("family1"), 5);
     procExec = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
   }
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
+    Closeables.close(ASYNC_CONN, true);
     TEST_UTIL.shutdownMiniCluster();
   }
 
@@ -120,16 +145,15 @@ public class TestHbck {
 
     //bypass the procedure
     List<Long> pids = Arrays.<Long>asList(procId);
-    List<Boolean> results =
-        TEST_UTIL.getHbck().bypassProcedure(pids, 30000, false, false);
+    List<Boolean> results = getHbck().bypassProcedure(pids, 30000, false, false);
     assertTrue("Failed to by pass procedure!", results.get(0));
     TEST_UTIL.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
     LOG.info("{} finished", proc);
   }
 
   @Test
-  public void testSetTableStateInMeta() throws IOException {
-    Hbck hbck = TEST_UTIL.getHbck();
+  public void testSetTableStateInMeta() throws Exception {
+    Hbck hbck = getHbck();
     // set table state to DISABLED
     hbck.setTableStateInMeta(new TableState(TABLE_NAME, TableState.State.DISABLED));
     // Method {@link Hbck#setTableStateInMeta()} returns previous state, which in this case
@@ -141,8 +165,8 @@ public class TestHbck {
   }
 
   @Test
-  public void testAssigns() throws IOException {
-    Hbck hbck = TEST_UTIL.getHbck();
+  public void testAssigns() throws Exception {
+    Hbck hbck = getHbck();
     try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
       List<RegionInfo> regions = admin.getRegions(TABLE_NAME);
       for (RegionInfo ri: regions) {
@@ -183,7 +207,7 @@ public class TestHbck {
     TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), Bytes.toBytes("family1"),
       true);
     ServerName serverName = testRs.getServerName();
-    Hbck hbck = TEST_UTIL.getHbck();
+    Hbck hbck = getHbck();
     List<Long> pids =
         hbck.scheduleServerCrashProcedure(Arrays.asList(ProtobufUtil.toServerName(serverName)));
     assertTrue(pids.get(0) > 0);