You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/10/06 23:36:11 UTC

hbase git commit: HBASE-16681: Flaky TestReplicationSourceManagerZkImpl

Repository: hbase
Updated Branches:
  refs/heads/master 97c133383 -> 2c7211ec4


HBASE-16681: Flaky TestReplicationSourceManagerZkImpl


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2c7211ec
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2c7211ec
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2c7211ec

Branch: refs/heads/master
Commit: 2c7211ec4bd6d83e498ddc82e60d70f411140ee0
Parents: 97c1333
Author: Ashu Pachauri <as...@gmail.com>
Authored: Fri Sep 23 16:04:08 2016 -0700
Committer: Apekshit Sharma <ap...@apache.org>
Committed: Thu Oct 6 16:26:38 2016 -0700

----------------------------------------------------------------------
 .../TestReplicationSourceManager.java           | 59 +++++++++++++++-----
 1 file changed, 46 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2c7211ec/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 7174d5f..c074048 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -56,7 +57,9 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -64,11 +67,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -437,28 +442,45 @@ public abstract class TestReplicationSourceManager {
     String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
     try {
       DummyServer server = new DummyServer();
-      ReplicationQueues rq =
+      final ReplicationQueues rq =
           ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(
               server.getConfiguration(), server, server.getZooKeeper()));
       rq.init(server.getServerName().toString());
       // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
       // initialization to throw an exception.
-      conf.set("replication.replicationsource.implementation", "fakeReplicationSourceImpl");
-      ReplicationPeers rp = manager.getReplicationPeers();
+      conf.set("replication.replicationsource.implementation",
+          FailInitializeDummyReplicationSource.class.getName());
+      final ReplicationPeers rp = manager.getReplicationPeers();
       // Set up the znode and ReplicationPeer for the fake peer
       rp.registerPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"));
-      rp.peerConnected("FakePeer");
-      // Have ReplicationSourceManager add the fake peer. It should fail to initialize a
-      // ReplicationSourceInterface.
-      List<String> fakePeers = new ArrayList<>();
-      fakePeers.add("FakePeer");
-      manager.peerListChanged(fakePeers);
+      // Wait for the peer to get created and connected
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return (rp.getConnectedPeer("FakePeer") != null);
+        }
+      });
+
+      // Make sure that the replication source was not initialized
+      List<ReplicationSourceInterface> sources = manager.getSources();
+      for (ReplicationSourceInterface source : sources) {
+        assertNotEquals("FakePeer", source.getPeerClusterId());
+      }
+
       // Create a replication queue for the fake peer
       rq.addLog("FakePeer", "FakeFile");
-      // Removing the peer should remove both the replication queue and the ReplicationPeer
-      manager.removePeer("FakePeer");
-      assertFalse(rq.getAllQueues().contains("FakePeer"));
-      assertNull(rp.getConnectedPeer("FakePeer"));
+      // Unregister peer, this should remove the peer and clear all queues associated with it
+      // Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
+      rp.unregisterPeer("FakePeer");
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          List<String> peers = rp.getAllPeerIds();
+          return (!rq.getAllQueues().contains("FakePeer"))
+              && (rp.getConnectedPeer("FakePeer") == null)
+              && (!peers.contains("FakePeer"));
+          }
+      });
     } finally {
       conf.set("replication.replicationsource.implementation", replicationSourceImplName);
     }
@@ -553,6 +575,17 @@ public abstract class TestReplicationSourceManager {
     }
   }
 
+  static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
+
+    @Override
+    public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+        ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
+        UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
+        throws IOException {
+      throw new IOException("Failing deliberately");
+    }
+  }
+
   static class DummyServer implements Server {
     String hostname;