You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2012/10/25 19:32:08 UTC

svn commit: r1402226 - in /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase: HRegionInfo.java master/AssignmentManager.java master/OfflineCallback.java

Author: jxiang
Date: Thu Oct 25 17:32:08 2012
New Revision: 1402226

URL: http://svn.apache.org/viewvc?rev=1402226&view=rev
Log:
HBASE-6977 Multithread processing ZK assignment events

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/OfflineCallback.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java?rev=1402226&r1=1402225&r2=1402226&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java Thu Oct 25 17:32:08 2012
@@ -34,7 +34,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1402226&r1=1402225&r2=1402226&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Thu Oct 25 17:32:08 2012
@@ -32,7 +32,8 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -76,7 +77,6 @@ import org.apache.hadoop.hbase.zookeeper
 import org.apache.hadoop.hbase.zookeeper.ZKTable;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
@@ -98,7 +98,7 @@ public class AssignmentManager extends Z
   public static final ServerName HBCK_CODE_SERVERNAME = new ServerName(HConstants.HBCK_CODE_NAME,
       -1, -1L);
 
-  protected Server server;
+  protected final Server server;
 
   private ServerManager serverManager;
 
@@ -145,6 +145,9 @@ public class AssignmentManager extends Z
   //Thread pool executor service for timeout monitor
   private java.util.concurrent.ExecutorService threadPoolExecutorService;
 
+  // A bunch of ZK events workers. Each is a single thread executor service
+  private java.util.concurrent.ExecutorService[] zkEventWorkers;
+
   private List<EventType> ignoreStatesRSOffline = Arrays.asList(new EventType[]{
       EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED });
 
@@ -161,18 +164,6 @@ public class AssignmentManager extends Z
    */
   final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
 
-  // A temp ZK watcher for bulk assigner to avoid deadlock,
-  // will be removed in HBASE-6977
-  //
-  // A separate ZK watcher used for async ZK node offline.
-  // We can't use that exiting one because it could lead to
-  // deadlocks if its event thread asks for a locker held by a bulk
-  // assigner thread. This watcher is just for async ZK node offline.
-  // In HBASE-6977, we are going to process assignment ZK events
-  // outside of ZK event thread, so there won't be deadlock
-  // threat anymore.  That's when this watcher to be removed.
-  private final ZooKeeperWatcher asyncOfflineZKWatcher;
-
   /**
    * Constructs a new assignment manager.
    *
@@ -206,13 +197,20 @@ public class AssignmentManager extends Z
     this.maximumAttempts =
       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
     this.balancer = balancer;
-    this.threadPoolExecutorService = Executors.newCachedThreadPool();
+    int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
+    this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
+      maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("hbase-am"));
     this.masterMetrics = metrics;// can be null only with tests.
     this.regionStates = new RegionStates(server, serverManager);
-    // A temp ZK watcher for bulk assigner to avoid deadlock,
-    // will be removed in HBASE-6977
-    asyncOfflineZKWatcher = new ZooKeeperWatcher(conf,
-      "async offline ZK watcher", server);
+
+    int workers = conf.getInt("hbase.assignment.zkevent.workers", 5);
+    zkEventWorkers = new java.util.concurrent.ExecutorService[workers];
+    ThreadFactory threadFactory =
+      Threads.newDaemonThreadFactory("am-zkevent-worker");
+    for (int i = 0; i < workers; i++) {
+      zkEventWorkers[i] = Threads.getBoundedCachedThreadPool(
+        1, 60L, TimeUnit.SECONDS, threadFactory);
+    }
   }
 
   void startTimeOutMonitor() {
@@ -923,56 +921,47 @@ public class AssignmentManager extends Z
     handleAssignmentEvent(path);
   }
 
-  private void handleAssignmentEvent(final String path) {
-    if (!path.startsWith(watcher.assignmentZNode)) return;
-    try {
-      Stat stat = new Stat();
-      byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
-      if (data == null) return;
-      RegionTransition rt = RegionTransition.parseFrom(data);
-      handleRegion(rt, stat.getVersion());
-    } catch (KeeperException e) {
-      server.abort("Unexpected ZK exception reading unassigned node data", e);
-    } catch (DeserializationException e) {
-      server.abort("Unexpected exception deserializing node data", e);
-    }
-  }
-
   @Override
   public void nodeDeleted(final String path) {
-    if (path.startsWith(this.watcher.assignmentZNode)) {
-      String regionName = ZKAssign.getRegionName(this.watcher, path);
-      Lock lock = locker.acquireLock(regionName);
-      try {
-        RegionState rs = regionStates.getRegionTransitionState(regionName);
-        if (rs != null) {
-          HRegionInfo regionInfo = rs.getRegion();
-          if (rs.isSplit()) {
-            LOG.debug("Ephemeral node deleted, regionserver crashed?, " +
-              "clearing from RIT; rs=" + rs);
-            regionOffline(rs.getRegion());
-          } else {
-            LOG.debug("The znode of region " + regionInfo.getRegionNameAsString()
-                + " has been deleted.");
-            if (rs.isOpened()) {
-              ServerName serverName = rs.getServerName();
-              regionOnline(regionInfo, serverName);
-              LOG.info("The master has opened the region "
-                + regionInfo.getRegionNameAsString() + " that was online on "
-                + serverName);
-              if (this.getZKTable().isDisablingOrDisabledTable(
-                  regionInfo.getTableNameAsString())) {
-                LOG.debug("Opened region "
-                    + regionInfo.getRegionNameAsString() + " but "
-                    + "this table is disabled, triggering close of region");
-                unassign(regionInfo);
+    if (path.startsWith(watcher.assignmentZNode)) {
+      int wi = Math.abs(path.hashCode() % zkEventWorkers.length);
+      zkEventWorkers[wi].submit(new Runnable() {
+        @Override
+        public void run() {
+          String regionName = ZKAssign.getRegionName(watcher, path);
+          Lock lock = locker.acquireLock(regionName);
+          try {
+            RegionState rs = regionStates.getRegionTransitionState(regionName);
+            if (rs == null) return;
+
+            HRegionInfo regionInfo = rs.getRegion();
+            if (rs.isSplit()) {
+              LOG.debug("Ephemeral node deleted, regionserver crashed?, " +
+                "clearing from RIT; rs=" + rs);
+              regionOffline(rs.getRegion());
+            } else {
+              LOG.debug("The znode of region " + regionInfo.getRegionNameAsString()
+                  + " has been deleted.");
+              if (rs.isOpened()) {
+                ServerName serverName = rs.getServerName();
+                regionOnline(regionInfo, serverName);
+                LOG.info("The master has opened the region "
+                  + regionInfo.getRegionNameAsString() + " that was online on "
+                  + serverName);
+                if (getZKTable().isDisablingOrDisabledTable(
+                    regionInfo.getTableNameAsString())) {
+                  LOG.debug("Opened region "
+                      + regionInfo.getRegionNameAsString() + " but "
+                      + "this table is disabled, triggering close of region");
+                  unassign(regionInfo);
+                }
               }
             }
+          } finally {
+            lock.unlock();
           }
         }
-      } finally {
-        lock.unlock();
-      }
+      });
     }
   }
 
@@ -990,14 +979,32 @@ public class AssignmentManager extends Z
    */
   @Override
   public void nodeChildrenChanged(String path) {
-    if(path.equals(watcher.assignmentZNode)) {
-      try {
-        // Just make sure we see the changes for the new znodes
-        ZKUtil.listChildrenAndWatchThem(watcher,
-            watcher.assignmentZNode);
-      } catch(KeeperException e) {
-        server.abort("Unexpected ZK exception reading unassigned children", e);
-      }
+    if (path.equals(watcher.assignmentZNode)) {
+      int wi = Math.abs(path.hashCode() % zkEventWorkers.length);
+      zkEventWorkers[wi].submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            // Just make sure we see the changes for the new znodes
+            List<String> children =
+              ZKUtil.listChildrenAndWatchForNewChildren(
+                watcher, watcher.assignmentZNode);
+            if (children != null) {
+              for (String child : children) {
+                // if region is in transition, we already have a watch
+                // on it, so no need to watch it again. So, as I know for now,
+                // this is needed to watch splitting nodes only.
+                if (!regionStates.isRegionInTransition(child)) {
+                  ZKUtil.watchAndCheckExists(watcher,
+                    ZKUtil.joinZNode(watcher.assignmentZNode, child));
+                }
+              }
+            }
+          } catch(KeeperException e) {
+            server.abort("Unexpected ZK exception reading unassigned children", e);
+          }
+        }
+      });
     }
   }
 
@@ -1024,6 +1031,37 @@ public class AssignmentManager extends Z
   }
 
   /**
+   * Pass the assignment event to a worker for processing.
+   * Each worker is a single thread executor service.  The reason
+   * for just one thread is to make sure all events for a given
+   * region are processed in order.
+   *
+   * @param path
+   */
+  private void handleAssignmentEvent(final String path) {
+    if (path.startsWith(watcher.assignmentZNode)) {
+      int wi = Math.abs(path.hashCode() % zkEventWorkers.length);
+      zkEventWorkers[wi].submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            Stat stat = new Stat();
+            byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
+            if (data == null) return;
+
+            RegionTransition rt = RegionTransition.parseFrom(data);
+            handleRegion(rt, stat.getVersion());
+          } catch (KeeperException e) {
+            server.abort("Unexpected ZK exception reading unassigned node data", e);
+          } catch (DeserializationException e) {
+            server.abort("Unexpected exception deserializing node data", e);
+          }
+        }
+      });
+    }
+  }
+
+  /**
    * Add the server to the set serversInUpdatingTimer, then {@link TimerUpdater}
    * will update timers for this server in background
    * @param sn
@@ -1175,14 +1213,13 @@ public class AssignmentManager extends Z
       AtomicInteger counter = new AtomicInteger(0);
       Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
       OfflineCallback cb = new OfflineCallback(
-        regionStates, asyncOfflineZKWatcher, destination, counter, offlineNodesVersions);
+        watcher, destination, counter, offlineNodesVersions);
       Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
       List<RegionState> states = new ArrayList<RegionState>(regions.size());
       for (HRegionInfo region : regions) {
         String encodedRegionName = region.getEncodedName();
         RegionState state = forceRegionStateToOffline(region, true);
-        if (state != null && asyncSetOfflineInZooKeeper(
-            state, asyncOfflineZKWatcher, cb, destination)) {
+        if (state != null && asyncSetOfflineInZooKeeper(state, cb, destination)) {
           RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
           plans.put(encodedRegionName, plan);
           states.add(state);
@@ -1228,12 +1265,6 @@ public class AssignmentManager extends Z
           Lock lock = locks.remove(encodedRegionName);
           lock.unlock();
         } else {
-          try { // Set the ZK watcher explicitly
-            ZKAssign.getData(this.watcher, encodedRegionName);
-          } catch (KeeperException e) {
-            server.abort("Unexpected exception watching ZKAssign node", e);
-            return false;
-          }
           regionStates.updateRegionState(region,
             RegionState.State.PENDING_OPEN, destination);
           regionOpenInfos.add(new Pair<HRegionInfo, Integer>(
@@ -2640,8 +2671,9 @@ public class AssignmentManager extends Z
    * Shutdown the threadpool executor service
    */
   public void shutdown() {
-    if (null != threadPoolExecutorService) {
-      this.threadPoolExecutorService.shutdown();
+    threadPoolExecutorService.shutdownNow();
+    for (int i = 0, n = zkEventWorkers.length; i < n; i++) {
+      zkEventWorkers[i].shutdownNow();
     }
   }
 
@@ -2664,8 +2696,7 @@ public class AssignmentManager extends Z
    * updating zk).
    */
   private boolean asyncSetOfflineInZooKeeper(final RegionState state,
-      final ZooKeeperWatcher zkw, final AsyncCallback.StringCallback cb,
-      final ServerName destination) {
+      final AsyncCallback.StringCallback cb, final ServerName destination) {
     if (!state.isClosed() && !state.isOffline()) {
       this.server.abort("Unexpected state trying to OFFLINE; " + state,
         new IllegalStateException());
@@ -2674,7 +2705,7 @@ public class AssignmentManager extends Z
     regionStates.updateRegionState(
       state.getRegion(), RegionState.State.OFFLINE);
     try {
-      ZKAssign.asyncCreateNodeOffline(zkw, state.getRegion(),
+      ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
         destination, cb, state);
     } catch (KeeperException e) {
       if (e instanceof NodeExistsException) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/OfflineCallback.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/OfflineCallback.java?rev=1402226&r1=1402225&r2=1402226&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/OfflineCallback.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/OfflineCallback.java Thu Oct 25 17:32:08 2012
@@ -41,13 +41,15 @@ public class OfflineCallback implements 
   private final ExistCallback callBack;
   private final ZooKeeperWatcher zkw;
   private final ServerName destination;
+  private final AtomicInteger counter;
 
-  OfflineCallback(final RegionStates regionStates,
-      final ZooKeeperWatcher zkw, final ServerName destination,
-      final AtomicInteger counter, final Map<String, Integer> offlineNodesVersions) {
+  OfflineCallback(final ZooKeeperWatcher zkw,
+      final ServerName destination, final AtomicInteger counter,
+      final Map<String, Integer> offlineNodesVersions) {
     this.callBack = new ExistCallback(
-      regionStates, counter, destination, offlineNodesVersions);
+      destination, counter, offlineNodesVersions);
     this.destination = destination;
+    this.counter = counter;
     this.zkw = zkw;
   }
 
@@ -59,13 +61,12 @@ public class OfflineCallback implements 
       // This is result code.  If non-zero, need to resubmit.
       LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
         "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
-      this.zkw.abort("Connectionloss writing unassigned at " + path +
-        ", rc=" + rc, null);
+      this.counter.addAndGet(1);
       return;
     }
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("rs=" + (RegionState)ctx
-        + ", server=" + this.destination.toString());
+      LOG.debug("rs=" + ctx + ", server=" + destination);
     }
     // Async exists to set a watcher so we'll get triggered when
     // unassigned node changes.
@@ -80,17 +81,15 @@ public class OfflineCallback implements 
   static class ExistCallback implements StatCallback {
     private final Log LOG = LogFactory.getLog(ExistCallback.class);
     private final Map<String, Integer> offlineNodesVersions;
-    private final RegionStates regionStates;
     private final AtomicInteger counter;
     private ServerName destination;
 
-    ExistCallback(final RegionStates regionStates,
-        final AtomicInteger counter, ServerName destination,
+    ExistCallback(final ServerName destination,
+        final AtomicInteger counter,
         final Map<String, Integer> offlineNodesVersions) {
       this.offlineNodesVersions = offlineNodesVersions;
-      this.regionStates = regionStates;
-      this.counter = counter;
       this.destination = destination;
+      this.counter = counter;
     }
 
     @Override
@@ -99,24 +98,16 @@ public class OfflineCallback implements 
         // This is result code.  If non-zero, need to resubmit.
         LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
           "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
+        this.counter.addAndGet(1);
         return;
       }
-      RegionState state = (RegionState)ctx;
+
       if (LOG.isDebugEnabled()) {
-        LOG.debug("rs=" + state
-          + ", server=" + this.destination.toString());
+        LOG.debug("rs=" + ctx + ", server=" + destination);
       }
-      // Transition RegionState to PENDING_OPEN here in master; means we've
-      // sent the open.  We're a little ahead of ourselves here since we've not
-      // yet sent out the actual open but putting this state change after the
-      // call to open risks our writing PENDING_OPEN after state has been moved
-      // to OPENING by the regionserver.
-      HRegionInfo region = state.getRegion();
+      HRegionInfo region = ((RegionState)ctx).getRegion();
       offlineNodesVersions.put(
         region.getEncodedName(), Integer.valueOf(stat.getVersion()));
-      regionStates.updateRegionState(region,
-        RegionState.State.PENDING_OPEN, destination);
-
       this.counter.addAndGet(1);
     }
   }