You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2016/07/23 01:14:54 UTC

hbase git commit: HBASE-16096 Properly remove the replication queue and peer znodes after calling ReplicationSourceManager.removePeer().

Repository: hbase
Updated Branches:
  refs/heads/master 03fe257a6 -> 744248c13


HBASE-16096 Properly remove the replication queue and peer znodes after calling ReplicationSourceManager.removePeer().

Signed-off-by: Elliott Clark <ec...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/744248c1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/744248c1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/744248c1

Branch: refs/heads/master
Commit: 744248c131d344e5ddab3cfe032aad919dc0de0f
Parents: 03fe257
Author: Joseph Hwang <jz...@fb.com>
Authored: Thu Jun 30 15:18:33 2016 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Fri Jul 22 18:11:41 2016 -0700

----------------------------------------------------------------------
 .../regionserver/ReplicationSourceManager.java  | 15 +++++---
 .../TestReplicationSourceManager.java           | 40 ++++++++++++++++++++
 2 files changed, 50 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/744248c1/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index e2a232f..586aace 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -561,9 +561,10 @@ public class ReplicationSourceManager implements ReplicationListener {
           srcToRemove.add(src);
         }
       }
-      if (srcToRemove.size() == 0) {
-        LOG.error("The queue we wanted to close is missing " + id);
-        return;
+      if (srcToRemove.isEmpty()) {
+        LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. " +
+            "This could mean that ReplicationSourceInterface initialization failed for this peer " +
+            "and that replication on this peer may not be caught up. peerId=" + id);
       }
       for (ReplicationSourceInterface toRemove : srcToRemove) {
         toRemove.terminate(terminateMessage);
@@ -739,8 +740,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
-
-
   /**
    * Get the directory where wals are archived
    * @return the directory where wals are archived
@@ -766,6 +765,12 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
+   * Get the ReplicationPeers used by this ReplicationSourceManager
+   * @return the ReplicationPeers used by this ReplicationSourceManager
+   */
+  public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
+
+  /**
    * Get a string representation of all the sources' metrics
    */
   public String getStats() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/744248c1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 4442bbb..7696e95 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
@@ -424,6 +425,45 @@ public abstract class TestReplicationSourceManager {
       scopes.containsKey(f2));
   }
 
+  /**
+   * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the
+   * corresponding ReplicationSourceInterface correctly cleans up the corresponding
+   * replication queue and ReplicationPeer.
+   * See HBASE-16096.
+   * @throws Exception
+   */
+  @Test
+  public void testPeerRemovalCleanup() throws Exception{
+    String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
+    try {
+      DummyServer server = new DummyServer();
+      ReplicationQueues rq =
+          ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(
+              server.getConfiguration(), server, server.getZooKeeper()));
+      rq.init(server.getServerName().toString());
+      // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
+      // initialization to throw an exception.
+      conf.set("replication.replicationsource.implementation", "fakeReplicationSourceImpl");
+      ReplicationPeers rp = manager.getReplicationPeers();
+      // Set up the znode and ReplicationPeer for the fake peer
+      rp.registerPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"));
+      rp.peerConnected("FakePeer");
+      // Have ReplicationSourceManager add the fake peer. It should fail to initialize a
+      // ReplicationSourceInterface.
+      List<String> fakePeers = new ArrayList<>();
+      fakePeers.add("FakePeer");
+      manager.peerListChanged(fakePeers);
+      // Create a replication queue for the fake peer
+      rq.addLog("FakePeer", "FakeFile");
+      // Removing the peer should remove both the replication queue and the ReplicationPeer
+      manager.removePeer("FakePeer");
+      assertFalse(rq.getAllQueues().contains("FakePeer"));
+      assertNull(rp.getConnectedPeer("FakePeer"));
+    } finally {
+      conf.set("replication.replicationsource.implementation", replicationSourceImplName);
+    }
+  }
+
   private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
     // 1. Create store files for the families
     Map<byte[], List<Path>> storeFiles = new HashMap<>(1);