You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2012/09/24 22:33:20 UTC

svn commit: r1389561 [1/2] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/master/handler/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/zookeeper/ test...

Author: jxiang
Date: Mon Sep 24 20:33:19 2012
New Revision: 1389561

URL: http://svn.apache.org/viewvc?rev=1389561&view=rev
Log:
HBASE-6381 AssignmentManager should use the same logic for clean startup and failover

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java   (with props)
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Mon Sep 24 20:33:19 2012
@@ -19,12 +19,10 @@
 package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +31,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -62,13 +61,11 @@ import org.apache.hadoop.hbase.master.ha
 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
 import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
-import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
 import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
 import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
 import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.KeyLocker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
@@ -99,13 +96,13 @@ public class AssignmentManager extends Z
   public static final ServerName HBCK_CODE_SERVERNAME = new ServerName(HConstants.HBCK_CODE_NAME,
       -1, -1L);
 
-  protected Server master;
+  protected Server server;
 
   private ServerManager serverManager;
 
   private CatalogTracker catalogTracker;
 
-  private TimeoutMonitor timeoutMonitor;
+  final TimeoutMonitor timeoutMonitor;
 
   private TimerUpdater timerUpdater;
 
@@ -114,11 +111,6 @@ public class AssignmentManager extends Z
   final private KeyLocker<String> locker = new KeyLocker<String>();
 
   /**
-   * Used for assignment only.  TODO: revisit the assign lock scheme
-   */
-  final private KeyLocker<String> assignLocker = new KeyLocker<String>();
-
-  /**
    * Map of regions to reopen after the schema of a table is changed. Key -
    * encoded region name, value - HRegionInfo
    */
@@ -138,11 +130,6 @@ public class AssignmentManager extends Z
 
   private final ZKTable zkTable;
 
-  // store all the table names in disabling state
-  Set<String> disablingTables = new HashSet<String>();
-  // store all the enabling state tablenames.
-  Set<String> enablingTables = new HashSet<String>();
-
   /**
    * Contains the server which need to update timer, these servers will be
    * handled by {@link TimerUpdater}
@@ -158,62 +145,59 @@ public class AssignmentManager extends Z
   private List<EventType> ignoreStatesRSOffline = Arrays.asList(new EventType[]{
       EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED });
 
-  /**
-   * Set when we are doing master failover processing; cleared when failover
-   * completes.
-   */
-  private volatile boolean failover = false;
-
-  // Set holding all the regions which got processed while RIT was not
-  // populated during master failover.
-  private Map<String, HRegionInfo> failoverProcessedRegions =
-    new HashMap<String, HRegionInfo>();
+  // metrics instance to send metrics for RITs
+  MasterMetrics masterMetrics;
 
-   // metrics instance to send metrics for RITs
-   MasterMetrics masterMetrics;
+  private final RegionStates regionStates;
 
-   private final RegionStates regionStates;
+  /**
+   * Indicator that AssignmentManager has recovered the region states so
+   * that ServerShutdownHandler can be fully enabled and re-assign regions
+   * of dead servers. So that when re-assignment happens, AssignmentManager
+   * has proper region states.
+   */
+  final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
 
   /**
    * Constructs a new assignment manager.
    *
-   * @param master
+   * @param server
    * @param serverManager
    * @param catalogTracker
    * @param service
    * @throws KeeperException
    * @throws IOException
    */
-  public AssignmentManager(Server master, ServerManager serverManager,
+  public AssignmentManager(Server server, ServerManager serverManager,
       CatalogTracker catalogTracker, final LoadBalancer balancer,
       final ExecutorService service, MasterMetrics metrics) throws KeeperException, IOException {
-    super(master.getZooKeeper());
-    this.master = master;
+    super(server.getZooKeeper());
+    this.server = server;
     this.serverManager = serverManager;
     this.catalogTracker = catalogTracker;
     this.executorService = service;
     this.regionsToReopen = Collections.synchronizedMap
                            (new HashMap<String, HRegionInfo> ());
-    Configuration conf = master.getConfiguration();
+    Configuration conf = server.getConfiguration();
     this.timeoutMonitor = new TimeoutMonitor(
       conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000),
-      master, serverManager,
+      server, serverManager,
       conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000));
     this.timerUpdater = new TimerUpdater(conf.getInt(
-        "hbase.master.assignment.timerupdater.period", 10000), master);
+        "hbase.master.assignment.timerupdater.period", 10000), server);
     Threads.setDaemonThreadRunning(timerUpdater.getThread(),
-        master.getServerName() + ".timerUpdater");
-    this.zkTable = new ZKTable(this.master.getZooKeeper());
+        server.getServerName() + ".timerUpdater");
+    this.zkTable = new ZKTable(this.watcher);
     this.maximumAssignmentAttempts =
-      this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
+      this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
     this.balancer = balancer;
     this.threadPoolExecutorService = Executors.newCachedThreadPool();
     this.masterMetrics = metrics;// can be null only with tests.
-    this.regionStates = new RegionStates(master, serverManager);
+    this.regionStates = new RegionStates(server, serverManager);
   }
 
   void startTimeOutMonitor() {
-    Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), master.getServerName()
+    Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), server.getServerName()
         + ".timeoutMonitor");
   }
 
@@ -283,7 +267,7 @@ public class AssignmentManager extends Z
   public Pair<Integer, Integer> getReopenStatus(byte[] tableName)
   throws IOException {
     List <HRegionInfo> hris =
-      MetaReader.getTableRegions(this.master.getCatalogTracker(), tableName);
+      MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName);
     Integer pending = 0;
     for (HRegionInfo hri : hris) {
       String name = hri.getEncodedName();
@@ -295,17 +279,23 @@ public class AssignmentManager extends Z
     }
     return new Pair<Integer, Integer>(pending, hris.size());
   }
+
   /**
-   * Reset all unassigned znodes.  Called on startup of master.
-   * Call {@link #assignAllUserRegions()} after root and meta have been assigned.
-   * @throws IOException
-   * @throws KeeperException
+   * Used by ServerShutdownHandler to make sure AssignmentManager has completed
+   * the failover cleanup before re-assigning regions of dead servers. So that
+   * when re-assignment happens, AssignmentManager has proper region states.
    */
-  void cleanoutUnassigned() throws IOException, KeeperException {
-    // Cleanup any existing ZK nodes and start watching
-    ZKAssign.deleteAllNodes(watcher);
-    ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
-      this.watcher.assignmentZNode);
+  public boolean isFailoverCleanupDone() {
+    return failoverCleanupDone.get();
+  }
+
+  /**
+   * Now, failover cleanup is completed. Notify server manager to
+   * process queued up dead servers processing, if any.
+   */
+  void failoverCleanupDone() {
+    failoverCleanupDone.set(true);
+    serverManager.processQueuedDeadServers();
   }
 
   /**
@@ -327,17 +317,15 @@ public class AssignmentManager extends Z
 
     // Scan META to build list of existing regions, servers, and assignment
     // Returns servers who have not checked in (assumed dead) and their regions
-    Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers = rebuildUserRegions();
+    Map<ServerName, List<HRegionInfo>> deadServers = rebuildUserRegions();
 
     // This method will assign all user regions if a clean server startup or
-    // it will reconstitute master state and cleanup any leftovers from
+    // it will reconstruct master state and cleanup any leftovers from
     // previous master process.
     processDeadServersAndRegionsInTransition(deadServers);
 
-    // Recover the tables that were not fully moved to DISABLED state.
-    // These tables are in DISABLING state when the master restarted/switched.
-    boolean isWatcherCreated = recoverTableInDisablingState(this.disablingTables);
-    recoverTableInEnablingState(this.enablingTables, isWatcherCreated);
+    recoverTableInDisablingState();
+    recoverTableInEnablingState();
   }
 
   /**
@@ -352,58 +340,47 @@ public class AssignmentManager extends Z
    * @throws InterruptedException
    */
   void processDeadServersAndRegionsInTransition(
-      final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers)
-  throws KeeperException, IOException, InterruptedException {
-    List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
+      final Map<ServerName, List<HRegionInfo>> deadServers)
+          throws KeeperException, IOException, InterruptedException {
+    List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
       watcher.assignmentZNode);
 
     if (nodes == null) {
       String errorMessage = "Failed to get the children from ZK";
-      master.abort(errorMessage, new IOException(errorMessage));
+      server.abort(errorMessage, new IOException(errorMessage));
       return;
     }
 
-    // Run through all regions.  If they are not assigned and not in RIT, then
-    // its a clean cluster startup, else its a failover.
-    Map<HRegionInfo, ServerName> regions = regionStates.getRegionAssignments();
-    for (Map.Entry<HRegionInfo, ServerName> e: regions.entrySet()) {
-      if (!e.getKey().isMetaTable() && e.getValue() != null) {
-        LOG.debug("Found " + e + " out on cluster");
-        this.failover = true;
-        break;
-      }
-      if (nodes.contains(e.getKey().getEncodedName())) {
-        LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs");
-        // Could be a meta region.
-        this.failover = true;
-        break;
-      }
-    }
+    boolean failover = !serverManager.getDeadServers().isEmpty();
 
-    // Remove regions in RIT, they are possibly being processed by
-    // ServerShutdownHandler.
-    // no lock concurrent access ok: some threads may be adding/removing items but its java-valid
-    nodes.removeAll(regionStates.getRegionsInTransition().keySet());
-
-    // If some dead servers are processed by ServerShutdownHandler, we shouldn't
-    // assign all user regions( some would be assigned by
-    // ServerShutdownHandler), consider it as a failover
-    if (!this.serverManager.getDeadServers().isEmpty()) {
-      this.failover = true;
+    if (!failover) {
+      // Run through all regions.  If they are not assigned and not in RIT, then
+      // its a clean cluster startup, else its a failover.
+      Map<HRegionInfo, ServerName> regions = regionStates.getRegionAssignments();
+      for (Map.Entry<HRegionInfo, ServerName> e: regions.entrySet()) {
+        if (!e.getKey().isMetaTable() && e.getValue() != null) {
+          LOG.debug("Found " + e + " out on cluster");
+          failover = true;
+          break;
+        }
+        if (nodes.contains(e.getKey().getEncodedName())) {
+          LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs");
+          // Could be a meta region.
+          failover = true;
+          break;
+        }
+      }
     }
 
     // If we found user regions out on cluster, its a failover.
-    if (this.failover) {
+    if (failover) {
       LOG.info("Found regions out on cluster or in RIT; failover");
       // Process list of dead servers and regions in RIT.
       // See HBASE-4580 for more information.
       processDeadServersAndRecoverLostRegions(deadServers, nodes);
-      this.failover = false;
-      failoverProcessedRegions.clear();
     } else {
       // Fresh cluster startup.
       LOG.info("Clean cluster startup. Assigning userregions");
-      cleanoutUnassigned();
       assignAllUserRegions();
     }
   }
@@ -425,7 +402,7 @@ public class AssignmentManager extends Z
       processRegionInTransition(hri.getEncodedName(), hri, null);
     if (!intransistion) return intransistion;
     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(hri.getEncodedName()));
-    while (!this.master.isStopped() &&
+    while (!this.server.isStopped() &&
       this.regionStates.isRegionInTransition(hri.getEncodedName())) {
       // We put a timeout because we may have the region getting in just between the test
       //  and the waitForUpdate
@@ -445,139 +422,143 @@ public class AssignmentManager extends Z
    * @throws IOException
    */
   boolean processRegionInTransition(final String encodedRegionName,
-      final HRegionInfo regionInfo, final Map<ServerName,List<Pair<HRegionInfo,Result>>> deadServers)
-  throws KeeperException, IOException {
-    Stat stat = new Stat();
-    byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
-    if (data == null) return false;
-    RegionTransition rt;
+      final HRegionInfo regionInfo, final Map<ServerName, List<HRegionInfo>> deadServers)
+          throws KeeperException, IOException {
+    // We need a lock here to ensure that we will not put the same region twice
+    // It has no reason to be a lock shared with the other operations.
+    // We can do the lock on the region only, instead of a global lock: what we want to ensure
+    // is that we don't have two threads working on the same region.
+    Lock lock = locker.acquireLock(encodedRegionName);
     try {
-      rt = RegionTransition.parseFrom(data);
-    } catch (DeserializationException e) {
-      LOG.warn("Failed parse znode data", e);
-      return false;
-    }
-    HRegionInfo hri = regionInfo;
-    if (hri == null) {
-      if ((hri = getHRegionInfo(rt.getRegionName())) == null) return false;
+      Stat stat = new Stat();
+      byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
+      if (data == null) return false;
+      RegionTransition rt;
+      try {
+        rt = RegionTransition.parseFrom(data);
+      } catch (DeserializationException e) {
+        LOG.warn("Failed parse znode data", e);
+        return false;
+      }
+      HRegionInfo hri = regionInfo;
+      if (hri == null) {
+        hri = regionStates.getRegionInfo(rt.getRegionName());
+        if (hri == null) return false;
+      }
+      processRegionsInTransition(rt, hri, deadServers, stat.getVersion());
+      return true;
+    } finally {
+      lock.unlock();
     }
-    processRegionsInTransition(rt, hri, deadServers, stat.getVersion());
-    return true;
   }
 
+  /**
+   * This call is invoked only during failover mode, zk assignment node processing.
+   * The locker is set in the caller.
+   *
+   * It should be private but it is used by some test too.
+   */
   void processRegionsInTransition(final RegionTransition rt, final HRegionInfo regionInfo,
-      final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers, int expectedVersion)
-  throws KeeperException {
+      final Map<ServerName, List<HRegionInfo>> deadServers, int expectedVersion)
+          throws KeeperException {
     EventType et = rt.getEventType();
     // Get ServerName.  Could be null.
     ServerName sn = rt.getServerName();
     String encodedRegionName = regionInfo.getEncodedName();
     LOG.info("Processing region " + regionInfo.getRegionNameAsString() + " in state " + et);
 
-    // We need a lock here to ensure that we will not put the same region twice
-    // It has no reason to be a lock shared with the other operations.
-    // We can do the lock on the region only, instead of a global lock: what we want to ensure
-    // is that we don't have two threads working on the same region.
-    Lock lock = locker.acquireLock(encodedRegionName);
-    try {
-      if (regionStates.isRegionInTransition(encodedRegionName)
-          || failoverProcessedRegions.containsKey(encodedRegionName)) {
-        // Just return
-        return;
+    if (regionStates.isRegionInTransition(encodedRegionName)) {
+      // Just return
+      return;
+    }
+    switch (et) {
+    case M_ZK_REGION_CLOSING:
+      // If zk node of the region was updated by a live server skip this
+      // region and just add it into RIT.
+      if (isOnDeadServer(regionInfo, deadServers)
+          && !serverManager.isServerOnline(sn)) {
+        // If was on dead server, its closed now. Force to OFFLINE and this
+        // will get it reassigned if appropriate
+        forceOffline(regionInfo, rt);
+      } else {
+        // Just insert region into RIT.
+        // If this never updates the timeout will trigger new assignment
+        regionStates.updateRegionState(rt, RegionState.State.CLOSING);
       }
-      switch (et) {
-      case M_ZK_REGION_CLOSING:
-        // If zk node of the region was updated by a live server skip this
-        // region and just add it into RIT.
-        if (isOnDeadServer(regionInfo, deadServers) && !isServerOnline(sn)) {
-          // If was on dead server, its closed now. Force to OFFLINE and this
-          // will get it reassigned if appropriate
-          forceOffline(regionInfo, rt);
-        } else {
-          // Just insert region into RIT.
-          // If this never updates the timeout will trigger new assignment
-          regionStates.updateRegionState(rt, RegionState.State.CLOSING);
-        }
-        failoverProcessedRegions.put(encodedRegionName, regionInfo);
-        break;
+      break;
 
-      case RS_ZK_REGION_CLOSED:
-      case RS_ZK_REGION_FAILED_OPEN:
-        // Region is closed, insert into RIT and handle it
-        addToRITandCallClose(regionInfo, RegionState.State.CLOSED, rt);
-        failoverProcessedRegions.put(encodedRegionName, regionInfo);
-        break;
+    case RS_ZK_REGION_CLOSED:
+    case RS_ZK_REGION_FAILED_OPEN:
+      // Region is closed, insert into RIT and handle it
+      addToRITandCallClose(regionInfo, RegionState.State.CLOSED, rt);
+      break;
 
-      case M_ZK_REGION_OFFLINE:
-        // If zk node of the region was updated by a live server skip this
-        // region and just add it into RIT.
-        if (isOnDeadServer(regionInfo, deadServers) && (sn == null || !isServerOnline(sn))) {
-          // Region is offline, insert into RIT and handle it like a closed
-          addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
-        } else if (sn != null && !isServerOnline(sn)) {
-          // to handle cases where offline node is created but sendRegionOpen
-          // RPC is not yet sent
-          addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
-        } else {
-          regionStates.updateRegionState(rt, RegionState.State.PENDING_OPEN);
-        }
-        failoverProcessedRegions.put(encodedRegionName, regionInfo);
-        break;
+    case M_ZK_REGION_OFFLINE:
+      // If zk node of the region was updated by a live server skip this
+      // region and just add it into RIT.
+      if (isOnDeadServer(regionInfo, deadServers)
+          && (sn == null || !serverManager.isServerOnline(sn))) {
+        // Region is offline, insert into RIT and handle it like a closed
+        addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
+      } else if (sn != null && !serverManager.isServerOnline(sn)) {
+        // to handle cases where offline node is created but sendRegionOpen
+        // RPC is not yet sent
+        addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
+      } else {
+        regionStates.updateRegionState(rt, RegionState.State.PENDING_OPEN);
+      }
+      break;
 
-      case RS_ZK_REGION_OPENING:
-        // TODO: Could check if it was on deadServers.  If it was, then we could
-        // do what happens in TimeoutMonitor when it sees this condition.
-        // Just insert region into RIT
-        // If this never updates the timeout will trigger new assignment
-        if (regionInfo.isMetaTable()) {
-          regionStates.updateRegionState(rt, RegionState.State.OPENING);
-          // If ROOT or .META. table is waiting for timeout monitor to assign
-          // it may take lot of time when the assignment.timeout.period is
-          // the default value which may be very long.  We will not be able
-          // to serve any request during this time.
-          // So we will assign the ROOT and .META. region immediately.
-          processOpeningState(regionInfo);
-          break;
-        } else if (deadServers.keySet().contains(sn)) {
-          // if the region is found on a dead server, we can assign
-          // it to a new RS. (HBASE-5882)
-          processOpeningState(regionInfo);
-          break;
-        }
+    case RS_ZK_REGION_OPENING:
+      // TODO: Could check if it was on deadServers.  If it was, then we could
+      // do what happens in TimeoutMonitor when it sees this condition.
+      // Just insert region into RIT
+      // If this never updates the timeout will trigger new assignment
+      if (regionInfo.isMetaTable()) {
         regionStates.updateRegionState(rt, RegionState.State.OPENING);
-        failoverProcessedRegions.put(encodedRegionName, regionInfo);
-        break;
-
-      case RS_ZK_REGION_OPENED:
-        // Region is opened, insert into RIT and handle it
-        regionStates.updateRegionState(rt, RegionState.State.OPEN);
-        // sn could be null if this server is no longer online.  If
-        // that is the case, just let this RIT timeout; it'll be assigned
-        // to new server then.
-        if (sn == null) {
-          LOG.warn("Region in transition " + regionInfo.getEncodedName() +
-            " references a null server; letting RIT timeout so will be " +
-            "assigned elsewhere");
-        } else if (!serverManager.isServerOnline(sn)
-            && (isOnDeadServer(regionInfo, deadServers)
-                || regionInfo.isMetaRegion() || regionInfo.isRootRegion())) {
-          forceOffline(regionInfo, rt);
-        } else {
-          new OpenedRegionHandler(master, this, regionInfo, sn, expectedVersion).process();
-        }
-        failoverProcessedRegions.put(encodedRegionName, regionInfo);
-        break;
-      case RS_ZK_REGION_SPLITTING:
-        LOG.debug("Processed region in state : " + et);
+        // If ROOT or .META. table is waiting for timeout monitor to assign
+        // it may take lot of time when the assignment.timeout.period is
+        // the default value which may be very long.  We will not be able
+        // to serve any request during this time.
+        // So we will assign the ROOT and .META. region immediately.
+        processOpeningState(regionInfo);
         break;
-      case RS_ZK_REGION_SPLIT:
-        LOG.debug("Processed region in state : " + et);
+      } else if (deadServers != null
+          && deadServers.keySet().contains(sn)) {
+        // if the region is found on a dead server, we can assign
+        // it to a new RS. (HBASE-5882)
+        processOpeningState(regionInfo);
         break;
-      default:
-        throw new IllegalStateException("Received region in state :" + et + " is not valid");
       }
-    } finally {
-      lock.unlock();
+      regionStates.updateRegionState(rt, RegionState.State.OPENING);
+      break;
+
+    case RS_ZK_REGION_OPENED:
+      // Region is opened, insert into RIT and handle it
+      regionStates.updateRegionState(rt, RegionState.State.OPEN);
+      // sn could be null if this server is no longer online.  If
+      // that is the case, just let this RIT timeout; it'll be assigned
+      // to new server then.
+      if (sn == null) {
+        LOG.warn("Region in transition " + regionInfo.getEncodedName() +
+          " references a null server; letting RIT timeout so will be " +
+          "assigned elsewhere");
+      } else if (!serverManager.isServerOnline(sn)
+          && (isOnDeadServer(regionInfo, deadServers)
+              || regionInfo.isMetaRegion() || regionInfo.isRootRegion())) {
+        forceOffline(regionInfo, rt);
+      } else {
+        new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
+      }
+      break;
+    case RS_ZK_REGION_SPLITTING:
+      LOG.debug("Processed region in state : " + et);
+      break;
+    case RS_ZK_REGION_SPLIT:
+      LOG.debug("Processed region in state : " + et);
+      break;
+    default:
+      throw new IllegalStateException("Received region in state :" + et + " is not valid");
     }
   }
 
@@ -598,7 +579,7 @@ public class AssignmentManager extends Z
     LOG.debug("RIT " + hri.getEncodedName() + " in state=" + oldRt.getEventType() +
       " was on deadserver; forcing offline");
     ZKAssign.createOrForceNodeOffline(this.watcher, hri,
-      this.master.getServerName());
+      this.server.getServerName());
     addToRITandCallClose(hri, RegionState.State.OFFLINE, oldRt);
   }
 
@@ -612,7 +593,7 @@ public class AssignmentManager extends Z
   private void addToRITandCallClose(final HRegionInfo hri,
                                     final RegionState.State state, final RegionTransition oldData) {
     regionStates.updateRegionState(oldData, state);
-    new ClosedRegionHandler(this.master, this, hri).process();
+    new ClosedRegionHandler(this.server, this, hri).process();
   }
 
   /**
@@ -632,12 +613,11 @@ public class AssignmentManager extends Z
    * @return True if the passed regionInfo in the passed map of deadServers?
    */
   private boolean isOnDeadServer(final HRegionInfo regionInfo,
-      final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers) {
+      final Map<ServerName, List<HRegionInfo>> deadServers) {
     if (deadServers == null) return false;
-    for (Map.Entry<ServerName, List<Pair<HRegionInfo, Result>>> deadServer:
-        deadServers.entrySet()) {
-      for (Pair<HRegionInfo, Result> e: deadServer.getValue()) {
-        if (e.getFirst().equals(regionInfo)) return true;
+    for (List<HRegionInfo> deadRegions: deadServers.values()) {
+      if (deadRegions.contains(regionInfo)) {
+        return true;
       }
     }
     return false;
@@ -654,7 +634,6 @@ public class AssignmentManager extends Z
    * @param expectedVersion
    */
   private void handleRegion(final RegionTransition rt, int expectedVersion) {
-    HRegionInfo hri = null;
     if (rt == null) {
       LOG.warn("Unexpected NULL input " + rt);
       return;
@@ -675,7 +654,7 @@ public class AssignmentManager extends Z
     String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
     // Verify this is a known server
     if (!serverManager.isServerOnline(sn) &&
-      !this.master.getServerName().equals(sn)
+      !this.server.getServerName().equals(sn)
       && !ignoreStatesRSOffline.contains(rt.getEventType())) {
       LOG.warn("Attempted to handle region transition for server but " +
         "server is not online: " + prettyPrintedRegionName);
@@ -739,21 +718,14 @@ public class AssignmentManager extends Z
             break;
           }
           // Run handler to do the rest of the SPLIT handling.
-          this.executorService.submit(new SplitRegionHandler(master, this,
+          this.executorService.submit(new SplitRegionHandler(server, this,
             regionState.getRegion(), sn, daughters));
           break;
 
         case M_ZK_REGION_CLOSING:
-          hri = checkIfInFailover(regionState, encodedName, regionName);
-          if (hri != null) {
-            regionState = regionStates.updateRegionState(
-              hri, RegionState.State.CLOSING, createTime, sn);
-            failoverProcessedRegions.put(encodedName, hri);
-            break;
-          }
           // Should see CLOSING after we have asked it to CLOSE or additional
           // times after already being in state of CLOSING
-          if (regionState == null ||
+          if (regionState != null &&
               (!regionState.isPendingClose() && !regionState.isClosing())) {
             LOG.warn("Received CLOSING for region " + prettyPrintedRegionName +
               " from server " + sn + " but region was in " +
@@ -766,18 +738,8 @@ public class AssignmentManager extends Z
           break;
 
         case RS_ZK_REGION_CLOSED:
-          hri = checkIfInFailover(regionState, encodedName, regionName);
-          if (hri != null) {
-            regionState = regionStates.updateRegionState(
-              hri, RegionState.State.CLOSED, createTime, sn);
-            removeClosedRegion(regionState.getRegion());
-            new ClosedRegionHandler(master, this, regionState.getRegion())
-              .process();
-            failoverProcessedRegions.put(encodedName, hri);
-            break;
-          }
           // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
-          if (regionState == null ||
+          if (regionState != null &&
               (!regionState.isPendingClose() && !regionState.isClosing())) {
             LOG.warn("Received CLOSED for region " + prettyPrintedRegionName +
                 " from server " + sn + " but region was in " +
@@ -788,23 +750,16 @@ public class AssignmentManager extends Z
           // Handle CLOSED by assigning elsewhere or stopping if a disable
           // If we got here all is good.  Need to update RegionState -- else
           // what follows will fail because not in expected state.
-          regionStates.updateRegionState(rt, RegionState.State.CLOSED);
-          removeClosedRegion(regionState.getRegion());
-          this.executorService.submit(new ClosedRegionHandler(master,
-            this, regionState.getRegion()));
+          regionState = regionStates.updateRegionState(rt, RegionState.State.CLOSED);
+          if (regionState != null) {
+            removeClosedRegion(regionState.getRegion());
+            this.executorService.submit(new ClosedRegionHandler(server,
+              this, regionState.getRegion()));
+          }
           break;
 
         case RS_ZK_REGION_FAILED_OPEN:
-          hri = checkIfInFailover(regionState, encodedName, regionName);
-          if (hri != null) {
-            regionState = regionStates.updateRegionState(
-              hri, RegionState.State.CLOSED, createTime, sn);
-            new ClosedRegionHandler(master, this, regionState.getRegion())
-              .process();
-            failoverProcessedRegions.put(encodedName, hri);
-            break;
-          }
-          if (regionState == null ||
+          if (regionState != null &&
               (!regionState.isPendingOpen() && !regionState.isOpening())) {
             LOG.warn("Received FAILED_OPEN for region " + prettyPrintedRegionName +
                 " from server " + sn + " but region was in " +
@@ -812,25 +767,20 @@ public class AssignmentManager extends Z
             return;
           }
           // Handle this the same as if it were opened and then closed.
-          regionStates.updateRegionState(rt, RegionState.State.CLOSED);
+          regionState = regionStates.updateRegionState(rt, RegionState.State.CLOSED);
           // When there are more than one region server a new RS is selected as the
           // destination and the same is updated in the regionplan. (HBASE-5546)
-          getRegionPlan(regionState, sn, true);
-          this.executorService.submit(new ClosedRegionHandler(master,
-            this, regionState.getRegion()));
+          if (regionState != null) {
+            getRegionPlan(regionState, sn, true);
+            this.executorService.submit(new ClosedRegionHandler(server,
+              this, regionState.getRegion()));
+          }
           break;
 
         case RS_ZK_REGION_OPENING:
-          hri = checkIfInFailover(regionState, encodedName, regionName);
-          if (hri != null) {
-            regionState = regionStates.updateRegionState(
-              hri, RegionState.State.OPENING, createTime, sn);
-            failoverProcessedRegions.put(encodedName, hri);
-            break;
-          }
           // Should see OPENING after we have asked it to OPEN or additional
           // times after already being in state of OPENING
-          if (regionState == null ||
+          if (regionState != null &&
               (!regionState.isPendingOpen() && !regionState.isOpening())) {
             LOG.warn("Received OPENING for region " +
                 prettyPrintedRegionName +
@@ -844,16 +794,8 @@ public class AssignmentManager extends Z
           break;
 
         case RS_ZK_REGION_OPENED:
-          hri = checkIfInFailover(regionState, encodedName, regionName);
-          if (hri != null) {
-            regionState = regionStates.updateRegionState(
-              hri, RegionState.State.OPEN, createTime, sn);
-            new OpenedRegionHandler(master, this, regionState.getRegion(), sn, expectedVersion).process();
-            failoverProcessedRegions.put(encodedName, hri);
-            break;
-          }
           // Should see OPENED after OPENING but possible after PENDING_OPEN
-          if (regionState == null ||
+          if (regionState != null &&
               (!regionState.isPendingOpen() && !regionState.isOpening())) {
             LOG.warn("Received OPENED for region " +
                 prettyPrintedRegionName +
@@ -863,9 +805,11 @@ public class AssignmentManager extends Z
             return;
           }
           // Handle OPENED by removing from transition and deleted zk node
-          regionStates.updateRegionState(rt, RegionState.State.OPEN);
-          this.executorService.submit(
-            new OpenedRegionHandler(master, this, regionState.getRegion(), sn, expectedVersion));
+          regionState = regionStates.updateRegionState(rt, RegionState.State.OPEN);
+          if (regionState != null) {
+            this.executorService.submit(new OpenedRegionHandler(
+              server, this, regionState.getRegion(), sn, expectedVersion));
+          }
           break;
 
         default:
@@ -877,44 +821,6 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * Checks whether the callback came while RIT was not yet populated during
-   * master failover.
-   * @param regionState
-   * @param encodedName
-   * @param data
-   * @return hri
-   */
-  private HRegionInfo checkIfInFailover(RegionState regionState,
-      String encodedName, final byte [] regionName) {
-    if (regionState == null && this.failover &&
-        (failoverProcessedRegions.containsKey(encodedName) == false ||
-          failoverProcessedRegions.get(encodedName) == null)) {
-      HRegionInfo hri = this.failoverProcessedRegions.get(encodedName);
-      if (hri == null) hri = getHRegionInfo(regionName);
-      return hri;
-    }
-    return null;
-  }
-
-  /**
-   * Gets the HRegionInfo from the META table
-   * @param  regionName
-   * @return HRegionInfo hri for the region
-   */
-  private HRegionInfo getHRegionInfo(final byte [] regionName) {
-    Pair<HRegionInfo, ServerName> p = null;
-    try {
-      p = MetaReader.getRegion(catalogTracker, regionName);
-      if (p == null) return null;
-      return p.getFirst();
-    } catch (IOException e) {
-      master.abort("Aborting because error occoured while reading " +
-        Bytes.toStringBinary(regionName) + " from .META.", e);
-      return null;
-    }
-  }
-
-  /**
    * @return Returns true if this RegionState is splittable; i.e. the
    * RegionState is currently in splitting state or pending_close or
    * null (Anything else will return false). (Anything else will return false).
@@ -1032,16 +938,16 @@ public class AssignmentManager extends Z
       RegionTransition rt = RegionTransition.parseFrom(data);
       handleRegion(rt, stat.getVersion());
     } catch (KeeperException e) {
-      master.abort("Unexpected ZK exception reading unassigned node data", e);
+      server.abort("Unexpected ZK exception reading unassigned node data", e);
     } catch (DeserializationException e) {
-      master.abort("Unexpected exception deserializing node data", e);
+      server.abort("Unexpected exception deserializing node data", e);
     }
   }
 
   @Override
   public void nodeDeleted(final String path) {
     if (path.startsWith(this.watcher.assignmentZNode)) {
-      String regionName = ZKAssign.getRegionName(this.master.getZooKeeper(), path);
+      String regionName = ZKAssign.getRegionName(this.watcher, path);
       RegionState rs = regionStates.getRegionTransitionState(regionName);
       if (rs != null) {
         HRegionInfo regionInfo = rs.getRegion();
@@ -1091,7 +997,7 @@ public class AssignmentManager extends Z
         ZKUtil.listChildrenAndWatchThem(watcher,
             watcher.assignmentZNode);
       } catch(KeeperException e) {
-        master.abort("Unexpected ZK exception reading unassigned children", e);
+        server.abort("Unexpected ZK exception reading unassigned children", e);
       }
     }
   }
@@ -1105,9 +1011,9 @@ public class AssignmentManager extends Z
    * @param sn
    */
   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
-    if (!isServerOnline(sn)) {
+    if (!serverManager.isServerOnline(sn)) {
       LOG.warn("A region was opened on a dead server, ServerName=" +
-        sn.getServerName() + ", region=" + regionInfo.getEncodedName());
+        sn + ", region=" + regionInfo.getEncodedName());
     }
 
     regionStates.regionOnline(regionInfo, sn);
@@ -1190,7 +1096,7 @@ public class AssignmentManager extends Z
       LOG.debug("Tried to delete closed node for " + regionInfo + " but it " +
           "does not exist so just offlining");
     } catch (KeeperException e) {
-      this.master.abort("Error deleting CLOSED node in ZK", e);
+      this.server.abort("Error deleting CLOSED node in ZK", e);
     }
     regionOffline(regionInfo);
   }
@@ -1242,14 +1148,9 @@ public class AssignmentManager extends Z
         region.getRegionNameAsString());
       return;
     }
-    RegionState state = forceRegionStateToOffline(region,
-        hijack);
-    // TODO: we can't synchronized on state any more since it could
-    // be an new instance.  We need to reconsider how to avoid
-    // double/multiple assignments.
-    // This is to prevent double assignments? Does it work?
+    RegionState state = forceRegionStateToOffline(region, hijack);
     String encodedName = region.getEncodedName();
-    Lock lock = assignLocker.acquireLock(encodedName);
+    Lock lock = locker.acquireLock(encodedName);
     try {
       assign(region, state, setOfflineInZK, forceNewPlan, hijack);
     } finally {
@@ -1296,7 +1197,7 @@ public class AssignmentManager extends Z
     }
     // Wait until all unassigned nodes have been put up and watchers set.
     int total = regions.size();
-    for (int oldCounter = 0; true;) {
+    for (int oldCounter = 0; !server.isStopped();) {
       int count = counter.get();
       if (oldCounter != count) {
         LOG.info(destination.toString() + " unassigned znodes=" + count +
@@ -1304,16 +1205,20 @@ public class AssignmentManager extends Z
         oldCounter = count;
       }
       if (count == total) break;
-      Threads.sleep(1);
+      Threads.sleep(10);
     }
+    if (server.isStopped()) {
+      return false;
+    }
+
     // Move on to open regions.
     try {
       // Send OPEN RPC. If it fails on a IOE or RemoteException, the
       // TimeoutMonitor will pick up the pieces.
       long maxWaitTime = System.currentTimeMillis() +
-        this.master.getConfiguration().
+        this.server.getConfiguration().
           getLong("hbase.regionserver.rpc.startup.waittime", 60000);
-      while (!this.master.isStopped()) {
+      while (!this.server.isStopped()) {
         try {
           List<RegionOpeningState> regionOpeningStateList = this.serverManager
               .sendRegionOpen(destination, regions);
@@ -1343,7 +1248,7 @@ public class AssignmentManager extends Z
             if (now > maxWaitTime) throw e;
             LOG.debug("Server is not yet up; waiting up to " +
                 (maxWaitTime - now) + "ms", e);
-            Thread.sleep(1000);
+            Thread.sleep(100);
           }
 
           throw decodedException;
@@ -1362,52 +1267,6 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * Bulk assign regions to available servers if any with retry, else assign
-   * region singly.
-   *
-   * @param regions all regions to assign
-   * @param servers all available servers
-   */
-  public void assign(List<HRegionInfo> regions, List<ServerName> servers) {
-    LOG.info("Quickly assigning " + regions.size() + " region(s) across "
-        + servers.size() + " server(s)");
-    Map<ServerName, List<HRegionInfo>> bulkPlan = balancer
-        .roundRobinAssignment(regions, servers);
-    if (bulkPlan == null || bulkPlan.isEmpty()) {
-      LOG.info("Failed getting bulk plan, assigning region singly");
-      for (HRegionInfo region : regions) {
-        assign(region, true);
-      }
-      return;
-    }
-    Map<ServerName, List<HRegionInfo>> failedPlans = new HashMap<ServerName, List<HRegionInfo>>();
-    for (Map.Entry<ServerName, List<HRegionInfo>> e : bulkPlan.entrySet()) {
-      try {
-        if (!assign(e.getKey(), e.getValue())) {
-          failedPlans.put(e.getKey(), e.getValue());
-        }
-      } catch (Throwable t) {
-        LOG.warn("Failed bulking assigning " + e.getValue().size()
-            + " region(s) to " + e.getKey().getServerName()
-            + ", and continue to bulk assign others", t);
-        failedPlans.put(e.getKey(), e.getValue());
-      }
-    }
-    if (!failedPlans.isEmpty()) {
-      servers.removeAll(failedPlans.keySet());
-      List<HRegionInfo> reassigningRegions = new ArrayList<HRegionInfo>();
-      for (Map.Entry<ServerName, List<HRegionInfo>> e : failedPlans.entrySet()) {
-        LOG.info("Failed assigning " + e.getValue().size()
-            + " regions to server " + e.getKey() + ", reassigning them");
-        reassigningRegions.addAll(e.getValue());
-      }
-      for (HRegionInfo region : reassigningRegions) {
-        assign(region, true, true);
-      }
-    }
-  }
-
-  /**
    * Callback handler for create unassigned znodes used during bulk assign.
    */
   static class CreateUnassignedAsyncCallback implements AsyncCallback.StringCallback {
@@ -1428,8 +1287,10 @@ public class AssignmentManager extends Z
 
     @Override
     public void processResult(int rc, String path, Object ctx, String name) {
-      if (rc != 0) {
-        // Thisis resultcode.  If non-zero, need to resubmit.
+      if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
+        LOG.warn("Node for " + path + " already exists");
+      } else if (rc != 0) {
+        // This is result code.  If non-zero, need to resubmit.
         LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
           "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
         this.zkw.abort("Connectionloss writing unassigned at " + path +
@@ -1565,7 +1426,7 @@ public class AssignmentManager extends Z
       if (setOfflineInZK && versionOfOfflineNode == -1) {
         return;
       }
-      if (this.master.isStopped()) {
+      if (this.server.isStopped()) {
         LOG.debug("Server stopped; skipping assign of " + state);
         return;
       }
@@ -1643,14 +1504,14 @@ public class AssignmentManager extends Z
         + " to " + sn);
     String encodedRegionName = region.getEncodedName();
     try {
-      ZKAssign.deleteOfflineNode(master.getZooKeeper(), encodedRegionName);
+      ZKAssign.deleteOfflineNode(watcher, encodedRegionName);
     } catch (KeeperException.NoNodeException e) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("The unassigned node " + encodedRegionName
             + " doesnot exist.");
       }
     } catch (KeeperException e) {
-      master.abort(
+      server.abort(
           "Error deleting OFFLINED node in ZK for transition ZK node ("
               + encodedRegionName + ")", e);
     }
@@ -1688,7 +1549,7 @@ public class AssignmentManager extends Z
     if (!hijack && !state.isClosed() && !state.isOffline()) {
       if (!regionAlreadyInTransitionException ) {
         String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
-        this.master.abort(msg, new IllegalStateException(msg));
+        this.server.abort(msg, new IllegalStateException(msg));
         return -1;
       } else {
         LOG.debug("Unexpected state : " + state
@@ -1705,7 +1566,7 @@ public class AssignmentManager extends Z
     // For all other cases we can change the in-memory state to OFFLINE.
     if (hijack &&
         (state.getState().equals(RegionState.State.PENDING_OPEN) ||
-            state.getState().equals(RegionState.State.OPENING))) {
+          state.getState().equals(RegionState.State.OPENING))) {
       regionStates.updateRegionState(state.getRegion(),
         RegionState.State.PENDING_OPEN);
       allowZNodeCreation = false;
@@ -1717,8 +1578,8 @@ public class AssignmentManager extends Z
     int versionOfOfflineNode = -1;
     try {
       // get the version after setting the znode to OFFLINE
-      versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(master.getZooKeeper(),
-          state.getRegion(), this.master.getServerName(),
+      versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
+        state.getRegion(), this.server.getServerName(),
           hijack, allowZNodeCreation);
       if (versionOfOfflineNode == -1) {
         LOG.warn("Attempted to create/force node into OFFLINE state before "
@@ -1726,7 +1587,7 @@ public class AssignmentManager extends Z
         return -1;
       }
     } catch (KeeperException e) {
-      master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
+      server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
       return -1;
     }
     return versionOfOfflineNode;
@@ -1741,20 +1602,20 @@ public class AssignmentManager extends Z
   boolean asyncSetOfflineInZooKeeper(final RegionState state,
       final AsyncCallback.StringCallback cb, final Object ctx) {
     if (!state.isClosed() && !state.isOffline()) {
-      this.master.abort("Unexpected state trying to OFFLINE; " + state,
+      this.server.abort("Unexpected state trying to OFFLINE; " + state,
         new IllegalStateException());
       return false;
     }
     regionStates.updateRegionState(
       state.getRegion(), RegionState.State.OFFLINE);
     try {
-      ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
-        this.master.getServerName(), cb, ctx);
+      ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
+        this.server.getServerName(), cb, ctx);
     } catch (KeeperException e) {
       if (e instanceof NodeExistsException) {
         LOG.warn("Node for " + state.getRegion() + " already exists");
-      } else {
-        master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
+      } else { 
+        server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
       }
       return false;
     }
@@ -1832,17 +1693,6 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * Loop through the deadNotExpired server list and remove them from the
-   * servers.
-   * @param servers
-   * @deprecated the method is now available in ServerManager - deprecated in 0.96
-   */
-  @Deprecated
-  void removeDeadNotExpiredServers(List<ServerName> servers) {
-    this.serverManager.removeDeadNotExpiredServers(servers);
-  }
-
-  /**
    * Unassign the list of regions. Configuration knobs:
    * hbase.bulk.waitbetween.reopen indicates the number of milliseconds to
    * wait before unassigning another region from this region server
@@ -1851,7 +1701,7 @@ public class AssignmentManager extends Z
    * @throws InterruptedException
    */
   public void unassign(List<HRegionInfo> regions) {
-    int waitTime = this.master.getConfiguration().getInt(
+    int waitTime = this.server.getConfiguration().getInt(
         "hbase.bulk.waitbetween.reopen", 0);
     for (HRegionInfo region : regions) {
       if (regionStates.isRegionInTransition(region))
@@ -1929,7 +1779,7 @@ public class AssignmentManager extends Z
         // Create the znode in CLOSING state
         try {
           versionOfClosingNode = ZKAssign.createNodeClosing(
-            master.getZooKeeper(), region, master.getServerName());
+            watcher, region, server.getServerName());
           if (versionOfClosingNode == -1) {
             LOG.debug("Attempting to unassign region " +
                 region.getRegionNameAsString() + " but ZK closing node "
@@ -1963,7 +1813,7 @@ public class AssignmentManager extends Z
             }
           }
           // If we get here, don't understand whats going on -- abort.
-          master.abort("Unexpected ZK exception creating node CLOSING", e);
+          server.abort("Unexpected ZK exception creating node CLOSING", e);
           return;
         }
         state = regionStates.updateRegionState(region, RegionState.State.PENDING_CLOSE);
@@ -2011,7 +1861,7 @@ public class AssignmentManager extends Z
         // Presume that master has stale data.  Presume remote side just split.
         // Presume that the split message when it comes in will fix up the master's
         // in memory cluster state.
-        if (checkIfRegionBelongsToDisabling(region)) {
+        if (getZKTable().isDisablingTable(region.getTableNameAsString())) {
           // Remove from the regionsinTransition map
           LOG.info("While trying to recover the table "
               + region.getTableNameAsString()
@@ -2041,9 +1891,9 @@ public class AssignmentManager extends Z
    */
   public void deleteClosingOrClosedNode(HRegionInfo region) {
     try {
-      if (!ZKAssign.deleteNode(master.getZooKeeper(), region.getEncodedName(),
+      if (!ZKAssign.deleteNode(watcher, region.getEncodedName(),
           EventHandler.EventType.M_ZK_REGION_CLOSING)) {
-        boolean deleteNode = ZKAssign.deleteNode(master.getZooKeeper(), region
+        boolean deleteNode = ZKAssign.deleteNode(watcher, region
             .getEncodedName(), EventHandler.EventType.RS_ZK_REGION_CLOSED);
         // TODO : We don't abort if the delete node returns false. Is there any
         // such corner case?
@@ -2056,7 +1906,7 @@ public class AssignmentManager extends Z
       LOG.debug("CLOSING/CLOSED node for the region " + region.getEncodedName()
           + " already deleted");
     } catch (KeeperException ke) {
-      master.abort(
+      server.abort(
           "Unexpected ZK exception deleting node CLOSING/CLOSED for the region "
               + region.getEncodedName(), ke);
       return;
@@ -2074,7 +1924,7 @@ public class AssignmentManager extends Z
     boolean result = false;
     // This may fail if the SPLIT or SPLITTING znode gets cleaned up before we
     // can get data from it.
-    byte [] data = ZKAssign.getData(master.getZooKeeper(), path);
+    byte [] data = ZKAssign.getData(watcher, path);
     if (data == null) return false;
     RegionTransition rt = RegionTransition.parseFrom(data);
     switch (rt.getEventType()) {
@@ -2098,7 +1948,7 @@ public class AssignmentManager extends Z
    */
   public void waitForAssignment(HRegionInfo regionInfo)
   throws InterruptedException {
-    while(!this.master.isStopped() &&
+    while(!this.server.isStopped() &&
         !regionStates.isRegionAssigned(regionInfo)) {
       // We should receive a notification, but it's
       //  better to have a timeout to recheck the condition here:
@@ -2118,7 +1968,7 @@ public class AssignmentManager extends Z
    * @throws KeeperException
    */
   public void assignRoot() throws KeeperException {
-    RootRegionTracker.deleteRootLocation(this.master.getZooKeeper());
+    RootRegionTracker.deleteRootLocation(this.watcher);
     assign(HRegionInfo.ROOT_REGIONINFO, true);
   }
 
@@ -2136,38 +1986,32 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * Assigns all user regions to online servers. Use round-robin assignment.
-   *
-   * @param regions
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public void assignUserRegionsToOnlineServers(List<HRegionInfo> regions)
-      throws IOException,
-      InterruptedException {
-    List<ServerName> destServers = serverManager.createDestinationServersList();
-    assignUserRegions(regions, destServers);
-  }
-
-  /**
-   * Assigns all user regions, if any.  Used during cluster startup.
+   * Assigns specified regions round robin, if any.
    * <p>
    * This is a synchronous call and will return once every region has been
    * assigned.  If anything fails, an exception is thrown
    * @throws InterruptedException
    * @throws IOException
    */
-  public void assignUserRegions(List<HRegionInfo> regions, List<ServerName> servers)
-  throws IOException, InterruptedException {
-    if (regions == null)
+  public void assign(List<HRegionInfo> regions)
+        throws IOException, InterruptedException {
+    if (regions == null || regions.isEmpty()) {
       return;
-    Map<ServerName, List<HRegionInfo>> bulkPlan = null;
+    }
+    List<ServerName> servers = serverManager.createDestinationServersList();
+    if (servers == null || servers.isEmpty()) {
+      throw new IOException("Found no destination server to assign region(s)");
+    }
+
     // Generate a round-robin bulk assignment plan
-    bulkPlan = balancer.roundRobinAssignment(regions, servers);
-    LOG.info("Bulk assigning " + regions.size() + " region(s) round-robin across " +
-               servers.size() + " server(s)");
+    Map<ServerName, List<HRegionInfo>> bulkPlan
+      = balancer.roundRobinAssignment(regions, servers);
+
+    LOG.info("Bulk assigning " + regions.size() + " region(s) round-robin across "
+      + servers.size() + " server(s)");
+
     // Use fixed count thread pool assigning.
-    BulkAssigner ba = new StartupBulkAssigner(this.master, bulkPlan, this);
+    BulkAssigner ba = new GeneralBulkAssigner(this.server, bulkPlan, this);
     ba.bulkAssign();
     LOG.info("Bulk assigning done");
   }
@@ -2192,165 +2036,53 @@ public class AssignmentManager extends Z
    * should be shutdown.
    * @throws InterruptedException
    * @throws IOException
+   * @throws KeeperException
    */
-  public void assignAllUserRegions() throws IOException, InterruptedException {
+  private void assignAllUserRegions()
+      throws IOException, InterruptedException, KeeperException {
+    // Cleanup any existing ZK nodes and start watching
+    ZKAssign.deleteAllNodes(watcher);
+    ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
+      this.watcher.assignmentZNode);
+    failoverCleanupDone();
+
     // Skip assignment for regions of tables in DISABLING state because during clean cluster startup
     // no RS is alive and regions map also doesn't have any information about the regions.
     // See HBASE-6281.
-    Set<String> disablingAndDisabledTables = new HashSet<String>(this.disablingTables);
-    disablingAndDisabledTables.addAll(this.zkTable.getDisabledTables());
+    Set<String> disabledOrDisablingOrEnabling = ZKTable.getDisabledOrDisablingTables(watcher);
+    disabledOrDisablingOrEnabling.addAll(ZKTable.getEnablingTables(watcher));
     // Scan META for all user regions, skipping any disabled tables
-    Map<HRegionInfo, ServerName> allRegions = MetaReader.fullScan(catalogTracker,
-        disablingAndDisabledTables, true);
+    Map<HRegionInfo, ServerName> allRegions = MetaReader.fullScan(
+      catalogTracker, disabledOrDisablingOrEnabling, true);
     if (allRegions == null || allRegions.isEmpty()) return;
 
-    // Get all available servers
-    List<ServerName> destServers = serverManager.createDestinationServersList();
-
-    // If there are no servers we need not proceed with region assignment.
-    if (destServers.isEmpty()) return;
-
     // Determine what type of assignment to do on startup
-    boolean retainAssignment = master.getConfiguration().
+    boolean retainAssignment = server.getConfiguration().
       getBoolean("hbase.master.startup.retainassign", true);
 
-    Map<ServerName, List<HRegionInfo>> bulkPlan = null;
     if (retainAssignment) {
+      List<ServerName> servers = serverManager.createDestinationServersList();
+      if (servers == null || servers.isEmpty()) {
+        throw new IOException("Found no destination server to assign region(s)");
+      }
+
       // Reuse existing assignment info
-      bulkPlan = balancer.retainAssignment(allRegions, destServers);
+      Map<ServerName, List<HRegionInfo>> bulkPlan =
+        balancer.retainAssignment(allRegions, servers);
+
+      LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
+        servers.size() + " server(s), retainAssignment=true");
+      BulkAssigner ba = new GeneralBulkAssigner(this.server, bulkPlan, this);
+      ba.bulkAssign();
+      LOG.info("Bulk assigning done");
     } else {
-      // assign regions in round-robin fashion
-      assignUserRegions(new ArrayList<HRegionInfo>(allRegions.keySet()), destServers);
-      for (HRegionInfo hri : allRegions.keySet()) {
-        setEnabledTable(hri);
-      }
-      return;
+      List<HRegionInfo> regions = new ArrayList<HRegionInfo>(allRegions.keySet());
+      assign(regions);
     }
-    LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
-      destServers.size() + " server(s), retainAssignment=" + retainAssignment);
 
-    // Use fixed count thread pool assigning.
-    BulkAssigner ba = new StartupBulkAssigner(this.master, bulkPlan, this);
-    ba.bulkAssign();
     for (HRegionInfo hri : allRegions.keySet()) {
       setEnabledTable(hri);
     }
-    LOG.info("Bulk assigning done");
-  }
-
-  /**
-   * Run bulk assign on startup.  Does one RCP per regionserver passing a
-   * batch of reginons using {@link SingleServerBulkAssigner}.
-   * Uses default {@link #getUncaughtExceptionHandler()}
-   * which will abort the Server if exception.
-   */
-  static class StartupBulkAssigner extends BulkAssigner {
-    final Map<ServerName, List<HRegionInfo>> bulkPlan;
-    final AssignmentManager assignmentManager;
-
-    StartupBulkAssigner(final Server server,
-        final Map<ServerName, List<HRegionInfo>> bulkPlan,
-        final AssignmentManager am) {
-      super(server);
-      this.bulkPlan = bulkPlan;
-      this.assignmentManager = am;
-    }
-
-    @Override
-    public boolean bulkAssign(boolean sync) throws InterruptedException,
-        IOException {
-      // Disable timing out regions in transition up in zk while bulk assigning.
-      this.assignmentManager.timeoutMonitor.bulkAssign(true);
-      try {
-        return super.bulkAssign(sync);
-      } finally {
-        // Reenable timing out regions in transition up in zi.
-        this.assignmentManager.timeoutMonitor.bulkAssign(false);
-      }
-    }
-
-    @Override
-    protected String getThreadNamePrefix() {
-      return this.server.getServerName() + "-StartupBulkAssigner";
-    }
-
-    @Override
-    protected void populatePool(java.util.concurrent.ExecutorService pool) {
-      for (Map.Entry<ServerName, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
-        pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
-          this.assignmentManager));
-      }
-    }
-
-    /**
-     *
-     * @param timeout How long to wait.
-     * @return true if done.
-     */
-    @Override
-    protected boolean waitUntilDone(final long timeout)
-    throws InterruptedException {
-      Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
-      for (List<HRegionInfo> regionList : bulkPlan.values()) {
-        regionSet.addAll(regionList);
-      }
-      return this.assignmentManager.waitUntilNoRegionsInTransition(timeout, regionSet);
-    }
-
-    @Override
-    protected long getTimeoutOnRIT() {
-      // Guess timeout.  Multiply the number of regions on a random server
-      // by how long we thing one region takes opening.
-      long perRegionOpenTimeGuesstimate =
-        this.server.getConfiguration().getLong("hbase.bulk.assignment.perregion.open.time", 1000);
-      int regionsPerServer =
-        this.bulkPlan.entrySet().iterator().next().getValue().size();
-      long timeout = perRegionOpenTimeGuesstimate * regionsPerServer;
-      LOG.debug("Timeout-on-RIT=" + timeout);
-      return timeout;
-    }
-  }
-
-  /**
-   * Bulk user region assigner.
-   * If failed assign, lets timeout in RIT do cleanup.
-   */
-  static class GeneralBulkAssigner extends StartupBulkAssigner {
-    GeneralBulkAssigner(final Server server,
-        final Map<ServerName, List<HRegionInfo>> bulkPlan,
-        final AssignmentManager am) {
-      super(server, bulkPlan, am);
-    }
-
-    @Override
-    protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
-      return new UncaughtExceptionHandler() {
-        @Override
-        public void uncaughtException(Thread t, Throwable e) {
-          LOG.warn("Assigning regions in " + t.getName(), e);
-        }
-      };
-    }
-  }
-
-  /**
-   * Manage bulk assigning to a server.
-   */
-  static class SingleServerBulkAssigner implements Runnable {
-    private final ServerName regionserver;
-    private final List<HRegionInfo> regions;
-    private final AssignmentManager assignmentManager;
-
-    SingleServerBulkAssigner(final ServerName regionserver,
-        final List<HRegionInfo> regions, final AssignmentManager am) {
-      this.regionserver = regionserver;
-      this.regions = regions;
-      this.assignmentManager = am;
-    }
-    @Override
-    public void run() {
-      this.assignmentManager.assign(this.regionserver, this.regions);
-    }
   }
 
   /**
@@ -2369,7 +2101,7 @@ public class AssignmentManager extends Z
     // state of the Master.
     final long endTime = System.currentTimeMillis() + timeout;
 
-    while (!this.master.isStopped() && regionStates.isRegionsInTransition()
+    while (!this.server.isStopped() && regionStates.isRegionsInTransition()
         && endTime > System.currentTimeMillis()) {
       regionStates.waitForUpdate(100);
     }
@@ -2378,35 +2110,6 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * Wait until no regions from set regions are in transition.
-   * @param timeout How long to wait.
-   * @param regions set of regions to wait for. It will be modified by this method.
-   * @return True if none of the regions in the set is in transition
-   * @throws InterruptedException
-   */
-  boolean waitUntilNoRegionsInTransition(final long timeout, Set<HRegionInfo> regions)
-    throws InterruptedException {
-    final long endTime = System.currentTimeMillis() + timeout;
-
-    // We're not synchronizing on regionsInTransition now because we don't use any iterator.
-    while (!regions.isEmpty() && !this.master.isStopped() && endTime > System.currentTimeMillis()) {
-      Iterator<HRegionInfo> regionInfoIterator = regions.iterator();
-      while (regionInfoIterator.hasNext()) {
-        HRegionInfo hri = regionInfoIterator.next();
-        if (!regionStates.isRegionInTransition(hri)) {
-          regionInfoIterator.remove();
-        }
-      }
-
-      if (!regions.isEmpty()) {
-        regionStates.waitForUpdate(100);
-      }
-    }
-
-    return regions.isEmpty();
-  }
-
-  /**
    * Rebuild the list of user regions and assignment information.
    * <p>
    * Returns a map of servers that are not found to be online and the regions
@@ -2415,19 +2118,22 @@ public class AssignmentManager extends Z
    *         in META
    * @throws IOException
    */
-  Map<ServerName, List<Pair<HRegionInfo, Result>>> rebuildUserRegions()
-  throws IOException, KeeperException {
+  Map<ServerName, List<HRegionInfo>> rebuildUserRegions() throws IOException, KeeperException {
+    Set<String> enablingTables = ZKTable.getEnablingTables(watcher);
+    Set<String> disabledOrEnablingTables = ZKTable.getDisabledTables(watcher);
+    disabledOrEnablingTables.addAll(enablingTables);
+    Set<String> disabledOrDisablingOrEnabling = ZKTable.getDisablingTables(watcher);
+    disabledOrDisablingOrEnabling.addAll(disabledOrEnablingTables);
+
     // Region assignment from META
     List<Result> results = MetaReader.fullScan(this.catalogTracker);
     // Get any new but slow to checkin region server that joined the cluster
     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
     // Map of offline servers and their regions to be returned
-    Map<ServerName, List<Pair<HRegionInfo,Result>>> offlineServers =
-      new TreeMap<ServerName, List<Pair<HRegionInfo, Result>>>();
+    Map<ServerName, List<HRegionInfo>> offlineServers =
+      new TreeMap<ServerName, List<HRegionInfo>>();
     // Iterate regions in META
     for (Result result : results) {
-      boolean disabled = false;
-      boolean disablingOrEnabling = false;
       Pair<HRegionInfo, ServerName> region = HRegionInfo.getHRegionInfoAndServerName(result);
       if (region == null) continue;
       HRegionInfo regionInfo = region.getFirst();
@@ -2446,29 +2152,25 @@ public class AssignmentManager extends Z
         // from ENABLED state when application calls disableTable.
         // It can't be in DISABLED state, because DISABLED states transitions
         // from DISABLING state.
-        if (false == checkIfRegionsBelongsToEnabling(regionInfo)) {
+        if (!enablingTables.contains(tableName)) {
           LOG.warn("Region " + regionInfo.getEncodedName() +
             " has null regionLocation." + " But its table " + tableName +
             " isn't in ENABLING state.");
         }
-        addTheTablesInPartialState(this.disablingTables, this.enablingTables, regionInfo,
-            tableName);
       } else if (!onlineServers.contains(regionLocation)) {
         // Region is located on a server that isn't online
-        List<Pair<HRegionInfo, Result>> offlineRegions =
-          offlineServers.get(regionLocation);
+        List<HRegionInfo> offlineRegions = offlineServers.get(regionLocation);
         if (offlineRegions == null) {
-          offlineRegions = new ArrayList<Pair<HRegionInfo,Result>>(1);
+          offlineRegions = new ArrayList<HRegionInfo>(1);
           offlineServers.put(regionLocation, offlineRegions);
         }
-        offlineRegions.add(new Pair<HRegionInfo,Result>(regionInfo, result));
-        disabled = checkIfRegionBelongsToDisabled(regionInfo);
-        disablingOrEnabling = addTheTablesInPartialState(this.disablingTables,
-            this.enablingTables, regionInfo, tableName);
+        offlineRegions.add(regionInfo);
         // need to enable the table if not disabled or disabling or enabling
         // this will be used in rolling restarts
-        enableTableIfNotDisabledOrDisablingOrEnabling(disabled,
-            disablingOrEnabling, tableName);
+        if (!disabledOrDisablingOrEnabling.contains(tableName)
+            && !getZKTable().isEnabledTable(tableName)) {
+          setEnabledTable(tableName);
+        }
       } else {
         // If region is in offline and split state check the ZKNode
         if (regionInfo.isOffline() && regionInfo.isSplit()) {
@@ -2476,7 +2178,7 @@ public class AssignmentManager extends Z
               .getEncodedName());
           Stat stat = new Stat();
           byte[] data = ZKUtil.getDataNoWatch(this.watcher, node, stat);
-          // If znode does not exist dont consider this region
+          // If znode does not exist, don't consider this region
           if (data == null) {
             LOG.debug("Region "	+  regionInfo.getRegionNameAsString()
                + " split is completed. Hence need not add to regions list");
@@ -2484,92 +2186,56 @@ public class AssignmentManager extends Z
           }
         }
         // Region is being served and on an active server
-        // add only if region not in disabled and enabling table
-        if (!checkIfRegionBelongsToDisabled(regionInfo)
-            && !checkIfRegionsBelongsToEnabling(regionInfo)) {
+        // add only if region not in disabled or enabling table
+        if (!disabledOrEnablingTables.contains(tableName)) {
           regionStates.regionOnline(regionInfo, regionLocation);
         }
-        disablingOrEnabling = addTheTablesInPartialState(this.disablingTables,
-            this.enablingTables, regionInfo, tableName);
-        disabled = checkIfRegionBelongsToDisabled(regionInfo);
         // need to enable the table if not disabled or disabling or enabling
         // this will be used in rolling restarts
-        enableTableIfNotDisabledOrDisablingOrEnabling(disabled,
-            disablingOrEnabling, tableName);
+        if (!disabledOrDisablingOrEnabling.contains(tableName)
+            && !getZKTable().isEnabledTable(tableName)) {
+          setEnabledTable(tableName);
+        }
       }
     }
     return offlineServers;
   }
 
-  private void enableTableIfNotDisabledOrDisablingOrEnabling(boolean disabled,
-      boolean disablingOrEnabling, String tableName) {
-    if (!disabled && !disablingOrEnabling
-        && !getZKTable().isEnabledTable(tableName)) {
-      setEnabledTable(tableName);
-    }
-  }
-
-  private Boolean addTheTablesInPartialState(Set<String> disablingTables,
-      Set<String> enablingTables, HRegionInfo regionInfo,
-      String disablingTableName) {
-    if (checkIfRegionBelongsToDisabling(regionInfo)) {
-      disablingTables.add(disablingTableName);
-      return true;
-    } else if (checkIfRegionsBelongsToEnabling(regionInfo)) {
-      enablingTables.add(disablingTableName);
-      return true;
-    }
-    return false;
-  }
-
   /**
    * Recover the tables that were not fully moved to DISABLED state. These
    * tables are in DISABLING state when the master restarted/switched.
    *
-   * @param disablingTables
-   * @return
    * @throws KeeperException
    * @throws TableNotFoundException
    * @throws IOException
    */
-  private boolean recoverTableInDisablingState(Set<String> disablingTables)
+  private void recoverTableInDisablingState()
       throws KeeperException, TableNotFoundException, IOException {
-    boolean isWatcherCreated = false;
+    Set<String> disablingTables = ZKTable.getDisablingTables(watcher);
     if (disablingTables.size() != 0) {
-      // Create a watcher on the zookeeper node
-      ZKUtil.listChildrenAndWatchForNewChildren(watcher,
-          watcher.assignmentZNode);
-      isWatcherCreated = true;
       for (String tableName : disablingTables) {
         // Recover by calling DisableTableHandler
         LOG.info("The table " + tableName
             + " is in DISABLING state.  Hence recovering by moving the table"
             + " to DISABLED state.");
-        new DisableTableHandler(this.master, tableName.getBytes(),
+        new DisableTableHandler(this.server, tableName.getBytes(),
             catalogTracker, this, true).process();
       }
     }
-    return isWatcherCreated;
   }
 
   /**
    * Recover the tables that are not fully moved to ENABLED state. These tables
    * are in ENABLING state when the master restarted/switched
    *
-   * @param enablingTables
-   * @param isWatcherCreated
    * @throws KeeperException
    * @throws TableNotFoundException
    * @throws IOException
    */
-  private void recoverTableInEnablingState(Set<String> enablingTables,
-      boolean isWatcherCreated) throws KeeperException, TableNotFoundException,
-      IOException {
+  private void recoverTableInEnablingState()
+      throws KeeperException, TableNotFoundException, IOException {
+    Set<String> enablingTables = ZKTable.getEnablingTables(watcher);
     if (enablingTables.size() != 0) {
-      if (false == isWatcherCreated) {
-        ZKUtil.listChildrenAndWatchForNewChildren(watcher,
-            watcher.assignmentZNode);
-      }
       for (String tableName : enablingTables) {
         // Recover by calling EnableTableHandler
         LOG.info("The table " + tableName
@@ -2577,27 +2243,12 @@ public class AssignmentManager extends Z
             + " to ENABLED state.");
         // enableTable in sync way during master startup,
         // no need to invoke coprocessor
-        new EnableTableHandler(this.master, tableName.getBytes(),
+        new EnableTableHandler(this.server, tableName.getBytes(),
             catalogTracker, this, true).process();
       }
     }
   }
 
-  private boolean checkIfRegionsBelongsToEnabling(HRegionInfo regionInfo) {
-    String tableName = regionInfo.getTableNameAsString();
-    return getZKTable().isEnablingTable(tableName);
-  }
-
-  private boolean checkIfRegionBelongsToDisabled(HRegionInfo regionInfo) {
-    String tableName = regionInfo.getTableNameAsString();
-    return getZKTable().isDisabledTable(tableName);
-  }
-
-  private boolean checkIfRegionBelongsToDisabling(HRegionInfo regionInfo) {
-    String tableName = regionInfo.getTableNameAsString();
-    return getZKTable().isDisablingTable(tableName);
-  }
-
   /**
    * Processes list of dead servers from result of META scan and regions in RIT
    * <p>
@@ -2615,82 +2266,35 @@ public class AssignmentManager extends Z
    * @throws KeeperException
    */
   private void processDeadServersAndRecoverLostRegions(
-      Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers, List<String> nodes)
+      Map<ServerName, List<HRegionInfo>> deadServers, List<String> nodes)
   throws IOException, KeeperException {
-    processDeadServers(deadServers, nodes);
+    if (deadServers != null) {
+      for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
+        ServerName serverName = server.getKey();
+        if (!serverManager.isServerDead(serverName)) {
+          serverManager.expireServer(serverName); // Let SSH do region re-assign
+        }
+      }
+    }
+    nodes = ZKUtil.listChildrenAndWatchForNewChildren(
+      this.watcher, this.watcher.assignmentZNode);
     if (!nodes.isEmpty()) {
       for (String encodedRegionName : nodes) {
         processRegionInTransition(encodedRegionName, null, deadServers);
       }
     }
-  }
-
-  private void processDeadServers(Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers,
-      final List<String> nodes)
-  throws KeeperException, IOException {
-    if (deadServers == null) return;
-    Set<ServerName> actualDeadServers = this.serverManager.getDeadServers();
-    for (Map.Entry<ServerName, List<Pair<HRegionInfo, Result>>> deadServer: deadServers.entrySet()) {
-      // skip regions of dead servers because SSH will process regions during rs expiration.
-      // see HBASE-5916
-      if (actualDeadServers.contains(deadServer.getKey())) {
-        for (Pair<HRegionInfo, Result> deadRegion : deadServer.getValue()) {
-          nodes.remove(deadRegion.getFirst().getEncodedName());
-        }
-        continue;
-      }
-      List<Pair<HRegionInfo, Result>> regions = deadServer.getValue();
-      for (Pair<HRegionInfo, Result> region : regions) {
-        HRegionInfo regionInfo = region.getFirst();
-        Result result = region.getSecond();
-        try {
-          // If region was in transition (was in zk) force it offline for reassign.  Check if node
-          // up in zk at all first.
-          if (ZKUtil.checkExists(this.watcher,
-              ZKAssign.getPath(this.watcher, regionInfo.getEncodedName())) != -1) {
-            byte [] data = ZKAssign.getData(watcher, regionInfo.getEncodedName());
-            if (data == null) {
-              LOG.warn("No data in znode for " + regionInfo.getEncodedName());
-              continue;
-            }
-            RegionTransition rt;
-            try {
-              rt = RegionTransition.parseFrom(data);
-            } catch (DeserializationException e) {
-              LOG.warn("Failed parse of znode data for " + regionInfo.getEncodedName(), e);
-              continue;
-            }
 
-            // If zk node of this region has been updated by a live server,
-            // we consider that this region is being handled.
-            // So we should skip it and process it in processRegionsInTransition.
-            ServerName sn = rt.getServerName();
-            if (isServerOnline(sn)) {
-              LOG.info("The region " + regionInfo.getEncodedName() + "is being handled on " + sn);
-              continue;
-            }
-          }
-          // Process with existing RS shutdown code
-          boolean assign = ServerShutdownHandler.processDeadRegion(
-              regionInfo, result, this, this.catalogTracker);
-          if (assign) {
-            ZKAssign.createOrForceNodeOffline(watcher, regionInfo,
-                master.getServerName());
-            if (!nodes.contains(regionInfo.getEncodedName())) {
-              nodes.add(regionInfo.getEncodedName());
-            }
-          }
-        } catch (KeeperException.NoNodeException nne) {
-          // This is fine
-        }
-      }
-    }
+    // Now we can safely claim failover cleanup completed and enable
+    // ServerShutdownHandler for further processing. The nodes (below)
+    // in transition, if any, are for regions not related to those
+    // dead servers at all, and can be done in parallel to SSH.
+    failoverCleanupDone();
   }
 
   /**
    * Set Regions in transitions metrics.
    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
-   * This iterator is not fail fast, wich may lead to stale read; but that's better than
+   * This iterator is not fail fast, which may lead to stale read; but that's better than
    * creating a copy of the map for metrics computation, as this method will be invoked
    * on a frequent interval.
    */
@@ -2699,7 +2303,7 @@ public class AssignmentManager extends Z
     int totalRITs = 0;
     int totalRITsOverThreshold = 0;
     long oldestRITTime = 0;
-    int ritThreshold = this.master.getConfiguration().
+    int ritThreshold = this.server.getConfiguration().
       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
     for (RegionState state: regionStates.getRegionsInTransition().values()) {
       totalRITs++;
@@ -2738,11 +2342,11 @@ public class AssignmentManager extends Z
     RegionState rs = null;
     // There is already a timeout monitor on regions in transition so I
     // should not have to have one here too?
-    while(!this.master.isStopped() && regionStates.isRegionInTransition(hri)) {
+    while(!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
       LOG.info("Waiting on " + rs + " to clear regions-in-transition");
-      regionStates.waitForUpdate(1000);
+      regionStates.waitForUpdate(100);
     }
-    if (this.master.isStopped()) {
+    if (this.server.isStopped()) {
       LOG.info("Giving up wait on regions in " +
         "transition because stoppable.isStopped is set");
     }
@@ -2876,16 +2480,7 @@ public class AssignmentManager extends Z
       case OPEN:
         LOG.error("Region has been OPEN for too long, " +
             "we don't know where region was opened so can't do anything");
-        // TODO: do we need synchronization here?
-        // could not synchronized on regionState since it can be
-        // an new instance
-        String encodedName = regionState.getRegion().getEncodedName();
-        Lock lock = assignLocker.acquireLock(encodedName);
-        try {
-          regionState.updateTimestampToNow();
-        } finally {
-          lock.unlock();
-        }
+        regionState.updateTimestampToNow();
         break;
 
       case PENDING_CLOSE:
@@ -2973,13 +2568,13 @@ public class AssignmentManager extends Z
   public boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
     RegionTransition rt = null;
     try {
-      byte [] data = ZKAssign.getData(master.getZooKeeper(), hri.getEncodedName());
+      byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
       // This call can legitimately come by null
       rt = data == null? null: RegionTransition.parseFrom(data);
     } catch (KeeperException e) {
-      master.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
+      server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
     } catch (DeserializationException e) {
-      master.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
+      server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
     }
 
     ServerName addressFromZK = rt != null? rt.getServerName():  null;
@@ -3065,14 +2660,6 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * Check whether the RegionServer is online.
-   * @param serverName
-   * @return True if online.
-   */
-  public boolean isServerOnline(ServerName serverName) {
-    return serverName != null && this.serverManager.isServerOnline(serverName);
-  }
-  /**
    * Shutdown the threadpool executor service
    */
   public void shutdown() {
@@ -3089,7 +2676,7 @@ public class AssignmentManager extends Z
       String errorMsg = "Unable to ensure that the table " + tableName
           + " will be" + " enabled because of a ZooKeeper issue";
       LOG.error(errorMsg);
-      this.master.abort(errorMsg, e);
+      this.server.abort(errorMsg, e);
     }
   }
 }