You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/05/22 02:50:33 UTC

[hbase] 05/26: HBASE-21538 Rewrite RegionReplicaFlushHandler to use AsyncClusterConnection

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 3ec40aef1172d12f42caa460c1373b40b1e9be7e
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Dec 12 09:33:33 2018 +0800

    HBASE-21538 Rewrite RegionReplicaFlushHandler to use AsyncClusterConnection
---
 .../hbase/client/AsyncClusterConnection.java       |   8 ++
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |   8 ++
 .../hbase/client/ClusterConnectionFactory.java     |  16 +--
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java    |  36 ++++---
 .../hbase/protobuf/ReplicationProtbufUtil.java     |  15 +--
 .../hadoop/hbase/regionserver/HRegionServer.java   |   3 +-
 .../handler/RegionReplicaFlushHandler.java         | 110 ++++++++++++---------
 7 files changed, 106 insertions(+), 90 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 1327fd7..f1f64ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+
 /**
  * The asynchronous connection for internal usage.
  */
@@ -41,4 +44,9 @@ public interface AsyncClusterConnection extends AsyncConnection {
    * Get the rpc client we used to communicate with other servers.
    */
   RpcClient getRpcClient();
+
+  /**
+   * Flush a region and get the response.
+   */
+  CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean writeFlushWALMarker);
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 62b9d8b..c17cca9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -60,6 +60,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
@@ -384,4 +385,11 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
   public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
     return new AsyncRegionServerAdmin(serverName, this);
   }
+
+  @Override
+  public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
+      boolean writeFlushWALMarker) {
+    RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
+    return admin.flushRegionInternal(regionName, writeFlushWALMarker);
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
index 68c0630..79484db 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
@@ -18,15 +18,12 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.net.SocketAddress;
-import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
-
 /**
  * The factory for creating {@link AsyncClusterConnection}.
  */
@@ -48,16 +45,7 @@ public final class ClusterConnectionFactory {
   public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
       SocketAddress localAddress, User user) throws IOException {
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
-    String clusterId;
-    try {
-      clusterId = registry.getClusterId().get();
-    } catch (InterruptedException e) {
-      throw (IOException) new InterruptedIOException().initCause(e);
-    } catch (ExecutionException e) {
-      Throwable cause = e.getCause();
-      Throwables.propagateIfPossible(cause, IOException.class);
-      throw new IOException(cause);
-    }
+    String clusterId = FutureUtils.get(registry.getClusterId());
     return new AsyncConnectionImpl(conf, registry, clusterId, localAddress, user);
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 3303fd3..b3d3468 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -910,7 +910,19 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public CompletableFuture<Void> flushRegion(byte[] regionName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
+    return flushRegionInternal(regionName, false).thenAccept(r -> {
+    });
+  }
+
+  /**
+   * This method is for internal use only, where we need the response of the flush.
+   * <p/>
+   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
+   * API.
+   */
+  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
+      boolean writeFlushWALMarker) {
+    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
     addListener(getRegionLocation(regionName), (location, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
@@ -922,7 +934,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
           .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
         return;
       }
-      addListener(flush(serverName, location.getRegion()), (ret, err2) -> {
+      addListener(flush(serverName, location.getRegion(), writeFlushWALMarker), (ret, err2) -> {
         if (err2 != null) {
           future.completeExceptionally(err2);
         } else {
@@ -933,15 +945,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     return future;
   }
 
-  private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) {
-    return this.<Void> newAdminCaller()
-            .serverName(serverName)
-            .action(
-              (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
-                controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
-                  .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
-                resp -> null))
-            .call();
+  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
+      boolean writeFlushWALMarker) {
+    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
+      .action((controller, stub) -> this
+        .<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
+          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), writeFlushWALMarker),
+          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
+      .call();
   }
 
   @Override
@@ -954,7 +965,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       }
       List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
       if (hRegionInfos != null) {
-        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
+        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, false).thenAccept(r -> {
+        })));
       }
       addListener(CompletableFuture.allOf(
         compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 74fad26..9f41a76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -18,13 +18,10 @@
  */
 package org.apache.hadoop.hbase.protobuf;
 
-
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -32,12 +29,12 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
@@ -60,15 +57,7 @@ public class ReplicationProtbufUtil {
       throws IOException {
     Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(
       entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
-    try {
-      admin.replicateWALEntry(p.getFirst(), p.getSecond()).get();
-    } catch (InterruptedException e) {
-      throw (IOException) new InterruptedIOException().initCause(e);
-    } catch (ExecutionException e) {
-      Throwable cause = e.getCause();
-      Throwables.propagateIfPossible(cause, IOException.class);
-      throw new IOException(e);
-    }
+    FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond()));
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 9dce52a..7c9141e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2409,8 +2409,7 @@ public class HRegionServer extends HasThread implements
 
     // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
     if (this.executorService != null) {
-      this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection,
-          rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
+      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
index 81b6d7e..0729203 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
@@ -20,26 +20,23 @@ package org.apache.hadoop.hbase.regionserver.handler;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.FlushRegionCallable;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 
 /**
  * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in
@@ -56,20 +53,13 @@ public class RegionReplicaFlushHandler extends EventHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class);
 
-  private final ClusterConnection connection;
-  private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
-  private final RpcControllerFactory rpcControllerFactory;
-  private final int operationTimeout;
+  private final AsyncClusterConnection connection;
+
   private final HRegion region;
 
-  public RegionReplicaFlushHandler(Server server, ClusterConnection connection,
-      RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory,
-      int operationTimeout, HRegion region) {
+  public RegionReplicaFlushHandler(Server server, HRegion region) {
     super(server, EventType.RS_REGION_REPLICA_FLUSH);
-    this.connection = connection;
-    this.rpcRetryingCallerFactory = rpcRetryingCallerFactory;
-    this.rpcControllerFactory = rpcControllerFactory;
-    this.operationTimeout = operationTimeout;
+    this.connection = server.getAsyncClusterConnection();
     this.region = region;
   }
 
@@ -103,7 +93,7 @@ public class RegionReplicaFlushHandler extends EventHandler {
     return numRetries;
   }
 
-  void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException {
+  void triggerFlushInPrimaryRegion(final HRegion region) throws IOException {
     long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
       HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
 
@@ -117,45 +107,59 @@ public class RegionReplicaFlushHandler extends EventHandler {
     }
     while (!region.isClosing() && !region.isClosed()
         && !server.isAborted() && !server.isStopped()) {
-      FlushRegionCallable flushCallable = new FlushRegionCallable(
-        connection, rpcControllerFactory,
-        RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true);
-
       // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we
       // do not have to wait for the whole flush here, just initiate it.
-      FlushRegionResponse response = null;
+      FlushRegionResponse response;
       try {
-         response = rpcRetryingCallerFactory.<FlushRegionResponse>newCaller()
-          .callWithRetries(flushCallable, this.operationTimeout);
-      } catch (IOException ex) {
-        if (ex instanceof TableNotFoundException
-            || connection.isTableDisabled(region.getRegionInfo().getTable())) {
+        response = FutureUtils.get(connection.flush(ServerRegionReplicaUtil
+          .getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionName(), true));
+      } catch (IOException e) {
+        if (e instanceof TableNotFoundException || FutureUtils
+          .get(connection.getAdmin().isTableDisabled(region.getRegionInfo().getTable()))) {
           return;
         }
-        throw ex;
+        if (!counter.shouldRetry()) {
+          throw e;
+        }
+        // The reason that why we need to retry here is that, the retry for asynchronous admin
+        // request is much simpler than the normal operation, if we failed to locate the region once
+        // then we will throw the exception out and will not try to relocate again. So here we need
+        // to add some retries by ourselves to prevent shutting down the region server too
+        // frequent...
+        LOG.debug("Failed to trigger a flush of primary region replica {} of region {}, retry={}",
+          ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+            .getRegionNameAsString(),
+          region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes(), e);
+        try {
+          counter.sleepUntilNextRetry();
+        } catch (InterruptedException e1) {
+          throw new InterruptedIOException(e1.getMessage());
+        }
+        continue;
       }
 
       if (response.getFlushed()) {
         // then we have to wait for seeing the flush entry. All reads will be rejected until we see
         // a complete flush cycle or replay a region open event
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Successfully triggered a flush of primary region replica "
-              + ServerRegionReplicaUtil
-                .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
-                + " of region " + region.getRegionInfo().getEncodedName()
-                + " Now waiting and blocking reads until observing a full flush cycle");
+          LOG.debug("Successfully triggered a flush of primary region replica " +
+            ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+              .getRegionNameAsString() +
+            " of region " + region.getRegionInfo().getRegionNameAsString() +
+            " Now waiting and blocking reads until observing a full flush cycle");
         }
         region.setReadsEnabled(true);
         break;
       } else {
         if (response.hasWroteFlushWalMarker()) {
-          if(response.getWroteFlushWalMarker()) {
+          if (response.getWroteFlushWalMarker()) {
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary "
-                  + "region replica " + ServerRegionReplicaUtil
-                    .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
-                  + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and "
-                  + "blocking reads until observing a flush marker");
+              LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " +
+                "region replica " +
+                ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+                  .getRegionNameAsString() +
+                " of region " + region.getRegionInfo().getRegionNameAsString() +
+                " Now waiting and " + "blocking reads until observing a flush marker");
             }
             region.setReadsEnabled(true);
             break;
@@ -164,15 +168,23 @@ public class RegionReplicaFlushHandler extends EventHandler {
             // closing or already flushing. Retry flush again after some sleep.
             if (!counter.shouldRetry()) {
               throw new IOException("Cannot cause primary to flush or drop a wal marker after " +
-                  "retries. Failing opening of this region replica "
-                  + region.getRegionInfo().getEncodedName());
+                counter.getAttemptTimes() + " retries. Failing opening of this region replica " +
+                region.getRegionInfo().getRegionNameAsString());
+            } else {
+              LOG.warn(
+                "Cannot cause primary replica {} to flush or drop a wal marker " +
+                  "for region replica {}, retry={}",
+                ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
+                  .getRegionNameAsString(),
+                region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes());
             }
           }
         } else {
           // nothing to do. Are we dealing with an old server?
-          LOG.warn("Was not able to trigger a flush from primary region due to old server version? "
-              + "Continuing to open the secondary region replica: "
-              + region.getRegionInfo().getEncodedName());
+          LOG.warn(
+            "Was not able to trigger a flush from primary region due to old server version? " +
+              "Continuing to open the secondary region replica: " +
+              region.getRegionInfo().getRegionNameAsString());
           region.setReadsEnabled(true);
           break;
         }