You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/10/06 23:36:11 UTC
hbase git commit: HBASE-16681: Flaky
TestReplicationSourceManagerZkImpl
Repository: hbase
Updated Branches:
refs/heads/master 97c133383 -> 2c7211ec4
HBASE-16681: Flaky TestReplicationSourceManagerZkImpl
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2c7211ec
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2c7211ec
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2c7211ec
Branch: refs/heads/master
Commit: 2c7211ec4bd6d83e498ddc82e60d70f411140ee0
Parents: 97c1333
Author: Ashu Pachauri <as...@gmail.com>
Authored: Fri Sep 23 16:04:08 2016 -0700
Committer: Apekshit Sharma <ap...@apache.org>
Committed: Thu Oct 6 16:26:38 2016 -0700
----------------------------------------------------------------------
.../TestReplicationSourceManager.java | 59 +++++++++++++++-----
1 file changed, 46 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c7211ec/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 7174d5f..c074048 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -56,7 +57,9 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -64,11 +67,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -437,28 +442,45 @@ public abstract class TestReplicationSourceManager {
String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
try {
DummyServer server = new DummyServer();
- ReplicationQueues rq =
+ final ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(
server.getConfiguration(), server, server.getZooKeeper()));
rq.init(server.getServerName().toString());
// Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
// initialization to throw an exception.
- conf.set("replication.replicationsource.implementation", "fakeReplicationSourceImpl");
- ReplicationPeers rp = manager.getReplicationPeers();
+ conf.set("replication.replicationsource.implementation",
+ FailInitializeDummyReplicationSource.class.getName());
+ final ReplicationPeers rp = manager.getReplicationPeers();
// Set up the znode and ReplicationPeer for the fake peer
rp.registerPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"));
- rp.peerConnected("FakePeer");
- // Have ReplicationSourceManager add the fake peer. It should fail to initialize a
- // ReplicationSourceInterface.
- List<String> fakePeers = new ArrayList<>();
- fakePeers.add("FakePeer");
- manager.peerListChanged(fakePeers);
+ // Wait for the peer to get created and connected
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return (rp.getConnectedPeer("FakePeer") != null);
+ }
+ });
+
+ // Make sure that the replication source was not initialized
+ List<ReplicationSourceInterface> sources = manager.getSources();
+ for (ReplicationSourceInterface source : sources) {
+ assertNotEquals("FakePeer", source.getPeerClusterId());
+ }
+
// Create a replication queue for the fake peer
rq.addLog("FakePeer", "FakeFile");
- // Removing the peer should remove both the replication queue and the ReplicationPeer
- manager.removePeer("FakePeer");
- assertFalse(rq.getAllQueues().contains("FakePeer"));
- assertNull(rp.getConnectedPeer("FakePeer"));
+ // Unregister peer, this should remove the peer and clear all queues associated with it
+ // Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
+ rp.unregisterPeer("FakePeer");
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ List<String> peers = rp.getAllPeerIds();
+ return (!rq.getAllQueues().contains("FakePeer"))
+ && (rp.getConnectedPeer("FakePeer") == null)
+ && (!peers.contains("FakePeer"));
+ }
+ });
} finally {
conf.set("replication.replicationsource.implementation", replicationSourceImplName);
}
@@ -553,6 +575,17 @@ public abstract class TestReplicationSourceManager {
}
}
+ static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
+
+ @Override
+ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+ ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
+ UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
+ throws IOException {
+ throw new IOException("Failing deliberately");
+ }
+ }
+
static class DummyServer implements Server {
String hostname;