You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2017/04/05 03:59:37 UTC
hbase git commit: HBASE-12770 Don't transfer all the queued hlogs of
a dead server to the same alive server
Repository: hbase
Updated Branches:
refs/heads/branch-1.3 ea3907da7 -> fd297e280
HBASE-12770 Don't transfer all the queued hlogs of a dead server to the same alive server
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fd297e28
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fd297e28
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fd297e28
Branch: refs/heads/branch-1.3
Commit: fd297e280f25c26346c3343d6ea1be4f0362821e
Parents: ea3907d
Author: Phil Yang <ud...@gmail.com>
Authored: Thu Aug 4 19:33:01 2016 +0800
Committer: Mikhail Antonov <an...@apache.org>
Committed: Tue Apr 4 20:03:14 2017 -0700
----------------------------------------------------------------------
.../hbase/replication/ReplicationQueues.java | 28 +-
.../replication/ReplicationQueuesZKImpl.java | 253 +++++++++++--------
.../regionserver/ReplicationSourceManager.java | 25 +-
.../replication/TestReplicationStateBasic.java | 16 +-
.../TestReplicationSourceManager.java | 37 ++-
5 files changed, 231 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fd297e28/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index 507367b..1b1c770 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -23,6 +23,7 @@ import java.util.SortedMap;
import java.util.SortedSet;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Pair;
/**
* This provides an interface for maintaining a region server's replication queues. These queues
@@ -94,14 +95,33 @@ public interface ReplicationQueues {
List<String> getAllQueues();
/**
- * Take ownership for the set of queues belonging to a dead region server.
+ * Checks if the provided znode is the same as this region server's
+ * @param regionserver the id of the region server
+ * @return if this is this rs's znode
+ */
+ boolean isThisOurRegionServer(String regionserver);
+
+ /**
+ * Get queueIds from a dead region server, whose queues has not been claimed by other region
+ * servers.
+ * @return empty if the queue exists but no children, null if the queue does not exist.
+ */
+ List<String> getUnClaimedQueueIds(String regionserver);
+
+ /**
+ * Take ownership for the queue identified by queueId and belongs to a dead region server.
* @param regionserver the id of the dead region server
- * @return A SortedMap of the queues that have been claimed, including a SortedSet of WALs in
- * each queue. Returns an empty map if no queues were failed-over.
+ * @param queueId the id of the queue
+ * @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue.
*/
- SortedMap<String, SortedSet<String>> claimQueues(String regionserver);
+ Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId);
/**
+ * Remove the znode of region server if the queue is empty.
+ * @param regionserver
+ */
+ void removeReplicatorIfQueueIsEmpty(String regionserver);
+ /**
* Get a list of all region servers that have outstanding replication queues. These servers could
* be alive, dead or from a previous run of the cluster.
* @return a list of server names
http://git-wip-us.apache.org/repos/asf/hbase/blob/fd297e28/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index c366a74..559ab41 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
@@ -174,24 +175,6 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
}
@Override
- public SortedMap<String, SortedSet<String>> claimQueues(String regionserverZnode) {
- SortedMap<String, SortedSet<String>> newQueues = new TreeMap<String, SortedSet<String>>();
- // check whether there is multi support. If yes, use it.
- if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
- LOG.info("Atomically moving " + regionserverZnode + "'s WALs to my queue");
- newQueues = copyQueuesFromRSUsingMulti(regionserverZnode);
- } else {
- LOG.info("Moving " + regionserverZnode + "'s wals to my queue");
- if (!lockOtherRS(regionserverZnode)) {
- return newQueues;
- }
- newQueues = copyQueuesFromRS(regionserverZnode);
- deleteAnotherRSQueues(regionserverZnode);
- }
- return newQueues;
- }
-
- @Override
public void removeAllQueues() {
try {
ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
@@ -229,6 +212,74 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
return listOfQueues;
}
+ @Override
+ public boolean isThisOurRegionServer(String regionserver) {
+ return ZKUtil.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode);
+ }
+
+ @Override
+ public List<String> getUnClaimedQueueIds(String regionserver) {
+ if (isThisOurRegionServer(regionserver)) {
+ return null;
+ }
+ String rsZnodePath = ZKUtil.joinZNode(this.queuesZNode, regionserver);
+ List<String> queues = null;
+ try {
+ queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath);
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed to getUnClaimedQueueIds for " + regionserver, e);
+ }
+ if (queues != null) {
+ queues.remove(RS_LOCK_ZNODE);
+ }
+ return queues;
+ }
+
+ @Override
+ public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) {
+ if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
+ LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue");
+ return moveQueueUsingMulti(regionserver, queueId);
+ } else {
+ LOG.info("Moving " + regionserver + "/" + queueId + "'s wals to my queue");
+ if (!lockOtherRS(regionserver)) {
+ LOG.info("Can not take the lock now");
+ return null;
+ }
+ Pair<String, SortedSet<String>> newQueues;
+ try {
+ newQueues = copyQueueFromLockedRS(regionserver, queueId);
+ removeQueueFromLockedRS(regionserver, queueId);
+ } finally {
+ unlockOtherRS(regionserver);
+ }
+ return newQueues;
+ }
+ }
+
+ private void removeQueueFromLockedRS(String znode, String peerId) {
+ String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
+ String peerPath = ZKUtil.joinZNode(nodePath, peerId);
+ try {
+ ZKUtil.deleteNodeRecursively(this.zookeeper, peerPath);
+ } catch (KeeperException e) {
+ LOG.warn("Remove copied queue failed", e);
+ }
+ }
+
+ @Override
+ public void removeReplicatorIfQueueIsEmpty(String regionserver) {
+ String rsPath = ZKUtil.joinZNode(this.queuesZNode, regionserver);
+ try {
+ List<String> list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath);
+ if (list != null && list.size() == 0){
+ ZKUtil.deleteNode(this.zookeeper, rsPath);
+ }
+ } catch (KeeperException e) {
+ LOG.warn("Got error while removing replicator", e);
+ }
+ }
+
/**
* Try to set a lock in another region server's znode.
* @param znode the server names of the other server
@@ -271,6 +322,16 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
return ZKUtil.checkExists(zookeeper, getLockZNode(znode)) >= 0;
}
+ private void unlockOtherRS(String znode){
+ String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
+ String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
+ try {
+ ZKUtil.deleteNode(this.zookeeper, p);
+ } catch (KeeperException e) {
+ this.abortable.abort("Remove lock failed", e);
+ }
+ }
+
/**
* Delete all the replication queues for a given region server.
* @param regionserverZnode The znode of the region server to delete.
@@ -308,38 +369,30 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
* It "atomically" copies all the wals queues from another region server and returns them all
* sorted per peer cluster (appended with the dead server's znode).
* @param znode pertaining to the region server to copy the queues from
- * @return WAL queues sorted per peer cluster
*/
- private SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
- SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
- // hbase/replication/rs/deadrs
- String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
- List<String> peerIdsToProcess = null;
- List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+ private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) {
try {
- peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
- if (peerIdsToProcess == null) return queues; // node already processed
- for (String peerId : peerIdsToProcess) {
- ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
- if (!peerExists(replicationQueueInfo.getPeerId())) {
- // the orphaned queues must be moved, otherwise the delete op of dead rs will fail,
- // this will cause the whole multi op fail.
- // NodeFailoverWorker will skip the orphaned queues.
- LOG.warn("Peer " + peerId
- + " didn't exist, will move its queue to avoid the failure of multi op");
- }
- String newPeerId = peerId + "-" + znode;
- String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
- // check the logs queue for the old peer cluster
- String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
- List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
- if (wals == null || wals.size() == 0) {
- listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
- continue; // empty log queue.
- }
+ // hbase/replication/rs/deadrs
+ String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
+ List<ZKUtilOp> listOfOps = new ArrayList<>();
+ ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+ if (!peerExists(replicationQueueInfo.getPeerId())) {
+ // the orphaned queues must be moved, otherwise the delete op of dead rs will fail,
+ // this will cause the whole multi op fail.
+ // NodeFailoverWorker will skip the orphaned queues.
+ LOG.warn("Peer " + peerId +
+ " didn't exist, will move its queue to avoid the failure of multi op");
+ }
+ String newPeerId = peerId + "-" + znode;
+ String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
+ // check the logs queue for the old peer cluster
+ String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
+ List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
+ SortedSet<String> logQueue = new TreeSet<>();
+ if (wals == null || wals.size() == 0) {
+ listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+ } else {
// create the new cluster znode
- SortedSet<String> logQueue = new TreeSet<String>();
- queues.put(newPeerId, logQueue);
ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
listOfOps.add(op);
// get the offset of the logs and set it to new znodes
@@ -349,98 +402,86 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal);
listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
- // add ops for deleting
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
logQueue.add(wal);
}
// add delete op for peer
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+
+ if (LOG.isTraceEnabled())
+ LOG.trace(" The multi list size is: " + listOfOps.size());
}
- // add delete op for dead rs, this will update the cversion of the parent.
- // The reader will make optimistic locking with this to get a consistent
- // snapshot
- listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
- if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size());
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
- if (LOG.isTraceEnabled()) LOG.trace("Atomically moved the dead regionserver logs. ");
+ if (LOG.isTraceEnabled())
+ LOG.trace("Atomically moved the dead regionserver logs. ");
+ return new Pair<>(newPeerId, logQueue);
} catch (KeeperException e) {
// Multi call failed; it looks like some other regionserver took away the logs.
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
- queues.clear();
} catch (InterruptedException e) {
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
- queues.clear();
Thread.currentThread().interrupt();
}
- return queues;
+ return null;
}
/**
- * This methods copies all the wals queues from another region server and returns them all sorted
+ * This methods moves all the wals queues from another region server and returns them all sorted
* per peer cluster (appended with the dead server's znode)
* @param znode server names to copy
- * @return all wals for all peers of that cluster, null if an error occurred
+ * @return all wals for the peer of that cluster, null if an error occurred
*/
- private SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
+ private Pair<String, SortedSet<String>> copyQueueFromLockedRS(String znode, String peerId) {
// TODO this method isn't atomic enough, we could start copying and then
// TODO fail for some reason and we would end up with znodes we don't want.
- SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
try {
String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
- List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
- // We have a lock znode in there, it will count as one.
- if (clusters == null || clusters.size() <= 1) {
- return queues;
+ ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+ String clusterPath = ZKUtil.joinZNode(nodePath, peerId);
+ if (!peerExists(replicationQueueInfo.getPeerId())) {
+ LOG.warn("Peer " + peerId + " didn't exist, skipping the replay");
+ // Protection against moving orphaned queues
+ return null;
}
- // The lock isn't a peer cluster, remove it
- clusters.remove(RS_LOCK_ZNODE);
- for (String cluster : clusters) {
- ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(cluster);
- if (!peerExists(replicationQueueInfo.getPeerId())) {
- LOG.warn("Peer " + cluster + " didn't exist, skipping the replay");
- // Protection against moving orphaned queues
- continue;
- }
- // We add the name of the recovered RS to the new znode, we can even
- // do that for queues that were recovered 10 times giving a znode like
- // number-startcode-number-otherstartcode-number-anotherstartcode-etc
- String newCluster = cluster + "-" + znode;
- String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
- String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
- List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
- // That region server didn't have anything to replicate for this cluster
- if (wals == null || wals.size() == 0) {
- continue;
- }
- ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
+ // We add the name of the recovered RS to the new znode, we can even
+ // do that for queues that were recovered 10 times giving a znode like
+ // number-startcode-number-otherstartcode-number-anotherstartcode-etc
+ String newCluster = peerId + "-" + znode;
+ String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
+
+ List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
+ // That region server didn't have anything to replicate for this cluster
+ if (wals == null || wals.size() == 0) {
+ return null;
+ }
+ ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
HConstants.EMPTY_BYTE_ARRAY);
- SortedSet<String> logQueue = new TreeSet<String>();
- queues.put(newCluster, logQueue);
- for (String wal : wals) {
- String z = ZKUtil.joinZNode(clusterPath, wal);
- byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
- long position = 0;
- try {
- position = ZKUtil.parseWALPositionFrom(positionBytes);
- } catch (DeserializationException e) {
- LOG.warn("Failed parse of wal position from the following znode: " + z
- + ", Exception: " + e);
- }
- LOG.debug("Creating " + wal + " with data " + position);
- String child = ZKUtil.joinZNode(newClusterZnode, wal);
- // Position doesn't actually change, we are just deserializing it for
- // logging, so just use the already serialized version
- ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
- logQueue.add(wal);
+ SortedSet<String> logQueue = new TreeSet<>();
+ for (String wal : wals) {
+ String z = ZKUtil.joinZNode(clusterPath, wal);
+ byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
+ long position = 0;
+ try {
+ position = ZKUtil.parseWALPositionFrom(positionBytes);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse of wal position from the following znode: " + z
+ + ", Exception: " + e);
}
+ LOG.debug("Creating " + wal + " with data " + position);
+ String child = ZKUtil.joinZNode(newClusterZnode, wal);
+ // Position doesn't actually change, we are just deserializing it for
+ // logging, so just use the already serialized version
+ ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, child, positionBytes);
+ logQueue.add(wal);
}
+ return new Pair<>(newCluster, logQueue);
} catch (KeeperException e) {
- this.abortable.abort("Copy queues from rs", e);
+ LOG.warn("Got exception in copyQueueFromLockedRS: ", e);
} catch (InterruptedException e) {
LOG.warn(e);
Thread.currentThread().interrupt();
}
- return queues;
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/fd297e28/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index c4dc800..b7a5839 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
@@ -64,6 +63,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
/**
@@ -672,9 +672,28 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.info("Not transferring queue since we are shutting down");
return;
}
- SortedMap<String, SortedSet<String>> newQueues = null;
- newQueues = this.rq.claimQueues(rsZnode);
+ Map<String, SortedSet<String>> newQueues = new HashMap<>();
+ List<String> peers = rq.getUnClaimedQueueIds(rsZnode);
+ while (peers != null && !peers.isEmpty()) {
+ Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode,
+ peers.get(rand.nextInt(peers.size())));
+ long sleep = sleepBeforeFailover/2;
+ if (peer != null) {
+ newQueues.put(peer.getFirst(), peer.getSecond());
+ sleep = sleepBeforeFailover;
+ }
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting before transferring a queue.");
+ Thread.currentThread().interrupt();
+ }
+ peers = rq.getUnClaimedQueueIds(rsZnode);
+ }
+ if (peers != null) {
+ rq.removeReplicatorIfQueueIsEmpty(rsZnode);
+ }
// Copying over the failed queue is completed.
if (newQueues.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/fd297e28/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index c8e234e..e0fe10f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -22,8 +22,6 @@ import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.List;
-import java.util.SortedMap;
-import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -124,7 +122,7 @@ public abstract class TestReplicationStateBasic {
assertNull(rq1.getAllQueues());
assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
assertNull(rq1.getLogsInQueue("bogus"));
- assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size());
+ assertNull(rq1.getUnClaimedQueueIds(ServerName.valueOf("bogus", 1234, -1L).toString()));
rq1.setLogPosition("bogus", "bogus", 5L);
@@ -143,15 +141,21 @@ public abstract class TestReplicationStateBasic {
assertEquals(1, rq2.getAllQueues().size());
assertEquals(5, rq3.getAllQueues().size());
- assertEquals(0, rq3.claimQueues(server1).size());
+ assertEquals(0, rq3.getUnClaimedQueueIds(server1).size());
+ rq3.removeReplicatorIfQueueIsEmpty(server1);
assertEquals(2, rq3.getListOfReplicators().size());
- SortedMap<String, SortedSet<String>> queues = rq2.claimQueues(server3);
+ List<String> queues = rq2.getUnClaimedQueueIds(server3);
assertEquals(5, queues.size());
+ for(String queue: queues) {
+ rq2.claimQueue(server3, queue);
+ }
+ rq2.removeReplicatorIfQueueIsEmpty(server3);
assertEquals(1, rq2.getListOfReplicators().size());
// Try to claim our own queues
- assertEquals(0, rq2.claimQueues(server2).size());
+ assertNull(rq2.getUnClaimedQueueIds(server2));
+ rq2.removeReplicatorIfQueueIsEmpty(server2);
assertEquals(6, rq2.getAllQueues().size());
http://git-wip-us.apache.org/repos/asf/hbase/blob/fd297e28/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 012ef36..7614b0f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@@ -80,6 +81,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -352,7 +354,7 @@ public class TestReplicationSourceManager {
manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
new Long(1), new Long(2)));
w1.start();
- w1.join(5000);
+ w1.join(10000);
assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
String id = "1-" + server.getServerName().getServerName();
assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
@@ -385,18 +387,26 @@ public class TestReplicationSourceManager {
ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
rq1.init(s1.getServerName().toString());
- SortedMap<String, SortedSet<String>> testMap =
- rq1.claimQueues(server.getServerName().getServerName());
+ String serverName = server.getServerName().getServerName();
+ List<String> unclaimed = rq1.getUnClaimedQueueIds(serverName);
+ rq1.claimQueue(serverName, unclaimed.get(0)).getSecond();
+ rq1.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
+
ReplicationQueues rq2 =
ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2);
rq2.init(s2.getServerName().toString());
- testMap = rq2.claimQueues(s1.getServerName().getServerName());
+ serverName = s1.getServerName().getServerName();
+ unclaimed = rq2.getUnClaimedQueueIds(serverName);
+ rq2.claimQueue(serverName, unclaimed.get(0)).getSecond();
+ rq2.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
ReplicationQueues rq3 =
ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3);
rq3.init(s3.getServerName().toString());
- testMap = rq3.claimQueues(s2.getServerName().getServerName());
-
- ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey());
+ serverName = s2.getServerName().getServerName();
+ unclaimed = rq3.getUnClaimedQueueIds(serverName);
+ String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst();
+ rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
+ ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3);
List<String> result = replicationQueueInfo.getDeadRegionServers();
// verify
@@ -432,7 +442,11 @@ public class TestReplicationSourceManager {
ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
int v0 = client.getQueuesZNodeCversion();
- rq1.claimQueues(s0.getServerName().getServerName());
+ List<String> queues = rq1.getUnClaimedQueueIds(s0.getServerName().getServerName());
+ for(String queue : queues) {
+ rq1.claimQueue(s0.getServerName().getServerName(), queue);
+ }
+ rq1.removeReplicatorIfQueueIsEmpty(s0.getServerName().getServerName());
int v1 = client.getQueuesZNodeCversion();
// cversion should increased by 1 since a child node is deleted
assertEquals(v0 + 1, v1);
@@ -611,7 +625,12 @@ public class TestReplicationSourceManager {
@Override
public void run() {
try {
- logZnodesMap = rq.claimQueues(deadRsZnode);
+ logZnodesMap = new TreeMap<>();
+ List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode);
+ for(String queue:queues){
+ Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, queue);
+ logZnodesMap.put(pair.getFirst(), pair.getSecond());
+ }
server.abort("Done with testing", null);
} catch (Exception e) {
LOG.error("Got exception while running NodeFailoverWorker", e);