You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2012/10/25 19:32:08 UTC
svn commit: r1402226 - in
/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase:
HRegionInfo.java master/AssignmentManager.java master/OfflineCallback.java
Author: jxiang
Date: Thu Oct 25 17:32:08 2012
New Revision: 1402226
URL: http://svn.apache.org/viewvc?rev=1402226&view=rev
Log:
HBASE-6977 Multithread processing ZK assignment events
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/OfflineCallback.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java?rev=1402226&r1=1402225&r2=1402226&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java Thu Oct 25 17:32:08 2012
@@ -34,7 +34,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1402226&r1=1402225&r2=1402226&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Thu Oct 25 17:32:08 2012
@@ -32,7 +32,8 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
@@ -76,7 +77,6 @@ import org.apache.hadoop.hbase.zookeeper
import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
@@ -98,7 +98,7 @@ public class AssignmentManager extends Z
public static final ServerName HBCK_CODE_SERVERNAME = new ServerName(HConstants.HBCK_CODE_NAME,
-1, -1L);
- protected Server server;
+ protected final Server server;
private ServerManager serverManager;
@@ -145,6 +145,9 @@ public class AssignmentManager extends Z
//Thread pool executor service for timeout monitor
private java.util.concurrent.ExecutorService threadPoolExecutorService;
+ // A bunch of ZK events workers. Each is a single thread executor service
+ private java.util.concurrent.ExecutorService[] zkEventWorkers;
+
private List<EventType> ignoreStatesRSOffline = Arrays.asList(new EventType[]{
EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED });
@@ -161,18 +164,6 @@ public class AssignmentManager extends Z
*/
final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
- // A temp ZK watcher for bulk assigner to avoid deadlock,
- // will be removed in HBASE-6977
- //
- // A separate ZK watcher used for async ZK node offline.
- // We can't use that exiting one because it could lead to
- // deadlocks if its event thread asks for a locker held by a bulk
- // assigner thread. This watcher is just for async ZK node offline.
- // In HBASE-6977, we are going to process assignment ZK events
- // outside of ZK event thread, so there won't be deadlock
- // threat anymore. That's when this watcher to be removed.
- private final ZooKeeperWatcher asyncOfflineZKWatcher;
-
/**
* Constructs a new assignment manager.
*
@@ -206,13 +197,20 @@ public class AssignmentManager extends Z
this.maximumAttempts =
this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
this.balancer = balancer;
- this.threadPoolExecutorService = Executors.newCachedThreadPool();
+ int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
+ this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
+ maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("hbase-am"));
this.masterMetrics = metrics;// can be null only with tests.
this.regionStates = new RegionStates(server, serverManager);
- // A temp ZK watcher for bulk assigner to avoid deadlock,
- // will be removed in HBASE-6977
- asyncOfflineZKWatcher = new ZooKeeperWatcher(conf,
- "async offline ZK watcher", server);
+
+ int workers = conf.getInt("hbase.assignment.zkevent.workers", 5);
+ zkEventWorkers = new java.util.concurrent.ExecutorService[workers];
+ ThreadFactory threadFactory =
+ Threads.newDaemonThreadFactory("am-zkevent-worker");
+ for (int i = 0; i < workers; i++) {
+ zkEventWorkers[i] = Threads.getBoundedCachedThreadPool(
+ 1, 60L, TimeUnit.SECONDS, threadFactory);
+ }
}
void startTimeOutMonitor() {
@@ -923,56 +921,47 @@ public class AssignmentManager extends Z
handleAssignmentEvent(path);
}
- private void handleAssignmentEvent(final String path) {
- if (!path.startsWith(watcher.assignmentZNode)) return;
- try {
- Stat stat = new Stat();
- byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
- if (data == null) return;
- RegionTransition rt = RegionTransition.parseFrom(data);
- handleRegion(rt, stat.getVersion());
- } catch (KeeperException e) {
- server.abort("Unexpected ZK exception reading unassigned node data", e);
- } catch (DeserializationException e) {
- server.abort("Unexpected exception deserializing node data", e);
- }
- }
-
@Override
public void nodeDeleted(final String path) {
- if (path.startsWith(this.watcher.assignmentZNode)) {
- String regionName = ZKAssign.getRegionName(this.watcher, path);
- Lock lock = locker.acquireLock(regionName);
- try {
- RegionState rs = regionStates.getRegionTransitionState(regionName);
- if (rs != null) {
- HRegionInfo regionInfo = rs.getRegion();
- if (rs.isSplit()) {
- LOG.debug("Ephemeral node deleted, regionserver crashed?, " +
- "clearing from RIT; rs=" + rs);
- regionOffline(rs.getRegion());
- } else {
- LOG.debug("The znode of region " + regionInfo.getRegionNameAsString()
- + " has been deleted.");
- if (rs.isOpened()) {
- ServerName serverName = rs.getServerName();
- regionOnline(regionInfo, serverName);
- LOG.info("The master has opened the region "
- + regionInfo.getRegionNameAsString() + " that was online on "
- + serverName);
- if (this.getZKTable().isDisablingOrDisabledTable(
- regionInfo.getTableNameAsString())) {
- LOG.debug("Opened region "
- + regionInfo.getRegionNameAsString() + " but "
- + "this table is disabled, triggering close of region");
- unassign(regionInfo);
+ if (path.startsWith(watcher.assignmentZNode)) {
+ int wi = Math.abs(path.hashCode() % zkEventWorkers.length);
+ zkEventWorkers[wi].submit(new Runnable() {
+ @Override
+ public void run() {
+ String regionName = ZKAssign.getRegionName(watcher, path);
+ Lock lock = locker.acquireLock(regionName);
+ try {
+ RegionState rs = regionStates.getRegionTransitionState(regionName);
+ if (rs == null) return;
+
+ HRegionInfo regionInfo = rs.getRegion();
+ if (rs.isSplit()) {
+ LOG.debug("Ephemeral node deleted, regionserver crashed?, " +
+ "clearing from RIT; rs=" + rs);
+ regionOffline(rs.getRegion());
+ } else {
+ LOG.debug("The znode of region " + regionInfo.getRegionNameAsString()
+ + " has been deleted.");
+ if (rs.isOpened()) {
+ ServerName serverName = rs.getServerName();
+ regionOnline(regionInfo, serverName);
+ LOG.info("The master has opened the region "
+ + regionInfo.getRegionNameAsString() + " that was online on "
+ + serverName);
+ if (getZKTable().isDisablingOrDisabledTable(
+ regionInfo.getTableNameAsString())) {
+ LOG.debug("Opened region "
+ + regionInfo.getRegionNameAsString() + " but "
+ + "this table is disabled, triggering close of region");
+ unassign(regionInfo);
+ }
}
}
+ } finally {
+ lock.unlock();
}
}
- } finally {
- lock.unlock();
- }
+ });
}
}
@@ -990,14 +979,32 @@ public class AssignmentManager extends Z
*/
@Override
public void nodeChildrenChanged(String path) {
- if(path.equals(watcher.assignmentZNode)) {
- try {
- // Just make sure we see the changes for the new znodes
- ZKUtil.listChildrenAndWatchThem(watcher,
- watcher.assignmentZNode);
- } catch(KeeperException e) {
- server.abort("Unexpected ZK exception reading unassigned children", e);
- }
+ if (path.equals(watcher.assignmentZNode)) {
+ int wi = Math.abs(path.hashCode() % zkEventWorkers.length);
+ zkEventWorkers[wi].submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Just make sure we see the changes for the new znodes
+ List<String> children =
+ ZKUtil.listChildrenAndWatchForNewChildren(
+ watcher, watcher.assignmentZNode);
+ if (children != null) {
+ for (String child : children) {
+ // if region is in transition, we already have a watch
+ // on it, so no need to watch it again. So, as I know for now,
+ // this is needed to watch splitting nodes only.
+ if (!regionStates.isRegionInTransition(child)) {
+ ZKUtil.watchAndCheckExists(watcher,
+ ZKUtil.joinZNode(watcher.assignmentZNode, child));
+ }
+ }
+ }
+ } catch(KeeperException e) {
+ server.abort("Unexpected ZK exception reading unassigned children", e);
+ }
+ }
+ });
}
}
@@ -1024,6 +1031,37 @@ public class AssignmentManager extends Z
}
/**
+ * Pass the assignment event to a worker for processing.
+ * Each worker is a single thread executor service. The reason
+ * for just one thread is to make sure all events for a given
+ * region are processed in order.
+ *
+ * @param path
+ */
+ private void handleAssignmentEvent(final String path) {
+ if (path.startsWith(watcher.assignmentZNode)) {
+ int wi = Math.abs(path.hashCode() % zkEventWorkers.length);
+ zkEventWorkers[wi].submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Stat stat = new Stat();
+ byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
+ if (data == null) return;
+
+ RegionTransition rt = RegionTransition.parseFrom(data);
+ handleRegion(rt, stat.getVersion());
+ } catch (KeeperException e) {
+ server.abort("Unexpected ZK exception reading unassigned node data", e);
+ } catch (DeserializationException e) {
+ server.abort("Unexpected exception deserializing node data", e);
+ }
+ }
+ });
+ }
+ }
+
+ /**
* Add the server to the set serversInUpdatingTimer, then {@link TimerUpdater}
* will update timers for this server in background
* @param sn
@@ -1175,14 +1213,13 @@ public class AssignmentManager extends Z
AtomicInteger counter = new AtomicInteger(0);
Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
OfflineCallback cb = new OfflineCallback(
- regionStates, asyncOfflineZKWatcher, destination, counter, offlineNodesVersions);
+ watcher, destination, counter, offlineNodesVersions);
Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
List<RegionState> states = new ArrayList<RegionState>(regions.size());
for (HRegionInfo region : regions) {
String encodedRegionName = region.getEncodedName();
RegionState state = forceRegionStateToOffline(region, true);
- if (state != null && asyncSetOfflineInZooKeeper(
- state, asyncOfflineZKWatcher, cb, destination)) {
+ if (state != null && asyncSetOfflineInZooKeeper(state, cb, destination)) {
RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
plans.put(encodedRegionName, plan);
states.add(state);
@@ -1228,12 +1265,6 @@ public class AssignmentManager extends Z
Lock lock = locks.remove(encodedRegionName);
lock.unlock();
} else {
- try { // Set the ZK watcher explicitly
- ZKAssign.getData(this.watcher, encodedRegionName);
- } catch (KeeperException e) {
- server.abort("Unexpected exception watching ZKAssign node", e);
- return false;
- }
regionStates.updateRegionState(region,
RegionState.State.PENDING_OPEN, destination);
regionOpenInfos.add(new Pair<HRegionInfo, Integer>(
@@ -2640,8 +2671,9 @@ public class AssignmentManager extends Z
* Shutdown the threadpool executor service
*/
public void shutdown() {
- if (null != threadPoolExecutorService) {
- this.threadPoolExecutorService.shutdown();
+ threadPoolExecutorService.shutdownNow();
+ for (int i = 0, n = zkEventWorkers.length; i < n; i++) {
+ zkEventWorkers[i].shutdownNow();
}
}
@@ -2664,8 +2696,7 @@ public class AssignmentManager extends Z
* updating zk).
*/
private boolean asyncSetOfflineInZooKeeper(final RegionState state,
- final ZooKeeperWatcher zkw, final AsyncCallback.StringCallback cb,
- final ServerName destination) {
+ final AsyncCallback.StringCallback cb, final ServerName destination) {
if (!state.isClosed() && !state.isOffline()) {
this.server.abort("Unexpected state trying to OFFLINE; " + state,
new IllegalStateException());
@@ -2674,7 +2705,7 @@ public class AssignmentManager extends Z
regionStates.updateRegionState(
state.getRegion(), RegionState.State.OFFLINE);
try {
- ZKAssign.asyncCreateNodeOffline(zkw, state.getRegion(),
+ ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
destination, cb, state);
} catch (KeeperException e) {
if (e instanceof NodeExistsException) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/OfflineCallback.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/OfflineCallback.java?rev=1402226&r1=1402225&r2=1402226&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/OfflineCallback.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/OfflineCallback.java Thu Oct 25 17:32:08 2012
@@ -41,13 +41,15 @@ public class OfflineCallback implements
private final ExistCallback callBack;
private final ZooKeeperWatcher zkw;
private final ServerName destination;
+ private final AtomicInteger counter;
- OfflineCallback(final RegionStates regionStates,
- final ZooKeeperWatcher zkw, final ServerName destination,
- final AtomicInteger counter, final Map<String, Integer> offlineNodesVersions) {
+ OfflineCallback(final ZooKeeperWatcher zkw,
+ final ServerName destination, final AtomicInteger counter,
+ final Map<String, Integer> offlineNodesVersions) {
this.callBack = new ExistCallback(
- regionStates, counter, destination, offlineNodesVersions);
+ destination, counter, offlineNodesVersions);
this.destination = destination;
+ this.counter = counter;
this.zkw = zkw;
}
@@ -59,13 +61,12 @@ public class OfflineCallback implements
// This is result code. If non-zero, need to resubmit.
LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
"FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
- this.zkw.abort("Connectionloss writing unassigned at " + path +
- ", rc=" + rc, null);
+ this.counter.addAndGet(1);
return;
}
+
if (LOG.isDebugEnabled()) {
- LOG.debug("rs=" + (RegionState)ctx
- + ", server=" + this.destination.toString());
+ LOG.debug("rs=" + ctx + ", server=" + destination);
}
// Async exists to set a watcher so we'll get triggered when
// unassigned node changes.
@@ -80,17 +81,15 @@ public class OfflineCallback implements
static class ExistCallback implements StatCallback {
private final Log LOG = LogFactory.getLog(ExistCallback.class);
private final Map<String, Integer> offlineNodesVersions;
- private final RegionStates regionStates;
private final AtomicInteger counter;
private ServerName destination;
- ExistCallback(final RegionStates regionStates,
- final AtomicInteger counter, ServerName destination,
+ ExistCallback(final ServerName destination,
+ final AtomicInteger counter,
final Map<String, Integer> offlineNodesVersions) {
this.offlineNodesVersions = offlineNodesVersions;
- this.regionStates = regionStates;
- this.counter = counter;
this.destination = destination;
+ this.counter = counter;
}
@Override
@@ -99,24 +98,16 @@ public class OfflineCallback implements
// This is result code. If non-zero, need to resubmit.
LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
"FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
+ this.counter.addAndGet(1);
return;
}
- RegionState state = (RegionState)ctx;
+
if (LOG.isDebugEnabled()) {
- LOG.debug("rs=" + state
- + ", server=" + this.destination.toString());
+ LOG.debug("rs=" + ctx + ", server=" + destination);
}
- // Transition RegionState to PENDING_OPEN here in master; means we've
- // sent the open. We're a little ahead of ourselves here since we've not
- // yet sent out the actual open but putting this state change after the
- // call to open risks our writing PENDING_OPEN after state has been moved
- // to OPENING by the regionserver.
- HRegionInfo region = state.getRegion();
+ HRegionInfo region = ((RegionState)ctx).getRegion();
offlineNodesVersions.put(
region.getEncodedName(), Integer.valueOf(stat.getVersion()));
- regionStates.updateRegionState(region,
- RegionState.State.PENDING_OPEN, destination);
-
this.counter.addAndGet(1);
}
}