You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2011/03/23 23:23:54 UTC

svn commit: r1084785 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Author: jdcryans
Date: Wed Mar 23 22:23:54 2011
New Revision: 1084785

URL: http://svn.apache.org/viewvc?rev=1084785&view=rev
Log:
HBASE-3640  [replication] Transferring queues shouldn't be done inline with RS startup

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1084785&r1=1084784&r2=1084785&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Mar 23 22:23:54 2011
@@ -182,6 +182,7 @@ Release 0.90.2 - Unreleased
    HBASE-3496  HFile CLI Improvements
    HBASE-3596  [replication] Wait a few seconds before transferring queues
    HBASE-3600  Update our jruby to 1.6.0
+   HBASE-3640  [replication] Transferring queues shouldn't be done inline with RS startup
 
 Release 0.90.1 - February 9th, 2011
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1084785&r1=1084784&r2=1084785&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Wed Mar 23 22:23:54 2011
@@ -27,8 +27,13 @@ import java.util.Map;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -78,6 +83,8 @@ public class ReplicationSourceManager {
   private final Path oldLogDir;
   // The number of ms that we wait before moving znodes, HBASE-3596
   private final long sleepBeforeFailover;
+  // Homemade executer service for replication
+  private final ThreadPoolExecutor executor;
 
   /**
    * Creates a replication manager and sets the watch on all the other
@@ -116,6 +123,17 @@ public class ReplicationSourceManager {
         new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
     this.zkHelper.listPeersIdsAndWatch();
     this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
+    // It's preferable to failover 1 RS at a time, but with good zk servers
+    // more could be processed at the same time.
+    int nbWorkers = conf.getInt("replication.executor.workers", 1);
+    // use a short 100ms sleep since this could be done inline with a RS startup
+    // even if we fail, other region servers can take care of it
+    this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
+        100, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>());
+    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+    tfb.setNameFormat("ReplicationExecutor-%d");
+    this.executor.setThreadFactory(tfb.build());
   }
 
   /**
@@ -199,6 +217,7 @@ public class ReplicationSourceManager {
    * Terminate the replication on this region server
    */
   public void join() {
+    this.executor.shutdown();
     if (this.sources.size() == 0) {
       this.zkHelper.deleteOwnRSZNode();
     }
@@ -298,50 +317,12 @@ public class ReplicationSourceManager {
    * @param rsZnode
    */
   public void transferQueues(String rsZnode) {
-    // Wait a bit before transferring the queues, we may be shutting down.
-    // This sleep may not be enough in some cases.
+    NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode);
     try {
-      Thread.sleep(this.sleepBeforeFailover);
-    } catch (InterruptedException e) {
-      LOG.warn("Interrupted while waiting before transferring a queue.");
-      Thread.currentThread().interrupt();
-    }
-    // We try to lock that rs' queue directory
-    if (this.stopper.isStopped()) {
-      LOG.info("Not transferring queue since we are shutting down");
-      return;
-    }
-    if (!this.zkHelper.lockOtherRS(rsZnode)) {
-      return;
-    }
-    LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
-    SortedMap<String, SortedSet<String>> newQueues =
-        this.zkHelper.copyQueuesFromRS(rsZnode);
-    this.zkHelper.deleteRsQueues(rsZnode);
-    if (newQueues == null || newQueues.size() == 0) {
-      return;
-    }
-
-    for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
-      String peerId = entry.getKey();
-      try {
-        ReplicationSourceInterface src = getReplicationSource(this.conf,
-            this.fs, this, this.stopper, this.replicating, peerId);
-        if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
-          src.terminate("Recovered queue doesn't belong to any current peer");
-          break;
-        }
-        this.oldsources.add(src);
-        for (String hlog : entry.getValue()) {
-          src.enqueueLog(new Path(this.oldLogDir, hlog));
-        }
-        // TODO set it to what's in ZK
-        src.setSourceEnabled(true);
-        src.startup();
-      } catch (IOException e) {
-        // TODO manage it
-        LOG.error("Failed creating a source", e);
-      }
+      this.executor.execute(transfer);
+    } catch (RejectedExecutionException ex) {
+      LOG.info("Cancelling the transfer of " + rsZnode +
+          " because of " + ex.getMessage());
     }
   }
 
@@ -525,6 +506,73 @@ public class ReplicationSourceManager {
   }
 
   /**
+   * Class responsible to setup new ReplicationSources to take care of the
+   * queues from dead region servers.
+   */
+  class NodeFailoverWorker extends Thread {
+
+    private String rsZnode;
+
+    /**
+     *
+     * @param rsZnode
+     */
+    public NodeFailoverWorker(String rsZnode) {
+      super("Failover-for-"+rsZnode);
+      this.rsZnode = rsZnode;
+    }
+
+    @Override
+    public void run() {
+      // Wait a bit before transferring the queues, we may be shutting down.
+      // This sleep may not be enough in some cases.
+      try {
+        Thread.sleep(sleepBeforeFailover);
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting before transferring a queue.");
+        Thread.currentThread().interrupt();
+      }
+      // We try to lock that rs' queue directory
+      if (stopper.isStopped()) {
+        LOG.info("Not transferring queue since we are shutting down");
+        return;
+      }
+      if (!zkHelper.lockOtherRS(rsZnode)) {
+        return;
+      }
+      LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
+      SortedMap<String, SortedSet<String>> newQueues =
+          zkHelper.copyQueuesFromRS(rsZnode);
+      zkHelper.deleteRsQueues(rsZnode);
+      if (newQueues == null || newQueues.size() == 0) {
+        return;
+      }
+
+      for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
+        String peerId = entry.getKey();
+        try {
+          ReplicationSourceInterface src = getReplicationSource(conf,
+              fs, ReplicationSourceManager.this, stopper, replicating, peerId);
+          if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
+            src.terminate("Recovered queue doesn't belong to any current peer");
+            break;
+          }
+          oldsources.add(src);
+          for (String hlog : entry.getValue()) {
+            src.enqueueLog(new Path(oldLogDir, hlog));
+          }
+          // TODO set it to what's in ZK
+          src.setSourceEnabled(true);
+          src.startup();
+        } catch (IOException e) {
+          // TODO manage it
+          LOG.error("Failed creating a source", e);
+        }
+      }
+    }
+  }
+
+  /**
    * Get the directory where hlogs are archived
    * @return the directory where hlogs are archived
    */