You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/12/10 12:33:51 UTC
hbase git commit: HBASE-16336 Removing peers seems to be leaving
spare queues (Guanghao Zhang)
Repository: hbase
Updated Branches:
refs/heads/master b554e0541 -> 67420fe21
HBASE-16336 Removing peers seems to be leaving spare queues (Guanghao Zhang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/67420fe2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/67420fe2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/67420fe2
Branch: refs/heads/master
Commit: 67420fe21dfd104f23fd74f83f955dea2f971e71
Parents: b554e05
Author: tedyu <yu...@gmail.com>
Authored: Sat Dec 10 04:33:40 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Sat Dec 10 04:33:40 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/master/HMaster.java | 17 ++
.../cleaner/ReplicationZKNodeCleaner.java | 202 +++++++++++++++++++
.../cleaner/ReplicationZKNodeCleanerChore.java | 55 +++++
.../hbase/util/hbck/ReplicationChecker.java | 143 +++----------
.../cleaner/TestReplicationZKNodeCleaner.java | 115 +++++++++++
5 files changed, 417 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/67420fe2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 970744d..167a029 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -96,6 +96,8 @@ import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
@@ -135,6 +137,8 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.UserProvider;
@@ -320,6 +324,7 @@ public class HMaster extends HRegionServer implements MasterServices {
CatalogJanitor catalogJanitorChore;
private ReplicationMetaCleaner replicationMetaCleaner;
+ private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
@@ -1008,6 +1013,17 @@ public class HMaster extends HRegionServer implements MasterServices {
LOG.trace("Started service threads");
}
+ // Start replication zk node cleaner
+ if (conf.getClass("hbase.region.replica.replication.replicationQueues.class",
+ ReplicationFactory.defaultReplicationQueueClass) == ReplicationQueuesZKImpl.class) {
+ try {
+ replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval,
+ new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this));
+ getChoreService().scheduleChore(replicationZKNodeCleanerChore);
+ } catch (Exception e) {
+ LOG.error("start replicationZKNodeCleanerChore failed", e);
+ }
+ }
replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
getChoreService().scheduleChore(replicationMetaCleaner);
}
@@ -1043,6 +1059,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// Clean up and close up shop
if (this.logCleaner != null) this.logCleaner.cancel(true);
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
+ if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true);
if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
if (this.quotaManager != null) this.quotaManager.stop();
if (this.activeMasterManager != null) this.activeMasterManager.stop();
http://git-wip-us.apache.org/repos/asf/hbase/blob/67420fe2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
new file mode 100644
index 0000000..c0a1b75
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Used to clean the replication queues belonging to the peer which does not exist.
+ */
+@InterfaceAudience.Private
+public class ReplicationZKNodeCleaner {
+ private static final Log LOG = LogFactory.getLog(ReplicationZKNodeCleaner.class);
+ private final ZooKeeperWatcher zkw;
+ private final ReplicationQueuesClient queuesClient;
+ private final ReplicationPeers replicationPeers;
+ private final ReplicationQueueDeletor queueDeletor;
+
+ public ReplicationZKNodeCleaner(Configuration conf, ZooKeeperWatcher zkw, Abortable abortable)
+ throws IOException {
+ try {
+ this.zkw = zkw;
+ this.queuesClient = ReplicationFactory
+ .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw));
+ this.queuesClient.init();
+ this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient,
+ abortable);
+ this.replicationPeers.init();
+ this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
+ } catch (Exception e) {
+ throw new IOException("failed to construct ReplicationZKNodeCleaner", e);
+ }
+ }
+
+ /**
+ * @return undeletedQueues replicator with its queueIds for removed peers
+ * @throws IOException
+ */
+ public Map<String, List<String>> getUnDeletedQueues() throws IOException {
+ Map<String, List<String>> undeletedQueues = new HashMap<>();
+ Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
+ try {
+ List<String> replicators = this.queuesClient.getListOfReplicators();
+ for (String replicator : replicators) {
+ List<String> queueIds = this.queuesClient.getAllQueues(replicator);
+ for (String queueId : queueIds) {
+ ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+ if (!peerIds.contains(queueInfo.getPeerId())) {
+ undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<String>()).add(
+ queueId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Undeleted replication queue for removed peer found: "
+ + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
+ queueInfo.getPeerId(), replicator, queueId));
+ }
+ }
+ }
+ }
+ } catch (KeeperException ke) {
+ throw new IOException("Failed to get the replication queues of all replicators", ke);
+ }
+ return undeletedQueues;
+ }
+
+ /**
+ * @return undeletedHFileRefsQueue replicator with its undeleted queueIds for removed peers in
+ * hfile-refs queue
+ * @throws IOException
+ */
+ public Set<String> getUnDeletedHFileRefsQueues() throws IOException {
+ Set<String> undeletedHFileRefsQueue = new HashSet<>();
+ Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
+ String hfileRefsZNode = queueDeletor.getHfileRefsZNode();
+ try {
+ if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
+ return null;
+ }
+ List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue();
+ Set<String> peers = new HashSet<>(listOfPeers);
+ peers.removeAll(peerIds);
+ if (!peers.isEmpty()) {
+ undeletedHFileRefsQueue.addAll(peers);
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Failed to get list of all peers from hfile-refs znode "
+ + hfileRefsZNode, e);
+ }
+ return undeletedHFileRefsQueue;
+ }
+
+ private class ReplicationQueueDeletor extends ReplicationStateZKBase {
+
+ public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) {
+ super(zk, conf, abortable);
+ }
+
+ /**
+ * @param replicator The regionserver which has undeleted queue
+ * @param queueId The undeleted queue id
+ * @throws IOException
+ */
+ public void removeQueue(final String replicator, final String queueId) throws IOException {
+ String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
+ queueId);
+ try {
+ ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+ if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) {
+ ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
+ LOG.info("Successfully removed replication queue, replicator: " + replicator
+ + ", queueId: " + queueId);
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: "
+ + queueId);
+ }
+ }
+
+ /**
+ * @param hfileRefsQueueId The undeleted hfile-refs queue id
+ * @throws IOException
+ */
+ public void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException {
+ String node = ZKUtil.joinZNode(this.hfileRefsZNode, hfileRefsQueueId);
+ try {
+ if (!replicationPeers.getAllPeerIds().contains(hfileRefsQueueId)) {
+ ZKUtil.deleteNodeRecursively(this.zookeeper, node);
+ LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path "
+ + hfileRefsZNode);
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
+ + " from path " + hfileRefsZNode);
+ }
+ }
+
+ String getHfileRefsZNode() {
+ return this.hfileRefsZNode;
+ }
+ }
+
+ /**
+ * Remove the undeleted replication queue's zk node for removed peers.
+ * @param undeletedQueues replicator with its queueIds for removed peers
+ * @throws IOException
+ */
+ public void removeQueues(final Map<String, List<String>> undeletedQueues) throws IOException {
+ for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueues.entrySet()) {
+ String replicator = replicatorAndQueueIds.getKey();
+ for (String queueId : replicatorAndQueueIds.getValue()) {
+ queueDeletor.removeQueue(replicator, queueId);
+ }
+ }
+ }
+
+ /**
+ * Remove the undeleted hfile-refs queue's zk node for removed peers.
+ * @param undeletedHFileRefsQueues replicator with its undeleted queueIds for removed peers in
+ * hfile-refs queue
+ * @throws IOException
+ */
+ public void removeHFileRefsQueues(final Set<String> undeletedHFileRefsQueues) throws IOException {
+ for (String hfileRefsQueueId : undeletedHFileRefsQueues) {
+ queueDeletor.removeHFileRefsQueue(hfileRefsQueueId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/67420fe2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java
new file mode 100644
index 0000000..4bc1244
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.cleaner;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Chore that will clean the replication queues belonging to the peer which does not exist.
+ */
+@InterfaceAudience.Private
+public class ReplicationZKNodeCleanerChore extends ScheduledChore {
+ private static final Log LOG = LogFactory.getLog(ReplicationZKNodeCleanerChore.class);
+ private final ReplicationZKNodeCleaner cleaner;
+
+ public ReplicationZKNodeCleanerChore(Stoppable stopper, int period,
+ ReplicationZKNodeCleaner cleaner) {
+ super("ReplicationZKNodeCleanerChore", stopper, period);
+ this.cleaner = cleaner;
+ }
+
+ @Override
+ protected void chore() {
+ try {
+ Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
+ cleaner.removeQueues(undeletedQueues);
+ } catch (IOException e) {
+ LOG.warn("Failed to clean replication zk node", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/67420fe2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
index 4a430ec..4815f63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.util.hbck;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -27,161 +26,75 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
-import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
/*
* Check and fix undeleted replication queues for removed peerId.
*/
@InterfaceAudience.Private
public class ReplicationChecker {
- private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
- private final ZooKeeperWatcher zkw;
private final ErrorReporter errorReporter;
- private final ReplicationQueuesClient queuesClient;
- private final ReplicationPeers replicationPeers;
- private final ReplicationQueueDeletor queueDeletor;
// replicator with its queueIds for removed peers
- private final Map<String, List<String>> undeletedQueueIds = new HashMap<>();
+ private Map<String, List<String>> undeletedQueueIds = new HashMap<>();
// replicator with its undeleted queueIds for removed peers in hfile-refs queue
private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
- private final String hfileRefsZNode;
+ private final ReplicationZKNodeCleaner cleaner;
public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, ClusterConnection connection,
ErrorReporter errorReporter) throws IOException {
- try {
- this.zkw = zkw;
- this.errorReporter = errorReporter;
- this.queuesClient = ReplicationFactory.getReplicationQueuesClient(
- new ReplicationQueuesClientArguments(conf, connection, zkw));
- this.queuesClient.init();
- this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient,
- connection);
- this.replicationPeers.init();
- this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection);
- } catch (Exception e) {
- throw new IOException("failed to construct ReplicationChecker", e);
- }
-
- String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
- String replicationZNode = ZKUtil.joinZNode(this.zkw.znodePaths.baseZNode, replicationZNodeName);
- String hfileRefsZNodeName =
- conf.get(ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
- ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
- hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName);
+ this.cleaner = new ReplicationZKNodeCleaner(conf, zkw, connection);
+ this.errorReporter = errorReporter;
}
public boolean hasUnDeletedQueues() {
- return errorReporter.getErrorList()
- .contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
+ return errorReporter.getErrorList().contains(
+ HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
}
public void checkUnDeletedQueues() throws IOException {
- Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
- try {
- List<String> replicators = this.queuesClient.getListOfReplicators();
- for (String replicator : replicators) {
- List<String> queueIds = this.queuesClient.getAllQueues(replicator);
- for (String queueId : queueIds) {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
- if (!peerIds.contains(queueInfo.getPeerId())) {
- if (!undeletedQueueIds.containsKey(replicator)) {
- undeletedQueueIds.put(replicator, new ArrayList<String>());
- }
- undeletedQueueIds.get(replicator).add(queueId);
-
- String msg = "Undeleted replication queue for removed peer found: "
- + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
- queueInfo.getPeerId(), replicator, queueId);
- errorReporter.reportError(
- HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg);
- }
- }
- }
- } catch (KeeperException ke) {
- throw new IOException(ke);
- }
-
- checkUnDeletedHFileRefsQueues(peerIds);
- }
-
- private void checkUnDeletedHFileRefsQueues(Set<String> peerIds) throws IOException {
- try {
- if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
- return;
- }
- List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue();
- Set<String> peers = new HashSet<>(listOfPeers);
- peers.removeAll(peerIds);
- if (!peers.isEmpty()) {
- undeletedHFileRefsQueueIds.addAll(peers);
- String msg =
- "Undeleted replication hfile-refs queue for removed peer found: "
- + undeletedHFileRefsQueueIds + " under hfile-refs node " + hfileRefsZNode;
+ undeletedQueueIds = cleaner.getUnDeletedQueues();
+ for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
+ String replicator = replicatorAndQueueIds.getKey();
+ for (String queueId : replicatorAndQueueIds.getValue()) {
+ ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+ String msg = "Undeleted replication queue for removed peer found: "
+ + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(),
+ replicator, queueId);
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
msg);
}
- } catch (KeeperException e) {
- throw new IOException("Failed to get list of all peers from hfile-refs znode "
- + hfileRefsZNode, e);
}
- }
- private static class ReplicationQueueDeletor extends ReplicationStateZKBase {
- public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) {
- super(zk, conf, abortable);
- }
+ checkUnDeletedHFileRefsQueues();
+ }
- public void removeQueue(String replicator, String queueId) throws IOException {
- String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
- queueId);
- try {
- ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
- LOG.info("remove replication queue, replicator: " + replicator + ", queueId: " + queueId);
- } catch (KeeperException e) {
- throw new IOException("failed to delete queue, replicator: " + replicator + ", queueId: "
- + queueId);
- }
+ private void checkUnDeletedHFileRefsQueues() throws IOException {
+ undeletedHFileRefsQueueIds = cleaner.getUnDeletedHFileRefsQueues();
+ if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) {
+ String msg = "Undeleted replication hfile-refs queue for removed peer found: "
+ + undeletedHFileRefsQueueIds + " under hfile-refs node";
+ errorReporter
+ .reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg);
}
}
public void fixUnDeletedQueues() throws IOException {
- for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
- String replicator = replicatorAndQueueIds.getKey();
- for (String queueId : replicatorAndQueueIds.getValue()) {
- queueDeletor.removeQueue(replicator, queueId);
- }
+ if (!undeletedQueueIds.isEmpty()) {
+ cleaner.removeQueues(undeletedQueueIds);
}
fixUnDeletedHFileRefsQueue();
}
private void fixUnDeletedHFileRefsQueue() throws IOException {
- for (String hfileRefsQueueId : undeletedHFileRefsQueueIds) {
- String node = ZKUtil.joinZNode(hfileRefsZNode, hfileRefsQueueId);
- try {
- ZKUtil.deleteNodeRecursively(this.zkw, node);
- LOG.info("Successfully deleted hfile-refs queue " + hfileRefsQueueId + " from path "
- + hfileRefsZNode);
- } catch (KeeperException e) {
- throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
- + " from path " + hfileRefsZNode);
- }
+ if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) {
+ cleaner.removeHFileRefsQueues(undeletedHFileRefsQueueIds);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/67420fe2/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
new file mode 100644
index 0000000..e11143d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.cleaner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestReplicationZKNodeCleaner {
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private final String ID_ONE = "1";
+ private final String SERVER_ONE = "server1";
+ private final String ID_TWO = "2";
+ private final String SERVER_TWO = "server2";
+
+ private final Configuration conf;
+ private final ZooKeeperWatcher zkw;
+ private final ReplicationQueues repQueues;
+
+ public TestReplicationZKNodeCleaner() throws Exception {
+ conf = TEST_UTIL.getConfiguration();
+ zkw = new ZooKeeperWatcher(conf, "TestReplicationZKNodeCleaner", null);
+ repQueues = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null,
+ zkw));
+ assertTrue(repQueues instanceof ReplicationQueuesZKImpl);
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.cleaner.interval", 10000);
+ TEST_UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testReplicationZKNodeCleaner() throws Exception {
+ repQueues.init(SERVER_ONE);
+ // add queue for ID_ONE which isn't exist
+ repQueues.addLog(ID_ONE, "file1");
+
+ ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
+ Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
+ assertEquals(1, undeletedQueues.size());
+ assertTrue(undeletedQueues.containsKey(SERVER_ONE));
+ assertEquals(1, undeletedQueues.get(SERVER_ONE).size());
+ assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
+
+ // add a recovery queue for ID_TWO which isn't exist
+ repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
+
+ undeletedQueues = cleaner.getUnDeletedQueues();
+ assertEquals(1, undeletedQueues.size());
+ assertTrue(undeletedQueues.containsKey(SERVER_ONE));
+ assertEquals(2, undeletedQueues.get(SERVER_ONE).size());
+ assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
+ assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_TWO + "-" + SERVER_TWO));
+
+ cleaner.removeQueues(undeletedQueues);
+ undeletedQueues = cleaner.getUnDeletedQueues();
+ assertEquals(0, undeletedQueues.size());
+ }
+
+ @Test
+ public void testReplicationZKNodeCleanerChore() throws Exception {
+ repQueues.init(SERVER_ONE);
+ // add queue for ID_ONE which isn't exist
+ repQueues.addLog(ID_ONE, "file1");
+ // add a recovery queue for ID_TWO which isn't exist
+ repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
+
+ // Wait the cleaner chore to run
+ Thread.sleep(20000);
+
+ ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
+ assertEquals(0, cleaner.getUnDeletedQueues().size());
+ }
+}