You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2011/03/23 23:23:54 UTC
svn commit: r1084785 - in /hbase/trunk: CHANGES.txt
src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
Author: jdcryans
Date: Wed Mar 23 22:23:54 2011
New Revision: 1084785
URL: http://svn.apache.org/viewvc?rev=1084785&view=rev
Log:
HBASE-3640 [replication] Transferring queues shouldn't be done inline with RS startup
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1084785&r1=1084784&r2=1084785&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Mar 23 22:23:54 2011
@@ -182,6 +182,7 @@ Release 0.90.2 - Unreleased
HBASE-3496 HFile CLI Improvements
HBASE-3596 [replication] Wait a few seconds before transferring queues
HBASE-3600 Update our jruby to 1.6.0
+ HBASE-3640 [replication] Transferring queues shouldn't be done inline with RS startup
Release 0.90.1 - February 9th, 2011
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1084785&r1=1084784&r2=1084785&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Wed Mar 23 22:23:54 2011
@@ -27,8 +27,13 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -78,6 +83,8 @@ public class ReplicationSourceManager {
private final Path oldLogDir;
// The number of ms that we wait before moving znodes, HBASE-3596
private final long sleepBeforeFailover;
+ // Homemade executer service for replication
+ private final ThreadPoolExecutor executor;
/**
* Creates a replication manager and sets the watch on all the other
@@ -116,6 +123,17 @@ public class ReplicationSourceManager {
new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
this.zkHelper.listPeersIdsAndWatch();
this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
+ // It's preferable to failover 1 RS at a time, but with good zk servers
+ // more could be processed at the same time.
+ int nbWorkers = conf.getInt("replication.executor.workers", 1);
+ // use a short 100ms sleep since this could be done inline with a RS startup
+ // even if we fail, other region servers can take care of it
+ this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
+ 100, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>());
+ ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+ tfb.setNameFormat("ReplicationExecutor-%d");
+ this.executor.setThreadFactory(tfb.build());
}
/**
@@ -199,6 +217,7 @@ public class ReplicationSourceManager {
* Terminate the replication on this region server
*/
public void join() {
+ this.executor.shutdown();
if (this.sources.size() == 0) {
this.zkHelper.deleteOwnRSZNode();
}
@@ -298,50 +317,12 @@ public class ReplicationSourceManager {
* @param rsZnode
*/
public void transferQueues(String rsZnode) {
- // Wait a bit before transferring the queues, we may be shutting down.
- // This sleep may not be enough in some cases.
+ NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode);
try {
- Thread.sleep(this.sleepBeforeFailover);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting before transferring a queue.");
- Thread.currentThread().interrupt();
- }
- // We try to lock that rs' queue directory
- if (this.stopper.isStopped()) {
- LOG.info("Not transferring queue since we are shutting down");
- return;
- }
- if (!this.zkHelper.lockOtherRS(rsZnode)) {
- return;
- }
- LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
- SortedMap<String, SortedSet<String>> newQueues =
- this.zkHelper.copyQueuesFromRS(rsZnode);
- this.zkHelper.deleteRsQueues(rsZnode);
- if (newQueues == null || newQueues.size() == 0) {
- return;
- }
-
- for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
- String peerId = entry.getKey();
- try {
- ReplicationSourceInterface src = getReplicationSource(this.conf,
- this.fs, this, this.stopper, this.replicating, peerId);
- if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
- src.terminate("Recovered queue doesn't belong to any current peer");
- break;
- }
- this.oldsources.add(src);
- for (String hlog : entry.getValue()) {
- src.enqueueLog(new Path(this.oldLogDir, hlog));
- }
- // TODO set it to what's in ZK
- src.setSourceEnabled(true);
- src.startup();
- } catch (IOException e) {
- // TODO manage it
- LOG.error("Failed creating a source", e);
- }
+ this.executor.execute(transfer);
+ } catch (RejectedExecutionException ex) {
+ LOG.info("Cancelling the transfer of " + rsZnode +
+ " because of " + ex.getMessage());
}
}
@@ -525,6 +506,73 @@ public class ReplicationSourceManager {
}
/**
+ * Class responsible to setup new ReplicationSources to take care of the
+ * queues from dead region servers.
+ */
+ class NodeFailoverWorker extends Thread {
+
+ private String rsZnode;
+
+ /**
+ *
+ * @param rsZnode
+ */
+ public NodeFailoverWorker(String rsZnode) {
+ super("Failover-for-"+rsZnode);
+ this.rsZnode = rsZnode;
+ }
+
+ @Override
+ public void run() {
+ // Wait a bit before transferring the queues, we may be shutting down.
+ // This sleep may not be enough in some cases.
+ try {
+ Thread.sleep(sleepBeforeFailover);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting before transferring a queue.");
+ Thread.currentThread().interrupt();
+ }
+ // We try to lock that rs' queue directory
+ if (stopper.isStopped()) {
+ LOG.info("Not transferring queue since we are shutting down");
+ return;
+ }
+ if (!zkHelper.lockOtherRS(rsZnode)) {
+ return;
+ }
+ LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
+ SortedMap<String, SortedSet<String>> newQueues =
+ zkHelper.copyQueuesFromRS(rsZnode);
+ zkHelper.deleteRsQueues(rsZnode);
+ if (newQueues == null || newQueues.size() == 0) {
+ return;
+ }
+
+ for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
+ String peerId = entry.getKey();
+ try {
+ ReplicationSourceInterface src = getReplicationSource(conf,
+ fs, ReplicationSourceManager.this, stopper, replicating, peerId);
+ if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
+ src.terminate("Recovered queue doesn't belong to any current peer");
+ break;
+ }
+ oldsources.add(src);
+ for (String hlog : entry.getValue()) {
+ src.enqueueLog(new Path(oldLogDir, hlog));
+ }
+ // TODO set it to what's in ZK
+ src.setSourceEnabled(true);
+ src.startup();
+ } catch (IOException e) {
+ // TODO manage it
+ LOG.error("Failed creating a source", e);
+ }
+ }
+ }
+ }
+
+ /**
* Get the directory where hlogs are archived
* @return the directory where hlogs are archived
*/