You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/10/28 03:40:39 UTC
hbase git commit: HBASE-12769 Replication fails to delete all
corresponding zk nodes when peer is removed (Jianwei Cui)
Repository: hbase
Updated Branches:
refs/heads/master e24d03b10 -> 210c3dd93
HBASE-12769 Replication fails to delete all corresponding zk nodes when peer is removed (Jianwei Cui)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/210c3dd9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/210c3dd9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/210c3dd9
Branch: refs/heads/master
Commit: 210c3dd93748b5de65301f2cca2342f36e169b78
Parents: e24d03b
Author: tedyu <yu...@gmail.com>
Authored: Tue Oct 27 19:40:40 2015 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Oct 27 19:40:40 2015 -0700
----------------------------------------------------------------------
.../client/replication/ReplicationAdmin.java | 7 +-
.../hbase/replication/ReplicationFactory.java | 7 +-
.../replication/ReplicationPeersZKImpl.java | 25 +++-
.../org/apache/hadoop/hbase/util/HBaseFsck.java | 39 +++++-
.../hbase/util/hbck/ReplicationChecker.java | 134 +++++++++++++++++++
.../replication/TestReplicationAdmin.java | 34 +++++
.../hadoop/hbase/util/BaseTestHBaseFsck.java | 3 +-
.../hadoop/hbase/util/TestHBaseFsckOneRS.java | 59 +++++++-
.../hadoop/hbase/util/hbck/HbckTestingUtil.java | 11 +-
9 files changed, 303 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index b33e64d..8bd1267 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -125,11 +125,12 @@ public class ReplicationAdmin implements Closeable {
try {
zkw = createZooKeeperWatcher();
try {
- this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
- this.replicationPeers.init();
this.replicationQueuesClient =
ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
this.replicationQueuesClient.init();
+ this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
+ this.replicationQueuesClient, this.connection);
+ this.replicationPeers.init();
} catch (Exception exception) {
if (zkw != null) {
zkw.close();
@@ -187,7 +188,7 @@ public class ReplicationAdmin implements Closeable {
this.replicationPeers.addPeer(id,
new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
}
-
+
/**
* Add a new remote slave cluster for replication.
* @param id a short name that identifies the cluster
http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index f115a39..91e77ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -42,7 +42,12 @@ public class ReplicationFactory {
public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
Abortable abortable) {
- return new ReplicationPeersZKImpl(zk, conf, abortable);
+ return getReplicationPeers(zk, conf, null, abortable);
+ }
+
+ public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
+ final ReplicationQueuesClient queuesClient, Abortable abortable) {
+ return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable);
}
public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper,
http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index a223531..1884469 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -81,14 +81,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
// Map of peer clusters keyed by their id
private Map<String, ReplicationPeerZKImpl> peerClusters;
private final String tableCFsNodeName;
+ private final ReplicationQueuesClient queuesClient;
private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
- Abortable abortable) {
+ final ReplicationQueuesClient queuesClient, Abortable abortable) {
super(zk, conf, abortable);
this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>();
+ this.queuesClient = queuesClient;
}
@Override
@@ -116,6 +118,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
throw new IllegalArgumentException("Found invalid peer name:" + id);
}
+ checkQueuesDeleted(id);
+
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
@@ -561,5 +565,22 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
return ProtobufUtil.prependPBMagic(bytes);
}
-
+ private void checkQueuesDeleted(String peerId) throws ReplicationException {
+ if (queuesClient == null) return;
+ try {
+ List<String> replicators = queuesClient.getListOfReplicators();
+ for (String replicator : replicators) {
+ List<String> queueIds = queuesClient.getAllQueues(replicator);
+ for (String queueId : queueIds) {
+ ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+ if (queueInfo.getPeerId().equals(peerId)) {
+ throw new ReplicationException("undeleted queue for peerId: " + peerId
+ + ", replicator: " + replicator + ", queueId: " + queueId);
+ }
+ }
+ }
+ } catch (KeeperException e) {
+ throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index e55b53f..88c5427 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
+import org.apache.hadoop.hbase.util.hbck.ReplicationChecker;
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
import org.apache.hadoop.hbase.util.hbck.TableLockChecker;
@@ -246,6 +247,7 @@ public class HBaseFsck extends Configured implements Closeable {
private boolean fixReferenceFiles = false; // fix lingering reference store file
private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows
private boolean fixTableLocks = false; // fix table locks which are expired
+ private boolean fixReplication = false; // fix undeleted replication queues for removed peer
private boolean fixAny = false; // Set to true if any of the fix is required.
// limit checking/fixes to listed tables, if empty attempt to check/fix all
@@ -702,6 +704,8 @@ public class HBaseFsck extends Configured implements Closeable {
checkAndFixTableLocks();
+ checkAndFixReplication();
+
// Remove the hbck lock
unlockHbck();
@@ -3257,12 +3261,29 @@ public class HBaseFsck extends Configured implements Closeable {
}
private void checkAndFixTableLocks() throws IOException {
- TableLockChecker checker = new TableLockChecker(createZooKeeperWatcher(), errors);
+ ZooKeeperWatcher zkw = createZooKeeperWatcher();
+ TableLockChecker checker = new TableLockChecker(zkw, errors);
checker.checkTableLocks();
if (this.fixTableLocks) {
checker.fixExpiredTableLocks();
}
+ zkw.close();
+ }
+
+ private void checkAndFixReplication() throws IOException {
+ ZooKeeperWatcher zkw = createZooKeeperWatcher();
+ try {
+ ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, connection, errors);
+ checker.checkUnDeletedQueues();
+
+ if (checker.hasUnDeletedQueues() && this.fixReplication) {
+ checker.fixUnDeletedQueues();
+ setShouldRerun();
+ }
+ } finally {
+ zkw.close();
+ }
}
/**
@@ -3801,7 +3822,7 @@ public class HBaseFsck extends Configured implements Closeable {
HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION,
ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE,
WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK, BOUNDARIES_ERROR, ORPHAN_TABLE_STATE,
- NO_TABLE_STATE
+ NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE
}
void clear();
void report(String message);
@@ -4202,6 +4223,14 @@ public class HBaseFsck extends Configured implements Closeable {
fixTableLocks = shouldFix;
fixAny |= shouldFix;
}
+
+ /**
+ * Set replication fix mode.
+ */
+ public void setFixReplication(boolean shouldFix) {
+ fixReplication = shouldFix;
+ fixAny |= shouldFix;
+ }
/**
* Check if we should rerun fsck again. This checks if we've tried to
@@ -4462,6 +4491,10 @@ public class HBaseFsck extends Configured implements Closeable {
out.println(" Table lock options");
out.println(" -fixTableLocks Deletes table locks held for a long time (hbase.table.lock.expire.ms, 10min by default)");
+ out.println("");
+ out.println(" Replication options");
+ out.println(" -fixReplication Deletes replication queues for removed peers");
+
out.flush();
errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString());
@@ -4647,6 +4680,8 @@ public class HBaseFsck extends Configured implements Closeable {
setRegionBoundariesCheck();
} else if (cmd.equals("-fixTableLocks")) {
setFixTableLocks(true);
+ } else if (cmd.equals("-fixReplication")) {
+ setFixReplication(true);
} else if (cmd.startsWith("-")) {
errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd);
return printUsageAndExit();
http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
new file mode 100644
index 0000000..bf44a50
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.hbck;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.util.HBaseFsck;
+import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/*
+ * Check and fix undeleted replication queues for removed peerId.
+ */
+@InterfaceAudience.Private
+public class ReplicationChecker {
+ private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
+ private ErrorReporter errorReporter;
+ private ReplicationQueuesClient queuesClient;
+ private ReplicationPeers replicationPeers;
+ private ReplicationQueueDeletor queueDeletor;
+ // replicator with its queueIds for removed peers
+ private Map<String, List<String>> undeletedQueueIds = new HashMap<String, List<String>>();
+
+ public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, HConnection connection,
+ ErrorReporter errorReporter) throws IOException {
+ try {
+ this.errorReporter = errorReporter;
+ this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection);
+ this.queuesClient.init();
+ this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient,
+ connection);
+ this.replicationPeers.init();
+ this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection);
+ } catch (ReplicationException e) {
+ throw new IOException("failed to construct ReplicationChecker", e);
+ }
+ }
+
+ public boolean hasUnDeletedQueues() {
+ return errorReporter.getErrorList()
+ .contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
+ }
+
+ public void checkUnDeletedQueues() throws IOException {
+ Set<String> peerIds = new HashSet<String>(this.replicationPeers.getAllPeerIds());
+ try {
+ List<String> replicators = this.queuesClient.getListOfReplicators();
+ for (String replicator : replicators) {
+ List<String> queueIds = this.queuesClient.getAllQueues(replicator);
+ for (String queueId : queueIds) {
+ ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+ if (!peerIds.contains(queueInfo.getPeerId())) {
+ if (!undeletedQueueIds.containsKey(replicator)) {
+ undeletedQueueIds.put(replicator, new ArrayList<String>());
+ }
+ undeletedQueueIds.get(replicator).add(queueId);
+
+ String msg = "Undeleted replication queue for removed peer found: "
+ + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
+ queueInfo.getPeerId(), replicator, queueId);
+ errorReporter.reportError(
+ HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg);
+ }
+ }
+ }
+ } catch (KeeperException ke) {
+ throw new IOException(ke);
+ }
+ }
+
+ private static class ReplicationQueueDeletor extends ReplicationStateZKBase {
+ public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) {
+ super(zk, conf, abortable);
+ }
+
+ public void removeQueue(String replicator, String queueId) throws IOException {
+ String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
+ queueId);
+ try {
+ ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
+ LOG.info("remove replication queue, replicator: " + replicator + ", queueId: " + queueId);
+ } catch (KeeperException e) {
+ throw new IOException("failed to delete queue, replicator: " + replicator + ", queueId: "
+ + queueId);
+ }
+ }
+ }
+
+ public void fixUnDeletedQueues() throws IOException {
+ for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
+ String replicator = replicatorAndQueueIds.getKey();
+ for (String queueId : replicatorAndQueueIds.getValue()) {
+ queueDeletor.removeQueue(replicator, queueId);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index e126205..e187b9b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -26,8 +26,11 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -114,6 +117,37 @@ public class TestReplicationAdmin {
admin.removePeer(ID_SECOND);
assertEquals(0, admin.getPeersCount());
}
+
+ @Test
+ public void testAddPeerWithUnDeletedQueues() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test HBaseAdmin", null);
+ ReplicationQueues repQueues =
+ ReplicationFactory.getReplicationQueues(zkw, conf, null);
+ repQueues.init("server1");
+
+ // add queue for ID_ONE
+ repQueues.addLog(ID_ONE, "file1");
+ try {
+ admin.addPeer(ID_ONE, KEY_ONE);
+ fail();
+ } catch (ReplicationException e) {
+ // OK!
+ }
+ repQueues.removeQueue(ID_ONE);
+ assertEquals(0, repQueues.getAllQueues().size());
+
+ // add recovered queue for ID_ONE
+ repQueues.addLog(ID_ONE + "-server2", "file1");
+ try {
+ admin.addPeer(ID_ONE, KEY_ONE);
+ fail();
+ } catch (ReplicationException e) {
+ // OK!
+ }
+ repQueues.removeAllQueues();
+ zkw.close();
+ }
/**
* basic checks that when we add a peer that it is enabled, and that we can disable
http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java
index 7459a7d..8e8bb41 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java
@@ -498,7 +498,8 @@ public class BaseTestHBaseFsck {
// fix hole
assertErrors(
- doFsck(conf, false, true, false, false, false, false, false, false, false, false, null),
+ doFsck(conf, false, true, false, false, false, false, false, false, false, false, false,
+ null),
new ERROR_CODE[] { ERROR_CODE.NOT_IN_META_OR_DEPLOYED,
ERROR_CODE.NOT_IN_META_OR_DEPLOYED });
http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
index a44ccd1..df3c69c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.io.hfile.TestHFile;
import org.apache.hadoop.hbase.master.AssignmentManager;
@@ -50,10 +51,13 @@ import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@@ -862,7 +866,7 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck {
// for some time until children references are deleted. HBCK erroneously sees this as
// overlapping regions
HBaseFsck hbck = doFsck(conf, true, true, false, false, false, true, true, true, false,
- false, null);
+ false, false, null);
assertErrors(hbck, new HBaseFsck.ErrorReporter.ERROR_CODE[] {}); //no LINGERING_SPLIT_PARENT reported
// assert that the split hbase:meta entry is still there.
@@ -941,7 +945,7 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck {
// now fix it. The fix should not revert the region split, but add daughters to META
hbck = doFsck(conf, true, true, false, false, false, false, false, false, false,
- false, null);
+ false, false, null);
assertErrors(hbck, new HBaseFsck.ErrorReporter.ERROR_CODE[] {
HBaseFsck.ErrorReporter.ERROR_CODE.NOT_IN_META_OR_DEPLOYED,
HBaseFsck.ErrorReporter.ERROR_CODE.NOT_IN_META_OR_DEPLOYED,
@@ -1474,4 +1478,55 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck {
writeLock.release(); // release for clean state
tableLockManager.tableDeleted(tableName);
}
+
+ @Test(timeout=180000)
+ public void testCheckReplication() throws Exception {
+ // check no errors
+ HBaseFsck hbck = doFsck(conf, false);
+ assertNoErrors(hbck);
+
+ // create peer
+ ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf);
+ Assert.assertEquals(0, replicationAdmin.getPeersCount());
+ int zkPort = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
+ HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+ replicationAdmin.addPeer("1", "127.0.0.1:" + zkPort + ":/hbase");
+ replicationAdmin.getPeersCount();
+ Assert.assertEquals(1, replicationAdmin.getPeersCount());
+
+ // create replicator
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test Hbase Fsck", connection);
+ ReplicationQueues repQueues =
+ ReplicationFactory.getReplicationQueues(zkw, conf, connection);
+ repQueues.init("server1");
+ // queues for current peer, no errors
+ repQueues.addLog("1", "file1");
+ repQueues.addLog("1-server2", "file1");
+ Assert.assertEquals(2, repQueues.getAllQueues().size());
+ hbck = doFsck(conf, false);
+ assertNoErrors(hbck);
+
+ // queues for removed peer
+ repQueues.addLog("2", "file1");
+ repQueues.addLog("2-server2", "file1");
+ Assert.assertEquals(4, repQueues.getAllQueues().size());
+ hbck = doFsck(conf, false);
+ assertErrors(hbck, new HBaseFsck.ErrorReporter.ERROR_CODE[] {
+ HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
+ HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE });
+
+ // fix the case
+ hbck = doFsck(conf, true);
+ hbck = doFsck(conf, false);
+ assertNoErrors(hbck);
+ // ensure only "2" is deleted
+ Assert.assertEquals(2, repQueues.getAllQueues().size());
+ Assert.assertNull(repQueues.getLogsInQueue("2"));
+ Assert.assertNull(repQueues.getLogsInQueue("2-sever2"));
+
+ replicationAdmin.removePeer("1");
+ repQueues.removeAllQueues();
+ zkw.close();
+ replicationAdmin.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
index a28378e..d1e774e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
@@ -40,13 +40,13 @@ public class HbckTestingUtil {
public static HBaseFsck doFsck(
Configuration conf, boolean fix, TableName table) throws Exception {
- return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table);
+ return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table);
}
- public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments,
- boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps,
- boolean fixHdfsOrphans, boolean fixTableOrphans, boolean fixVersionFile,
- boolean fixReferenceFiles, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks,
+ public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta,
+ boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans,
+ boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles,
+ boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, Boolean fixReplication,
TableName table) throws Exception {
HBaseFsck fsck = new HBaseFsck(conf, exec);
try {
@@ -62,6 +62,7 @@ public class HbckTestingUtil {
fsck.setFixReferenceFiles(fixReferenceFiles);
fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo);
fsck.setFixTableLocks(fixTableLocks);
+ fsck.setFixReplication(fixReplication);
if (table != null) {
fsck.includeTable(table);
}