You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2016/07/12 00:37:45 UTC

hbase git commit: HBASE-16081 Wait for Replication Tasks to complete before killing the ThreadPoolExecutor inside of HBaseInterClusterReplicationEndpoint

Repository: hbase
Updated Branches:
  refs/heads/master a396ae773 -> ccf293d7f


HBASE-16081 Wait for Replication Tasks to complete before killing the ThreadPoolExecutor inside of HBaseInterClusterReplicationEndpoint

Signed-off-by: Mikhail Antonov <an...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ccf293d7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ccf293d7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ccf293d7

Branch: refs/heads/master
Commit: ccf293d7fbb2e73f454feb4d72d860ed46cc5115
Parents: a396ae7
Author: Joseph Hwang <jz...@fb.com>
Authored: Mon Jul 11 13:17:56 2016 -0700
Committer: Mikhail Antonov <an...@apache.org>
Committed: Mon Jul 11 17:34:12 2016 -0700

----------------------------------------------------------------------
 .../hbase/replication/ReplicationEndpoint.java  |  7 +++-
 .../HBaseInterClusterReplicationEndpoint.java   | 34 +++++++++++++++++---
 .../regionserver/ReplicationSourceManager.java  |  2 +-
 3 files changed, 36 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ccf293d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index c92b53d..69db31c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -57,6 +58,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
     private final String peerId;
     private final UUID clusterId;
     private final MetricsSource metrics;
+    private final Abortable abortable;
 
     @InterfaceAudience.Private
     public Context(
@@ -66,7 +68,8 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
         final UUID clusterId,
         final ReplicationPeer replicationPeer,
         final MetricsSource metrics,
-        final TableDescriptors tableDescriptors) {
+        final TableDescriptors tableDescriptors,
+        final Abortable abortable) {
       this.conf = conf;
       this.fs = fs;
       this.clusterId = clusterId;
@@ -74,6 +77,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
       this.replicationPeer = replicationPeer;
       this.metrics = metrics;
       this.tableDescriptors = tableDescriptors;
+      this.abortable = abortable;
     }
     public Configuration getConfiguration() {
       return conf;
@@ -99,6 +103,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
     public TableDescriptors getTableDescriptors() {
       return tableDescriptors;
     }
+    public Abortable getAbortable() { return abortable; }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccf293d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 28340b5..bf3fd1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -40,6 +40,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableNotFoundException;
@@ -71,17 +72,19 @@ import org.apache.hadoop.ipc.RemoteException;
 public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
 
   private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
-  private ClusterConnection conn;
 
-  private Configuration conf;
+  private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
 
+  private ClusterConnection conn;
+  private Configuration conf;
   // How long should we sleep for each retry
   private long sleepForRetries;
-
   // Maximum number of retries before taking bold actions
   private int maxRetriesMultiplier;
   // Socket timeouts require even bolder actions since we don't want to DDOS
   private int socketTimeoutMultiplier;
+  // Amount of time for shutdown to wait for all tasks to complete
+  private long maxTerminationWait;
   //Metrics for this source
   private MetricsSource metrics;
   // Handles connecting to peer region servers
@@ -93,6 +96,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   private Path baseNamespaceDir;
   private Path hfileArchiveDir;
   private boolean replicationBulkLoadDataEnabled;
+  private Abortable abortable;
 
   @Override
   public void init(Context context) throws IOException {
@@ -102,6 +106,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
     this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
         maxRetriesMultiplier);
+    // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
+    // tasks to terminate when doStop() is called.
+    long maxTerminationWaitMultiplier = this.conf.getLong(
+        "replication.source.maxterminationmultiplier",
+        DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
+    this.maxTerminationWait = maxTerminationWaitMultiplier *
+        this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
     // TODO: This connection is replication specific or we should make it particular to
     // replication and make replication specific settings such as compression or codec to use
     // passing Cells.
@@ -117,6 +128,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
         new LinkedBlockingQueue<Runnable>());
     this.exec.allowCoreThreadTimeOut(true);
+    this.abortable = ctx.getAbortable();
 
     this.replicationBulkLoadDataEnabled =
         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
@@ -211,7 +223,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
       }
     }
-    while (this.isRunning()) {
+    while (this.isRunning() && !exec.isShutdown()) {
       if (!isPeerEnabled()) {
         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
           sleepMultiplier++;
@@ -321,7 +333,19 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         LOG.warn("Failed to close the connection");
       }
     }
-    exec.shutdownNow();
+    // Allow currently running replication tasks to finish
+    exec.shutdown();
+    try {
+      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+    }
+    // Abort if the tasks did not terminate in time
+    if (!exec.isTerminated()) {
+      String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " +
+          "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " +
+          "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
+      abortable.abort(errMsg, new IOException(errMsg));
+    }
     notifyStopped();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccf293d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 143d6e2..e2a232f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -489,7 +489,7 @@ public class ReplicationSourceManager implements ReplicationListener {
 
     // init replication endpoint
     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
-      fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
+      fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server));
 
     return src;
   }