You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2016/07/23 01:14:54 UTC
hbase git commit: HBASE-16096 Properly remove the replication queue
and peer znodes after calling ReplicationSourceManager.removePeer().
Repository: hbase
Updated Branches:
refs/heads/master 03fe257a6 -> 744248c13
HBASE-16096 Properly remove the replication queue and peer znodes after calling ReplicationSourceManager.removePeer().
Signed-off-by: Elliott Clark <ec...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/744248c1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/744248c1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/744248c1
Branch: refs/heads/master
Commit: 744248c131d344e5ddab3cfe032aad919dc0de0f
Parents: 03fe257
Author: Joseph Hwang <jz...@fb.com>
Authored: Thu Jun 30 15:18:33 2016 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Fri Jul 22 18:11:41 2016 -0700
----------------------------------------------------------------------
.../regionserver/ReplicationSourceManager.java | 15 +++++---
.../TestReplicationSourceManager.java | 40 ++++++++++++++++++++
2 files changed, 50 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/744248c1/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index e2a232f..586aace 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -561,9 +561,10 @@ public class ReplicationSourceManager implements ReplicationListener {
srcToRemove.add(src);
}
}
- if (srcToRemove.size() == 0) {
- LOG.error("The queue we wanted to close is missing " + id);
- return;
+ if (srcToRemove.isEmpty()) {
+ LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. " +
+ "This could mean that ReplicationSourceInterface initialization failed for this peer " +
+ "and that replication on this peer may not be caught up. peerId=" + id);
}
for (ReplicationSourceInterface toRemove : srcToRemove) {
toRemove.terminate(terminateMessage);
@@ -739,8 +740,6 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
-
-
/**
* Get the directory where wals are archived
* @return the directory where wals are archived
@@ -766,6 +765,12 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
+ * Get the ReplicationPeers used by this ReplicationSourceManager
+ * @return the ReplicationPeers used by this ReplicationSourceManager
+ */
+ public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
+
+ /**
* Get a string representation of all the sources' metrics
*/
public String getStats() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/744248c1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 4442bbb..7696e95 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
@@ -424,6 +425,45 @@ public abstract class TestReplicationSourceManager {
scopes.containsKey(f2));
}
+ /**
+ * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the
+ * corresponding ReplicationSourceInterface correctly cleans up the corresponding
+ * replication queue and ReplicationPeer.
+ * See HBASE-16096.
+ * @throws Exception
+ */
+ @Test
+ public void testPeerRemovalCleanup() throws Exception{
+ String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
+ try {
+ DummyServer server = new DummyServer();
+ ReplicationQueues rq =
+ ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(
+ server.getConfiguration(), server, server.getZooKeeper()));
+ rq.init(server.getServerName().toString());
+ // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
+ // initialization to throw an exception.
+ conf.set("replication.replicationsource.implementation", "fakeReplicationSourceImpl");
+ ReplicationPeers rp = manager.getReplicationPeers();
+ // Set up the znode and ReplicationPeer for the fake peer
+ rp.registerPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"));
+ rp.peerConnected("FakePeer");
+ // Have ReplicationSourceManager add the fake peer. It should fail to initialize a
+ // ReplicationSourceInterface.
+ List<String> fakePeers = new ArrayList<>();
+ fakePeers.add("FakePeer");
+ manager.peerListChanged(fakePeers);
+ // Create a replication queue for the fake peer
+ rq.addLog("FakePeer", "FakeFile");
+ // Removing the peer should remove both the replication queue and the ReplicationPeer
+ manager.removePeer("FakePeer");
+ assertFalse(rq.getAllQueues().contains("FakePeer"));
+ assertNull(rp.getConnectedPeer("FakePeer"));
+ } finally {
+ conf.set("replication.replicationsource.implementation", replicationSourceImplName);
+ }
+ }
+
private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
// 1. Create store files for the families
Map<byte[], List<Path>> storeFiles = new HashMap<>(1);