You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/07/04 11:28:13 UTC
hbase git commit: HBASE-16135 PeerClusterZnode under rs of removed
peer may never be deleted
Repository: hbase
Updated Branches:
refs/heads/master d22c23c39 -> 6944a17ad
HBASE-16135 PeerClusterZnode under rs of removed peer may never be deleted
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6944a17a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6944a17a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6944a17a
Branch: refs/heads/master
Commit: 6944a17ad4f039d05f76e1f75136bd121776e809
Parents: d22c23c
Author: zhangduo <zh...@apache.org>
Authored: Fri Jul 1 13:41:01 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Jul 4 19:26:27 2016 +0800
----------------------------------------------------------------------
.../replication/ReplicationQueuesZKImpl.java | 8 ++-
.../regionserver/ReplicationSourceManager.java | 63 +++++++++++++-------
.../TestReplicationSourceManager.java | 39 ++++++++++--
.../TestReplicationSourceManagerZkImpl.java | 3 -
...tTableBasedReplicationSourceManagerImpl.java | 2 -
5 files changed, 81 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6944a17a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index a3635e4..baea74f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -316,9 +316,11 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
for (String peerId : peerIdsToProcess) {
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
if (!peerExists(replicationQueueInfo.getPeerId())) {
- LOG.warn("Peer " + peerId + " didn't exist, skipping the replay");
- // Protection against moving orphaned queues
- continue;
+ // the orphaned queues must be moved, otherwise the delete op of dead rs will fail,
+ // this will cause the whole multi op fail.
+ // NodeFailoverWorker will skip the orphaned queues.
+ LOG.warn("Peer " + peerId
+ + " didn't exist, will move its queue to avoid the failure of multi op");
}
String newPeerId = peerId + "-" + znode;
String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
http://git-wip-us.apache.org/repos/asf/hbase/blob/6944a17a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 07ee46a..143d6e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.replication.regionserver;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
@@ -344,6 +345,11 @@ public class ReplicationSourceManager implements ReplicationListener {
return this.oldsources;
}
+ @VisibleForTesting
+ List<String> getAllQueues() {
+ return replicationQueues.getAllQueues();
+ }
+
void preLogRoll(Path newLog) throws IOException {
recordLog(newLog);
String logName = newLog.getName();
@@ -371,8 +377,8 @@ public class ReplicationSourceManager implements ReplicationListener {
String logName = logPath.getName();
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
// update replication queues on ZK
- synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for
- // the to-be-removed peer
+ // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
+ synchronized (replicationPeers) {
for (String id : replicationPeers.getConnectedPeerIds()) {
try {
this.replicationQueues.addLog(id, logName);
@@ -528,24 +534,28 @@ public class ReplicationSourceManager implements ReplicationListener {
+ sources.size() + " and another "
+ oldsources.size() + " that were recovered");
String terminateMessage = "Replication stream was removed by a user";
- List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
List<ReplicationSourceInterface> oldSourcesToDelete =
new ArrayList<ReplicationSourceInterface>();
- // First close all the recovered sources for this peer
- for (ReplicationSourceInterface src : oldsources) {
- if (id.equals(src.getPeerClusterId())) {
- oldSourcesToDelete.add(src);
+ // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
+ // see NodeFailoverWorker.run
+ synchronized (oldsources) {
+ // First close all the recovered sources for this peer
+ for (ReplicationSourceInterface src : oldsources) {
+ if (id.equals(src.getPeerClusterId())) {
+ oldSourcesToDelete.add(src);
+ }
+ }
+ for (ReplicationSourceInterface src : oldSourcesToDelete) {
+ src.terminate(terminateMessage);
+ closeRecoveredQueue(src);
}
- }
- for (ReplicationSourceInterface src : oldSourcesToDelete) {
- src.terminate(terminateMessage);
- closeRecoveredQueue((src));
}
LOG.info("Number of deleted recovered sources for " + id + ": "
+ oldSourcesToDelete.size());
// Now look for the one on this cluster
- synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding source
- // for the to-be-removed peer
+ List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
+ // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
+ synchronized (this.replicationPeers) {
for (ReplicationSourceInterface src : this.sources) {
if (id.equals(src.getPeerClusterId())) {
srcToRemove.add(src);
@@ -603,9 +613,12 @@ public class ReplicationSourceManager implements ReplicationListener {
private final UUID clusterId;
/**
- *
* @param rsZnode
*/
+ public NodeFailoverWorker(String rsZnode) {
+ this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
+ }
+
public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final UUID clusterId) {
super("Failover-for-"+rsZnode);
@@ -661,6 +674,7 @@ public class ReplicationSourceManager implements ReplicationListener {
}
if (peer == null || peerConfig == null) {
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
+ replicationQueues.removeQueue(peerId);
continue;
}
// track sources in walsByIdRecoveredQueues
@@ -680,15 +694,20 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationSourceInterface src =
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
server, peerId, this.clusterId, peerConfig, peer);
- if (!this.rp.getConnectedPeerIds().contains((src.getPeerClusterId()))) {
- src.terminate("Recovered queue doesn't belong to any current peer");
- break;
- }
- oldsources.add(src);
- for (String wal : walsSet) {
- src.enqueueLog(new Path(oldLogDir, wal));
+ // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
+ // see removePeer
+ synchronized (oldsources) {
+ if (!this.rp.getConnectedPeerIds().contains(src.getPeerClusterId())) {
+ src.terminate("Recovered queue doesn't belong to any current peer");
+ closeRecoveredQueue(src);
+ continue;
+ }
+ oldsources.add(src);
+ for (String wal : walsSet) {
+ src.enqueueLog(new Path(oldLogDir, wal));
+ }
+ src.startup();
}
- src.startup();
} catch (IOException e) {
// TODO manage it
LOG.error("Failed creating a source", e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/6944a17a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index f60982e..4442bbb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -84,8 +84,10 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
import com.google.common.collect.Sets;
@@ -187,15 +189,24 @@ public abstract class TestReplicationSourceManager {
utility.shutdownMiniCluster();
}
- @Before
- public void setUp() throws Exception {
+ @Rule
+ public TestName testName = new TestName();
+
+ private void cleanLogDir() throws IOException {
fs.delete(logDir, true);
fs.delete(oldLogDir, true);
}
+ @Before
+ public void setUp() throws Exception {
+ LOG.info("Start " + testName.getMethodName());
+ cleanLogDir();
+ }
+
@After
public void tearDown() throws Exception {
- setUp();
+ LOG.info("End " + testName.getMethodName());
+ cleanLogDir();
}
@Test
@@ -274,7 +285,6 @@ public abstract class TestReplicationSourceManager {
@Test
public void testClaimQueues() throws Exception {
- LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("hostname0.example.org");
@@ -355,6 +365,27 @@ public abstract class TestReplicationSourceManager {
}
@Test
+ public void testCleanupUnknownPeerZNode() throws Exception {
+ final Server server = new DummyServer("hostname2.example.org");
+ ReplicationQueues rq = ReplicationFactory.getReplicationQueues(
+ new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper()));
+ rq.init(server.getServerName().toString());
+ // populate some znodes in the peer znode
+ // add log to an unknown peer
+ String group = "testgroup";
+ rq.addLog("2", group + ".log1");
+ rq.addLog("2", group + ".log2");
+
+ NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName());
+ w1.run();
+
+ // The log of the unknown peer should be removed from zk
+ for (String peer : manager.getAllQueues()) {
+ assertTrue(peer.startsWith("1"));
+ }
+ }
+
+ @Test
public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
NavigableMap<byte[], Integer> scope = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
// 1. Get the bulk load wal edit event
http://git-wip-us.apache.org/repos/asf/hbase/blob/6944a17a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
index f47e986..75ed835 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
@@ -64,7 +64,6 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
// Tests the naming convention of adopted queues for ReplicationQueuesZkImpl
@Test
public void testNodeFailoverDeadServerParsing() throws Exception {
- LOG.debug("testNodeFailoverDeadServerParsing");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
ReplicationQueues repQueues =
@@ -114,8 +113,6 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
@Test
public void testFailoverDeadServerCversionChange() throws Exception {
- LOG.debug("testFailoverDeadServerCversionChange");
-
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server s0 = new DummyServer("cversion-change0.example.org");
ReplicationQueues repQueues =
http://git-wip-us.apache.org/repos/asf/hbase/blob/6944a17a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
index 20f5cc8..e606257 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
@@ -28,7 +27,6 @@ import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesClientImpl
import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesImpl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;