You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/10/28 03:40:39 UTC

hbase git commit: HBASE-12769 Replication fails to delete all corresponding zk nodes when peer is removed (Jianwei Cui)

Repository: hbase
Updated Branches:
  refs/heads/master e24d03b10 -> 210c3dd93


HBASE-12769 Replication fails to delete all corresponding zk nodes when peer is removed (Jianwei Cui)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/210c3dd9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/210c3dd9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/210c3dd9

Branch: refs/heads/master
Commit: 210c3dd93748b5de65301f2cca2342f36e169b78
Parents: e24d03b
Author: tedyu <yu...@gmail.com>
Authored: Tue Oct 27 19:40:40 2015 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Oct 27 19:40:40 2015 -0700

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    |   7 +-
 .../hbase/replication/ReplicationFactory.java   |   7 +-
 .../replication/ReplicationPeersZKImpl.java     |  25 +++-
 .../org/apache/hadoop/hbase/util/HBaseFsck.java |  39 +++++-
 .../hbase/util/hbck/ReplicationChecker.java     | 134 +++++++++++++++++++
 .../replication/TestReplicationAdmin.java       |  34 +++++
 .../hadoop/hbase/util/BaseTestHBaseFsck.java    |   3 +-
 .../hadoop/hbase/util/TestHBaseFsckOneRS.java   |  59 +++++++-
 .../hadoop/hbase/util/hbck/HbckTestingUtil.java |  11 +-
 9 files changed, 303 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index b33e64d..8bd1267 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -125,11 +125,12 @@ public class ReplicationAdmin implements Closeable {
     try {
       zkw = createZooKeeperWatcher();
       try {
-        this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
-        this.replicationPeers.init();
         this.replicationQueuesClient =
             ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
         this.replicationQueuesClient.init();
+        this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
+          this.replicationQueuesClient, this.connection);
+        this.replicationPeers.init();
       } catch (Exception exception) {
         if (zkw != null) {
           zkw.close();
@@ -187,7 +188,7 @@ public class ReplicationAdmin implements Closeable {
     this.replicationPeers.addPeer(id,
       new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
   }
-
+  
   /**
    * Add a new remote slave cluster for replication.
    * @param id a short name that identifies the cluster

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index f115a39..91e77ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -42,7 +42,12 @@ public class ReplicationFactory {
 
   public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
       Abortable abortable) {
-    return new ReplicationPeersZKImpl(zk, conf, abortable);
+    return getReplicationPeers(zk, conf, null, abortable);
+  }
+  
+  public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
+      final ReplicationQueuesClient queuesClient, Abortable abortable) {
+    return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable);
   }
 
   public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper,

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index a223531..1884469 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -81,14 +81,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   // Map of peer clusters keyed by their id
   private Map<String, ReplicationPeerZKImpl> peerClusters;
   private final String tableCFsNodeName;
+  private final ReplicationQueuesClient queuesClient;
 
   private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
 
   public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
-      Abortable abortable) {
+      final ReplicationQueuesClient queuesClient, Abortable abortable) {
     super(zk, conf, abortable);
     this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
     this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>();
+    this.queuesClient = queuesClient;
   }
 
   @Override
@@ -116,6 +118,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
         throw new IllegalArgumentException("Found invalid peer name:" + id);
       }
 
+      checkQueuesDeleted(id);
+      
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
       List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
       ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
@@ -561,5 +565,22 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     return ProtobufUtil.prependPBMagic(bytes);
   }
 
-
+  private void checkQueuesDeleted(String peerId) throws ReplicationException {
+    if (queuesClient == null) return;
+    try {
+      List<String> replicators = queuesClient.getListOfReplicators();
+      for (String replicator : replicators) {
+        List<String> queueIds = queuesClient.getAllQueues(replicator);
+        for (String queueId : queueIds) {
+          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+          if (queueInfo.getPeerId().equals(peerId)) {
+            throw new ReplicationException("undeleted queue for peerId: " + peerId
+                + ", replicator: " + replicator + ", queueId: " + queueId);
+          }
+        }
+      }
+    } catch (KeeperException e) {
+      throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index e55b53f..88c5427 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
 import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
+import org.apache.hadoop.hbase.util.hbck.ReplicationChecker;
 import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
 import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
 import org.apache.hadoop.hbase.util.hbck.TableLockChecker;
@@ -246,6 +247,7 @@ public class HBaseFsck extends Configured implements Closeable {
   private boolean fixReferenceFiles = false; // fix lingering reference store file
   private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows
   private boolean fixTableLocks = false; // fix table locks which are expired
+  private boolean fixReplication = false; // fix undeleted replication queues for removed peer
   private boolean fixAny = false; // Set to true if any of the fix is required.
 
   // limit checking/fixes to listed tables, if empty attempt to check/fix all
@@ -702,6 +704,8 @@ public class HBaseFsck extends Configured implements Closeable {
 
     checkAndFixTableLocks();
 
+    checkAndFixReplication();
+    
     // Remove the hbck lock
     unlockHbck();
 
@@ -3257,12 +3261,29 @@ public class HBaseFsck extends Configured implements Closeable {
   }
 
   private void checkAndFixTableLocks() throws IOException {
-    TableLockChecker checker = new TableLockChecker(createZooKeeperWatcher(), errors);
+    ZooKeeperWatcher zkw = createZooKeeperWatcher();
+    TableLockChecker checker = new TableLockChecker(zkw, errors);
     checker.checkTableLocks();
 
     if (this.fixTableLocks) {
       checker.fixExpiredTableLocks();
     }
+    zkw.close();
+  }
+  
+  private void checkAndFixReplication() throws IOException {
+    ZooKeeperWatcher zkw = createZooKeeperWatcher();
+    try {
+      ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, connection, errors);
+      checker.checkUnDeletedQueues();
+
+      if (checker.hasUnDeletedQueues() && this.fixReplication) {
+        checker.fixUnDeletedQueues();
+        setShouldRerun();
+      }
+    } finally {
+      zkw.close();
+    }
   }
 
   /**
@@ -3801,7 +3822,7 @@ public class HBaseFsck extends Configured implements Closeable {
       HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION,
       ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE,
       WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK, BOUNDARIES_ERROR, ORPHAN_TABLE_STATE,
-      NO_TABLE_STATE
+      NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE
     }
     void clear();
     void report(String message);
@@ -4202,6 +4223,14 @@ public class HBaseFsck extends Configured implements Closeable {
     fixTableLocks = shouldFix;
     fixAny |= shouldFix;
   }
+  
+  /**
+   * Set replication fix mode.
+   */
+  public void setFixReplication(boolean shouldFix) {
+    fixReplication = shouldFix;
+    fixAny |= shouldFix;
+  }
 
   /**
    * Check if we should rerun fsck again. This checks if we've tried to
@@ -4462,6 +4491,10 @@ public class HBaseFsck extends Configured implements Closeable {
     out.println("  Table lock options");
     out.println("   -fixTableLocks    Deletes table locks held for a long time (hbase.table.lock.expire.ms, 10min by default)");
 
+    out.println("");
+    out.println(" Replication options");
+    out.println("   -fixReplication   Deletes replication queues for removed peers");
+    
     out.flush();
     errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString());
 
@@ -4647,6 +4680,8 @@ public class HBaseFsck extends Configured implements Closeable {
         setRegionBoundariesCheck();
       } else if (cmd.equals("-fixTableLocks")) {
         setFixTableLocks(true);
+      } else if (cmd.equals("-fixReplication")) {
+        setFixReplication(true);
       } else if (cmd.startsWith("-")) {
         errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd);
         return printUsageAndExit();

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
new file mode 100644
index 0000000..bf44a50
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.hbck;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.util.HBaseFsck;
+import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/*
+ * Check and fix undeleted replication queues for removed peerId.
+ */
+@InterfaceAudience.Private
+public class ReplicationChecker {
+  private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
+  private ErrorReporter errorReporter;
+  private ReplicationQueuesClient queuesClient;
+  private ReplicationPeers replicationPeers;
+  private ReplicationQueueDeletor queueDeletor;
+  // replicator with its queueIds for removed peers
+  private Map<String, List<String>> undeletedQueueIds = new HashMap<String, List<String>>();
+  
+  public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, HConnection connection,
+      ErrorReporter errorReporter) throws IOException {
+    try {
+      this.errorReporter = errorReporter;
+      this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection);
+      this.queuesClient.init();
+      this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient,
+        connection);
+      this.replicationPeers.init();
+      this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection);
+    } catch (ReplicationException e) {
+      throw new IOException("failed to construct ReplicationChecker", e);
+    }
+  }
+
+  public boolean hasUnDeletedQueues() {
+    return errorReporter.getErrorList()
+        .contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
+  }
+
+  public void checkUnDeletedQueues() throws IOException {
+    Set<String> peerIds = new HashSet<String>(this.replicationPeers.getAllPeerIds());
+    try {
+      List<String> replicators = this.queuesClient.getListOfReplicators();
+      for (String replicator : replicators) {
+        List<String> queueIds = this.queuesClient.getAllQueues(replicator);
+        for (String queueId : queueIds) {
+          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+          if (!peerIds.contains(queueInfo.getPeerId())) {
+            if (!undeletedQueueIds.containsKey(replicator)) {
+              undeletedQueueIds.put(replicator, new ArrayList<String>());
+            }
+            undeletedQueueIds.get(replicator).add(queueId);
+
+            String msg = "Undeleted replication queue for removed peer found: "
+                + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
+                  queueInfo.getPeerId(), replicator, queueId);
+            errorReporter.reportError(
+              HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg);
+          }
+        }
+      }
+    } catch (KeeperException ke) {
+      throw new IOException(ke);
+    }
+  }
+  
+  private static class ReplicationQueueDeletor extends ReplicationStateZKBase {
+    public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) {
+      super(zk, conf, abortable);
+    }
+    
+    public void removeQueue(String replicator, String queueId) throws IOException {
+      String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
+        queueId);
+      try {
+        ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
+        LOG.info("remove replication queue, replicator: " + replicator + ", queueId: " + queueId);
+      } catch (KeeperException e) {
+        throw new IOException("failed to delete queue, replicator: " + replicator + ", queueId: "
+            + queueId);
+      }
+    }
+  }
+  
+  public void fixUnDeletedQueues() throws IOException {
+    for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
+      String replicator = replicatorAndQueueIds.getKey();
+      for (String queueId : replicatorAndQueueIds.getValue()) {
+        queueDeletor.removeQueue(replicator, queueId);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index e126205..e187b9b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -26,8 +26,11 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -114,6 +117,37 @@ public class TestReplicationAdmin {
     admin.removePeer(ID_SECOND);
     assertEquals(0, admin.getPeersCount());
   }
+  
+  @Test
+  public void testAddPeerWithUnDeletedQueues() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test HBaseAdmin", null);
+    ReplicationQueues repQueues =
+        ReplicationFactory.getReplicationQueues(zkw, conf, null);
+    repQueues.init("server1");
+
+    // add queue for ID_ONE
+    repQueues.addLog(ID_ONE, "file1");
+    try {
+      admin.addPeer(ID_ONE, KEY_ONE);
+      fail();
+    } catch (ReplicationException e) {
+      // OK!
+    }
+    repQueues.removeQueue(ID_ONE);
+    assertEquals(0, repQueues.getAllQueues().size());
+    
+    // add recovered queue for ID_ONE
+    repQueues.addLog(ID_ONE + "-server2", "file1");
+    try {
+      admin.addPeer(ID_ONE, KEY_ONE);
+      fail();
+    } catch (ReplicationException e) {
+      // OK!
+    }
+    repQueues.removeAllQueues();
+    zkw.close();
+  }
 
   /**
    * basic checks that when we add a peer that it is enabled, and that we can disable

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java
index 7459a7d..8e8bb41 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java
@@ -498,7 +498,8 @@ public class BaseTestHBaseFsck {
 
       // fix hole
       assertErrors(
-          doFsck(conf, false, true, false, false, false, false, false, false, false, false, null),
+          doFsck(conf, false, true, false, false, false, false, false, false, false, false, false,
+            null),
           new ERROR_CODE[] { ERROR_CODE.NOT_IN_META_OR_DEPLOYED,
               ERROR_CODE.NOT_IN_META_OR_DEPLOYED });
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
index a44ccd1..df3c69c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.io.hfile.TestHFile;
 import org.apache.hadoop.hbase.master.AssignmentManager;
@@ -50,10 +51,13 @@ import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
 import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -862,7 +866,7 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck {
         // for some time until children references are deleted. HBCK erroneously sees this as
         // overlapping regions
         HBaseFsck hbck = doFsck(conf, true, true, false, false, false, true, true, true, false,
-            false, null);
+            false, false, null);
         assertErrors(hbck, new HBaseFsck.ErrorReporter.ERROR_CODE[] {}); //no LINGERING_SPLIT_PARENT reported
 
         // assert that the split hbase:meta entry is still there.
@@ -941,7 +945,7 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck {
 
         // now fix it. The fix should not revert the region split, but add daughters to META
         hbck = doFsck(conf, true, true, false, false, false, false, false, false, false,
-            false, null);
+            false, false, null);
         assertErrors(hbck, new HBaseFsck.ErrorReporter.ERROR_CODE[] {
             HBaseFsck.ErrorReporter.ERROR_CODE.NOT_IN_META_OR_DEPLOYED,
             HBaseFsck.ErrorReporter.ERROR_CODE.NOT_IN_META_OR_DEPLOYED,
@@ -1474,4 +1478,55 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck {
     writeLock.release(); // release for clean state
     tableLockManager.tableDeleted(tableName);
   }
+
+  @Test(timeout=180000)
+  public void testCheckReplication() throws Exception {
+    // check no errors
+    HBaseFsck hbck = doFsck(conf, false);
+    assertNoErrors(hbck);
+    
+    // create peer
+    ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf);
+    Assert.assertEquals(0, replicationAdmin.getPeersCount());
+    int zkPort = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
+      HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+    replicationAdmin.addPeer("1", "127.0.0.1:" + zkPort + ":/hbase");
+    replicationAdmin.getPeersCount();
+    Assert.assertEquals(1, replicationAdmin.getPeersCount());
+    
+    // create replicator
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test Hbase Fsck", connection);
+    ReplicationQueues repQueues =
+        ReplicationFactory.getReplicationQueues(zkw, conf, connection);
+    repQueues.init("server1");
+    // queues for current peer, no errors
+    repQueues.addLog("1", "file1");
+    repQueues.addLog("1-server2", "file1");
+    Assert.assertEquals(2, repQueues.getAllQueues().size());
+    hbck = doFsck(conf, false);
+    assertNoErrors(hbck);
+    
+    // queues for removed peer
+    repQueues.addLog("2", "file1");
+    repQueues.addLog("2-server2", "file1");
+    Assert.assertEquals(4, repQueues.getAllQueues().size());
+    hbck = doFsck(conf, false);
+    assertErrors(hbck, new HBaseFsck.ErrorReporter.ERROR_CODE[] {
+        HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
+        HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE });
+    
+    // fix the case
+    hbck = doFsck(conf, true);
+    hbck = doFsck(conf, false);
+    assertNoErrors(hbck);
+    // ensure only "2" is deleted
+    Assert.assertEquals(2, repQueues.getAllQueues().size());
+    Assert.assertNull(repQueues.getLogsInQueue("2"));
+    Assert.assertNull(repQueues.getLogsInQueue("2-sever2"));
+    
+    replicationAdmin.removePeer("1");
+    repQueues.removeAllQueues();
+    zkw.close();
+    replicationAdmin.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
index a28378e..d1e774e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
@@ -40,13 +40,13 @@ public class HbckTestingUtil {
 
   public static HBaseFsck doFsck(
       Configuration conf, boolean fix, TableName table) throws Exception {
-    return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table);
+    return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table);
   }
 
-  public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments,
-      boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps,
-      boolean fixHdfsOrphans, boolean fixTableOrphans, boolean fixVersionFile,
-      boolean fixReferenceFiles, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks,
+  public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta,
+      boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans,
+      boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles,
+      boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, Boolean fixReplication,
       TableName table) throws Exception {
     HBaseFsck fsck = new HBaseFsck(conf, exec);
     try {
@@ -62,6 +62,7 @@ public class HbckTestingUtil {
       fsck.setFixReferenceFiles(fixReferenceFiles);
       fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo);
       fsck.setFixTableLocks(fixTableLocks);
+      fsck.setFixReplication(fixReplication);
       if (table != null) {
         fsck.includeTable(table);
       }