You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/01/29 06:04:36 UTC
svn commit: r1439744 - in
/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication:
ReplicationZookeeper.java regionserver/ReplicationSourceManager.java
Author: tedyu
Date: Tue Jan 29 05:04:36 2013
New Revision: 1439744
URL: http://svn.apache.org/viewvc?rev=1439744&view=rev
Log:
HBASE-2611 Handle RS that fails while processing the failure of another one (Himanshu)
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1439744&r1=1439743&r2=1439744&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Tue Jan 29 05:04:36 2013
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.zookeeper
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -791,6 +792,58 @@ public class ReplicationZookeeper implem
}
/**
+ * It "atomically" copies all the hlogs queues from another region server and returns them all
+ * sorted per peer cluster (appended with the dead server's znode).
+ * @param znode
+ * @return HLog queues sorted per peer cluster
+ */
+ public SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
+ SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
+ String deadRSZnodePath = ZKUtil.joinZNode(rsZNode, znode);// hbase/replication/rs/deadrs
+ List<String> peerIdsToProcess = null;
+ List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+ try {
+ peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
+ if (peerIdsToProcess == null) return null; // node already processed
+ for (String peerId : peerIdsToProcess) {
+ String newPeerId = peerId + "-" + znode;
+ String newPeerZnode = ZKUtil.joinZNode(this.rsServerNameZnode, newPeerId);
+ // check the logs queue for the old peer cluster
+ String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
+ List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
+ if (hlogs == null || hlogs.size() == 0) continue; // empty log queue.
+ // create the new cluster znode
+ SortedSet<String> logQueue = new TreeSet<String>();
+ queues.put(newPeerId, logQueue);
+ ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
+ listOfOps.add(op);
+ // get the offset of the logs and set it to new znodes
+ for (String hlog : hlogs) {
+ String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog);
+ byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode);
+ LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset));
+ String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog);
+ listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
+ // add ops for deleting
+ listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode));
+ logQueue.add(hlog);
+ }
+ // add delete op for peer
+ listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+ }
+ // add delete op for dead rs
+ listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
+ LOG.debug(" The multi list size is: " + listOfOps.size());
+ ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
+ LOG.info("Atomically moved the dead regionserver logs. ");
+ } catch (KeeperException e) {
+ // Multi call failed; it looks like some other regionserver took away the logs.
+ LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
+ }
+ return queues;
+ }
+
+ /**
* This methods copies all the hlogs queues from another region server
* and returns them all sorted per peer cluster (appended with the dead
* server's znode)
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1439744&r1=1439743&r2=1439744&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Tue Jan 29 05:04:36 2013
@@ -40,6 +40,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
@@ -581,14 +582,22 @@ public class ReplicationSourceManager {
LOG.info("Not transferring queue since we are shutting down");
return;
}
- if (!zkHelper.lockOtherRS(rsZnode)) {
- return;
+ SortedMap<String, SortedSet<String>> newQueues = null;
+
+ // check whether there is multi support. If yes, use it.
+ if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
+ LOG.info("Atomically moving " + rsZnode + "'s hlogs to my queue");
+ newQueues = zkHelper.copyQueuesFromRSUsingMulti(rsZnode);
+ } else {
+ LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
+ if (!zkHelper.lockOtherRS(rsZnode)) {
+ return;
+ }
+ newQueues = zkHelper.copyQueuesFromRS(rsZnode);
+ zkHelper.deleteRsQueues(rsZnode);
}
- LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
- SortedMap<String, SortedSet<String>> newQueues =
- zkHelper.copyQueuesFromRS(rsZnode);
- zkHelper.deleteRsQueues(rsZnode);
- if (newQueues == null || newQueues.size() == 0) {
+ // process of copying over the failed queue is completed.
+ if (newQueues.size() == 0) {
return;
}