You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/01/29 19:48:59 UTC

svn commit: r1440054 - in /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication: ReplicationZookeeper.java regionserver/ReplicationSourceManager.java

Author: larsh
Date: Tue Jan 29 18:48:58 2013
New Revision: 1440054

URL: http://svn.apache.org/viewvc?rev=1440054&view=rev
Log:
HBASE-2611 Handle RS that fails while processing the failure of another one (Himanshu Vashishtha)

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1440054&r1=1440053&r2=1440054&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Tue Jan 29 18:48:58 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.zookeeper
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
@@ -659,6 +660,58 @@ public class ReplicationZookeeper {
   }
 
   /**
+   * It "atomically" copies all the hlogs queues from another region server and returns them all
+   * sorted per peer cluster (appended with the dead server's znode).
+   * @param znode
+   * @return HLog queues sorted per peer cluster
+   */
+  public SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
+    SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
+    String deadRSZnodePath = ZKUtil.joinZNode(rsZNode, znode);// hbase/replication/rs/deadrs
+    List<String> peerIdsToProcess = null;
+    List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+    try {
+      peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
+      if (peerIdsToProcess == null) return null; // node already processed
+      for (String peerId : peerIdsToProcess) {
+        String newPeerId = peerId + "-" + znode;
+        String newPeerZnode = ZKUtil.joinZNode(this.rsServerNameZnode, newPeerId);
+        // check the logs queue for the old peer cluster
+        String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
+        List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
+        if (hlogs == null || hlogs.size() == 0) continue; // empty log queue.
+        // create the new cluster znode
+        SortedSet<String> logQueue = new TreeSet<String>();
+        queues.put(newPeerId, logQueue);
+        ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
+        listOfOps.add(op);
+        // get the offset of the logs and set it to new znodes
+        for (String hlog : hlogs) {
+          String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog);
+          byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode);
+          LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset));
+          String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog);
+          listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
+          // add ops for deleting
+          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode));
+          logQueue.add(hlog);
+        }
+        // add delete op for peer
+        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+      }
+      // add delete op for dead rs
+      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
+      LOG.debug(" The multi list size is: " + listOfOps.size());
+      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
+      LOG.info("Atomically moved the dead regionserver logs. ");
+    } catch (KeeperException e) {
+      // Multi call failed; it looks like some other regionserver took away the logs.
+      LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
+    }
+    return queues;
+  }
+
+  /**
    * This methods copies all the hlogs queues from another region server
    * and returns them all sorted per peer cluster (appended with the dead
    * server's znode)

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1440054&r1=1440053&r2=1440054&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Tue Jan 29 18:48:58 2013
@@ -40,6 +40,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
@@ -581,14 +582,22 @@ public class ReplicationSourceManager {
         LOG.info("Not transferring queue since we are shutting down");
         return;
       }
-      if (!zkHelper.lockOtherRS(rsZnode)) {
-        return;
+      SortedMap<String, SortedSet<String>> newQueues = null;
+
+      // check whether there is multi support. If yes, use it.
+      if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
+        LOG.info("Atomically moving " + rsZnode + "'s hlogs to my queue");
+        newQueues = zkHelper.copyQueuesFromRSUsingMulti(rsZnode);
+      } else {
+        LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
+        if (!zkHelper.lockOtherRS(rsZnode)) {
+          return;
+        }
+        newQueues = zkHelper.copyQueuesFromRS(rsZnode);
+        zkHelper.deleteRsQueues(rsZnode);
       }
-      LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
-      SortedMap<String, SortedSet<String>> newQueues =
-          zkHelper.copyQueuesFromRS(rsZnode);
-      zkHelper.deleteRsQueues(rsZnode);
-      if (newQueues == null || newQueues.size() == 0) {
+      // process of copying over the failed queue is completed.
+      if (newQueues.size() == 0) {
         return;
       }