You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/12/21 16:26:30 UTC

hbase git commit: HBASE-17341 Add a timeout during replication endpoint termination (Vincent Poon)

Repository: hbase
Updated Branches:
  refs/heads/master e1f4aaeac -> cac0904c1


HBASE-17341 Add a timeout during replication endpoint termination (Vincent Poon)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cac0904c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cac0904c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cac0904c

Branch: refs/heads/master
Commit: cac0904c16dde9eb7bdbb57e4a33224dd4edb77f
Parents: e1f4aae
Author: tedyu <yu...@gmail.com>
Authored: Wed Dec 21 08:26:22 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Wed Dec 21 08:26:22 2016 -0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         |  6 ++-
 .../replication/TestReplicationSource.java      | 54 ++++++++++++++++++++
 2 files changed, 58 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cac0904c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 97368e6..3fb5f94 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -430,9 +430,11 @@ public class ReplicationSource extends Thread
       }
       if (future != null) {
         try {
-          future.get();
+          future.get(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
         } catch (Exception e) {
-          LOG.warn("Got exception:" + e);
+          LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
+              + this.peerClusterZnode,
+            e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cac0904c/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index 375a866..abdd68a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -21,12 +21,18 @@ package org.apache.hadoop.hbase.replication;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.Waiter.Predicate;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -36,6 +42,8 @@ import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -43,6 +51,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.mockito.Mockito.mock;
+
 @Category({ReplicationTests.class, MediumTests.class})
 public class TestReplicationSource {
 
@@ -111,5 +121,49 @@ public class TestReplicationSource {
     reader.close();
   }
 
+  /**
+   * Tests that {@link ReplicationSource#terminate(String)} will timeout properly
+   */
+  @Test
+  public void testTerminateTimeout() throws Exception {
+    ReplicationSource source = new ReplicationSource();
+    ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() {
+      @Override
+      protected void doStart() {
+        notifyStarted();
+      }
+
+      @Override
+      protected void doStop() {
+        // not calling notifyStopped() here causes the caller of stop() to get a Future that never
+        // completes
+      }
+    };
+    replicationEndpoint.start();
+    ReplicationPeers mockPeers = mock(ReplicationPeers.class);
+    Configuration testConf = HBaseConfiguration.create();
+    testConf.setInt("replication.source.maxretriesmultiplier", 1);
+    source.init(testConf, null, null, null, mockPeers, null, "testPeer", null,
+      replicationEndpoint, null);
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future<?> future = executor.submit(new Runnable() {
+
+      @Override
+      public void run() {
+        source.terminate("testing source termination");
+      }
+    });
+    long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
+    Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return future.isDone();
+      }
+
+    });
+
+  }
+
 }