You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2016/11/01 19:50:12 UTC

[48/50] [abbrv] hbase git commit: HBASE-16947 Some improvements for DumpReplicationQueues tool

HBASE-16947 Some improvements for DumpReplicationQueues tool

Signed-off-by: Michael Stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/722be3a3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/722be3a3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/722be3a3

Branch: refs/heads/branch-1
Commit: 722be3a3d8e8bde9b557c62fb1cbb83d21c0a38b
Parents: a969f8d
Author: Guanghao Zhang <zg...@gmail.com>
Authored: Fri Oct 28 13:20:50 2016 +0800
Committer: Michael Stack <st...@apache.org>
Committed: Mon Oct 31 21:00:45 2016 -0700

----------------------------------------------------------------------
 .../ReplicationQueuesClientZKImpl.java          |   4 +-
 .../replication/ReplicationQueuesZKImpl.java    |   8 +-
 .../regionserver/DumpReplicationQueues.java     | 123 ++++++++++++++-----
 3 files changed, 102 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/722be3a3/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index cc407e3..9078e40 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -40,7 +40,9 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
   @Override
   public void init() throws ReplicationException {
     try {
-      ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
+      if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) {
+        ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
+      }
     } catch (KeeperException e) {
       throw new ReplicationException("Internal error while initializing a queues client", e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/722be3a3/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index c7af78e..9beaba7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -82,14 +82,18 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
   public void init(String serverName) throws ReplicationException {
     this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
     try {
-      ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
+      if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) {
+        ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
+      }
     } catch (KeeperException e) {
       throw new ReplicationException("Could not initialize replication queues.", e);
     }
     if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
       HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
       try {
-        ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
+        if (ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) < 0) {
+          ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
+        }
       } catch (KeeperException e) {
         throw new ReplicationException("Could not initialize hfile references replication queue.",
             e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/722be3a3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index bf38d6f..0772f89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -40,6 +40,9 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.zookeeper.KeeperException;
 import org.mortbay.util.IO;
 
+import com.google.common.util.concurrent.AtomicLongMap;
+
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.*;
 
@@ -55,6 +58,20 @@ public class DumpReplicationQueues extends Configured implements Tool {
 
   private static final Log LOG = LogFactory.getLog(DumpReplicationQueues.class.getName());
 
+  private List<String> deadRegionServers;
+  private List<String> deletedQueues;
+  private AtomicLongMap<String> peersQueueSize;
+  private long totalSizeOfWALs;
+  private long numWalsNotFound;
+
+  public DumpReplicationQueues() {
+    deadRegionServers = new ArrayList<String>();
+    deletedQueues = new ArrayList<String>();
+    peersQueueSize = AtomicLongMap.create();
+    totalSizeOfWALs = 0;
+    numWalsNotFound = 0;
+  }
+
   static class DumpOptions {
     boolean hdfs = false;
     boolean distributed = false;
@@ -155,13 +172,14 @@ public class DumpReplicationQueues extends Configured implements Tool {
     if (message != null && message.length() > 0) {
       System.err.println(message);
     }
-    System.err.println("Usage: java " + className + " \\");
+    System.err.println("Usage: bin/hbase " + className + " \\");
     System.err.println("  <OPTIONS> [-D<property=value>]*");
     System.err.println();
     System.err.println("General Options:");
-    System.err.println(" distributed  Poll each RS and print its own replication queue. "
+    System.err.println(" -h|--h|--help  Show this help and exit.");
+    System.err.println(" --distributed  Poll each RS and print its own replication queue. "
         + "Default only polls ZooKeeper");
-    System.err.println(" hdfs         Use HDFS to calculate usage of WALs by replication. It could be overestimated"
+    System.err.println(" --hdfs         Use HDFS to calculate usage of WALs by replication. It could be overestimated"
         + " if replicating to multiple peers. --distributed flag is also needed.");
   }
 
@@ -176,7 +194,6 @@ public class DumpReplicationQueues extends Configured implements Tool {
     HBaseAdmin.checkHBaseAvailable(conf);
     ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf);
     ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
-    long deleted = 0;
 
     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(),
         new WarnOnlyAbortable(), true);
@@ -203,11 +220,8 @@ public class DumpReplicationQueues extends Configured implements Tool {
 
       if (opts.isDistributed()) {
         LOG.info("Found [--distributed], will poll each RegionServer.");
-        System.out.println(dumpQueues(connection, zkw, opts.isHdfs(), deleted));
-        if (deleted > 0) {
-          LOG.warn("Found " + deleted +" deleted queues"
-              + ", run hbck -fixReplication in order to remove the deleted replication queues");
-        }
+        System.out.println(dumpQueues(connection, peerConfigs.keySet(), zkw, opts.isHdfs()));
+        System.out.println(dumpReplicationSummary());
       } else {
         // use ZK instead
         System.out.print("Dumping replication znodes via ZooKeeper:");
@@ -221,21 +235,52 @@ public class DumpReplicationQueues extends Configured implements Tool {
     }
   }
 
+  public String dumpReplicationSummary() {
+    StringBuilder sb = new StringBuilder();
+    if (!deletedQueues.isEmpty()) {
+      sb.append("Found " + deletedQueues.size() + " deleted queues"
+          + ", run hbck -fixReplication in order to remove the deleted replication queues\n");
+      for (String deletedQueue : deletedQueues) {
+        sb.append("    " + deletedQueue + "\n");
+      }
+    }
+    if (!deadRegionServers.isEmpty()) {
+      sb.append("Found " + deadRegionServers.size() + " dead regionservers"
+          + ", restart one regionserver to transfer the queues of dead regionservers\n");
+      for (String deadRs : deadRegionServers) {
+        sb.append("    " + deadRs + "\n");
+      }
+    }
+    if (!peersQueueSize.isEmpty()) {
+      sb.append("Dumping all peers's number of WALs in replication queue\n");
+      for (Map.Entry<String, Long> entry : peersQueueSize.asMap().entrySet()) {
+        sb.append("    PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n");
+      }
+    }
+    sb.append("    Total size of WALs on HDFS: " + StringUtils.humanSize(totalSizeOfWALs) + "\n");
+    if (numWalsNotFound > 0) {
+      sb.append("    ERROR: There are " + numWalsNotFound + " WALs not found!!!\n");
+    }
+    return sb.toString();
+  }
+
   public String dumpPeersState(ReplicationAdmin replicationAdmin,
-                              Map<String, ReplicationPeerConfig> peerConfigs) throws Exception {
+      Map<String, ReplicationPeerConfig> peerConfigs) throws Exception {
     Map<String, String> currentConf;
     StringBuilder sb = new StringBuilder();
     for (Map.Entry<String, ReplicationPeerConfig> peer : peerConfigs.entrySet()) {
       try {
+        ReplicationPeerConfig peerConfig = peer.getValue();
         sb.append("Peer: " + peer.getKey() + "\n");
-        sb.append("    " + "State: " + (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : "DISABLED") + "\n");
-        sb.append("    " + "Cluster Name: " + peer.getValue() + "\n");
-        currentConf = peer.getValue().getConfiguration();
+        sb.append("    " + "State: "
+            + (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : "DISABLED") + "\n");
+        sb.append("    " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
+        currentConf = peerConfig.getConfiguration();
         // Only show when we have a custom configuration for the peer
         if (currentConf.size() > 1) {
           sb.append("    " + "Peer Configuration: " + currentConf + "\n");
         }
-        sb.append("    " + "Peer Table CFs: " + replicationAdmin.getPeerTableCFs(peer.getKey()) + "\n");
+        sb.append("    " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n");
       } catch (ReplicationException re) {
         sb.append("Got an exception while invoking ReplicationAdmin: " + re + "\n");
       }
@@ -243,33 +288,36 @@ public class DumpReplicationQueues extends Configured implements Tool {
     return sb.toString();
   }
 
-  public String dumpQueues(ClusterConnection connection, ZooKeeperWatcher zkw, boolean hdfs, long deleted)
-      throws Exception {
+  public String dumpQueues(ClusterConnection connection, Set<String> peerIds, ZooKeeperWatcher zkw,
+      boolean hdfs) throws Exception {
     ReplicationQueuesClient queuesClient;
     ReplicationPeers replicationPeers;
     ReplicationQueues replicationQueues;
-
+    ReplicationTracker replicationTracker;
     StringBuilder sb = new StringBuilder();
 
     queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, getConf(), connection);
     queuesClient.init();
     replicationQueues = ReplicationFactory.getReplicationQueues(zkw, getConf(), connection);
     replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), queuesClient, connection);
-    replicationPeers.init();
+    replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
+      new WarnOnlyAbortable(), new WarnOnlyStoppable());
+    List<String> liveRegionServers = replicationTracker.getListOfRegionServers();
 
     // Loops each peer on each RS and dumps the queues
-
-    Set<String> peerIds = new HashSet<String>(replicationPeers.getAllPeerIds());
     try {
       List<String> regionservers = queuesClient.getListOfReplicators();
       for (String regionserver : regionservers) {
         List<String> queueIds = queuesClient.getAllQueues(regionserver);
         replicationQueues.init(regionserver);
+        if (!liveRegionServers.contains(regionserver)) {
+          deadRegionServers.add(regionserver);
+        }
         for (String queueId : queueIds) {
           ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
           List<String> wals = queuesClient.getLogsInQueue(regionserver, queueId);
           if (!peerIds.contains(queueInfo.getPeerId())) {
-            deleted++;
+            deletedQueues.add(regionserver + "/" + queueId);
             sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs));
           } else {
             sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs));
@@ -282,14 +330,14 @@ public class DumpReplicationQueues extends Configured implements Tool {
     return sb.toString();
   }
 
-  private String formatQueue(String regionserver, ReplicationQueues replicationQueues, ReplicationQueueInfo queueInfo,
-                           String queueId, List<String> wals, boolean isDeleted, boolean hdfs) throws Exception {
-
+  private String formatQueue(String regionserver, ReplicationQueues replicationQueues,
+      ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
+      boolean hdfs) throws Exception {
     StringBuilder sb = new StringBuilder();
-
     List<String> deadServers ;
 
     sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
+    sb.append("    Queue znode: " + queueId + "\n");
     sb.append("    PeerID: " + queueInfo.getPeerId() + "\n");
     sb.append("    Recovered: " + queueInfo.isQueueRecovered() + "\n");
     deadServers = queueInfo.getDeadRegionServers();
@@ -300,6 +348,8 @@ public class DumpReplicationQueues extends Configured implements Tool {
     }
     sb.append("    Was deleted: " + isDeleted + "\n");
     sb.append("    Number of WALs in replication queue: " + wals.size() + "\n");
+    peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
+
     for (String wal : wals) {
       long position = replicationQueues.getLogPosition(queueInfo.getPeerId(), wal);
       sb.append("    Replication position for " + wal + ": " + (position > 0 ? position : "0"
@@ -324,11 +374,18 @@ public class DumpReplicationQueues extends Configured implements Tool {
       try {
         fileStatus = (new WALLink(getConf(), server, wal)).getFileStatus(fs);
       } catch (IOException e) {
-        LOG.warn("WAL " + wal + " couldn't be found, skipping");
-        break;
+        if (e instanceof FileNotFoundException) {
+          numWalsNotFound++;
+          LOG.warn("WAL " + wal + " couldn't be found, skipping", e);
+        } else {
+          LOG.warn("Can't get file status of WAL " + wal + ", skipping", e);
+        }
+        continue;
       }
       size += fileStatus.getLen();
     }
+
+    totalSizeOfWALs += size;
     return size;
   }
 
@@ -347,9 +404,15 @@ public class DumpReplicationQueues extends Configured implements Tool {
     }
   }
 
-  private static void usage(final String errorMsg) {
-    if (errorMsg != null && errorMsg.length() > 0) {
-      LOG.error(errorMsg);
+  private static class WarnOnlyStoppable implements Stoppable {
+    @Override
+    public void stop(String why) {
+      LOG.warn("DumpReplicationQueue received stop, ignoring.  Reason: " + why);
+    }
+
+    @Override
+    public boolean isStopped() {
+      return false;
     }
   }
 }