You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by as...@apache.org on 2016/06/03 13:15:02 UTC
hbase git commit: HBASE-15888 Extend HBASE-12769 for bulk load data
replication
Repository: hbase
Updated Branches:
refs/heads/master 72d3f2a86 -> 0cbce0762
HBASE-15888 Extend HBASE-12769 for bulk load data replication
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0cbce076
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0cbce076
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0cbce076
Branch: refs/heads/master
Commit: 0cbce07626b77d9aa75a16f5e52c19428865dce7
Parents: 72d3f2a
Author: Ashish Singhi <as...@apache.org>
Authored: Fri Jun 3 18:42:00 2016 +0530
Committer: Ashish Singhi <as...@apache.org>
Committed: Fri Jun 3 18:42:00 2016 +0530
----------------------------------------------------------------------
.../replication/ReplicationPeersZKImpl.java | 6 ++
.../hbase/util/hbck/ReplicationChecker.java | 59 ++++++++++++++++++--
2 files changed, 61 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/0cbce076/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 09d2100..15265d9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -550,6 +550,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
}
}
+ // Check for hfile-refs queue
+ if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
+ && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) {
+ throw new ReplicationException("Undeleted queue for peerId: " + peerId
+ + ", found in hfile-refs node path " + hfileRefsZNode);
+ }
} catch (KeeperException e) {
throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0cbce076/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
index 89f2557..e472558 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -51,16 +51,21 @@ import org.apache.zookeeper.KeeperException;
@InterfaceAudience.Private
public class ReplicationChecker {
private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
+ private final ZooKeeperWatcher zkw;
private final ErrorReporter errorReporter;
private final ReplicationQueuesClient queuesClient;
private final ReplicationPeers replicationPeers;
private final ReplicationQueueDeletor queueDeletor;
// replicator with its queueIds for removed peers
private final Map<String, List<String>> undeletedQueueIds = new HashMap<>();
-
+ // replicator with its undeleted queueIds for removed peers in hfile-refs queue
+ private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
+ private final String hfileRefsZNode;
+
public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, ClusterConnection connection,
ErrorReporter errorReporter) throws IOException {
try {
+ this.zkw = zkw;
this.errorReporter = errorReporter;
this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection);
this.queuesClient.init();
@@ -71,6 +76,13 @@ public class ReplicationChecker {
} catch (ReplicationException e) {
throw new IOException("failed to construct ReplicationChecker", e);
}
+
+ String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
+ String replicationZNode = ZKUtil.joinZNode(this.zkw.baseZNode, replicationZNodeName);
+ String hfileRefsZNodeName =
+ conf.get(ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+ ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
+ hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName);
}
public boolean hasUnDeletedQueues() {
@@ -103,13 +115,37 @@ public class ReplicationChecker {
} catch (KeeperException ke) {
throw new IOException(ke);
}
+
+ checkUnDeletedHFileRefsQueues(peerIds);
+ }
+
+ private void checkUnDeletedHFileRefsQueues(Set<String> peerIds) throws IOException {
+ try {
+ if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
+ return;
+ }
+ List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue();
+ Set<String> peers = new HashSet<>(listOfPeers);
+ peers.removeAll(peerIds);
+ if (!peers.isEmpty()) {
+ undeletedHFileRefsQueueIds.addAll(peers);
+ String msg =
+ "Undeleted replication hfile-refs queue for removed peer found: "
+ + undeletedHFileRefsQueueIds + " under hfile-refs node " + hfileRefsZNode;
+ errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
+ msg);
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Failed to get list of all peers from hfile-refs znode "
+ + hfileRefsZNode, e);
+ }
}
-
+
private static class ReplicationQueueDeletor extends ReplicationStateZKBase {
public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) {
super(zk, conf, abortable);
}
-
+
public void removeQueue(String replicator, String queueId) throws IOException {
String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
queueId);
@@ -122,7 +158,7 @@ public class ReplicationChecker {
}
}
}
-
+
public void fixUnDeletedQueues() throws IOException {
for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
String replicator = replicatorAndQueueIds.getKey();
@@ -130,5 +166,20 @@ public class ReplicationChecker {
queueDeletor.removeQueue(replicator, queueId);
}
}
+ fixUnDeletedHFileRefsQueue();
+ }
+
+ private void fixUnDeletedHFileRefsQueue() throws IOException {
+ for (String hfileRefsQueueId : undeletedHFileRefsQueueIds) {
+ String node = ZKUtil.joinZNode(hfileRefsZNode, hfileRefsQueueId);
+ try {
+ ZKUtil.deleteNodeRecursively(this.zkw, node);
+ LOG.info("Successfully deleted hfile-refs queue " + hfileRefsQueueId + " from path "
+ + hfileRefsZNode);
+ } catch (KeeperException e) {
+ throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
+ + " from path " + hfileRefsZNode);
+ }
+ }
}
}