You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/12/10 12:33:51 UTC

hbase git commit: HBASE-16336 Removing peers seems to be leaving spare queues (Guanghao Zhang)

Repository: hbase
Updated Branches:
  refs/heads/master b554e0541 -> 67420fe21


HBASE-16336 Removing peers seems to be leaving spare queues (Guanghao Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/67420fe2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/67420fe2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/67420fe2

Branch: refs/heads/master
Commit: 67420fe21dfd104f23fd74f83f955dea2f971e71
Parents: b554e05
Author: tedyu <yu...@gmail.com>
Authored: Sat Dec 10 04:33:40 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Sat Dec 10 04:33:40 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/master/HMaster.java |  17 ++
 .../cleaner/ReplicationZKNodeCleaner.java       | 202 +++++++++++++++++++
 .../cleaner/ReplicationZKNodeCleanerChore.java  |  55 +++++
 .../hbase/util/hbck/ReplicationChecker.java     | 143 +++----------
 .../cleaner/TestReplicationZKNodeCleaner.java   | 115 +++++++++++
 5 files changed, 417 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/67420fe2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 970744d..167a029 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -96,6 +96,8 @@ import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
@@ -135,6 +137,8 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
 import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.security.UserProvider;
@@ -320,6 +324,7 @@ public class HMaster extends HRegionServer implements MasterServices {
 
   CatalogJanitor catalogJanitorChore;
   private ReplicationMetaCleaner replicationMetaCleaner;
+  private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore;
   private LogCleaner logCleaner;
   private HFileCleaner hfileCleaner;
   private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
@@ -1008,6 +1013,17 @@ public class HMaster extends HRegionServer implements MasterServices {
       LOG.trace("Started service threads");
     }
 
+    // Start replication zk node cleaner
+    if (conf.getClass("hbase.region.replica.replication.replicationQueues.class",
+      ReplicationFactory.defaultReplicationQueueClass) == ReplicationQueuesZKImpl.class) {
+      try {
+        replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval,
+            new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this));
+        getChoreService().scheduleChore(replicationZKNodeCleanerChore);
+      } catch (Exception e) {
+        LOG.error("start replicationZKNodeCleanerChore failed", e);
+      }
+    }
     replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
     getChoreService().scheduleChore(replicationMetaCleaner);
   }
@@ -1043,6 +1059,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     // Clean up and close up shop
     if (this.logCleaner != null) this.logCleaner.cancel(true);
     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
+    if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true);
     if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
     if (this.quotaManager != null) this.quotaManager.stop();
     if (this.activeMasterManager != null) this.activeMasterManager.stop();

http://git-wip-us.apache.org/repos/asf/hbase/blob/67420fe2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
new file mode 100644
index 0000000..c0a1b75
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Used to clean the replication queues belonging to the peer which does not exist.
+ */
+@InterfaceAudience.Private
+public class ReplicationZKNodeCleaner {
+  private static final Log LOG = LogFactory.getLog(ReplicationZKNodeCleaner.class);
+  private final ZooKeeperWatcher zkw;
+  private final ReplicationQueuesClient queuesClient;
+  private final ReplicationPeers replicationPeers;
+  private final ReplicationQueueDeletor queueDeletor;
+
+  public ReplicationZKNodeCleaner(Configuration conf, ZooKeeperWatcher zkw, Abortable abortable)
+      throws IOException {
+    try {
+      this.zkw = zkw;
+      this.queuesClient = ReplicationFactory
+          .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw));
+      this.queuesClient.init();
+      this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient,
+        abortable);
+      this.replicationPeers.init();
+      this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
+    } catch (Exception e) {
+      throw new IOException("failed to construct ReplicationZKNodeCleaner", e);
+    }
+  }
+
+  /**
+   * @return undeletedQueues replicator with its queueIds for removed peers
+   * @throws IOException
+   */
+  public Map<String, List<String>> getUnDeletedQueues() throws IOException {
+    Map<String, List<String>> undeletedQueues = new HashMap<>();
+    Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
+    try {
+      List<String> replicators = this.queuesClient.getListOfReplicators();
+      for (String replicator : replicators) {
+        List<String> queueIds = this.queuesClient.getAllQueues(replicator);
+        for (String queueId : queueIds) {
+          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+          if (!peerIds.contains(queueInfo.getPeerId())) {
+            undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<String>()).add(
+              queueId);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Undeleted replication queue for removed peer found: "
+                  + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
+                    queueInfo.getPeerId(), replicator, queueId));
+            }
+          }
+        }
+      }
+    } catch (KeeperException ke) {
+      throw new IOException("Failed to get the replication queues of all replicators", ke);
+    }
+    return undeletedQueues;
+  }
+
+  /**
+   * @return undeletedHFileRefsQueue replicator with its undeleted queueIds for removed peers in
+   *         hfile-refs queue
+   * @throws IOException
+   */
+  public Set<String> getUnDeletedHFileRefsQueues() throws IOException {
+    Set<String> undeletedHFileRefsQueue = new HashSet<>();
+    Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
+    String hfileRefsZNode = queueDeletor.getHfileRefsZNode();
+    try {
+      if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
+        return null;
+      }
+      List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue();
+      Set<String> peers = new HashSet<>(listOfPeers);
+      peers.removeAll(peerIds);
+      if (!peers.isEmpty()) {
+        undeletedHFileRefsQueue.addAll(peers);
+      }
+    } catch (KeeperException e) {
+      throw new IOException("Failed to get list of all peers from hfile-refs znode "
+          + hfileRefsZNode, e);
+    }
+    return undeletedHFileRefsQueue;
+  }
+
+  private class ReplicationQueueDeletor extends ReplicationStateZKBase {
+
+    public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) {
+      super(zk, conf, abortable);
+    }
+
+    /**
+     * @param replicator The regionserver which has undeleted queue
+     * @param queueId The undeleted queue id
+     * @throws IOException
+     */
+    public void removeQueue(final String replicator, final String queueId) throws IOException {
+      String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
+        queueId);
+      try {
+        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+        if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) {
+          ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
+          LOG.info("Successfully removed replication queue, replicator: " + replicator
+              + ", queueId: " + queueId);
+        }
+      } catch (KeeperException e) {
+        throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: "
+            + queueId);
+      }
+    }
+
+    /**
+     * @param hfileRefsQueueId The undeleted hfile-refs queue id
+     * @throws IOException
+     */
+    public void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException {
+      String node = ZKUtil.joinZNode(this.hfileRefsZNode, hfileRefsQueueId);
+      try {
+        if (!replicationPeers.getAllPeerIds().contains(hfileRefsQueueId)) {
+          ZKUtil.deleteNodeRecursively(this.zookeeper, node);
+          LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path "
+              + hfileRefsZNode);
+        }
+      } catch (KeeperException e) {
+        throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
+            + " from path " + hfileRefsZNode);
+      }
+    }
+
+    String getHfileRefsZNode() {
+      return this.hfileRefsZNode;
+    }
+  }
+
+  /**
+   * Remove the undeleted replication queue's zk node for removed peers.
+   * @param undeletedQueues replicator with its queueIds for removed peers
+   * @throws IOException
+   */
+  public void removeQueues(final Map<String, List<String>> undeletedQueues) throws IOException {
+    for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueues.entrySet()) {
+      String replicator = replicatorAndQueueIds.getKey();
+      for (String queueId : replicatorAndQueueIds.getValue()) {
+        queueDeletor.removeQueue(replicator, queueId);
+      }
+    }
+  }
+
+  /**
+   * Remove the undeleted hfile-refs queue's zk node for removed peers.
+   * @param undeletedHFileRefsQueues replicator with its undeleted queueIds for removed peers in
+   *          hfile-refs queue
+   * @throws IOException
+   */
+  public void removeHFileRefsQueues(final Set<String> undeletedHFileRefsQueues) throws IOException {
+    for (String hfileRefsQueueId : undeletedHFileRefsQueues) {
+      queueDeletor.removeHFileRefsQueue(hfileRefsQueueId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/67420fe2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java
new file mode 100644
index 0000000..4bc1244
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.cleaner;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Chore that will clean the replication queues belonging to the peer which does not exist.
+ */
+@InterfaceAudience.Private
+public class ReplicationZKNodeCleanerChore extends ScheduledChore {
+  private static final Log LOG = LogFactory.getLog(ReplicationZKNodeCleanerChore.class);
+  private final ReplicationZKNodeCleaner cleaner;
+
+  public ReplicationZKNodeCleanerChore(Stoppable stopper, int period,
+      ReplicationZKNodeCleaner cleaner) {
+    super("ReplicationZKNodeCleanerChore", stopper, period);
+    this.cleaner = cleaner;
+  }
+
+  @Override
+  protected void chore() {
+    try {
+      Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
+      cleaner.removeQueues(undeletedQueues);
+    } catch (IOException e) {
+      LOG.warn("Failed to clean replication zk node", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/67420fe2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
index 4a430ec..4815f63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hbase.util.hbck;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -27,161 +26,75 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
-import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.util.HBaseFsck;
 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
 
 /*
  * Check and fix undeleted replication queues for removed peerId.
  */
 @InterfaceAudience.Private
 public class ReplicationChecker {
-  private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
-  private final ZooKeeperWatcher zkw;
   private final ErrorReporter errorReporter;
-  private final ReplicationQueuesClient queuesClient;
-  private final ReplicationPeers replicationPeers;
-  private final ReplicationQueueDeletor queueDeletor;
   // replicator with its queueIds for removed peers
-  private final Map<String, List<String>> undeletedQueueIds = new HashMap<>();
+  private Map<String, List<String>> undeletedQueueIds = new HashMap<>();
   // replicator with its undeleted queueIds for removed peers in hfile-refs queue
   private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
-  private final String hfileRefsZNode;
+  private final ReplicationZKNodeCleaner cleaner;
 
   public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, ClusterConnection connection,
       ErrorReporter errorReporter) throws IOException {
-    try {
-      this.zkw = zkw;
-      this.errorReporter = errorReporter;
-      this.queuesClient = ReplicationFactory.getReplicationQueuesClient(
-          new ReplicationQueuesClientArguments(conf, connection, zkw));
-      this.queuesClient.init();
-      this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient,
-        connection);
-      this.replicationPeers.init();
-      this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection);
-    } catch (Exception e) {
-      throw new IOException("failed to construct ReplicationChecker", e);
-    }
-
-    String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
-    String replicationZNode = ZKUtil.joinZNode(this.zkw.znodePaths.baseZNode, replicationZNodeName);
-    String hfileRefsZNodeName =
-        conf.get(ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
-          ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
-    hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName);
+    this.cleaner = new ReplicationZKNodeCleaner(conf, zkw, connection);
+    this.errorReporter = errorReporter;
   }
 
   public boolean hasUnDeletedQueues() {
-    return errorReporter.getErrorList()
-        .contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
+    return errorReporter.getErrorList().contains(
+      HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
   }
 
   public void checkUnDeletedQueues() throws IOException {
-    Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
-    try {
-      List<String> replicators = this.queuesClient.getListOfReplicators();
-      for (String replicator : replicators) {
-        List<String> queueIds = this.queuesClient.getAllQueues(replicator);
-        for (String queueId : queueIds) {
-          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
-          if (!peerIds.contains(queueInfo.getPeerId())) {
-            if (!undeletedQueueIds.containsKey(replicator)) {
-              undeletedQueueIds.put(replicator, new ArrayList<String>());
-            }
-            undeletedQueueIds.get(replicator).add(queueId);
-
-            String msg = "Undeleted replication queue for removed peer found: "
-                + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
-                  queueInfo.getPeerId(), replicator, queueId);
-            errorReporter.reportError(
-              HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg);
-          }
-        }
-      }
-    } catch (KeeperException ke) {
-      throw new IOException(ke);
-    }
-
-    checkUnDeletedHFileRefsQueues(peerIds);
-  }
-
-  private void checkUnDeletedHFileRefsQueues(Set<String> peerIds) throws IOException {
-    try {
-      if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
-        return;
-      }
-      List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue();
-      Set<String> peers = new HashSet<>(listOfPeers);
-      peers.removeAll(peerIds);
-      if (!peers.isEmpty()) {
-        undeletedHFileRefsQueueIds.addAll(peers);
-        String msg =
-            "Undeleted replication hfile-refs queue for removed peer found: "
-                + undeletedHFileRefsQueueIds + " under hfile-refs node " + hfileRefsZNode;
+    undeletedQueueIds = cleaner.getUnDeletedQueues();
+    for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
+      String replicator = replicatorAndQueueIds.getKey();
+      for (String queueId : replicatorAndQueueIds.getValue()) {
+        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+        String msg = "Undeleted replication queue for removed peer found: "
+            + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(),
+              replicator, queueId);
         errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
           msg);
       }
-    } catch (KeeperException e) {
-      throw new IOException("Failed to get list of all peers from hfile-refs znode "
-          + hfileRefsZNode, e);
     }
-  }
 
-  private static class ReplicationQueueDeletor extends ReplicationStateZKBase {
-    public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) {
-      super(zk, conf, abortable);
-    }
+    checkUnDeletedHFileRefsQueues();
+  }
 
-    public void removeQueue(String replicator, String queueId) throws IOException {
-      String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
-        queueId);
-      try {
-        ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
-        LOG.info("remove replication queue, replicator: " + replicator + ", queueId: " + queueId);
-      } catch (KeeperException e) {
-        throw new IOException("failed to delete queue, replicator: " + replicator + ", queueId: "
-            + queueId);
-      }
+  private void checkUnDeletedHFileRefsQueues() throws IOException {
+    undeletedHFileRefsQueueIds = cleaner.getUnDeletedHFileRefsQueues();
+    if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) {
+      String msg = "Undeleted replication hfile-refs queue for removed peer found: "
+          + undeletedHFileRefsQueueIds + " under hfile-refs node";
+      errorReporter
+          .reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg);
     }
   }
 
   public void fixUnDeletedQueues() throws IOException {
-    for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
-      String replicator = replicatorAndQueueIds.getKey();
-      for (String queueId : replicatorAndQueueIds.getValue()) {
-        queueDeletor.removeQueue(replicator, queueId);
-      }
+    if (!undeletedQueueIds.isEmpty()) {
+      cleaner.removeQueues(undeletedQueueIds);
     }
     fixUnDeletedHFileRefsQueue();
   }
 
   private void fixUnDeletedHFileRefsQueue() throws IOException {
-    for (String hfileRefsQueueId : undeletedHFileRefsQueueIds) {
-      String node = ZKUtil.joinZNode(hfileRefsZNode, hfileRefsQueueId);
-      try {
-        ZKUtil.deleteNodeRecursively(this.zkw, node);
-        LOG.info("Successfully deleted hfile-refs queue " + hfileRefsQueueId + " from path "
-            + hfileRefsZNode);
-      } catch (KeeperException e) {
-        throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
-            + " from path " + hfileRefsZNode);
-      }
+    if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) {
+      cleaner.removeHFileRefsQueues(undeletedHFileRefsQueueIds);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/67420fe2/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
new file mode 100644
index 0000000..e11143d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.cleaner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestReplicationZKNodeCleaner {
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private final String ID_ONE = "1";
+  private final String SERVER_ONE = "server1";
+  private final String ID_TWO = "2";
+  private final String SERVER_TWO = "server2";
+
+  private final Configuration conf;
+  private final ZooKeeperWatcher zkw;
+  private final ReplicationQueues repQueues;
+
+  public TestReplicationZKNodeCleaner() throws Exception {
+    conf = TEST_UTIL.getConfiguration();
+    zkw = new ZooKeeperWatcher(conf, "TestReplicationZKNodeCleaner", null);
+    repQueues = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null,
+        zkw));
+    assertTrue(repQueues instanceof ReplicationQueuesZKImpl);
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.cleaner.interval", 10000);
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testReplicationZKNodeCleaner() throws Exception {
+    repQueues.init(SERVER_ONE);
+    // add queue for ID_ONE which isn't exist
+    repQueues.addLog(ID_ONE, "file1");
+
+    ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
+    Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
+    assertEquals(1, undeletedQueues.size());
+    assertTrue(undeletedQueues.containsKey(SERVER_ONE));
+    assertEquals(1, undeletedQueues.get(SERVER_ONE).size());
+    assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
+
+    // add a recovery queue for ID_TWO which isn't exist
+    repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
+
+    undeletedQueues = cleaner.getUnDeletedQueues();
+    assertEquals(1, undeletedQueues.size());
+    assertTrue(undeletedQueues.containsKey(SERVER_ONE));
+    assertEquals(2, undeletedQueues.get(SERVER_ONE).size());
+    assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
+    assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_TWO + "-" + SERVER_TWO));
+
+    cleaner.removeQueues(undeletedQueues);
+    undeletedQueues = cleaner.getUnDeletedQueues();
+    assertEquals(0, undeletedQueues.size());
+  }
+
+  @Test
+  public void testReplicationZKNodeCleanerChore() throws Exception {
+    repQueues.init(SERVER_ONE);
+    // add queue for ID_ONE which isn't exist
+    repQueues.addLog(ID_ONE, "file1");
+    // add a recovery queue for ID_TWO which isn't exist
+    repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
+
+    // Wait the cleaner chore to run
+    Thread.sleep(20000);
+
+    ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
+    assertEquals(0, cleaner.getUnDeletedQueues().size());
+  }
+}