You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/12/21 19:57:39 UTC
[4/4] hbase git commit: HBASE-17341 Add a timeout during replication
endpoint termination (Vincent Poon)
HBASE-17341 Add a timeout during replication endpoint termination (Vincent Poon)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1999c15a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1999c15a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1999c15a
Branch: refs/heads/branch-1.1
Commit: 1999c15a9adf774c39478d181accd6a15bdf29ff
Parents: d3ffdf6
Author: tedyu <yu...@gmail.com>
Authored: Wed Dec 21 08:27:45 2016 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Dec 21 11:19:00 2016 -0800
----------------------------------------------------------------------
.../regionserver/ReplicationSource.java | 6 ++-
.../replication/TestReplicationSource.java | 54 ++++++++++++++++++++
2 files changed, 58 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1999c15a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index f7dd446..10a615a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -869,9 +869,11 @@ public class ReplicationSource extends Thread
Threads.shutdown(this, this.sleepForRetries);
if (future != null) {
try {
- future.get();
+ future.get(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
} catch (Exception e) {
- LOG.warn("Got exception:" + e);
+ LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
+ + this.peerClusterZnode,
+ e);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1999c15a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index 458819d..9bf0e93 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -21,12 +21,18 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@@ -37,12 +43,16 @@ import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.mockito.Mockito.mock;
+
@Category(MediumTests.class)
public class TestReplicationSource {
@@ -111,5 +121,49 @@ public class TestReplicationSource {
reader.close();
}
+ /**
+ * Tests that {@link ReplicationSource#terminate(String)} will timeout properly
+ */
+ @Test
+ public void testTerminateTimeout() throws Exception {
+ final ReplicationSource source = new ReplicationSource();
+ ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() {
+ @Override
+ protected void doStart() {
+ notifyStarted();
+ }
+
+ @Override
+ protected void doStop() {
+ // not calling notifyStopped() here causes the caller of stop() to get a Future that never
+ // completes
+ }
+ };
+ replicationEndpoint.start();
+ ReplicationPeers mockPeers = mock(ReplicationPeers.class);
+ Configuration testConf = HBaseConfiguration.create();
+ testConf.setInt("replication.source.maxretriesmultiplier", 1);
+ source.init(testConf, null, null, null, mockPeers, null, "testPeer", null, replicationEndpoint,
+ null);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ final Future<?> future = executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ source.terminate("testing source termination");
+ }
+ });
+ long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
+ Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return future.isDone();
+ }
+
+ });
+
+ }
+
}