You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2016/11/01 19:50:12 UTC
[48/50] [abbrv] hbase git commit: HBASE-16947 Some improvements for
DumpReplicationQueues tool
HBASE-16947 Some improvements for DumpReplicationQueues tool
Signed-off-by: Michael Stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/722be3a3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/722be3a3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/722be3a3
Branch: refs/heads/branch-1
Commit: 722be3a3d8e8bde9b557c62fb1cbb83d21c0a38b
Parents: a969f8d
Author: Guanghao Zhang <zg...@gmail.com>
Authored: Fri Oct 28 13:20:50 2016 +0800
Committer: Michael Stack <st...@apache.org>
Committed: Mon Oct 31 21:00:45 2016 -0700
----------------------------------------------------------------------
.../ReplicationQueuesClientZKImpl.java | 4 +-
.../replication/ReplicationQueuesZKImpl.java | 8 +-
.../regionserver/DumpReplicationQueues.java | 123 ++++++++++++++-----
3 files changed, 102 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/722be3a3/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index cc407e3..9078e40 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -40,7 +40,9 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
@Override
public void init() throws ReplicationException {
try {
- ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
+ if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) {
+ ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
+ }
} catch (KeeperException e) {
throw new ReplicationException("Internal error while initializing a queues client", e);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/722be3a3/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index c7af78e..9beaba7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -82,14 +82,18 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
public void init(String serverName) throws ReplicationException {
this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
try {
- ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
+ if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) {
+ ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
+ }
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize replication queues.", e);
}
if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
try {
- ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
+ if (ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) < 0) {
+ ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
+ }
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize hfile references replication queue.",
e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/722be3a3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index bf38d6f..0772f89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -40,6 +40,9 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;
import org.mortbay.util.IO;
+import com.google.common.util.concurrent.AtomicLongMap;
+
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.*;
@@ -55,6 +58,20 @@ public class DumpReplicationQueues extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(DumpReplicationQueues.class.getName());
+ private List<String> deadRegionServers;
+ private List<String> deletedQueues;
+ private AtomicLongMap<String> peersQueueSize;
+ private long totalSizeOfWALs;
+ private long numWalsNotFound;
+
+ public DumpReplicationQueues() {
+ deadRegionServers = new ArrayList<String>();
+ deletedQueues = new ArrayList<String>();
+ peersQueueSize = AtomicLongMap.create();
+ totalSizeOfWALs = 0;
+ numWalsNotFound = 0;
+ }
+
static class DumpOptions {
boolean hdfs = false;
boolean distributed = false;
@@ -155,13 +172,14 @@ public class DumpReplicationQueues extends Configured implements Tool {
if (message != null && message.length() > 0) {
System.err.println(message);
}
- System.err.println("Usage: java " + className + " \\");
+ System.err.println("Usage: bin/hbase " + className + " \\");
System.err.println(" <OPTIONS> [-D<property=value>]*");
System.err.println();
System.err.println("General Options:");
- System.err.println(" distributed Poll each RS and print its own replication queue. "
+ System.err.println(" -h|--h|--help Show this help and exit.");
+ System.err.println(" --distributed Poll each RS and print its own replication queue. "
+ "Default only polls ZooKeeper");
- System.err.println(" hdfs Use HDFS to calculate usage of WALs by replication. It could be overestimated"
+ System.err.println(" --hdfs Use HDFS to calculate usage of WALs by replication. It could be overestimated"
+ " if replicating to multiple peers. --distributed flag is also needed.");
}
@@ -176,7 +194,6 @@ public class DumpReplicationQueues extends Configured implements Tool {
HBaseAdmin.checkHBaseAvailable(conf);
ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf);
ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
- long deleted = 0;
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(),
new WarnOnlyAbortable(), true);
@@ -203,11 +220,8 @@ public class DumpReplicationQueues extends Configured implements Tool {
if (opts.isDistributed()) {
LOG.info("Found [--distributed], will poll each RegionServer.");
- System.out.println(dumpQueues(connection, zkw, opts.isHdfs(), deleted));
- if (deleted > 0) {
- LOG.warn("Found " + deleted +" deleted queues"
- + ", run hbck -fixReplication in order to remove the deleted replication queues");
- }
+ System.out.println(dumpQueues(connection, peerConfigs.keySet(), zkw, opts.isHdfs()));
+ System.out.println(dumpReplicationSummary());
} else {
// use ZK instead
System.out.print("Dumping replication znodes via ZooKeeper:");
@@ -221,21 +235,52 @@ public class DumpReplicationQueues extends Configured implements Tool {
}
}
+ public String dumpReplicationSummary() {
+ StringBuilder sb = new StringBuilder();
+ if (!deletedQueues.isEmpty()) {
+ sb.append("Found " + deletedQueues.size() + " deleted queues"
+ + ", run hbck -fixReplication in order to remove the deleted replication queues\n");
+ for (String deletedQueue : deletedQueues) {
+ sb.append(" " + deletedQueue + "\n");
+ }
+ }
+ if (!deadRegionServers.isEmpty()) {
+ sb.append("Found " + deadRegionServers.size() + " dead regionservers"
+ + ", restart one regionserver to transfer the queues of dead regionservers\n");
+ for (String deadRs : deadRegionServers) {
+ sb.append(" " + deadRs + "\n");
+ }
+ }
+ if (!peersQueueSize.isEmpty()) {
+ sb.append("Dumping all peers's number of WALs in replication queue\n");
+ for (Map.Entry<String, Long> entry : peersQueueSize.asMap().entrySet()) {
+ sb.append(" PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n");
+ }
+ }
+ sb.append(" Total size of WALs on HDFS: " + StringUtils.humanSize(totalSizeOfWALs) + "\n");
+ if (numWalsNotFound > 0) {
+ sb.append(" ERROR: There are " + numWalsNotFound + " WALs not found!!!\n");
+ }
+ return sb.toString();
+ }
+
public String dumpPeersState(ReplicationAdmin replicationAdmin,
- Map<String, ReplicationPeerConfig> peerConfigs) throws Exception {
+ Map<String, ReplicationPeerConfig> peerConfigs) throws Exception {
Map<String, String> currentConf;
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, ReplicationPeerConfig> peer : peerConfigs.entrySet()) {
try {
+ ReplicationPeerConfig peerConfig = peer.getValue();
sb.append("Peer: " + peer.getKey() + "\n");
- sb.append(" " + "State: " + (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : "DISABLED") + "\n");
- sb.append(" " + "Cluster Name: " + peer.getValue() + "\n");
- currentConf = peer.getValue().getConfiguration();
+ sb.append(" " + "State: "
+ + (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : "DISABLED") + "\n");
+ sb.append(" " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
+ currentConf = peerConfig.getConfiguration();
// Only show when we have a custom configuration for the peer
if (currentConf.size() > 1) {
sb.append(" " + "Peer Configuration: " + currentConf + "\n");
}
- sb.append(" " + "Peer Table CFs: " + replicationAdmin.getPeerTableCFs(peer.getKey()) + "\n");
+ sb.append(" " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n");
} catch (ReplicationException re) {
sb.append("Got an exception while invoking ReplicationAdmin: " + re + "\n");
}
@@ -243,33 +288,36 @@ public class DumpReplicationQueues extends Configured implements Tool {
return sb.toString();
}
- public String dumpQueues(ClusterConnection connection, ZooKeeperWatcher zkw, boolean hdfs, long deleted)
- throws Exception {
+ public String dumpQueues(ClusterConnection connection, Set<String> peerIds, ZooKeeperWatcher zkw,
+ boolean hdfs) throws Exception {
ReplicationQueuesClient queuesClient;
ReplicationPeers replicationPeers;
ReplicationQueues replicationQueues;
-
+ ReplicationTracker replicationTracker;
StringBuilder sb = new StringBuilder();
queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, getConf(), connection);
queuesClient.init();
replicationQueues = ReplicationFactory.getReplicationQueues(zkw, getConf(), connection);
replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), queuesClient, connection);
- replicationPeers.init();
+ replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
+ new WarnOnlyAbortable(), new WarnOnlyStoppable());
+ List<String> liveRegionServers = replicationTracker.getListOfRegionServers();
// Loops each peer on each RS and dumps the queues
-
- Set<String> peerIds = new HashSet<String>(replicationPeers.getAllPeerIds());
try {
List<String> regionservers = queuesClient.getListOfReplicators();
for (String regionserver : regionservers) {
List<String> queueIds = queuesClient.getAllQueues(regionserver);
replicationQueues.init(regionserver);
+ if (!liveRegionServers.contains(regionserver)) {
+ deadRegionServers.add(regionserver);
+ }
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
List<String> wals = queuesClient.getLogsInQueue(regionserver, queueId);
if (!peerIds.contains(queueInfo.getPeerId())) {
- deleted++;
+ deletedQueues.add(regionserver + "/" + queueId);
sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs));
} else {
sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs));
@@ -282,14 +330,14 @@ public class DumpReplicationQueues extends Configured implements Tool {
return sb.toString();
}
- private String formatQueue(String regionserver, ReplicationQueues replicationQueues, ReplicationQueueInfo queueInfo,
- String queueId, List<String> wals, boolean isDeleted, boolean hdfs) throws Exception {
-
+ private String formatQueue(String regionserver, ReplicationQueues replicationQueues,
+ ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
+ boolean hdfs) throws Exception {
StringBuilder sb = new StringBuilder();
-
List<String> deadServers ;
sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
+ sb.append(" Queue znode: " + queueId + "\n");
sb.append(" PeerID: " + queueInfo.getPeerId() + "\n");
sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n");
deadServers = queueInfo.getDeadRegionServers();
@@ -300,6 +348,8 @@ public class DumpReplicationQueues extends Configured implements Tool {
}
sb.append(" Was deleted: " + isDeleted + "\n");
sb.append(" Number of WALs in replication queue: " + wals.size() + "\n");
+ peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
+
for (String wal : wals) {
long position = replicationQueues.getLogPosition(queueInfo.getPeerId(), wal);
sb.append(" Replication position for " + wal + ": " + (position > 0 ? position : "0"
@@ -324,11 +374,18 @@ public class DumpReplicationQueues extends Configured implements Tool {
try {
fileStatus = (new WALLink(getConf(), server, wal)).getFileStatus(fs);
} catch (IOException e) {
- LOG.warn("WAL " + wal + " couldn't be found, skipping");
- break;
+ if (e instanceof FileNotFoundException) {
+ numWalsNotFound++;
+ LOG.warn("WAL " + wal + " couldn't be found, skipping", e);
+ } else {
+ LOG.warn("Can't get file status of WAL " + wal + ", skipping", e);
+ }
+ continue;
}
size += fileStatus.getLen();
}
+
+ totalSizeOfWALs += size;
return size;
}
@@ -347,9 +404,15 @@ public class DumpReplicationQueues extends Configured implements Tool {
}
}
- private static void usage(final String errorMsg) {
- if (errorMsg != null && errorMsg.length() > 0) {
- LOG.error(errorMsg);
+ private static class WarnOnlyStoppable implements Stoppable {
+ @Override
+ public void stop(String why) {
+ LOG.warn("DumpReplicationQueue received stop, ignoring. Reason: " + why);
+ }
+
+ @Override
+ public boolean isStopped() {
+ return false;
}
}
}