You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/12/07 23:10:21 UTC

[1/2] hbase git commit: HBASE-16947 Some improvements for DumpReplicationQueues tool

Repository: hbase
Updated Branches:
  refs/heads/0.98 695812aab -> 52566bb03


HBASE-16947 Some improvements for DumpReplicationQueues tool

Signed-off-by: Michael Stack <st...@apache.org>
Amending-Author: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/52566bb0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/52566bb0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/52566bb0

Branch: refs/heads/0.98
Commit: 52566bb0325b9f9ac38450ae04c7a9e5892a493d
Parents: 20ee646
Author: Guanghao Zhang <zg...@gmail.com>
Authored: Fri Oct 28 13:20:50 2016 +0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Dec 7 09:36:18 2016 -0800

----------------------------------------------------------------------
 .../ReplicationQueuesClientZKImpl.java          |   4 +-
 .../replication/ReplicationQueuesZKImpl.java    |   4 +-
 .../regionserver/DumpReplicationQueues.java     | 122 ++++++++++++++-----
 3 files changed, 98 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/52566bb0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index 581327b..d28ca1e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -40,7 +40,9 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
   @Override
   public void init() throws ReplicationException {
     try {
-      ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
+      if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) {
+        ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
+      }
     } catch (KeeperException e) {
       throw new ReplicationException("Internal error while initializing a queues client", e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/52566bb0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index e1ede20..1647d81 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -80,7 +80,9 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
   public void init(String serverName) throws ReplicationException {
     this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
     try {
-      ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
+      if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) {
+        ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
+      }
     } catch (KeeperException e) {
       throw new ReplicationException("Could not initialize replication queues.", e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/52566bb0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 8cb84f6..78591e4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -37,6 +37,9 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.common.util.concurrent.AtomicLongMap;
+
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.*;
 
@@ -52,6 +55,20 @@ public class DumpReplicationQueues extends Configured implements Tool {
 
   private static final Log LOG = LogFactory.getLog(DumpReplicationQueues.class.getName());
 
+  private List<String> deadRegionServers;
+  private List<String> deletedQueues;
+  private AtomicLongMap<String> peersQueueSize;
+  private long totalSizeOfWALs;
+  private long numWalsNotFound;
+
+  public DumpReplicationQueues() {
+    deadRegionServers = new ArrayList<String>();
+    deletedQueues = new ArrayList<String>();
+    peersQueueSize = AtomicLongMap.create();
+    totalSizeOfWALs = 0;
+    numWalsNotFound = 0;
+  }
+
   static class DumpOptions {
     boolean hdfs = false;
     boolean distributed = false;
@@ -152,13 +169,14 @@ public class DumpReplicationQueues extends Configured implements Tool {
     if (message != null && message.length() > 0) {
       System.err.println(message);
     }
-    System.err.println("Usage: java " + className + " \\");
+    System.err.println("Usage: bin/hbase " + className + " \\");
     System.err.println("  <OPTIONS> [-D<property=value>]*");
     System.err.println();
     System.err.println("General Options:");
-    System.err.println(" distributed  Poll each RS and print its own replication queue. "
+    System.err.println(" -h|--h|--help  Show this help and exit.");
+    System.err.println(" --distributed  Poll each RS and print its own replication queue. "
         + "Default only polls ZooKeeper");
-    System.err.println(" hdfs         Use HDFS to calculate usage of WALs by replication. It could be overestimated"
+    System.err.println(" --hdfs         Use HDFS to calculate usage of WALs by replication. It could be overestimated"
         + " if replicating to multiple peers. --distributed flag is also needed.");
   }
 
@@ -173,7 +191,6 @@ public class DumpReplicationQueues extends Configured implements Tool {
     HBaseAdmin.checkHBaseAvailable(conf);
     ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf);
     HConnection connection = HConnectionManager.createConnection(conf);
-    long deleted = 0;
 
     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(),
         new WarnOnlyAbortable(), true);
@@ -200,11 +217,8 @@ public class DumpReplicationQueues extends Configured implements Tool {
 
       if (opts.isDistributed()) {
         LOG.info("Found [--distributed], will poll each RegionServer.");
-        System.out.println(dumpQueues(connection, zkw, opts.isHdfs(), deleted));
-        if (deleted > 0) {
-          LOG.warn("Found " + deleted +" deleted queues"
-              + ", run hbck -fixReplication in order to remove the deleted replication queues");
-        }
+        System.out.println(dumpQueues(connection, peerConfigs.keySet(), zkw, opts.isHdfs()));
+        System.out.println(dumpReplicationSummary());
       } else {
         // use ZK instead
         System.out.print("Dumping replication znodes via ZooKeeper:");
@@ -218,21 +232,51 @@ public class DumpReplicationQueues extends Configured implements Tool {
     }
   }
 
+  public String dumpReplicationSummary() {
+    StringBuilder sb = new StringBuilder();
+    if (!deletedQueues.isEmpty()) {
+      sb.append("Found " + deletedQueues.size() + " deleted queues"
+          + ", run hbck -fixReplication in order to remove the deleted replication queues\n");
+      for (String deletedQueue : deletedQueues) {
+        sb.append("    " + deletedQueue + "\n");
+      }
+    }
+    if (!deadRegionServers.isEmpty()) {
+      sb.append("Found " + deadRegionServers.size() + " dead regionservers"
+          + ", restart one regionserver to transfer the queues of dead regionservers\n");
+      for (String deadRs : deadRegionServers) {
+        sb.append("    " + deadRs + "\n");
+      }
+    }
+    if (!peersQueueSize.isEmpty()) {
+      sb.append("Dumping all peers's number of WALs in replication queue\n");
+      for (Map.Entry<String, Long> entry : peersQueueSize.asMap().entrySet()) {
+        sb.append("    PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n");
+      }
+    }
+    sb.append("    Total size of WALs on HDFS: " + StringUtils.humanReadableInt(totalSizeOfWALs) + "\n");
+    if (numWalsNotFound > 0) {
+      sb.append("    ERROR: There are " + numWalsNotFound + " WALs not found!!!\n");
+    }
+    return sb.toString();
+  }
+
   public String dumpPeersState(ReplicationAdmin replicationAdmin,
-                              Map<String, ReplicationPeerConfig> peerConfigs) throws Exception {
+      Map<String, ReplicationPeerConfig> peerConfigs) throws Exception {
     Map<String, String> currentConf;
     StringBuilder sb = new StringBuilder();
     for (Map.Entry<String, ReplicationPeerConfig> peer : peerConfigs.entrySet()) {
       try {
+        ReplicationPeerConfig peerConfig = peer.getValue();
         sb.append("Peer: " + peer.getKey() + "\n");
-        sb.append("    " + "State: " + (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : "DISABLED") + "\n");
-        sb.append("    " + "Cluster Name: " + peer.getValue() + "\n");
-        currentConf = peer.getValue().getConfiguration();
+        sb.append("    " + "State: "
+            + (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : "DISABLED") + "\n");
+        sb.append("    " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
+        currentConf = peerConfig.getConfiguration();
         // Only show when we have a custom configuration for the peer
         if (currentConf.size() > 1) {
           sb.append("    " + "Peer Configuration: " + currentConf + "\n");
         }
-        sb.append("    " + "Peer Table CFs: " + replicationAdmin.getPeerTableCFs(peer.getKey()) + "\n");
       } catch (ReplicationException re) {
         sb.append("Got an exception while invoking ReplicationAdmin: " + re + "\n");
       }
@@ -240,33 +284,36 @@ public class DumpReplicationQueues extends Configured implements Tool {
     return sb.toString();
   }
 
-  public String dumpQueues(HConnection connection, ZooKeeperWatcher zkw, boolean hdfs, long deleted)
-      throws Exception {
+  public String dumpQueues(HConnection connection, Set<String> peerIds, ZooKeeperWatcher zkw,
+      boolean hdfs) throws Exception {
     ReplicationQueuesClient queuesClient;
     ReplicationPeers replicationPeers;
     ReplicationQueues replicationQueues;
-
+    ReplicationTracker replicationTracker;
     StringBuilder sb = new StringBuilder();
 
     queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, getConf(), connection);
     queuesClient.init();
     replicationQueues = ReplicationFactory.getReplicationQueues(zkw, getConf(), connection);
     replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), connection);
-    replicationPeers.init();
+    replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
+      new WarnOnlyAbortable(), new WarnOnlyStoppable());
+    List<String> liveRegionServers = replicationTracker.getListOfRegionServers();
 
     // Loops each peer on each RS and dumps the queues
-
-    Set<String> peerIds = new HashSet<String>(replicationPeers.getAllPeerIds());
     try {
       List<String> regionservers = queuesClient.getListOfReplicators();
       for (String regionserver : regionservers) {
         List<String> queueIds = queuesClient.getAllQueues(regionserver);
         replicationQueues.init(regionserver);
+        if (!liveRegionServers.contains(regionserver)) {
+          deadRegionServers.add(regionserver);
+        }
         for (String queueId : queueIds) {
           ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
           List<String> wals = queuesClient.getLogsInQueue(regionserver, queueId);
           if (!peerIds.contains(queueInfo.getPeerId())) {
-            deleted++;
+            deletedQueues.add(regionserver + "/" + queueId);
             sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs));
           } else {
             sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs));
@@ -279,14 +326,14 @@ public class DumpReplicationQueues extends Configured implements Tool {
     return sb.toString();
   }
 
-  private String formatQueue(String regionserver, ReplicationQueues replicationQueues, ReplicationQueueInfo queueInfo,
-                           String queueId, List<String> wals, boolean isDeleted, boolean hdfs) throws Exception {
-
+  private String formatQueue(String regionserver, ReplicationQueues replicationQueues,
+      ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
+      boolean hdfs) throws Exception {
     StringBuilder sb = new StringBuilder();
-
     List<String> deadServers ;
 
     sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
+    sb.append("    Queue znode: " + queueId + "\n");
     sb.append("    PeerID: " + queueInfo.getPeerId() + "\n");
     sb.append("    Recovered: " + queueInfo.isQueueRecovered() + "\n");
     deadServers = queueInfo.getDeadRegionServers();
@@ -297,6 +344,8 @@ public class DumpReplicationQueues extends Configured implements Tool {
     }
     sb.append("    Was deleted: " + isDeleted + "\n");
     sb.append("    Number of WALs in replication queue: " + wals.size() + "\n");
+    peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
+
     for (String wal : wals) {
       long position = replicationQueues.getLogPosition(queueInfo.getPeerId(), wal);
       sb.append("    Replication position for " + wal + ": " + (position > 0 ? position : "0"
@@ -321,11 +370,18 @@ public class DumpReplicationQueues extends Configured implements Tool {
       try {
         fileStatus = (new WALLink(getConf(), server, wal)).getFileStatus(fs);
       } catch (IOException e) {
-        LOG.warn("WAL " + wal + " couldn't be found, skipping");
-        break;
+        if (e instanceof FileNotFoundException) {
+          numWalsNotFound++;
+          LOG.warn("WAL " + wal + " couldn't be found, skipping", e);
+        } else {
+          LOG.warn("Can't get file status of WAL " + wal + ", skipping", e);
+        }
+        continue;
       }
       size += fileStatus.getLen();
     }
+
+    totalSizeOfWALs += size;
     return size;
   }
 
@@ -344,9 +400,15 @@ public class DumpReplicationQueues extends Configured implements Tool {
     }
   }
 
-  private static void usage(final String errorMsg) {
-    if (errorMsg != null && errorMsg.length() > 0) {
-      LOG.error(errorMsg);
+  private static class WarnOnlyStoppable implements Stoppable {
+    @Override
+    public void stop(String why) {
+      LOG.warn("DumpReplicationQueue received stop, ignoring.  Reason: " + why);
+    }
+
+    @Override
+    public boolean isStopped() {
+      return false;
     }
   }
 }


[2/2] hbase git commit: HBASE-16450 Shell tool to dump replication queues

Posted by ap...@apache.org.
HBASE-16450 Shell tool to dump replication queues

New tool to dump existing replication peers, configurations and
queues when using HBase Replication. The tool provides two flags:

 --distributed  This flag will poll each RS for information about
                the replication queues being processed on this RS.
                By default this is not enabled and the information
                about the replication queues and configuration will
                be obtained from ZooKeeper.
 --hdfs         When --distributed is used, this flag will attempt
                to calculate the total size of the WAL files used
                by the replication queues. Since its possible that
                multiple peers can be configured this value can be
                overestimated.

Signed-off-by: Matteo Bertozzi <ma...@cloudera.com>
Amending-Author: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/20ee6464
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/20ee6464
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/20ee6464

Branch: refs/heads/0.98
Commit: 20ee64642f858fe5fdb2bc32811752a85f502aca
Parents: 695812a
Author: Esteban Gutierrez <es...@apache.org>
Authored: Mon Aug 22 19:53:29 2016 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Dec 7 09:36:18 2016 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/zookeeper/ZKUtil.java   |  11 +
 .../org/apache/hadoop/hbase/io/WALLink.java     |  69 ++++
 .../regionserver/DumpReplicationQueues.java     | 352 +++++++++++++++++++
 3 files changed, 432 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/20ee6464/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index c7bed68..d37adf4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -1798,6 +1798,17 @@ public class ZKUtil {
     }
   }
 
+  /**
+   * Returns a string with replication znodes and position of the replication log
+   * @param zkw
+   * @return aq string of replication znodes and log positions
+   */
+  public static String getReplicationZnodesDump(ZooKeeperWatcher zkw) throws KeeperException {
+    StringBuilder sb = new StringBuilder();
+    getReplicationZnodesDump(zkw, sb);
+    return sb.toString();
+  }
+
   private static void appendRSZnodes(ZooKeeperWatcher zkw, String znode, StringBuilder sb)
       throws KeeperException {
     List<String> stack = new LinkedList<String>();

http://git-wip-us.apache.org/repos/asf/hbase/blob/20ee6464/hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java
new file mode 100644
index 0000000..344d496
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * WALLink describes a link to a WAL.
+ *
+ * An wal can be in /hbase/.logs/&lt;server&gt;/&lt;wal&gt;
+ * or it can be in /hbase/.oldlogs/&lt;wal&gt;
+ *
+ * The link checks first in the original path,
+ * if it is not present it fallbacks to the archived path.
+ */
+@InterfaceAudience.Private
+public class WALLink extends FileLink {
+  /**
+   * @param conf {@link Configuration} from which to extract specific archive locations
+   * @param serverName Region Server owner of the log
+   * @param logName WAL file name
+   * @throws IOException on unexpected error.
+   */
+  public WALLink(final Configuration conf,
+      final String serverName, final String logName) throws IOException {
+    this(FSUtils.getRootDir(conf), serverName, logName);
+  }
+
+  /**
+   * @param rootDir Path to the root directory where hbase files are stored
+   * @param serverName Region Server owner of the log
+   * @param logName WAL file name
+   */
+  public WALLink(final Path rootDir, final String serverName, final String logName) {
+    final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    final Path logDir = new Path(new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), serverName);
+    setLocations(new Path(logDir, logName), new Path(oldLogDir, logName));
+  }
+
+  /**
+   * @param originPath Path to the wal in the log directory
+   * @param archivePath Path to the wal in the archived log directory
+   */
+  public WALLink(final Path originPath, final Path archivePath) {
+    setLocations(originPath, archivePath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/20ee6464/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
new file mode 100644
index 0000000..8cb84f6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.io.WALLink;
+import org.apache.hadoop.hbase.replication.*;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Provides information about the existing states of replication, replication peers and queues.
+ *
+ * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
+ * Arguments: --distributed    Polls each RS to dump information about the queue
+ *            --hdfs           Reports HDFS usage by the replication queues (note: can be overestimated).
+ */
+
+public class DumpReplicationQueues extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(DumpReplicationQueues.class.getName());
+
+  static class DumpOptions {
+    boolean hdfs = false;
+    boolean distributed = false;
+
+    public DumpOptions() {
+    }
+
+    public DumpOptions(DumpOptions that) {
+      this.hdfs = that.hdfs;
+      this.distributed = that.distributed;
+    }
+
+    boolean isHdfs () {
+      return hdfs;
+    }
+
+    boolean isDistributed() {
+      return distributed;
+    }
+
+    void setHdfs (boolean hdfs) {
+      this.hdfs = hdfs;
+    }
+
+    void setDistributed(boolean distributed) {
+      this.distributed = distributed;
+    }
+  }
+
+  static DumpOptions parseOpts(Queue<String> args) {
+    DumpOptions opts = new DumpOptions();
+
+    String cmd = null;
+    while ((cmd = args.poll()) != null) {
+      if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
+        // place item back onto queue so that caller knows parsing was incomplete
+        args.add(cmd);
+        break;
+      }
+      final String hdfs = "--hdfs";
+      if (cmd.equals(hdfs)) {
+        opts.setHdfs(true);
+        continue;
+      }
+      final String distributed = "--distributed";
+      if (cmd.equals(distributed)) {
+        opts.setDistributed(true);
+        continue;
+      } else {
+        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
+      }
+      // check that --distributed is present when --hdfs is in the arguments
+      if (!opts.isDistributed()  && opts.isHdfs()) {
+        printUsageAndExit("ERROR: --hdfs option can only be used with --distributed: " + cmd, -1);
+      }
+    }
+    return opts;
+  }
+
+  /**
+   * Main
+   *
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    int ret = ToolRunner.run(conf, new DumpReplicationQueues(), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    int errCode = -1;
+    LinkedList<String> argv = new LinkedList<String>();
+    argv.addAll(Arrays.asList(args));
+    DumpOptions opts = parseOpts(argv);
+
+    // args remaining, print help and exit
+    if (!argv.isEmpty()) {
+      errCode = 0;
+      printUsage();
+      return errCode;
+    }
+    return dumpReplicationQueues(opts);
+  }
+
+  protected void printUsage() {
+    printUsage(this.getClass().getName(), null);
+  }
+
+  protected static void printUsage(final String message) {
+    printUsage(DumpReplicationQueues.class.getName(), message);
+  }
+
+  protected static void printUsage(final String className, final String message) {
+    if (message != null && message.length() > 0) {
+      System.err.println(message);
+    }
+    System.err.println("Usage: java " + className + " \\");
+    System.err.println("  <OPTIONS> [-D<property=value>]*");
+    System.err.println();
+    System.err.println("General Options:");
+    System.err.println(" distributed  Poll each RS and print its own replication queue. "
+        + "Default only polls ZooKeeper");
+    System.err.println(" hdfs         Use HDFS to calculate usage of WALs by replication. It could be overestimated"
+        + " if replicating to multiple peers. --distributed flag is also needed.");
+  }
+
+  protected static void printUsageAndExit(final String message, final int exitCode) {
+    printUsage(message);
+    System.exit(exitCode);
+  }
+
+  private int dumpReplicationQueues(DumpOptions opts) throws Exception {
+
+    Configuration conf = getConf();
+    HBaseAdmin.checkHBaseAvailable(conf);
+    ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf);
+    HConnection connection = HConnectionManager.createConnection(conf);
+    long deleted = 0;
+
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(),
+        new WarnOnlyAbortable(), true);
+
+    try {
+      // Our zk watcher
+      LOG.info("Our Quorum: " + zkw.getQuorum());
+      List<HashMap<String, String>> replicatedTables = replicationAdmin.listReplicated();
+      if (replicatedTables.isEmpty()) {
+        LOG.info("No tables with a configured replication peer were found.");
+        return(0);
+      } else {
+        LOG.info("Replicated Tables: " + replicatedTables);
+      }
+
+      Map<String, ReplicationPeerConfig> peerConfigs = replicationAdmin.listPeerConfigs();
+
+      if (peerConfigs.isEmpty()) {
+        LOG.info("Replication is enabled but no peer configuration was found.");
+      }
+
+      System.out.println("Dumping replication peers and configurations:");
+      System.out.println(dumpPeersState(replicationAdmin, peerConfigs));
+
+      if (opts.isDistributed()) {
+        LOG.info("Found [--distributed], will poll each RegionServer.");
+        System.out.println(dumpQueues(connection, zkw, opts.isHdfs(), deleted));
+        if (deleted > 0) {
+          LOG.warn("Found " + deleted +" deleted queues"
+              + ", run hbck -fixReplication in order to remove the deleted replication queues");
+        }
+      } else {
+        // use ZK instead
+        System.out.print("Dumping replication znodes via ZooKeeper:");
+        System.out.println(ZKUtil.getReplicationZnodesDump(zkw));
+      }
+      return (0);
+    } catch (IOException e) {
+      return (-1);
+    } finally {
+      zkw.close();
+    }
+  }
+
+  public String dumpPeersState(ReplicationAdmin replicationAdmin,
+                              Map<String, ReplicationPeerConfig> peerConfigs) throws Exception {
+    Map<String, String> currentConf;
+    StringBuilder sb = new StringBuilder();
+    for (Map.Entry<String, ReplicationPeerConfig> peer : peerConfigs.entrySet()) {
+      try {
+        sb.append("Peer: " + peer.getKey() + "\n");
+        sb.append("    " + "State: " + (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : "DISABLED") + "\n");
+        sb.append("    " + "Cluster Name: " + peer.getValue() + "\n");
+        currentConf = peer.getValue().getConfiguration();
+        // Only show when we have a custom configuration for the peer
+        if (currentConf.size() > 1) {
+          sb.append("    " + "Peer Configuration: " + currentConf + "\n");
+        }
+        sb.append("    " + "Peer Table CFs: " + replicationAdmin.getPeerTableCFs(peer.getKey()) + "\n");
+      } catch (ReplicationException re) {
+        sb.append("Got an exception while invoking ReplicationAdmin: " + re + "\n");
+      }
+    }
+    return sb.toString();
+  }
+
+  public String dumpQueues(HConnection connection, ZooKeeperWatcher zkw, boolean hdfs, long deleted)
+      throws Exception {
+    ReplicationQueuesClient queuesClient;
+    ReplicationPeers replicationPeers;
+    ReplicationQueues replicationQueues;
+
+    StringBuilder sb = new StringBuilder();
+
+    queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, getConf(), connection);
+    queuesClient.init();
+    replicationQueues = ReplicationFactory.getReplicationQueues(zkw, getConf(), connection);
+    replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), connection);
+    replicationPeers.init();
+
+    // Loops each peer on each RS and dumps the queues
+
+    Set<String> peerIds = new HashSet<String>(replicationPeers.getAllPeerIds());
+    try {
+      List<String> regionservers = queuesClient.getListOfReplicators();
+      for (String regionserver : regionservers) {
+        List<String> queueIds = queuesClient.getAllQueues(regionserver);
+        replicationQueues.init(regionserver);
+        for (String queueId : queueIds) {
+          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+          List<String> wals = queuesClient.getLogsInQueue(regionserver, queueId);
+          if (!peerIds.contains(queueInfo.getPeerId())) {
+            deleted++;
+            sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs));
+          } else {
+            sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs));
+          }
+        }
+      }
+    } catch (KeeperException ke) {
+      throw new IOException(ke);
+    }
+    return sb.toString();
+  }
+
+  private String formatQueue(String regionserver, ReplicationQueues replicationQueues, ReplicationQueueInfo queueInfo,
+                           String queueId, List<String> wals, boolean isDeleted, boolean hdfs) throws Exception {
+
+    StringBuilder sb = new StringBuilder();
+
+    List<String> deadServers ;
+
+    sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
+    sb.append("    PeerID: " + queueInfo.getPeerId() + "\n");
+    sb.append("    Recovered: " + queueInfo.isQueueRecovered() + "\n");
+    deadServers = queueInfo.getDeadRegionServers();
+    if (deadServers.isEmpty()) {
+      sb.append("    No dead RegionServers found in this queue." + "\n");
+    } else {
+      sb.append("    Dead RegionServers: " + deadServers + "\n");
+    }
+    sb.append("    Was deleted: " + isDeleted + "\n");
+    sb.append("    Number of WALs in replication queue: " + wals.size() + "\n");
+    for (String wal : wals) {
+      long position = replicationQueues.getLogPosition(queueInfo.getPeerId(), wal);
+      sb.append("    Replication position for " + wal + ": " + (position > 0 ? position : "0"
+          + " (not started or nothing to replicate)") + "\n");
+    }
+
+    if (hdfs) {
+      FileSystem fs = FileSystem.get(getConf());
+      sb.append("    Total size of WALs on HDFS for this queue: "
+          + StringUtils.humanReadableInt(getTotalWALSize(fs, wals, regionserver)) + "\n");
+    }
+    return sb.toString();
+  }
+  /**
+   *  return total size in bytes from a list of WALs
+   */
+  private long getTotalWALSize(FileSystem fs, List<String> wals, String server) throws IOException {
+    int size = 0;
+    FileStatus fileStatus;
+
+    for (String wal : wals) {
+      try {
+        fileStatus = (new WALLink(getConf(), server, wal)).getFileStatus(fs);
+      } catch (IOException e) {
+        LOG.warn("WAL " + wal + " couldn't be found, skipping");
+        break;
+      }
+      size += fileStatus.getLen();
+    }
+    return size;
+  }
+
+  private static class WarnOnlyAbortable implements Abortable {
+    @Override
+    public void abort(String why, Throwable e) {
+      LOG.warn("DumpReplicationQueue received abort, ignoring.  Reason: " + why);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(e);
+      }
+    }
+
+    @Override
+    public boolean isAborted() {
+      return false;
+    }
+  }
+
+  private static void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      LOG.error(errorMsg);
+    }
+  }
+}