You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2013/05/16 02:56:36 UTC
svn commit: r1483136 - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/replication/
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ hbase-s...
Author: jdcryans
Date: Thu May 16 00:56:35 2013
New Revision: 1483136
URL: http://svn.apache.org/r1483136
Log:
HBASE-7380 [replication] When transferring queues, check if the peer still
exists before copying the znodes
Added:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java?rev=1483136&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java Thu May 16 00:56:35 2013
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ServerName;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class is responsible for the parsing logic for a znode representing a queue.
+ * It will extract the peerId if it's recovered as well as the dead region servers
+ * that were part of the queue's history.
+ */
+public class ReplicationQueueInfo {
+ private static final Log LOG = LogFactory.getLog(ReplicationQueueInfo.class);
+
+ private final String peerId;
+ private final String peerClusterZnode;
+ private boolean queueRecovered;
+ // List of all the dead region servers that had this queue (if recovered)
+ private List<String> deadRegionServers = new ArrayList<String>();
+
+ /**
+ * The passed znode will be either the id of the peer cluster or
+ * the handling story of that queue in the form of id-servername-*
+ */
+ public ReplicationQueueInfo(String znode) {
+ this.peerClusterZnode = znode;
+ String[] parts = znode.split("-", 2);
+ this.queueRecovered = parts.length != 1;
+ this.peerId = this.queueRecovered ?
+ parts[0] : peerClusterZnode;
+ if (parts.length >= 2) {
+ // extract dead servers
+ extractDeadServersFromZNodeString(parts[1], this.deadRegionServers);
+ }
+ }
+
+ /**
+ * Parse dead server names from znode string servername can contain "-" such as
+ * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following
+ * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-...
+ */
+ private static void
+ extractDeadServersFromZNodeString(String deadServerListStr, List<String> result) {
+
+ if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return;
+
+ // valid server name delimiter "-" has to be after "," in a server name
+ int seenCommaCnt = 0;
+ int startIndex = 0;
+ int len = deadServerListStr.length();
+
+ for (int i = 0; i < len; i++) {
+ switch (deadServerListStr.charAt(i)) {
+ case ',':
+ seenCommaCnt += 1;
+ break;
+ case '-':
+ if(seenCommaCnt>=2) {
+ if (i > startIndex) {
+ String serverName = deadServerListStr.substring(startIndex, i);
+ if(ServerName.isFullServerName(serverName)){
+ result.add(serverName);
+ } else {
+ LOG.error("Found invalid server name:" + serverName);
+ }
+ startIndex = i + 1;
+ }
+ seenCommaCnt = 0;
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ // add tail
+ if(startIndex < len - 1){
+ String serverName = deadServerListStr.substring(startIndex, len);
+ if(ServerName.isFullServerName(serverName)){
+ result.add(serverName);
+ } else {
+ LOG.error("Found invalid server name at the end:" + serverName);
+ }
+ }
+
+ LOG.debug("Found dead servers:" + result);
+ }
+
+ public List<String> getDeadRegionServers() {
+ return Collections.unmodifiableList(this.deadRegionServers);
+ }
+
+ public String getPeerId() {
+ return this.peerId;
+ }
+
+ public String getPeerClusterZnode() {
+ return this.peerClusterZnode;
+ }
+
+ public boolean isQueueRecovered() {
+ return queueRecovered;
+ }
+}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java?rev=1483136&r1=1483135&r2=1483136&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java Thu May 16 00:56:35 2013
@@ -258,6 +258,12 @@ public class ReplicationQueuesZKImpl ext
peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
if (peerIdsToProcess == null) return queues; // node already processed
for (String peerId : peerIdsToProcess) {
+ ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+ if (!peerExists(replicationQueueInfo.getPeerId())) {
+ LOG.warn("Peer " + peerId + " didn't exist, skipping the replay");
+ // Protection against moving orphaned queues
+ continue;
+ }
String newPeerId = peerId + "-" + znode;
String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
// check the logs queue for the old peer cluster
@@ -319,6 +325,12 @@ public class ReplicationQueuesZKImpl ext
// The lock isn't a peer cluster, remove it
clusters.remove(RS_LOCK_ZNODE);
for (String cluster : clusters) {
+ ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(cluster);
+ if (!peerExists(replicationQueueInfo.getPeerId())) {
+ LOG.warn("Peer " + cluster + " didn't exist, skipping the replay");
+ // Protection against moving orphaned queues
+ continue;
+ }
// We add the name of the recovered RS to the new znode, we can even
// do that for queues that were recovered 10 times giving a znode like
// number-startcode-number-otherstartcode-number-anotherstartcode-etc
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java?rev=1483136&r1=1483135&r2=1483136&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java Thu May 16 00:56:35 2013
@@ -78,4 +78,9 @@ public abstract class ReplicationStateZK
}
return result;
}
+
+ public boolean peerExists(String id) throws KeeperException {
+ return ZKUtil.checkExists(this.zookeeper,
+ ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
+ }
}
\ No newline at end of file
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1483136&r1=1483135&r2=1483136&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Thu May 16 00:56:35 2013
@@ -82,7 +82,7 @@ import java.util.concurrent.atomic.Atomi
* </pre>
*/
@InterfaceAudience.Private
-public class ReplicationZookeeper implements Closeable {
+public class ReplicationZookeeper extends ReplicationStateZKBase implements Closeable {
private static final Log LOG =
LogFactory.getLog(ReplicationZookeeper.class);
@@ -132,6 +132,7 @@ public class ReplicationZookeeper implem
*/
public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
final ZooKeeperWatcher zk) throws KeeperException {
+ super(zk, conf, abortable);
this.conf = conf;
this.zookeeper = zk;
setZNodes(abortable);
@@ -151,6 +152,7 @@ public class ReplicationZookeeper implem
*/
public ReplicationZookeeper(final Server server, final AtomicBoolean replicating)
throws IOException, KeeperException {
+ super(server.getZooKeeper(), server.getConfiguration(), server);
this.abortable = server;
this.zookeeper = server.getZooKeeper();
this.conf = server.getConfiguration();
@@ -536,11 +538,6 @@ public class ReplicationZookeeper implem
}
}
- private boolean peerExists(String id) throws KeeperException {
- return ZKUtil.checkExists(this.zookeeper,
- ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
- }
-
/**
* Enable replication to the peer
*
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1483136&r1=1483135&r2=1483136&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Thu May 16 00:56:35 2013
@@ -25,7 +25,6 @@ import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
@@ -58,6 +57,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@@ -94,6 +94,7 @@ public class ReplicationSource extends T
private Random random;
// should we replicate or not?
private AtomicBoolean replicating;
+ private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
private String peerId;
// The manager of all sources to which we ping back our progress
@@ -123,10 +124,6 @@ public class ReplicationSource extends T
private long totalReplicatedEdits = 0;
// The znode we currently play with
private String peerClusterZnode;
- // Indicates if this queue is recovered (and will be deleted when depleted)
- private boolean queueRecovered;
- // List of all the dead region servers that had this queue (if recovered)
- private List<String> deadRegionServers = new ArrayList<String>();
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
// Socket timeouts require even bolder actions since we don't want to DDOS
@@ -196,100 +193,22 @@ public class ReplicationSource extends T
} catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke);
}
-
- // Finally look if this is a recovered queue
- this.checkIfQueueRecovered(peerClusterZnode);
- }
-
- // The passed znode will be either the id of the peer cluster or
- // the handling story of that queue in the form of id-servername-*
- //
- // package access for testing
- void checkIfQueueRecovered(String peerClusterZnode) {
- String[] parts = peerClusterZnode.split("-", 2);
- this.queueRecovered = parts.length != 1;
- this.peerId = this.queueRecovered ?
- parts[0] : peerClusterZnode;
this.peerClusterZnode = peerClusterZnode;
-
- if (parts.length < 2) {
- // not queue recovered situation
- return;
- }
-
- // extract dead servers
- extractDeadServersFromZNodeString(parts[1], this.deadRegionServers);
+ this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
+ // ReplicationQueueInfo parses the peerId out of the znode for us
+ this.peerId = this.replicationQueueInfo.getPeerId();
}
-
- /**
- * for tests only
- */
- List<String> getDeadRegionServers() {
- return Collections.unmodifiableList(this.deadRegionServers);
- }
-
- /**
- * Parse dead server names from znode string servername can contain "-" such as
- * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following
- * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-...
- */
- private static void
- extractDeadServersFromZNodeString(String deadServerListStr, List<String> result) {
-
- if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return;
-
- // valid server name delimiter "-" has to be after "," in a server name
- int seenCommaCnt = 0;
- int startIndex = 0;
- int len = deadServerListStr.length();
- for (int i = 0; i < len; i++) {
- switch (deadServerListStr.charAt(i)) {
- case ',':
- seenCommaCnt += 1;
- break;
- case '-':
- if(seenCommaCnt>=2) {
- if (i > startIndex) {
- String serverName = deadServerListStr.substring(startIndex, i);
- if(ServerName.isFullServerName(serverName)){
- result.add(serverName);
- } else {
- LOG.error("Found invalid server name:" + serverName);
- }
- startIndex = i + 1;
- }
- seenCommaCnt = 0;
- }
- break;
- default:
- break;
- }
- }
-
- // add tail
- if(startIndex < len - 1){
- String serverName = deadServerListStr.substring(startIndex, len);
- if(ServerName.isFullServerName(serverName)){
- result.add(serverName);
- } else {
- LOG.error("Found invalid server name at the end:" + serverName);
- }
- }
-
- LOG.debug("Found dead servers:" + result);
- }
-
/**
* Select a number of peers at random using the ratio. Mininum 1.
*/
private void chooseSinks() {
this.currentPeers.clear();
- List<ServerName> addresses = this.zkHelper.getSlavesAddresses(peerId);
+ List<ServerName> addresses = this.zkHelper.getSlavesAddresses(this.peerId);
Set<ServerName> setOfAddr = new HashSet<ServerName>();
int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
LOG.info("Getting " + nbPeers +
- " rs from peer cluster # " + peerId);
+ " rs from peer cluster # " + this.peerId);
for (int i = 0; i < nbPeers; i++) {
ServerName sn;
// Make sure we get one address that we don't already have
@@ -333,13 +252,13 @@ public class ReplicationSource extends T
// If this is recovered, the queue is already full and the first log
// normally has a position (unless the RS failed between 2 logs)
- if (this.queueRecovered) {
+ if (this.replicationQueueInfo.isQueueRecovered()) {
try {
this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition(
this.peerClusterZnode, this.queue.peek().getName()));
} catch (KeeperException e) {
this.terminate("Couldn't get the position of this recovered queue " +
- peerClusterZnode, e);
+ this.peerClusterZnode, e);
}
}
// Loop until we close down
@@ -374,7 +293,7 @@ public class ReplicationSource extends T
//We take the snapshot now so that we are protected against races
//where a new file gets enqueued while the current file is being processed
//(and where we just finished reading the current file).
- if (!this.queueRecovered && queue.size() == 0) {
+ if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
currentWALisBeingWrittenTo = true;
}
// Open a reader on it
@@ -401,24 +320,24 @@ public class ReplicationSource extends T
continue;
}
} catch (IOException ioe) {
- LOG.warn(peerClusterZnode + " Got: ", ioe);
+ LOG.warn(this.peerClusterZnode + " Got: ", ioe);
gotIOE = true;
if (ioe.getCause() instanceof EOFException) {
boolean considerDumping = false;
- if (this.queueRecovered) {
+ if (this.replicationQueueInfo.isQueueRecovered()) {
try {
FileStatus stat = this.fs.getFileStatus(this.currentPath);
if (stat.getLen() == 0) {
- LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
+ LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
}
considerDumping = true;
} catch (IOException e) {
- LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
+ LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
}
} else if (currentNbEntries != 0) {
- LOG.warn(peerClusterZnode + " Got EOF while reading, " +
- "looks like this file is broken? " + currentPath);
+ LOG.warn(this.peerClusterZnode +
+ " Got EOF while reading, " + "looks like this file is broken? " + currentPath);
considerDumping = true;
currentNbEntries = 0;
}
@@ -446,7 +365,7 @@ public class ReplicationSource extends T
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.repLogReader.getPosition(),
- queueRecovered, currentWALisBeingWrittenTo);
+ this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.repLogReader.getPosition();
}
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
@@ -465,7 +384,7 @@ public class ReplicationSource extends T
LOG.debug("Attempt to close connection failed", e);
}
}
- LOG.debug("Source exiting " + peerId);
+ LOG.debug("Source exiting " + this.peerId);
metrics.clear();
}
@@ -571,10 +490,11 @@ public class ReplicationSource extends T
try {
this.reader = repLogReader.openReader(this.currentPath);
} catch (FileNotFoundException fnfe) {
- if (this.queueRecovered) {
+ if (this.replicationQueueInfo.isQueueRecovered()) {
// We didn't find the log in the archive directory, look if it still
// exists in the dead RS folder (there could be a chain of failures
// to look at)
+ List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
LOG.info("NB dead servers : " + deadRegionServers.size());
for (String curDeadServerName : deadRegionServers) {
Path deadRsDirectory =
@@ -621,7 +541,7 @@ public class ReplicationSource extends T
}
} catch (IOException ioe) {
if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
- LOG.warn(peerClusterZnode + " Got: ", ioe);
+ LOG.warn(this.peerClusterZnode + " Got: ", ioe);
this.reader = null;
if (ioe.getCause() instanceof NullPointerException) {
// Workaround for race condition in HDFS-4380
@@ -645,7 +565,8 @@ public class ReplicationSource extends T
* may be empty, and we don't want to retry that.
*/
private boolean isCurrentLogEmpty() {
- return (this.repLogReader.getPosition() == 0 && !queueRecovered && queue.size() == 0);
+ return (this.repLogReader.getPosition() == 0 &&
+ !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
}
/**
@@ -724,7 +645,7 @@ public class ReplicationSource extends T
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.repLogReader.getPosition(),
- queueRecovered, currentWALisBeingWrittenTo);
+ this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.repLogReader.getPosition();
}
this.totalReplicatedEdits += currentNbEntries;
@@ -788,7 +709,8 @@ public class ReplicationSource extends T
* @return true if the peer is enabled, otherwise false
*/
protected boolean isPeerEnabled() {
- return this.replicating.get() && this.zkHelper.getPeerEnabled(peerId);
+ return this.replicating.get() &&
+ this.zkHelper.getPeerEnabled(this.peerId);
}
/**
@@ -804,7 +726,7 @@ public class ReplicationSource extends T
this.repLogReader.finishCurrentFile();
this.reader = null;
return true;
- } else if (this.queueRecovered) {
+ } else if (this.replicationQueueInfo.isQueueRecovered()) {
this.manager.closeRecoveredQueue(this);
LOG.info("Finished recovering the queue");
this.running = false;
@@ -823,7 +745,8 @@ public class ReplicationSource extends T
}
};
Threads.setDaemonThreadRunning(
- this, n + ".replicationSource," + peerClusterZnode, handler);
+ this, n + ".replicationSource," +
+ this.peerClusterZnode, handler);
}
public void terminate(String reason) {
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java?rev=1483136&r1=1483135&r2=1483136&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java Thu May 16 00:56:35 2013
@@ -45,6 +45,7 @@ public class TestReplicationStateZKImpl
private static Configuration conf;
private static HBaseTestingUtility utility;
private static ZooKeeperWatcher zkw;
+ private static String replicationZNode;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -52,6 +53,8 @@ public class TestReplicationStateZKImpl
utility.startMiniZKCluster();
conf = utility.getConfiguration();
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
+ String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
+ replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
}
@Before
@@ -63,12 +66,14 @@ public class TestReplicationStateZKImpl
rq2 = new ReplicationQueuesZKImpl(zkw, conf, ds2);
rq3 = new ReplicationQueuesZKImpl(zkw, conf, ds3);
rqc = new ReplicationQueuesClientZKImpl(zkw, conf, ds1);
+ String peersZnode = ZKUtil.joinZNode(replicationZNode, "peers");
+ for (int i = 1; i < 6; i++) {
+ ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(peersZnode, "qId"+i));
+ }
}
@After
public void tearDown() throws KeeperException, IOException {
- String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
- String replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1483136&r1=1483135&r2=1483136&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Thu May 16 00:56:35 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
@@ -293,9 +294,8 @@ public class TestReplicationSourceManage
testMap = rz3.claimQueues(s2.getServerName().getServerName());
rz3.close();
- ReplicationSource s = new ReplicationSource();
- s.checkIfQueueRecovered(testMap.firstKey());
- List<String> result = s.getDeadRegionServers();
+ ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey());
+ List<String> result = replicationQueueInfo.getDeadRegionServers();
// verify
assertTrue(result.contains(server.getServerName().getServerName()));