You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2014/08/07 01:22:51 UTC

[07/10] HBASE-11611 Clean up ZK-based region assignment

http://git-wip-us.apache.org/repos/asf/hbase/blob/17dff681/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 4803227..c234767 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -36,7 +35,6 @@ import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -55,24 +53,16 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.RegionTransition;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.TableStateManager;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
-import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
-import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
-import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
-import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
-import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.executor.ExecutorService;
@@ -82,10 +72,8 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.master.RegionState.State;
 import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
 import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
-import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
-import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
@@ -94,42 +82,25 @@ import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
-import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.KeyLocker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.util.Triple;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.data.Stat;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.LinkedHashMultimap;
 
 /**
  * Manages and performs region assignment.
- * <p>
- * Monitors ZooKeeper for events related to regions in transition.
- * <p>
- * Handles existing regions in transition during master failover.
+ * Related communications with regionserver are all done over RPC.
  */
 @InterfaceAudience.Private
-public class AssignmentManager extends ZooKeeperListener {
+public class AssignmentManager {
   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
 
-  public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
-      -1, -1L);
-
   static final String ALREADY_IN_TRANSITION_WAITTIME
     = "hbase.assignment.already.intransition.waittime";
   static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
@@ -187,21 +158,9 @@ public class AssignmentManager extends ZooKeeperListener {
 
   private final ExecutorService executorService;
 
-  // For unit tests, keep track of calls to ClosedRegionHandler
-  private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
-
-  // For unit tests, keep track of calls to OpenedRegionHandler
-  private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
-
-  //Thread pool executor service for timeout monitor
+  // Thread pool executor service. TODO, consolidate with executorService?
   private java.util.concurrent.ExecutorService threadPoolExecutorService;
 
-  // A bunch of ZK events workers. Each is a single thread executor service
-  private final java.util.concurrent.ExecutorService zkEventWorkers;
-
-  private List<EventType> ignoreStatesRSOffline = Arrays.asList(
-      EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
-
   private final RegionStates regionStates;
 
   // The threshold to use bulk assigning. Using bulk assignment
@@ -236,9 +195,6 @@ public class AssignmentManager extends ZooKeeperListener {
   private final ConcurrentHashMap<String, AtomicInteger>
     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
 
-  // A flag to indicate if we are using ZK for region assignment
-  private final boolean useZKForAssignment;
-
   // In case not using ZK for region assignment, region states
   // are persisted in meta with a state store
   private final RegionStateStore regionStateStore;
@@ -261,15 +217,14 @@ public class AssignmentManager extends ZooKeeperListener {
    * @param service Executor service
    * @param metricsMaster metrics manager
    * @param tableLockManager TableLock manager
-   * @throws KeeperException
+   * @throws CoordinatedStateException
    * @throws IOException
    */
   public AssignmentManager(Server server, ServerManager serverManager,
       final LoadBalancer balancer,
       final ExecutorService service, MetricsMaster metricsMaster,
-      final TableLockManager tableLockManager) throws KeeperException,
-        IOException, CoordinatedStateException {
-    super(server.getZooKeeper());
+      final TableLockManager tableLockManager)
+          throws IOException, CoordinatedStateException {
     this.server = server;
     this.serverManager = serverManager;
     this.executorService = service;
@@ -307,14 +262,8 @@ public class AssignmentManager extends ZooKeeperListener {
     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
 
-    int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
-    ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
-    zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
-            TimeUnit.SECONDS, threadFactory);
-    this.tableLockManager = tableLockManager;
-
     this.metricsAssignmentManager = new MetricsAssignmentManager();
-    useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
+    this.tableLockManager = tableLockManager;
   }
 
   /**
@@ -406,9 +355,9 @@ public class AssignmentManager extends ZooKeeperListener {
    */
   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
       throws IOException {
-    List <HRegionInfo> hris =
-      MetaTableAccessor.getTableRegions(this.watcher, this.server.getShortCircuitConnection(),
-        tableName, true);
+    List <HRegionInfo> hris = MetaTableAccessor.getTableRegions(
+      this.server.getZooKeeper(), this.server.getShortCircuitConnection(),
+      tableName, true);
     Integer pending = 0;
     for (HRegionInfo hri : hris) {
       String name = hri.getEncodedName();
@@ -476,10 +425,6 @@ public class AssignmentManager extends ZooKeeperListener {
     // previous master process.
     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
 
-    if (!useZKForAssignment) {
-      // Not use ZK for assignment any more, remove the ZNode
-      ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode);
-    }
     recoverTableInDisablingState();
     recoverTableInEnablingState();
     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
@@ -493,22 +438,12 @@ public class AssignmentManager extends ZooKeeperListener {
    * startup, will assign all user regions.
    * @param deadServers
    *          Map of dead servers and their regions. Can be null.
-   * @throws KeeperException
    * @throws IOException
    * @throws InterruptedException
+   * @throws CoordinatedStateException
    */
-  boolean processDeadServersAndRegionsInTransition(
-      final Set<ServerName> deadServers) throws KeeperException,
-        IOException, InterruptedException, CoordinatedStateException {
-    List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
-      watcher.assignmentZNode);
-
-    if (useZKForAssignment && nodes == null) {
-      String errorMessage = "Failed to get the children from ZK";
-      server.abort(errorMessage, new IOException(errorMessage));
-      return true; // Doesn't matter in this case
-    }
-
+  boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
+      throws IOException, InterruptedException, CoordinatedStateException {
     boolean failover = !serverManager.getDeadServers().isEmpty();
     if (failover) {
       // This may not be a failover actually, especially if meta is on this master.
@@ -517,36 +452,28 @@ public class AssignmentManager extends ZooKeeperListener {
       }
     } else {
       // If any one region except meta is assigned, it's a failover.
-      for (HRegionInfo hri: regionStates.getRegionAssignments().keySet()) {
-        if (!hri.isMetaTable()) {
+      Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
+      for (Map.Entry<HRegionInfo, ServerName> en:
+          regionStates.getRegionAssignments().entrySet()) {
+        HRegionInfo hri = en.getKey();
+        if (!hri.isMetaTable()
+            && onlineServers.contains(en.getValue())) {
           LOG.debug("Found " + hri + " out on cluster");
           failover = true;
           break;
         }
       }
-    }
-    if (!failover && nodes != null) {
-      // If any one region except meta is in transition, it's a failover.
-      for (String encodedName: nodes) {
-        RegionState regionState = regionStates.getRegionState(encodedName);
-        if (regionState != null && !regionState.getRegion().isMetaRegion()) {
-          LOG.debug("Found " + regionState + " in RITs");
-          failover = true;
-          break;
-        }
-      }
-    }
-    if (!failover && !useZKForAssignment) {
-      // If any region except meta is in transition on a live server, it's a failover.
-      Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
-      if (!regionsInTransition.isEmpty()) {
-        Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
-        for (RegionState regionState: regionsInTransition.values()) {
-          if (!regionState.getRegion().isMetaRegion()
-              && onlineServers.contains(regionState.getServerName())) {
-            LOG.debug("Found " + regionState + " in RITs");
-            failover = true;
-            break;
+      if (!failover) {
+        // If any region except meta is in transition on a live server, it's a failover.
+        Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
+        if (!regionsInTransition.isEmpty()) {
+          for (RegionState regionState: regionsInTransition.values()) {
+            if (!regionState.getRegion().isMetaRegion()
+                && onlineServers.contains(regionState.getServerName())) {
+              LOG.debug("Found " + regionState + " in RITs");
+              failover = true;
+              break;
+            }
           }
         }
       }
@@ -596,19 +523,8 @@ public class AssignmentManager extends ZooKeeperListener {
     // Now region states are restored
     regionStateStore.start();
 
-    // If we found user regions out on cluster, its a failover.
     if (failover) {
-      LOG.info("Found regions out on cluster or in RIT; presuming failover");
-      // Process list of dead servers and regions in RIT.
-      // See HBASE-4580 for more information.
-      processDeadServersAndRecoverLostRegions(deadServers);
-    }
-
-    if (!failover && useZKForAssignment) {
-      // Cleanup any existing ZK nodes and start watching
-      ZKAssign.deleteAllNodes(watcher);
-      ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
-        this.watcher.assignmentZNode);
+      processDeadServers(deadServers);
     }
 
     // Now we can safely claim failover cleanup completed and enable
@@ -632,254 +548,6 @@ public class AssignmentManager extends ZooKeeperListener {
   }
 
   /**
-   * If region is up in zk in transition, then do fixup and block and wait until
-   * the region is assigned and out of transition.  Used on startup for
-   * catalog regions.
-   * @param hri Region to look for.
-   * @return True if we processed a region in transition else false if region
-   * was not up in zk in transition.
-   * @throws InterruptedException
-   * @throws KeeperException
-   * @throws IOException
-   */
-  boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
-      throws InterruptedException, KeeperException, IOException {
-    String encodedRegionName = hri.getEncodedName();
-    if (!processRegionInTransition(encodedRegionName, hri)) {
-      return false; // The region is not in transition
-    }
-    LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
-    while (!this.server.isStopped() &&
-        this.regionStates.isRegionInTransition(encodedRegionName)) {
-      RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
-      if (state == null || !serverManager.isServerOnline(state.getServerName())) {
-        // The region is not in transition, or not in transition on an online
-        // server. Doesn't help to block here any more. Caller need to
-        // verify the region is actually assigned.
-        break;
-      }
-      this.regionStates.waitForUpdate(100);
-    }
-    return true;
-  }
-
-  /**
-   * Process failover of new master for region <code>encodedRegionName</code>
-   * up in zookeeper.
-   * @param encodedRegionName Region to process failover for.
-   * @param regionInfo If null we'll go get it from meta table.
-   * @return True if we processed <code>regionInfo</code> as a RIT.
-   * @throws KeeperException
-   * @throws IOException
-   */
-  boolean processRegionInTransition(final String encodedRegionName,
-      final HRegionInfo regionInfo) throws KeeperException, IOException {
-    // We need a lock here to ensure that we will not put the same region twice
-    // It has no reason to be a lock shared with the other operations.
-    // We can do the lock on the region only, instead of a global lock: what we want to ensure
-    // is that we don't have two threads working on the same region.
-    Lock lock = locker.acquireLock(encodedRegionName);
-    try {
-      Stat stat = new Stat();
-      byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
-      if (data == null) return false;
-      RegionTransition rt;
-      try {
-        rt = RegionTransition.parseFrom(data);
-      } catch (DeserializationException e) {
-        LOG.warn("Failed parse znode data", e);
-        return false;
-      }
-      HRegionInfo hri = regionInfo;
-      if (hri == null) {
-        // The region info is not passed in. We will try to find the region
-        // from region states map/meta based on the encoded region name. But we
-        // may not be able to find it. This is valid for online merge that
-        // the region may have not been created if the merge is not completed.
-        // Therefore, it is not in meta at master recovery time.
-        hri = regionStates.getRegionInfo(rt.getRegionName());
-        EventType et = rt.getEventType();
-        if (hri == null && et != EventType.RS_ZK_REGION_MERGING
-            && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
-          LOG.warn("Couldn't find the region in recovering " + rt);
-          return false;
-        }
-      }
-
-      // TODO: This code is tied to ZK anyway, so for now leaving it as is,
-      // will refactor when whole region assignment will be abstracted from ZK
-      BaseCoordinatedStateManager cp =
-        (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
-      OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
-
-      ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
-        new ZkOpenRegionCoordination.ZkOpenRegionDetails();
-      zkOrd.setVersion(stat.getVersion());
-      zkOrd.setServerName(cp.getServer().getServerName());
-
-      return processRegionsInTransition(
-        rt, hri, openRegionCoordination, zkOrd);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * This call is invoked only (1) master assign meta;
-   * (2) during failover mode startup, zk assignment node processing.
-   * The locker is set in the caller. It returns true if the region
-   * is in transition for sure, false otherwise.
-   *
-   * It should be private but it is used by some test too.
-   */
-  boolean processRegionsInTransition(
-      final RegionTransition rt, final HRegionInfo regionInfo,
-      OpenRegionCoordination coordination,
-      final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
-    EventType et = rt.getEventType();
-    // Get ServerName.  Could not be null.
-    final ServerName sn = rt.getServerName();
-    final byte[] regionName = rt.getRegionName();
-    final String encodedName = HRegionInfo.encodeRegionName(regionName);
-    final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
-    LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
-
-    if (regionStates.isRegionInTransition(encodedName)
-        && (regionInfo.isMetaRegion() || !useZKForAssignment)) {
-      LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
-        + et + ", does nothing since the region is already in transition "
-        + regionStates.getRegionTransitionState(encodedName));
-      // Just return
-      return true;
-    }
-    if (!serverManager.isServerOnline(sn)) {
-      // It was transitioning on a dead server, so it's closed now.
-      // Force to OFFLINE and put it in transition, but not assign it
-      // since log splitting for the dead server is not done yet.
-      LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
-        " was on deadserver; forcing offline");
-      if (regionStates.isRegionOnline(regionInfo)) {
-        // Meta could still show the region is assigned to the previous
-        // server. If that server is online, when we reload the meta, the
-        // region is put back to online, we need to offline it.
-        regionStates.regionOffline(regionInfo);
-        sendRegionClosedNotification(regionInfo);
-      }
-      // Put it back in transition so that SSH can re-assign it
-      regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
-
-      if (regionInfo.isMetaRegion()) {
-        // If it's meta region, reset the meta location.
-        // So that master knows the right meta region server.
-        MetaTableLocator.setMetaLocation(watcher, sn);
-      } else {
-        // No matter the previous server is online or offline,
-        // we need to reset the last region server of the region.
-        regionStates.setLastRegionServerOfRegion(sn, encodedName);
-        // Make sure we know the server is dead.
-        if (!serverManager.isServerDead(sn)) {
-          serverManager.expireServer(sn);
-        }
-      }
-      return false;
-    }
-    switch (et) {
-      case M_ZK_REGION_CLOSING:
-        // Insert into RIT & resend the query to the region server: may be the previous master
-        // died before sending the query the first time.
-        final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
-        this.executorService.submit(
-          new EventHandler(server, EventType.M_MASTER_RECOVERY) {
-            @Override
-            public void process() throws IOException {
-              ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
-              try {
-                final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
-                  .getVersion();
-                unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
-                if (regionStates.isRegionOffline(regionInfo)) {
-                  assign(regionInfo, true);
-                }
-              } finally {
-                lock.unlock();
-              }
-            }
-          });
-        break;
-
-      case RS_ZK_REGION_CLOSED:
-      case RS_ZK_REGION_FAILED_OPEN:
-        // Region is closed, insert into RIT and handle it
-        regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
-        if (!replicasToClose.contains(regionInfo)) {
-          invokeAssign(regionInfo);
-        } else {
-          offlineDisabledRegion(regionInfo);
-        }
-        break;
-
-      case M_ZK_REGION_OFFLINE:
-        // Insert in RIT and resend to the regionserver
-        regionStates.updateRegionState(rt, State.PENDING_OPEN);
-        final RegionState rsOffline = regionStates.getRegionState(regionInfo);
-        this.executorService.submit(
-          new EventHandler(server, EventType.M_MASTER_RECOVERY) {
-            @Override
-            public void process() throws IOException {
-              ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
-              try {
-                RegionPlan plan = new RegionPlan(regionInfo, null, sn);
-                addPlan(encodedName, plan);
-                assign(rsOffline, false, false);
-              } finally {
-                lock.unlock();
-              }
-            }
-          });
-        break;
-
-      case RS_ZK_REGION_OPENING:
-        regionStates.updateRegionState(rt, State.OPENING);
-        break;
-
-      case RS_ZK_REGION_OPENED:
-        // Region is opened, insert into RIT and handle it
-        // This could be done asynchronously, we would need then to acquire the lock in the
-        //  handler.
-        regionStates.updateRegionState(rt, State.OPEN);
-        new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
-        break;
-      case RS_ZK_REQUEST_REGION_SPLIT:
-      case RS_ZK_REGION_SPLITTING:
-      case RS_ZK_REGION_SPLIT:
-        // Splitting region should be online. We could have skipped it during
-        // user region rebuilding since we may consider the split is completed.
-        // Put it in SPLITTING state to avoid complications.
-        regionStates.regionOnline(regionInfo, sn);
-        regionStates.updateRegionState(rt, State.SPLITTING);
-        if (!handleRegionSplitting(
-            rt, encodedName, prettyPrintedRegionName, sn)) {
-          deleteSplittingNode(encodedName, sn);
-        }
-        break;
-      case RS_ZK_REQUEST_REGION_MERGE:
-      case RS_ZK_REGION_MERGING:
-      case RS_ZK_REGION_MERGED:
-        if (!handleRegionMerging(
-            rt, encodedName, prettyPrintedRegionName, sn)) {
-          deleteMergingNode(encodedName, sn);
-        }
-        break;
-      default:
-        throw new IllegalStateException("Received region in state:" + et + " is not valid.");
-    }
-    LOG.info("Processed region " + prettyPrintedRegionName + " in state "
-      + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
-      + "server: " + sn);
-    return true;
-  }
-
-  /**
    * When a region is closed, it should be removed from the regionsToReopen
    * @param hri HRegionInfo of the region which was closed
    */
@@ -889,247 +557,6 @@ public class AssignmentManager extends ZooKeeperListener {
     }
   }
 
-  /**
-   * Handles various states an unassigned node can be in.
-   * <p>
-   * Method is called when a state change is suspected for an unassigned node.
-   * <p>
-   * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
-   * yet).
-   * @param rt region transition
-   * @param coordination coordination for opening region
-   * @param ord details about opening region
-   */
-  void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
-                    OpenRegionCoordination.OpenRegionDetails ord) {
-    if (rt == null) {
-      LOG.warn("Unexpected NULL input for RegionTransition rt");
-      return;
-    }
-    final ServerName sn = rt.getServerName();
-    // Check if this is a special HBCK transition
-    if (sn.equals(HBCK_CODE_SERVERNAME)) {
-      handleHBCK(rt);
-      return;
-    }
-    final long createTime = rt.getCreateTime();
-    final byte[] regionName = rt.getRegionName();
-    String encodedName = HRegionInfo.encodeRegionName(regionName);
-    String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
-    // Verify this is a known server
-    if (!serverManager.isServerOnline(sn)
-      && !ignoreStatesRSOffline.contains(rt.getEventType())) {
-      LOG.warn("Attempted to handle region transition for server but " +
-        "it is not online: " + prettyPrintedRegionName + ", " + rt);
-      return;
-    }
-
-    RegionState regionState =
-      regionStates.getRegionState(encodedName);
-    long startTime = System.currentTimeMillis();
-    if (LOG.isDebugEnabled()) {
-      boolean lateEvent = createTime < (startTime - 15000);
-      LOG.debug("Handling " + rt.getEventType() +
-        ", server=" + sn + ", region=" +
-        (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
-        (lateEvent ? ", which is more than 15 seconds late" : "") +
-        ", current_state=" + regionState);
-    }
-    // We don't do anything for this event,
-    // so separate it out, no need to lock/unlock anything
-    if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
-      return;
-    }
-
-    // We need a lock on the region as we could update it
-    Lock lock = locker.acquireLock(encodedName);
-    try {
-      RegionState latestState =
-        regionStates.getRegionState(encodedName);
-      if ((regionState == null && latestState != null)
-          || (regionState != null && latestState == null)
-          || (regionState != null && latestState != null
-            && latestState.getState() != regionState.getState())) {
-        LOG.warn("Region state changed from " + regionState + " to "
-          + latestState + ", while acquiring lock");
-      }
-      long waitedTime = System.currentTimeMillis() - startTime;
-      if (waitedTime > 5000) {
-        LOG.warn("Took " + waitedTime + "ms to acquire the lock");
-      }
-      regionState = latestState;
-      switch (rt.getEventType()) {
-      case RS_ZK_REQUEST_REGION_SPLIT:
-      case RS_ZK_REGION_SPLITTING:
-      case RS_ZK_REGION_SPLIT:
-        if (!handleRegionSplitting(
-            rt, encodedName, prettyPrintedRegionName, sn)) {
-          deleteSplittingNode(encodedName, sn);
-        }
-        break;
-
-      case RS_ZK_REQUEST_REGION_MERGE:
-      case RS_ZK_REGION_MERGING:
-      case RS_ZK_REGION_MERGED:
-        // Merged region is a new region, we can't find it in the region states now.
-        // However, the two merging regions are not new. They should be in state for merging.
-        if (!handleRegionMerging(
-            rt, encodedName, prettyPrintedRegionName, sn)) {
-          deleteMergingNode(encodedName, sn);
-        }
-        break;
-
-      case M_ZK_REGION_CLOSING:
-        // Should see CLOSING after we have asked it to CLOSE or additional
-        // times after already being in state of CLOSING
-        if (regionState == null
-            || !regionState.isPendingCloseOrClosingOnServer(sn)) {
-          LOG.warn("Received CLOSING for " + prettyPrintedRegionName
-            + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
-            + regionStates.getRegionState(encodedName));
-          return;
-        }
-        // Transition to CLOSING (or update stamp if already CLOSING)
-        regionStates.updateRegionState(rt, State.CLOSING);
-        break;
-
-      case RS_ZK_REGION_CLOSED:
-        // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
-        if (regionState == null
-            || !regionState.isPendingCloseOrClosingOnServer(sn)) {
-          LOG.warn("Received CLOSED for " + prettyPrintedRegionName
-            + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
-            + regionStates.getRegionState(encodedName));
-          return;
-        }
-        // Handle CLOSED by assigning elsewhere or stopping if a disable
-        // If we got here all is good.  Need to update RegionState -- else
-        // what follows will fail because not in expected state.
-        new ClosedRegionHandler(server, this, regionState.getRegion()).process();
-        updateClosedRegionHandlerTracker(regionState.getRegion());
-        break;
-
-        case RS_ZK_REGION_FAILED_OPEN:
-          if (regionState == null
-              || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
-            LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
-              + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
-              + regionStates.getRegionState(encodedName));
-            return;
-          }
-          AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
-          if (failedOpenCount == null) {
-            failedOpenCount = new AtomicInteger();
-            // No need to use putIfAbsent, or extra synchronization since
-            // this whole handleRegion block is locked on the encoded region
-            // name, and failedOpenTracker is updated only in this block
-            failedOpenTracker.put(encodedName, failedOpenCount);
-          }
-          if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
-            regionStates.updateRegionState(rt, State.FAILED_OPEN);
-            // remove the tracking info to save memory, also reset
-            // the count for next open initiative
-            failedOpenTracker.remove(encodedName);
-          } else {
-            // Handle this the same as if it were opened and then closed.
-            regionState = regionStates.updateRegionState(rt, State.CLOSED);
-            if (regionState != null) {
-              // When there are more than one region server a new RS is selected as the
-              // destination and the same is updated in the regionplan. (HBASE-5546)
-              try {
-                getRegionPlan(regionState.getRegion(), sn, true);
-                new ClosedRegionHandler(server, this, regionState.getRegion()).process();
-              } catch (HBaseIOException e) {
-                LOG.warn("Failed to get region plan", e);
-              }
-            }
-          }
-          break;
-
-        case RS_ZK_REGION_OPENING:
-          // Should see OPENING after we have asked it to OPEN or additional
-          // times after already being in state of OPENING
-          if (regionState == null
-              || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
-            LOG.warn("Received OPENING for " + prettyPrintedRegionName
-              + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
-              + regionStates.getRegionState(encodedName));
-            return;
-          }
-          // Transition to OPENING (or update stamp if already OPENING)
-          regionStates.updateRegionState(rt, State.OPENING);
-          break;
-
-        case RS_ZK_REGION_OPENED:
-          // Should see OPENED after OPENING but possible after PENDING_OPEN.
-          if (regionState == null
-              || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
-            LOG.warn("Received OPENED for " + prettyPrintedRegionName
-              + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
-              + regionStates.getRegionState(encodedName));
-
-            if (regionState != null) {
-              // Close it without updating the internal region states,
-              // so as not to create double assignments in unlucky scenarios
-              // mentioned in OpenRegionHandler#process
-              unassign(regionState.getRegion(), null, -1, null, false, sn);
-            }
-            return;
-          }
-          // Handle OPENED by removing from transition and deleted zk node
-          regionState = regionStates.updateRegionState(rt, State.OPEN);
-          if (regionState != null) {
-            failedOpenTracker.remove(encodedName); // reset the count, if any
-            new OpenedRegionHandler(
-              server, this, regionState.getRegion(), coordination, ord).process();
-            updateOpenedRegionHandlerTracker(regionState.getRegion());
-          }
-          break;
-
-        default:
-          throw new IllegalStateException("Received event is not valid.");
-      }
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  //For unit tests only
-  boolean wasClosedHandlerCalled(HRegionInfo hri) {
-    AtomicBoolean b = closedRegionHandlerCalled.get(hri);
-    //compareAndSet to be sure that unit tests don't see stale values. Means,
-    //we will return true exactly once unless the handler code resets to true
-    //this value.
-    return b == null ? false : b.compareAndSet(true, false);
-  }
-
-  //For unit tests only
-  boolean wasOpenedHandlerCalled(HRegionInfo hri) {
-    AtomicBoolean b = openedRegionHandlerCalled.get(hri);
-    //compareAndSet to be sure that unit tests don't see stale values. Means,
-    //we will return true exactly once unless the handler code resets to true
-    //this value.
-    return b == null ? false : b.compareAndSet(true, false);
-  }
-
-  //For unit tests only
-  void initializeHandlerTrackers() {
-    closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
-    openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
-  }
-
-  void updateClosedRegionHandlerTracker(HRegionInfo hri) {
-    if (closedRegionHandlerCalled != null) { //only for unit tests this is true
-      closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
-    }
-  }
-
-  void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
-    if (openedRegionHandlerCalled != null) { //only for unit tests this is true
-      openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
-    }
-  }
-
   // TODO: processFavoredNodes might throw an exception, for e.g., if the
   // meta could not be contacted/updated. We need to see how seriously to treat
   // this problem as. Should we fail the current assignment. We should be able
@@ -1150,264 +577,6 @@ public class AssignmentManager extends ZooKeeperListener {
   }
 
   /**
-   * Handle a ZK unassigned node transition triggered by HBCK repair tool.
-   * <p>
-   * This is handled in a separate code path because it breaks the normal rules.
-   * @param rt
-   */
-  @SuppressWarnings("deprecation")
-  private void handleHBCK(RegionTransition rt) {
-    String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
-    LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
-      ", server=" + rt.getServerName() + ", region=" +
-      HRegionInfo.prettyPrint(encodedName));
-    RegionState regionState = regionStates.getRegionTransitionState(encodedName);
-    switch (rt.getEventType()) {
-      case M_ZK_REGION_OFFLINE:
-        HRegionInfo regionInfo;
-        if (regionState != null) {
-          regionInfo = regionState.getRegion();
-        } else {
-          try {
-            byte [] name = rt.getRegionName();
-            Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
-              this.server.getShortCircuitConnection(), name);
-            regionInfo = p.getFirst();
-          } catch (IOException e) {
-            LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
-            return;
-          }
-        }
-        LOG.info("HBCK repair is triggering assignment of region=" +
-            regionInfo.getRegionNameAsString());
-        // trigger assign, node is already in OFFLINE so don't need to update ZK
-        assign(regionInfo, false);
-        break;
-
-      default:
-        LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
-        break;
-    }
-
-  }
-
-  // ZooKeeper events
-
-  /**
-   * New unassigned node has been created.
-   *
-   * <p>This happens when an RS begins the OPENING or CLOSING of a region by
-   * creating an unassigned node.
-   *
-   * <p>When this happens we must:
-   * <ol>
-   *   <li>Watch the node for further events</li>
-   *   <li>Read and handle the state in the node</li>
-   * </ol>
-   */
-  @Override
-  public void nodeCreated(String path) {
-    handleAssignmentEvent(path);
-  }
-
-  /**
-   * Existing unassigned node has had data changed.
-   *
-   * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
-   * OPENING/OPENED and CLOSING/CLOSED.
-   *
-   * <p>When this happens we must:
-   * <ol>
-   *   <li>Watch the node for further events</li>
-   *   <li>Read and handle the state in the node</li>
-   * </ol>
-   */
-  @Override
-  public void nodeDataChanged(String path) {
-    handleAssignmentEvent(path);
-  }
-
-
-  // We  don't want to have two events on the same region managed simultaneously.
-  // For this reason, we need to wait if an event on the same region is currently in progress.
-  // So we track the region names of the events in progress, and we keep a waiting list.
-  private final Set<String> regionsInProgress = new HashSet<String>();
-  // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need
-  //  this as we want the events to be managed in the same order as we received them.
-  private final LinkedHashMultimap <String, RegionRunnable>
-      zkEventWorkerWaitingList = LinkedHashMultimap.create();
-
-  /**
-   * A specific runnable that works only on a region.
-   */
-  private interface RegionRunnable extends Runnable{
-    /**
-     * @return - the name of the region it works on.
-     */
-    String getRegionName();
-  }
-
-  /**
-   * Submit a task, ensuring that there is only one task at a time that working on a given region.
-   * Order is respected.
-   */
-  protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
-
-    synchronized (regionsInProgress) {
-      // If we're there is already a task with this region, we add it to the
-      //  waiting list and return.
-      if (regionsInProgress.contains(regRunnable.getRegionName())) {
-        synchronized (zkEventWorkerWaitingList){
-          zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
-        }
-        return;
-      }
-
-      // No event in progress on this region => we can submit a new task immediately.
-      regionsInProgress.add(regRunnable.getRegionName());
-      zkEventWorkers.submit(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            regRunnable.run();
-          } finally {
-            // now that we have finished, let's see if there is an event for the same region in the
-            //  waiting list. If it's the case, we can now submit it to the pool.
-            synchronized (regionsInProgress) {
-              regionsInProgress.remove(regRunnable.getRegionName());
-              synchronized (zkEventWorkerWaitingList) {
-                java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
-                    regRunnable.getRegionName());
-                if (!waiting.isEmpty()) {
-                  // We want the first object only. The only way to get it is through an iterator.
-                  RegionRunnable toSubmit = waiting.iterator().next();
-                  zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
-                  zkEventWorkersSubmit(toSubmit);
-                }
-              }
-            }
-          }
-        }
-      });
-    }
-  }
-
-  @Override
-  public void nodeDeleted(final String path) {
-    if (path.startsWith(watcher.assignmentZNode)) {
-      final String regionName = ZKAssign.getRegionName(watcher, path);
-      zkEventWorkersSubmit(new RegionRunnable() {
-        @Override
-        public String getRegionName() {
-          return regionName;
-        }
-
-        @Override
-        public void run() {
-          Lock lock = locker.acquireLock(regionName);
-          try {
-            RegionState rs = regionStates.getRegionTransitionState(regionName);
-            if (rs == null) {
-              rs = regionStates.getRegionState(regionName);
-              if (rs == null || !rs.isMergingNew()) {
-                // MergingNew is an offline state
-                return;
-              }
-            }
-
-            HRegionInfo regionInfo = rs.getRegion();
-            String regionNameStr = regionInfo.getRegionNameAsString();
-            LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
-
-            boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(),
-                ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
-
-            ServerName serverName = rs.getServerName();
-            if (serverManager.isServerOnline(serverName)) {
-              if (rs.isOnServer(serverName)
-                  && (rs.isOpened() || rs.isSplitting())) {
-                regionOnline(regionInfo, serverName);
-                if (disabled) {
-                  // if server is offline, no hurt to unassign again
-                  LOG.info("Opened " + regionNameStr
-                    + "but this table is disabled, triggering close of region");
-                  unassign(regionInfo);
-                }
-              } else if (rs.isMergingNew()) {
-                synchronized (regionStates) {
-                  String p = regionInfo.getEncodedName();
-                  PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
-                  if (regions != null) {
-                    onlineMergingRegion(disabled, regions.getFirst(), serverName);
-                    onlineMergingRegion(disabled, regions.getSecond(), serverName);
-                  }
-                }
-              }
-            }
-          } finally {
-            lock.unlock();
-          }
-        }
-
-        private void onlineMergingRegion(boolean disabled,
-            final HRegionInfo hri, final ServerName serverName) {
-          RegionState regionState = regionStates.getRegionState(hri);
-          if (regionState != null && regionState.isMerging()
-              && regionState.isOnServer(serverName)) {
-            regionOnline(regionState.getRegion(), serverName);
-            if (disabled) {
-              unassign(hri);
-            }
-          }
-        }
-      });
-    }
-  }
-
-  /**
-   * New unassigned node has been created.
-   *
-   * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
-   * region by creating a znode.
-   *
-   * <p>When this happens we must:
-   * <ol>
-   *   <li>Watch the node for further children changed events</li>
-   *   <li>Watch all new children for changed events</li>
-   * </ol>
-   */
-  @Override
-  public void nodeChildrenChanged(String path) {
-    if (path.equals(watcher.assignmentZNode)) {
-      zkEventWorkers.submit(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            // Just make sure we see the changes for the new znodes
-            List<String> children =
-              ZKUtil.listChildrenAndWatchForNewChildren(
-                watcher, watcher.assignmentZNode);
-            if (children != null) {
-              Stat stat = new Stat();
-              for (String child : children) {
-                // if region is in transition, we already have a watch
-                // on it, so no need to watch it again. So, as I know for now,
-                // this is needed to watch splitting nodes only.
-                if (!regionStates.isRegionInTransition(child)) {
-                  ZKAssign.getDataAndWatch(watcher, child, stat);
-                }
-              }
-            }
-          } catch (KeeperException e) {
-            server.abort("Unexpected ZK exception reading unassigned children", e);
-          }
-        }
-      });
-    }
-  }
-
-
-  /**
    * Marks the region as online.  Removes it from regions in transition and
    * updates the in-memory assignment information.
    * <p>
@@ -1432,55 +601,6 @@ public class AssignmentManager extends ZooKeeperListener {
   }
 
   /**
-   * Pass the assignment event to a worker for processing.
-   * Each worker is a single thread executor service.  The reason
-   * for just one thread is to make sure all events for a given
-   * region are processed in order.
-   *
-   * @param path
-   */
-  private void handleAssignmentEvent(final String path) {
-    if (path.startsWith(watcher.assignmentZNode)) {
-      final String regionName = ZKAssign.getRegionName(watcher, path);
-
-      zkEventWorkersSubmit(new RegionRunnable() {
-        @Override
-        public String getRegionName() {
-          return regionName;
-        }
-
-        @Override
-        public void run() {
-          try {
-            Stat stat = new Stat();
-            byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
-            if (data == null) return;
-
-            RegionTransition rt = RegionTransition.parseFrom(data);
-
-            // TODO: This code is tied to ZK anyway, so for now leaving it as is,
-            // will refactor when whole region assignment will be abstracted from ZK
-            BaseCoordinatedStateManager csm =
-              (BaseCoordinatedStateManager) server.getCoordinatedStateManager();
-            OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
-
-            ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
-              new ZkOpenRegionCoordination.ZkOpenRegionDetails();
-            zkOrd.setVersion(stat.getVersion());
-            zkOrd.setServerName(csm.getServer().getServerName());
-
-            handleRegion(rt, openRegionCoordination, zkOrd);
-          } catch (KeeperException e) {
-            server.abort("Unexpected ZK exception reading unassigned node data", e);
-          } catch (DeserializationException e) {
-            server.abort("Unexpected exception deserializing node data", e);
-          }
-        }
-      });
-    }
-  }
-
-  /**
    * Marks the region as offline.  Removes it from regions in transition and
    * removes in-memory assignment information.
    * <p>
@@ -1492,15 +612,6 @@ public class AssignmentManager extends ZooKeeperListener {
   }
 
   public void offlineDisabledRegion(HRegionInfo regionInfo) {
-    if (useZKForAssignment) {
-      // Disabling so should not be reassigned, just delete the CLOSED node
-      LOG.debug("Table being disabled so deleting ZK node and removing from " +
-        "regions in transition, skipping assignment of region " +
-          regionInfo.getRegionNameAsString());
-      String encodedName = regionInfo.getEncodedName();
-      deleteNodeInStates(encodedName, "closed", null,
-        EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
-    }
     replicasToClose.remove(regionInfo);
     regionOffline(regionInfo);
   }
@@ -1517,23 +628,19 @@ public class AssignmentManager extends ZooKeeperListener {
    * Updates the RegionState and sends the OPEN RPC.
    * <p>
    * This will only succeed if the region is in transition and in a CLOSED or
-   * OFFLINE state or not in transition (in-memory not zk), and of course, the
-   * chosen server is up and running (It may have just crashed!).  If the
-   * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
+   * OFFLINE state or not in transition, and of course, the
+   * chosen server is up and running (It may have just crashed!).
    *
    * @param region server to be assigned
-   * @param setOfflineInZK whether ZK node should be created/transitioned to an
-   *                       OFFLINE state before assigning the region
    */
-  public void assign(HRegionInfo region, boolean setOfflineInZK) {
-    assign(region, setOfflineInZK, false);
+  public void assign(HRegionInfo region) {
+    assign(region, false);
   }
 
   /**
    * Use care with forceNewPlan. It could cause double assignment.
    */
-  public void assign(HRegionInfo region,
-      boolean setOfflineInZK, boolean forceNewPlan) {
+  public void assign(HRegionInfo region, boolean forceNewPlan) {
     if (isDisabledorDisablingRegionInRIT(region)) {
       return;
     }
@@ -1553,7 +660,7 @@ public class AssignmentManager extends ZooKeeperListener {
             + " is dead but not processed yet");
           return;
         }
-        assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
+        assign(state, forceNewPlan);
       }
     } finally {
       lock.unlock();
@@ -1583,12 +690,8 @@ public class AssignmentManager extends ZooKeeperListener {
       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
       try {
-        AtomicInteger counter = new AtomicInteger(0);
-        Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
-        OfflineCallback cb = new OfflineCallback(
-          watcher, destination, counter, offlineNodesVersions);
-        Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
-        List<RegionState> states = new ArrayList<RegionState>(regions.size());
+        Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regionCount);
+        List<RegionState> states = new ArrayList<RegionState>(regionCount);
         for (HRegionInfo region : regions) {
           String encodedName = region.getEncodedName();
           if (!isDisabledorDisablingRegionInRIT(region)) {
@@ -1600,8 +703,7 @@ public class AssignmentManager extends ZooKeeperListener {
                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
                   + " is dead but not processed yet");
                 onDeadServer = true;
-              } else if (!useZKForAssignment
-                  || asyncSetOfflineInZooKeeper(state, cb, destination)) {
+              } else {
                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
                 plans.put(encodedName, plan);
                 states.add(state);
@@ -1610,8 +712,8 @@ public class AssignmentManager extends ZooKeeperListener {
             }
             // Reassign if the region wasn't on a dead server
             if (!onDeadServer) {
-              LOG.info("failed to force region state to offline or "
-                + "failed to set it offline in ZK, will reassign later: " + region);
+              LOG.info("failed to force region state to offline, "
+                + "will reassign later: " + region);
               failedToOpenRegions.add(region); // assign individually later
             }
           }
@@ -1621,21 +723,6 @@ public class AssignmentManager extends ZooKeeperListener {
           lock.unlock();
         }
 
-        if (useZKForAssignment) {
-          // Wait until all unassigned nodes have been put up and watchers set.
-          int total = states.size();
-          for (int oldCounter = 0; !server.isStopped();) {
-            int count = counter.get();
-            if (oldCounter != count) {
-              LOG.debug(destination.toString() + " unassigned znodes=" + count +
-                " of total=" + total + "; oldCounter=" + oldCounter);
-              oldCounter = count;
-            }
-            if (count >= total) break;
-            Thread.sleep(5);
-          }
-        }
-
         if (server.isStopped()) {
           return false;
         }
@@ -1644,27 +731,18 @@ public class AssignmentManager extends ZooKeeperListener {
         // that unnecessary timeout on RIT is reduced.
         this.addPlans(plans);
 
-        List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
-          new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
+        List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos =
+          new ArrayList<Pair<HRegionInfo, List<ServerName>>>(states.size());
         for (RegionState state: states) {
           HRegionInfo region = state.getRegion();
-          String encodedRegionName = region.getEncodedName();
-          Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
-          if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
-            LOG.warn("failed to offline in zookeeper: " + region);
-            failedToOpenRegions.add(region); // assign individually later
-            Lock lock = locks.remove(encodedRegionName);
-            lock.unlock();
-          } else {
-            regionStates.updateRegionState(
-              region, State.PENDING_OPEN, destination);
-            List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
-            if (this.shouldAssignRegionsWithFavoredNodes) {
-              favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
-            }
-            regionOpenInfos.add(new Triple<HRegionInfo, Integer,  List<ServerName>>(
-              region, nodeVersion, favoredNodes));
+          regionStates.updateRegionState(
+            region, State.PENDING_OPEN, destination);
+          List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
+          if (this.shouldAssignRegionsWithFavoredNodes) {
+            favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
           }
+          regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>(
+            region, favoredNodes));
         }
 
         // Move on to open regions.
@@ -1686,15 +764,8 @@ public class AssignmentManager extends ZooKeeperListener {
                 RegionOpeningState openingState = regionOpeningStateList.get(k);
                 if (openingState != RegionOpeningState.OPENED) {
                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
-                  if (openingState == RegionOpeningState.ALREADY_OPENED) {
-                    processAlreadyOpenedRegion(region, destination);
-                  } else if (openingState == RegionOpeningState.FAILED_OPENING) {
-                    // Failed opening this region, reassign it later
-                    failedToOpenRegions.add(region);
-                  } else {
-                    LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
-                      + openingState + " in assigning region " + region);
-                  }
+                  // Failed opening this region, reassign it later
+                  failedToOpenRegions.add(region);
                 }
               }
               break;
@@ -1771,8 +842,7 @@ public class AssignmentManager extends ZooKeeperListener {
    * on an unexpected server scenario, for an example)
    */
   private void unassign(final HRegionInfo region,
-      final RegionState state, final int versionOfClosingNode,
-      final ServerName dest, final boolean transitionInZK,
+      final RegionState state, final ServerName dest,
       final ServerName src) {
     ServerName server = src;
     if (state != null) {
@@ -1788,10 +858,6 @@ public class AssignmentManager extends ZooKeeperListener {
       if (!serverManager.isServerOnline(server)) {
         LOG.debug("Offline " + region.getRegionNameAsString()
           + ", no need to unassign since it's on a dead server: " + server);
-        if (transitionInZK) {
-          // delete the node. if no node exists need not bother.
-          deleteClosingOrClosedNode(region, server);
-        }
         if (state != null) {
           regionOffline(region);
         }
@@ -1799,16 +865,9 @@ public class AssignmentManager extends ZooKeeperListener {
       }
       try {
         // Send CLOSE RPC
-        if (serverManager.sendRegionClose(server, region,
-          versionOfClosingNode, dest, transitionInZK)) {
+        if (serverManager.sendRegionClose(server, region, dest)) {
           LOG.debug("Sent CLOSE to " + server + " for region " +
             region.getRegionNameAsString());
-          if (useZKForAssignment && !transitionInZK && state != null) {
-            // Retry to make sure the region is
-            // closed so as to avoid double assignment.
-            unassign(region, state, versionOfClosingNode,
-              dest, transitionInZK, src);
-          }
           return;
         }
         // This never happens. Currently regionserver close always return true.
@@ -1825,9 +884,6 @@ public class AssignmentManager extends ZooKeeperListener {
             || t instanceof ServerNotRunningYetException) {
           LOG.debug("Offline " + region.getRegionNameAsString()
             + ", it's not any more on " + server, t);
-          if (transitionInZK) {
-            deleteClosingOrClosedNode(region, server);
-          }
           if (state != null) {
             regionOffline(region);
           }
@@ -1840,9 +896,6 @@ public class AssignmentManager extends ZooKeeperListener {
             sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
           } else {
-            // RS is already processing this region, only need to update the timestamp
-            LOG.debug("update " + state + " the timestamp.");
-            state.updateTimestampToNow();
             if (maxWaitTime < 0) {
               maxWaitTime =
                   EnvironmentEdgeManager.currentTimeMillis()
@@ -1898,7 +951,6 @@ public class AssignmentManager extends ZooKeeperListener {
       state = regionStates.createRegionState(region);
     }
 
-    ServerName sn = state.getServerName();
     if (forceNewPlan && LOG.isDebugEnabled()) {
       LOG.debug("Force region state offline " + state);
     }
@@ -1916,7 +968,7 @@ public class AssignmentManager extends ZooKeeperListener {
       }
     case FAILED_CLOSE:
     case FAILED_OPEN:
-      unassign(region, state, -1, null, false, null);
+      unassign(region, state, null, null);
       state = regionStates.getRegionState(region);
       if (state.isFailedClose()) {
         // If we can't close the region, we can't re-assign
@@ -1926,21 +978,6 @@ public class AssignmentManager extends ZooKeeperListener {
         return null;
       }
     case OFFLINE:
-      // This region could have been open on this server
-      // for a while. If the server is dead and not processed
-      // yet, we can move on only if the meta shows the
-      // region is not on this server actually, or on a server
-      // not dead, or dead and processed already.
-      // In case not using ZK, we don't need this check because
-      // we have the latest info in memory, and the caller
-      // will do another round checking any way.
-      if (useZKForAssignment
-          && regionStates.isServerDeadAndNotProcessed(sn)
-          && wasRegionOnDeadServerByMeta(region, sn)) {
-        LOG.info("Skip assigning " + region.getRegionNameAsString()
-          + ", it is on a dead but not processed yet server: " + sn);
-        return null;
-      }
     case CLOSED:
       break;
     default:
@@ -1951,49 +988,15 @@ public class AssignmentManager extends ZooKeeperListener {
     return state;
   }
 
-  @SuppressWarnings("deprecation")
-  private boolean wasRegionOnDeadServerByMeta(
-      final HRegionInfo region, final ServerName sn) {
-    try {
-      if (region.isMetaRegion()) {
-        ServerName server = this.server.getMetaTableLocator().
-          getMetaRegionLocation(this.server.getZooKeeper());
-        return regionStates.isServerDeadAndNotProcessed(server);
-      }
-      while (!server.isStopped()) {
-        try {
-          this.server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper());
-          Result r = MetaTableAccessor.getRegionResult(server.getShortCircuitConnection(),
-            region.getRegionName());
-          if (r == null || r.isEmpty()) return false;
-          ServerName server = HRegionInfo.getServerName(r);
-          return regionStates.isServerDeadAndNotProcessed(server);
-        } catch (IOException ioe) {
-          LOG.info("Received exception accessing hbase:meta during force assign "
-            + region.getRegionNameAsString() + ", retrying", ioe);
-        }
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.info("Interrupted accessing hbase:meta", e);
-    }
-    // Call is interrupted or server is stopped.
-    return regionStates.isServerDeadAndNotProcessed(sn);
-  }
-
   /**
    * Caller must hold lock on the passed <code>state</code> object.
    * @param state
-   * @param setOfflineInZK
    * @param forceNewPlan
    */
-  private void assign(RegionState state,
-      final boolean setOfflineInZK, final boolean forceNewPlan) {
+  private void assign(RegionState state, boolean forceNewPlan) {
     long startTime = EnvironmentEdgeManager.currentTimeMillis();
     try {
       Configuration conf = server.getConfiguration();
-      RegionState currentState = state;
-      int versionOfOfflineNode = -1;
       RegionPlan plan = null;
       long maxWaitTime = -1;
       HRegionInfo region = state.getRegion();
@@ -2027,14 +1030,6 @@ public class AssignmentManager extends ZooKeeperListener {
           regionStates.updateRegionState(region, State.FAILED_OPEN);
           return;
         }
-        if (setOfflineInZK && versionOfOfflineNode == -1) {
-          // get the version of the znode after setting it to OFFLINE.
-          // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
-          versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
-          if (versionOfOfflineNode != -1) {
-            if (isDisabledorDisablingRegionInRIT(region)) {
-              return;
-            }
             // In case of assignment from EnableTableHandler table state is ENABLING. Any how
             // EnableTableHandler will set ENABLED after assigning all the table regions. If we
             // try to set to ENABLED directly then client API may think table is enabled.
@@ -2047,22 +1042,10 @@ public class AssignmentManager extends ZooKeeperListener {
               LOG.debug("Setting table " + tableName + " to ENABLED state.");
               setEnabledTable(tableName);
             }
-          }
-        }
-        if (setOfflineInZK && versionOfOfflineNode == -1) {
-          LOG.info("Unable to set offline in ZooKeeper to assign " + region);
-          // Setting offline in ZK must have been failed due to ZK racing or some
-          // exception which may make the server to abort. If it is ZK racing,
-          // we should retry since we already reset the region state,
-          // existing (re)assignment will fail anyway.
-          if (!server.isAborted()) {
-            continue;
-          }
-        }
         LOG.info("Assigning " + region.getRegionNameAsString() +
             " to " + plan.getDestination().toString());
         // Transition RegionState to PENDING_OPEN
-        currentState = regionStates.updateRegionState(region,
+       regionStates.updateRegionState(region,
           State.PENDING_OPEN, plan.getDestination());
 
         boolean needNewPlan;
@@ -2074,7 +1057,7 @@ public class AssignmentManager extends ZooKeeperListener {
             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
           }
           regionOpenState = serverManager.sendRegionOpen(
-              plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
+              plan.getDestination(), region, favoredNodes);
 
           if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
             // Failed opening this region, looping again on a new server.
@@ -2084,9 +1067,6 @@ public class AssignmentManager extends ZooKeeperListener {
                 "try=" + i + " of " + this.maximumAttempts);
           } else {
             // we're done
-            if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
-              processAlreadyOpenedRegion(region, plan.getDestination());
-            }
             return;
           }
 
@@ -2186,8 +1166,7 @@ public class AssignmentManager extends ZooKeeperListener {
             // Clean out plan we failed execute and one that doesn't look like it'll
             // succeed anyways; we need a new plan!
             // Transition back to OFFLINE
-            currentState = regionStates.updateRegionState(region, State.OFFLINE);
-            versionOfOfflineNode = -1;
+            regionStates.updateRegionState(region, State.OFFLINE);
             plan = newPlan;
           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
               previousException instanceof FailedServerException) {
@@ -2213,17 +1192,6 @@ public class AssignmentManager extends ZooKeeperListener {
     }
   }
 
-  private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
-    // Remove region from in-memory transition and unassigned node from ZK
-    // While trying to enable the table the regions of the table were
-    // already enabled.
-    LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
-      + " to " + sn);
-    String encodedName = region.getEncodedName();
-    deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
-    regionStates.regionOnline(region, sn);
-  }
-
   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
     if (this.tableStateManager.isTableState(region.getTable(),
         ZooKeeperProtos.Table.State.DISABLED,
@@ -2237,37 +1205,6 @@ public class AssignmentManager extends ZooKeeperListener {
   }
 
   /**
-   * Set region as OFFLINED up in zookeeper
-   *
-   * @param state
-   * @return the version of the offline node if setting of the OFFLINE node was
-   *         successful, -1 otherwise.
-   */
-  private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
-    if (!state.isClosed() && !state.isOffline()) {
-      String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
-      this.server.abort(msg, new IllegalStateException(msg));
-      return -1;
-    }
-    regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
-    int versionOfOfflineNode;
-    try {
-      // get the version after setting the znode to OFFLINE
-      versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
-        state.getRegion(), destination);
-      if (versionOfOfflineNode == -1) {
-        LOG.warn("Attempted to create/force node into OFFLINE state before "
-            + "completing assignment but failed to do so for " + state);
-        return -1;
-      }
-    } catch (KeeperException e) {
-      server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
-      return -1;
-    }
-    return versionOfOfflineNode;
-  }
-
-  /**
    * @param region the region to assign
    * @return Plan for passed <code>region</code> (If none currently, it creates one or
    * if no servers to assign, it returns null).
@@ -2388,7 +1325,6 @@ public class AssignmentManager extends ZooKeeperListener {
 
     String encodedName = region.getEncodedName();
     // Grab the state of this region and synchronize on it
-    int versionOfClosingNode = -1;
     // We need a lock here as we're going to do a put later and we don't want multiple states
     //  creation
     ReentrantLock lock = locker.acquireLock(encodedName);
@@ -2404,56 +1340,12 @@ public class AssignmentManager extends ZooKeeperListener {
           // Offline region will be reassigned below
           return;
         }
-        // Create the znode in CLOSING state
-        try {
-          if (state == null || state.getServerName() == null) {
-            // We don't know where the region is, offline it.
-            // No need to send CLOSE RPC
-            LOG.warn("Attempting to unassign a region not in RegionStates"
-              + region.getRegionNameAsString() + ", offlined");
-            regionOffline(region);
-            return;
-          }
-          if (useZKForAssignment) {
-            versionOfClosingNode = ZKAssign.createNodeClosing(
-              watcher, region, state.getServerName());
-            if (versionOfClosingNode == -1) {
-              LOG.info("Attempting to unassign " +
-                region.getRegionNameAsString() + " but ZK closing node "
-                + "can't be created.");
-              reassign = false; // not unassigned at all
-              return;
-            }
-          }
-        } catch (KeeperException e) {
-          if (e instanceof NodeExistsException) {
-            // Handle race between master initiated close and regionserver
-            // orchestrated splitting. See if existing node is in a
-            // SPLITTING or SPLIT state.  If so, the regionserver started
-            // an op on node before we could get our CLOSING in.  Deal.
-            NodeExistsException nee = (NodeExistsException)e;
-            String path = nee.getPath();
-            try {
-              if (isSplitOrSplittingOrMergedOrMerging(path)) {
-                LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
-                  "skipping unassign because region no longer exists -- its split or merge");
-                reassign = false; // no need to reassign for split/merged region
-                return;
-              }
-            } catch (KeeperException.NoNodeException ke) {
-              LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
-                "; presuming split and that the region to unassign, " +
-                encodedName + ", no longer exists -- confirm", ke);
-              return;
-            } catch (KeeperException ke) {
-              LOG.error("Unexpected zk state", ke);
-            } catch (DeserializationException de) {
-              LOG.error("Failed parse", de);
-            }
-          }
-          // If we get here, don't understand whats going on -- abort.
-          server.abort("Unexpected ZK exception creating node CLOSING", e);
-          reassign = false; // heading out already
+        if (state == null || state.getServerName() == null) {
+          // We don't know where the region is, offline it.
+          // No need to send CLOSE RPC
+          LOG.warn("Attempting to unassign a region not in RegionStates"
+            + region.getRegionNameAsString() + ", offlined");
+          regionOffline(region);
           return;
         }
         state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
@@ -2468,7 +1360,6 @@ public class AssignmentManager extends ZooKeeperListener {
         if (state.isFailedClose()) {
           state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
         }
-        state.updateTimestampToNow();
       } else {
         LOG.debug("Attempting to unassign " +
           region.getRegionNameAsString() + " but it is " +
@@ -2476,13 +1367,13 @@ public class AssignmentManager extends ZooKeeperListener {
         return;
       }
 
-      unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
+      unassign(region, state, dest, null);
     } finally {
       lock.unlock();
 
       // Region is expected to be reassigned afterwards
       if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) {
-        assign(region, true);
+        assign(region);
       }
     }
   }
@@ -2492,48 +1383,6 @@ public class AssignmentManager extends ZooKeeperListener {
   }
 
   /**
-   * @param region regioninfo of znode to be deleted.
-   */
-  public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
-    String encodedName = region.getEncodedName();
-    deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
-      EventType.RS_ZK_REGION_CLOSED);
-  }
-
-  /**
-   * @param path
-   * @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state.
-   * @throws KeeperException Can happen if the znode went away in meantime.
-   * @throws DeserializationException
-   */
-  private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
-      throws KeeperException, DeserializationException {
-    boolean result = false;
-    // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets
-    // cleaned up before we can get data from it.
-    byte [] data = ZKAssign.getData(watcher, path);
-    if (data == null) {
-      LOG.info("Node " + path + " is gone");
-      return false;
-    }
-    RegionTransition rt = RegionTransition.parseFrom(data);
-    switch (rt.getEventType()) {
-    case RS_ZK_REQUEST_REGION_SPLIT:
-    case RS_ZK_REGION_SPLIT:
-    case RS_ZK_REGION_SPLITTING:
-    case RS_ZK_REQUEST_REGION_MERGE:
-    case RS_ZK_REGION_MERGED:
-    case RS_ZK_REGION_MERGING:
-      result = true;
-      break;
-    default:
-      LOG.info("Node " + path + " is in " + rt.getEventType());
-      break;
-    }
-    return result;
-  }
-
-  /**
    * Used by unit tests. Return the number of regions opened so far in the life
    * of the master. Increases by one every time the master opens a region
    * @return the counter value of the number of regions opened so far
@@ -2577,8 +1426,8 @@ public class AssignmentManager extends ZooKeeperListener {
    * @throws KeeperException
    */
   public void assignMeta() throws KeeperException {
-    this.server.getMetaTableLocator().deleteMetaLocation(this.watcher);
-    assign(HRegionInfo.FIRST_META_REGIONINFO, true);
+    this.server.getMetaTableLocator().deleteMetaLocation(this.server.getZooKeeper());
+    assign(HRegionInfo.FIRST_META_REGIONINFO);
   }
 
   /**
@@ -2735,30 +1584,6 @@ public class AssignmentManager extends ZooKeeperListener {
   }
 
   /**
-   * Wait until no regions in transition.
-   * @param timeout How long to wait.
-   * @return True if nothing in regions in transition.
-   * @throws InterruptedException
-   */
-  boolean waitUntilNoRegionsInTransition(final long timeout)
-      throws InterruptedException {
-    // Blocks until there are no regions in transition. It is possible that
-    // there
-    // are regions in transition immediately after this returns but guarantees
-    // that if it returns without an exception that there was a period of time
-    // with no regions in transition from the point-of-view of the in-memory
-    // state of the Master.
-    final long endTime = System.currentTimeMillis() + timeout;
-
-    while (!this.server.isStopped() && regionStates.isRegionsInTransition()
-        && endTime > System.currentTimeMillis()) {
-      regionStates.waitForUpdate(100);
-    }
-
-    return !regionStates.isRegionsInTransition();
-  }
-
-  /**
    * Rebuild the list of user regions and assignment information.
    * <p>
    * Returns a set of servers that are not found to be online that hosted
@@ -2829,16 +1654,11 @@ public class AssignmentManager extends ZooKeeperListener {
         if (!onlineServers.contains(regionLocation)) {
           // Region is located on a server that isn't online
           offlineServers.add(regionLocation);
-          if (useZKForAssignment) {
-            regionStates.regionOffline(regionInfo);
-          }
         } else if (!disabledOrEnablingTables.contains(tableName)) {
           // Region is being served and on an active server
           // add only if region not in disabled or enabling table
           regionStates.regionOnline(regionInfo, regionLocation);
           balancer.regionOnline(regionInfo, regionLocation);
-        } else if (useZKForAssignment) {
-          regionStates.regionOffline(regionInfo);
         }
         // need to enable the table if not disabled or disabling or enabling
         // this will be used in rolling restarts
@@ -2911,21 +1731,12 @@ public class AssignmentManager extends ZooKeeperListener {
 
   /**
    * Processes list of dead servers from result of hbase:meta scan and regions in RIT
-   * <p>
-   * This is used for failover to recover the lost regions that belonged to
-   * RegionServers which failed while there was no active master or regions
-   * that were in RIT.
-   * <p>
-   *
    *
    * @param deadServers
    *          The list of dead servers which failed while there was no active
    *          master. Can be null.
-   * @throws IOException
-   * @throws KeeperException
    */
-  private void processDeadServersAndRecoverLostRegions(
-      Set<ServerName> deadServers) throws IOException, KeeperException {
+  private void processDeadServers(Set<ServerName> deadServers) {
     if (deadServers != null && !deadServers.isEmpty()) {
       for (ServerName serverName: deadServers) {
         if (!serverManager.isServerDead(serverName)) {
@@ -2934,36 +1745,27 @@ public class AssignmentManager extends ZooKeeperListener {
       }
     }
 
-    List<String> nodes = useZKForAssignment ?
-      ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
-      : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
-    if (nodes != null && !nodes.isEmpty()) {
-      for (String encodedRegionName : nodes) {
-        processRegionInTransition(encodedRegionName, null);
+    // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
+    // in case the RPC call is not sent out yet before the master was shut down
+    // since we update the state before we send the RPC call. We can't update
+    // the state after the RPC call. Otherwise, we don't know what's happened
+    // to the region if the master dies right after the RPC call is out.
+    Map<String, RegionState> rits = regionStates.getRegionsInTransition();
+    for (RegionState regionState: rits.values()) {
+      if (!serverManager.isServerOnline(regionState.getServerName())) {
+        continue; // SSH will handle it
       }
-    } else if (!useZKForAssignment) {
-      // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
-      // in case the RPC call is not sent out yet before the master was shut down
-      // since we update the state before we send the RPC call. We can't update
-      // the state after the RPC call. Otherwise, we don't know what's happened
-      // to the region if the master dies right after the RPC call is out.
-      Map<String, RegionState> rits = regionStates.getRegionsInTransition();
-      for (RegionState regionState: rits.values()) {
-        if (!serverManager.isServerOnline(regionState.getServerName())) {
-          continue; // SSH will handle it
-        }
-        State state = regionState.getState();
-        LOG.info("Processing " + regionState);
-        switch (state) {
-        case PENDING_OPEN:
-          retrySendRegionOpen(regionState);
-          break;
-        case PENDING_CLOSE:
-          retrySendRegionClose(regionState);
-          break;
-        default:
-          // No process for other states
-        }
+      State state = regionState.getState();
+      LOG.info("Processing " + regionState);
+      switch (state) {
+      case PENDING_OPEN:
+        retrySendRegionOpen(regionState);
+        break;
+      case PENDING_CLOSE:
+        retrySendRegionClose(regionState);
+        break;
+      default:
+        // No process for other states
       }
     }
   }
@@ -2992,7 +1794,7 @@ public class AssignmentManager extends ZooKeeperListener {
                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
                 }
                 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
-                  serverName, hri, -1, favoredNodes);
+                  serverName, hri, favoredNodes);
 
                 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
                   // Failed opening this region, this means the target server didn't get
@@ -3045,7 +1847,7 @@ public class AssignmentManager extends ZooKeeperListener {
             while (serverManager.isServerOnline(serverName)
                 && !server.isStopped() && !server.isAborted()) {
               try {
-                if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
+                if (!serverManager.sendRegionClose(serverName, hri, null)) {
                   // This means the region is still on the target server
                   LOG.debug("Got false in retry sendRegionClose for "
                     + regionState + ", re-close it");
@@ -3179,43 +1981,26 @@ public class AssignmentManager extends ZooKeeperListener {
 
   /**
    * Check if the shutdown server carries the specific region.
-   * We have a bunch of places that store region location
-   * Those values aren't consistent. There is a delay of notification.
-   * The location from zookeeper unassigned node has the most recent data;
-   * but the node could be deleted after the region is opened by AM.
-   * The AM's info could be old when OpenedRegionHandler
-   * processing hasn't finished yet when server shutdown occurs.
    * @return whether the serverName currently hosts the region
    */
   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
-    RegionTransition rt = null;
-    try {
-      byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
-      // This call can legitimately come by null
-      rt = data == null? null: RegionTransition.parseFrom(data);
-    } catch (KeeperException e) {
-      server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
-    } catch (DeserializationException e) {
-      server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
-    }
-
-    ServerName addressFromZK = rt != null? rt.getServerName():  null;
-    if (addressFromZK != null) {
-      // if we get something from ZK, we will use the data
-      boolean matchZK = addressFromZK.equals(serverName);
-      LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
-        " current=" + serverName + ", matches=" + matchZK);
-      return matchZK;
+    RegionState regionState = regionStates.getRegionTransitionState(hri);
+    ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
+    if (transitionAddr != null) {
+      boolean matchTransitionAddr = transitionAddr.equals(serverName);
+      LOG.debug("Checking region=" + hri.getRegionNameAsString()
+        + ", transitioning on server=" + matchTransitionAddr
+        + " server being checked: " + serverName
+        + ", matches=" + matchTransitionAddr);
+      return matchTransitionAddr;
     }
 
-    ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
-    boolean matchAM = (addressFromAM != null &&
-      addressFromAM.equals(serverName));
-    LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
-      " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
-      " server being checked: " + serverName);
-
-    return matchAM;
+    ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
+    boolean matchAssignedAddr = serverName.equals(assignedAddr);
+    LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
+      + " is on server=" + assignedAddr + ", server being checked: "
+      + serverName);
+    return matchAssignedAddr;
   }
 
   /**
@@ -3237,7 +2022,7 @@ public class AssignmentManager extends ZooKeeperListener {
         }
       }
     }
-    List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
+    List<HRegionInfo> regions = regionStates.serverOffline(sn);
     for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
       HRegionInfo hri = it.next();
       String encodedName = hri.getEncodedName();
@@ -3255,12 +2040,6 @@ public class AssignmentManager extends ZooKeeperListener {
             + " on the dead server any more: " + sn);
           it.remove();
         } else {
-          try {
-            // Delete the ZNode if exists
-            ZKAssign.deleteNodeFailSilent(watcher, hri);
-          } catch (KeeperException ke) {
-            server.abort("Unexpected ZK exception deleting node " + hri, ke);
-          }
           if (tableStateManager.isTableState(hri.getTable(),
               ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
             regionStates.regionOffline(hri);
@@ -3317,12 +2096,7 @@ public class AssignmentManager extends ZooKeeperListener {
    * Shutdown the threadpool executor service
    */
   public void shutdown() {
-    // It's an immediate shutdown, so we're clearing the remaining tasks.
-    synchronized (zkEventWorkerWaitingList){
-      zkEventWorkerWaitingList.clear();
-    }
     threadPoolExecutorService.shutdownNow();
-    zkEventWorkers.shutdownNow();
     regionStateStore.stop();
   }
 
@@ -3339,65 +2113,6 @@ public class AssignmentManager extends ZooKeeperListener {
     }
   }
 
-  /**
-   * Set region as OFFLINED up in zookeeper asynchronously.
-   * @param state
-   * @return True if we succeeded, false otherwise (State was incorrect or failed
-   * updating zk).
-   */
-  private boolean asyncSetOfflineInZooKeeper(final RegionState state,
-      final AsyncCallback.StringCallback cb, final ServerName destination) {
-    if (!state.isClosed() && !state.isOffline()) {
-      this.server.abort("Unexpected state trying to OFFLINE; " + state,
-        new IllegalStateException());
-      return false;
-    }
-    regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
-    try {
-      ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
-        destination, cb, state);
-    } catch (KeeperException e) {
-      if (e instanceof NodeExistsException) {
-        LOG.warn("Node for " + state.getRegion() + " already exists");
-      } else {
-        server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
-      }
-      return false;
-    }
-    return true;
-  }
-
-  private boolean deleteNodeInStates(String encodedName,
-      String desc, ServerName sn, EventType... types) {
-    try {
-      for (EventType et: types) {
-        if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
-          return true;
-        }
-      }
-      LOG.info("Failed to delete the " + desc + " node for "
-        + encodedName + ". The node type may not match");
-    } catch (NoNodeException e) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
-      }
-    } catch (KeeperException ke) {
-      server.abort("Unexpected ZK exception deleting " + desc
-        + " node for the region " + encodedName, ke);
-    }
-    return false;
-  }
-
-  private void deleteMergingNode(String encodedName, ServerName sn) {
-    deleteNodeInStates

<TRUNCATED>