You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2016/07/12 00:37:45 UTC
hbase git commit: HBASE-16081 Wait for Replication Tasks to complete
before killing the ThreadPoolExecutor inside of
HBaseInterClusterReplicationEndpoint
Repository: hbase
Updated Branches:
refs/heads/master a396ae773 -> ccf293d7f
HBASE-16081 Wait for Replication Tasks to complete before killing the ThreadPoolExecutor inside of HBaseInterClusterReplicationEndpoint
Signed-off-by: Mikhail Antonov <an...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ccf293d7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ccf293d7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ccf293d7
Branch: refs/heads/master
Commit: ccf293d7fbb2e73f454feb4d72d860ed46cc5115
Parents: a396ae7
Author: Joseph Hwang <jz...@fb.com>
Authored: Mon Jul 11 13:17:56 2016 -0700
Committer: Mikhail Antonov <an...@apache.org>
Committed: Mon Jul 11 17:34:12 2016 -0700
----------------------------------------------------------------------
.../hbase/replication/ReplicationEndpoint.java | 7 +++-
.../HBaseInterClusterReplicationEndpoint.java | 34 +++++++++++++++++---
.../regionserver/ReplicationSourceManager.java | 2 +-
3 files changed, 36 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccf293d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index c92b53d..69db31c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List;
import java.util.UUID;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -57,6 +58,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
private final String peerId;
private final UUID clusterId;
private final MetricsSource metrics;
+ private final Abortable abortable;
@InterfaceAudience.Private
public Context(
@@ -66,7 +68,8 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
final UUID clusterId,
final ReplicationPeer replicationPeer,
final MetricsSource metrics,
- final TableDescriptors tableDescriptors) {
+ final TableDescriptors tableDescriptors,
+ final Abortable abortable) {
this.conf = conf;
this.fs = fs;
this.clusterId = clusterId;
@@ -74,6 +77,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
this.replicationPeer = replicationPeer;
this.metrics = metrics;
this.tableDescriptors = tableDescriptors;
+ this.abortable = abortable;
}
public Configuration getConfiguration() {
return conf;
@@ -99,6 +103,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
public TableDescriptors getTableDescriptors() {
return tableDescriptors;
}
+ public Abortable getAbortable() { return abortable; }
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccf293d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 28340b5..bf3fd1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -40,6 +40,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableNotFoundException;
@@ -71,17 +72,19 @@ import org.apache.hadoop.ipc.RemoteException;
public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
- private ClusterConnection conn;
- private Configuration conf;
+ private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
+ private ClusterConnection conn;
+ private Configuration conf;
// How long should we sleep for each retry
private long sleepForRetries;
-
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
// Socket timeouts require even bolder actions since we don't want to DDOS
private int socketTimeoutMultiplier;
+ // Amount of time for shutdown to wait for all tasks to complete
+ private long maxTerminationWait;
//Metrics for this source
private MetricsSource metrics;
// Handles connecting to peer region servers
@@ -93,6 +96,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private Path baseNamespaceDir;
private Path hfileArchiveDir;
private boolean replicationBulkLoadDataEnabled;
+ private Abortable abortable;
@Override
public void init(Context context) throws IOException {
@@ -102,6 +106,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
maxRetriesMultiplier);
+ // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
+ // tasks to terminate when doStop() is called.
+ long maxTerminationWaitMultiplier = this.conf.getLong(
+ "replication.source.maxterminationmultiplier",
+ DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
+ this.maxTerminationWait = maxTerminationWaitMultiplier *
+ this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
// TODO: This connection is replication specific or we should make it particular to
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
@@ -117,6 +128,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
this.exec.allowCoreThreadTimeOut(true);
+ this.abortable = ctx.getAbortable();
this.replicationBulkLoadDataEnabled =
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
@@ -211,7 +223,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
}
}
- while (this.isRunning()) {
+ while (this.isRunning() && !exec.isShutdown()) {
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
@@ -321,7 +333,19 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
LOG.warn("Failed to close the connection");
}
}
- exec.shutdownNow();
+ // Allow currently running replication tasks to finish
+ exec.shutdown();
+ try {
+ exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ }
+ // Abort if the tasks did not terminate in time
+ if (!exec.isTerminated()) {
+ String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " +
+ "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " +
+ "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
+ abortable.abort(errMsg, new IOException(errMsg));
+ }
notifyStopped();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccf293d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 143d6e2..e2a232f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -489,7 +489,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// init replication endpoint
replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
- fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
+ fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server));
return src;
}