You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2012/10/20 05:57:50 UTC

svn commit: r1400358 [1/4] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/master/handler/ main/java/org/apache/hadoop/hbase/protobuf/ main/java/org/apac...

Author: jxiang
Date: Sat Oct 20 03:57:49 2012
New Revision: 1400358

URL: http://svn.apache.org/viewvc?rev=1400358&view=rev
Log:
HBASE-6611 Forcing region state offline cause double assignment

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/OfflineCallback.java   (with props)
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionTransition.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java
    hbase/trunk/hbase-server/src/main/protobuf/Admin.proto
    hbase/trunk/hbase-server/src/main/protobuf/ZooKeeper.proto
    hbase/trunk/hbase-server/src/main/ruby/shell/commands/assign.rb
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionTransition.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionTransition.java?rev=1400358&r1=1400357&r2=1400358&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionTransition.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionTransition.java Sat Oct 20 03:57:49 2012
@@ -54,7 +54,7 @@ public class RegionTransition {
   }
 
   public ServerName getServerName() {
-    return ProtobufUtil.toServerName(this.rt.getOriginServerName());
+    return ProtobufUtil.toServerName(this.rt.getServerName());
   }
 
   public long getCreateTime() {
@@ -105,7 +105,7 @@ public class RegionTransition {
         setHostName(sn.getHostname()).setPort(sn.getPort()).setStartCode(sn.getStartcode()).build();
     ZooKeeperProtos.RegionTransition.Builder builder = ZooKeeperProtos.RegionTransition.newBuilder().
       setEventTypeCode(type.getCode()).setRegionName(ByteString.copyFrom(regionName)).
-        setOriginServerName(pbsn);
+        setServerName(pbsn);
     builder.setCreateTime(System.currentTimeMillis());
     if (payload != null) builder.setPayload(ByteString.copyFrom(payload));
     return new RegionTransition(builder.build());

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java?rev=1400358&r1=1400357&r2=1400358&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java Sat Oct 20 03:57:49 2012
@@ -42,7 +42,7 @@ public class AssignCallable implements C
 
   @Override
   public Object call() throws Exception {
-    assignmentManager.assign(hri, true, true, true);
+    assignmentManager.assign(hri, true, true);
     return null;
   }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1400358&r1=1400357&r2=1400358&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Sat Oct 20 03:57:49 2012
@@ -23,12 +23,14 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -117,9 +119,10 @@ public class AssignmentManager extends Z
   private final Map <String, HRegionInfo> regionsToReopen;
 
   /*
-   * Maximum times we recurse an assignment.  See below in {@link #assign()}.
+   * Maximum times we recurse an assignment/unassignment.
+   * See below in {@link #assign()} and {@link #unassign()}.
    */
-  private final int maximumAssignmentAttempts;
+  private final int maximumAttempts;
 
   /** Plans for region movement. Key is the encoded version of a region name*/
   // TODO: When do plans get cleaned out?  Ever? In server open and in server
@@ -158,6 +161,18 @@ public class AssignmentManager extends Z
    */
   final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
 
+  // A temp ZK watcher for bulk assigner to avoid deadlock,
+  // will be removed in HBASE-6977
+  //
+  // A separate ZK watcher used for async ZK node offline.
+  // We can't use that exiting one because it could lead to
+  // deadlocks if its event thread asks for a locker held by a bulk
+  // assigner thread. This watcher is just for async ZK node offline.
+  // In HBASE-6977, we are going to process assignment ZK events
+  // outside of ZK event thread, so there won't be deadlock
+  // threat anymore.  That's when this watcher to be removed.
+  private final ZooKeeperWatcher asyncOfflineZKWatcher;
+
   /**
    * Constructs a new assignment manager.
    *
@@ -180,20 +195,24 @@ public class AssignmentManager extends Z
                            (new HashMap<String, HRegionInfo> ());
     Configuration conf = server.getConfiguration();
     this.timeoutMonitor = new TimeoutMonitor(
-      conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000),
+      conf.getInt("hbase.master.assignment.timeoutmonitor.period", 60000),
       server, serverManager,
-      conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000));
+      conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1200000));
     this.timerUpdater = new TimerUpdater(conf.getInt(
-        "hbase.master.assignment.timerupdater.period", 10000), server);
+      "hbase.master.assignment.timerupdater.period", 10000), server);
     Threads.setDaemonThreadRunning(timerUpdater.getThread(),
-        server.getServerName() + ".timerUpdater");
+      server.getServerName() + ".timerUpdater");
     this.zkTable = new ZKTable(this.watcher);
-    this.maximumAssignmentAttempts =
+    this.maximumAttempts =
       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
     this.balancer = balancer;
     this.threadPoolExecutorService = Executors.newCachedThreadPool();
     this.masterMetrics = metrics;// can be null only with tests.
     this.regionStates = new RegionStates(server, serverManager);
+    // A temp ZK watcher for bulk assigner to avoid deadlock,
+    // will be removed in HBASE-6977
+    asyncOfflineZKWatcher = new ZooKeeperWatcher(conf,
+      "async offline ZK watcher", server);
   }
 
   void startTimeOutMonitor() {
@@ -265,7 +284,7 @@ public class AssignmentManager extends Z
    * @throws IOException
    */
   public Pair<Integer, Integer> getReopenStatus(byte[] tableName)
-  throws IOException {
+      throws IOException {
     List <HRegionInfo> hris =
       MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName);
     Integer pending = 0;
@@ -397,9 +416,8 @@ public class AssignmentManager extends Z
    * @throws IOException
    */
   boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
-  throws InterruptedException, KeeperException, IOException {
-    boolean intransistion =
-      processRegionInTransition(hri.getEncodedName(), hri, null);
+      throws InterruptedException, KeeperException, IOException {
+    boolean intransistion = processRegionInTransition(hri.getEncodedName(), hri);
     if (!intransistion) return intransistion;
     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(hri.getEncodedName()));
     while (!this.server.isStopped() &&
@@ -416,14 +434,12 @@ public class AssignmentManager extends Z
    * up in zookeeper.
    * @param encodedRegionName Region to process failover for.
    * @param regionInfo If null we'll go get it from meta table.
-   * @param deadServers Can be null
    * @return True if we processed <code>regionInfo</code> as a RIT.
    * @throws KeeperException
    * @throws IOException
    */
   boolean processRegionInTransition(final String encodedRegionName,
-      final HRegionInfo regionInfo, final Map<ServerName, List<HRegionInfo>> deadServers)
-          throws KeeperException, IOException {
+      final HRegionInfo regionInfo) throws KeeperException, IOException {
     // We need a lock here to ensure that we will not put the same region twice
     // It has no reason to be a lock shared with the other operations.
     // We can do the lock on the region only, instead of a global lock: what we want to ensure
@@ -445,7 +461,7 @@ public class AssignmentManager extends Z
         hri = regionStates.getRegionInfo(rt.getRegionName());
         if (hri == null) return false;
       }
-      processRegionsInTransition(rt, hri, deadServers, stat.getVersion());
+      processRegionsInTransition(rt, hri, stat.getVersion());
       return true;
     } finally {
       lock.unlock();
@@ -453,16 +469,17 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * This call is invoked only during failover mode, zk assignment node processing.
+   * This call is invoked only (1) master assign root and meta;
+   * (2) during failover mode startup, zk assignment node processing.
    * The locker is set in the caller.
    *
    * It should be private but it is used by some test too.
    */
-  void processRegionsInTransition(final RegionTransition rt, final HRegionInfo regionInfo,
-      final Map<ServerName, List<HRegionInfo>> deadServers, int expectedVersion)
-          throws KeeperException {
+  void processRegionsInTransition(
+      final RegionTransition rt, final HRegionInfo regionInfo,
+      int expectedVersion) throws KeeperException {
     EventType et = rt.getEventType();
-    // Get ServerName.  Could be null.
+    // Get ServerName.  Could not be null.
     ServerName sn = rt.getServerName();
     String encodedRegionName = regionInfo.getEncodedName();
     LOG.info("Processing region " + regionInfo.getRegionNameAsString() + " in state " + et);
@@ -475,9 +492,8 @@ public class AssignmentManager extends Z
     case M_ZK_REGION_CLOSING:
       // If zk node of the region was updated by a live server skip this
       // region and just add it into RIT.
-      if (isOnDeadServer(regionInfo, deadServers)
-          && !serverManager.isServerOnline(sn)) {
-        // If was on dead server, its closed now. Force to OFFLINE and this
+      if (!serverManager.isServerOnline(sn)) {
+        // If was not online, its closed now. Force to OFFLINE and this
         // will get it reassigned if appropriate
         forceOffline(regionInfo, rt);
       } else {
@@ -496,58 +512,42 @@ public class AssignmentManager extends Z
     case M_ZK_REGION_OFFLINE:
       // If zk node of the region was updated by a live server skip this
       // region and just add it into RIT.
-      if (isOnDeadServer(regionInfo, deadServers)
-          && (sn == null || !serverManager.isServerOnline(sn))) {
+      if (!serverManager.isServerOnline(sn)) {
         // Region is offline, insert into RIT and handle it like a closed
         addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
-      } else if (sn != null && !serverManager.isServerOnline(sn)) {
-        // to handle cases where offline node is created but sendRegionOpen
-        // RPC is not yet sent
-        addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
       } else {
+        // Just insert region into RIT.
+        // If this never updates the timeout will trigger new assignment
         regionStates.updateRegionState(rt, RegionState.State.PENDING_OPEN);
       }
       break;
 
     case RS_ZK_REGION_OPENING:
-      // TODO: Could check if it was on deadServers.  If it was, then we could
-      // do what happens in TimeoutMonitor when it sees this condition.
-      // Just insert region into RIT
-      // If this never updates the timeout will trigger new assignment
-      if (regionInfo.isMetaTable()) {
+      if (regionInfo.isMetaTable() || !serverManager.isServerOnline(sn)) {
         regionStates.updateRegionState(rt, RegionState.State.OPENING);
         // If ROOT or .META. table is waiting for timeout monitor to assign
         // it may take lot of time when the assignment.timeout.period is
         // the default value which may be very long.  We will not be able
         // to serve any request during this time.
         // So we will assign the ROOT and .META. region immediately.
+        // For a user region, if the server is not online, it takes
+        // some time for timeout monitor to kick in.  We know the region
+        // won't open. So we will assign the opening
+        // region immediately too.
         processOpeningState(regionInfo);
-        break;
-      } else if (deadServers != null
-          && deadServers.keySet().contains(sn)) {
-        // if the region is found on a dead server, we can assign
-        // it to a new RS. (HBASE-5882)
-        processOpeningState(regionInfo);
-        break;
+      } else {
+        // Just insert region into RIT.
+        // If this never updates the timeout will trigger new assignment
+        regionStates.updateRegionState(rt, RegionState.State.OPENING);
       }
-      regionStates.updateRegionState(rt, RegionState.State.OPENING);
       break;
 
     case RS_ZK_REGION_OPENED:
-      // Region is opened, insert into RIT and handle it
-      regionStates.updateRegionState(rt, RegionState.State.OPEN);
-      // sn could be null if this server is no longer online.  If
-      // that is the case, just let this RIT timeout; it'll be assigned
-      // to new server then.
-      if (sn == null) {
-        LOG.warn("Region in transition " + regionInfo.getEncodedName() +
-          " references a null server; letting RIT timeout so will be " +
-          "assigned elsewhere");
-      } else if (!serverManager.isServerOnline(sn)
-          && (isOnDeadServer(regionInfo, deadServers)
-              || regionInfo.isMetaRegion() || regionInfo.isRootRegion())) {
+      if (!serverManager.isServerOnline(sn)) {
         forceOffline(regionInfo, rt);
       } else {
+        // Region is opened, insert into RIT and handle it
+        regionStates.updateRegionState(rt, RegionState.State.OPEN);
         new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
       }
       break;
@@ -562,7 +562,6 @@ public class AssignmentManager extends Z
     }
   }
 
-
   /**
    * Put the region <code>hri</code> into an offline state up in zk.
    *
@@ -573,13 +572,12 @@ public class AssignmentManager extends Z
    * @throws KeeperException
    */
   private void forceOffline(final HRegionInfo hri, final RegionTransition oldRt)
-  throws KeeperException {
+      throws KeeperException {
     // If was on dead server, its closed now.  Force to OFFLINE and then
     // handle it like a close; this will get it reassigned if appropriate
     LOG.debug("RIT " + hri.getEncodedName() + " in state=" + oldRt.getEventType() +
       " was on deadserver; forcing offline");
-    ZKAssign.createOrForceNodeOffline(this.watcher, hri,
-      this.server.getServerName());
+    ZKAssign.createOrForceNodeOffline(this.watcher, hri, oldRt.getServerName());
     addToRITandCallClose(hri, RegionState.State.OFFLINE, oldRt);
   }
 
@@ -591,7 +589,7 @@ public class AssignmentManager extends Z
    * @param oldData
    */
   private void addToRITandCallClose(final HRegionInfo hri,
-                                    final RegionState.State state, final RegionTransition oldData) {
+      final RegionState.State state, final RegionTransition oldData) {
     regionStates.updateRegionState(oldData, state);
     new ClosedRegionHandler(this.server, this, hri).process();
   }
@@ -607,23 +605,6 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * @param regionInfo
-   * @param deadServers Map of deadServers and the regions they were carrying;
-   * can be null.
-   * @return True if the passed regionInfo in the passed map of deadServers?
-   */
-  private boolean isOnDeadServer(final HRegionInfo regionInfo,
-      final Map<ServerName, List<HRegionInfo>> deadServers) {
-    if (deadServers == null) return false;
-    for (List<HRegionInfo> deadRegions: deadServers.values()) {
-      if (deadRegions.contains(regionInfo)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
    * Handles various states an unassigned node can be in.
    * <p>
    * Method is called when a state change is suspected for an unassigned node.
@@ -639,10 +620,6 @@ public class AssignmentManager extends Z
       return;
     }
     final ServerName sn = rt.getServerName();
-    if (sn == null) {
-      LOG.warn("Null servername: " + rt);
-      return;
-    }
     // Check if this is a special HBCK transition
     if (sn.equals(HBCK_CODE_SERVERNAME)) {
       handleHBCK(rt);
@@ -653,30 +630,48 @@ public class AssignmentManager extends Z
     String encodedName = HRegionInfo.encodeRegionName(regionName);
     String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
     // Verify this is a known server
-    if (!serverManager.isServerOnline(sn) &&
-      !this.server.getServerName().equals(sn)
+    if (!serverManager.isServerOnline(sn)
       && !ignoreStatesRSOffline.contains(rt.getEventType())) {
       LOG.warn("Attempted to handle region transition for server but " +
         "server is not online: " + prettyPrintedRegionName);
       return;
     }
 
-    // We need a lock on the region as we could update it
-    Lock lock = locker.acquireLock(encodedName);
-    try {
-      // Printing if the event was created a long time ago helps debugging
-      boolean lateEvent = createTime < (System.currentTimeMillis() - 15000);
-      RegionState regionState = regionStates.getRegionTransitionState(encodedName);
+    RegionState regionState =
+      regionStates.getRegionTransitionState(encodedName);
+    long startTime = System.currentTimeMillis();
+    if (LOG.isDebugEnabled()) {
+      boolean lateEvent = createTime < (startTime - 15000);
       LOG.debug("Handling transition=" + rt.getEventType() +
         ", server=" + sn + ", region=" +
         (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
         (lateEvent ? ", which is more than 15 seconds late" : "") +
         ", current state from region state map =" + regionState);
-      switch (rt.getEventType()) {
-        case M_ZK_REGION_OFFLINE:
-          // Nothing to do.
-          break;
+    }
+    // We don't do anything for this event,
+    // so separate it out, no need to lock/unlock anything
+    if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
+      return;
+    }
 
+    // We need a lock on the region as we could update it
+    Lock lock = locker.acquireLock(encodedName);
+    try {
+      RegionState latestState =
+        regionStates.getRegionTransitionState(encodedName);
+      if ((regionState == null && latestState != null)
+          || (regionState != null && latestState == null)
+          || (regionState != null && latestState != null
+            && latestState.getState() != regionState.getState())) {
+        LOG.warn("Region state changed from " + regionState + " to "
+          + latestState + ", while acquiring lock");
+      }
+      long waitedTime = System.currentTimeMillis() - startTime;
+      if (waitedTime > 5000) {
+        LOG.warn("Took " + waitedTime + "ms to acquire the lock");
+      }
+      regionState = latestState;
+      switch (rt.getEventType()) {
         case RS_ZK_REGION_SPLITTING:
           if (!isInStateForSplitting(regionState)) break;
           regionStates.updateRegionState(rt, RegionState.State.SPLITTING);
@@ -725,12 +720,12 @@ public class AssignmentManager extends Z
         case M_ZK_REGION_CLOSING:
           // Should see CLOSING after we have asked it to CLOSE or additional
           // times after already being in state of CLOSING
-          if (regionState != null &&
-              (!regionState.isPendingClose() && !regionState.isClosing())) {
-            LOG.warn("Received CLOSING for region " + prettyPrintedRegionName +
-              " from server " + sn + " but region was in " +
-              " the state " + regionState + " and not " +
-              "in expected PENDING_CLOSE or CLOSING states");
+          if (regionState != null
+              && !regionState.isPendingCloseOrClosingOnServer(sn)) {
+            LOG.warn("Received CLOSING for region " + prettyPrintedRegionName
+              + " from server " + sn + " but region was in the state " + regionState
+              + " and not in expected PENDING_CLOSE or CLOSING states,"
+              + " or not on the expected server");
             return;
           }
           // Transition to CLOSING (or update stamp if already CLOSING)
@@ -739,12 +734,12 @@ public class AssignmentManager extends Z
 
         case RS_ZK_REGION_CLOSED:
           // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
-          if (regionState != null &&
-              (!regionState.isPendingClose() && !regionState.isClosing())) {
-            LOG.warn("Received CLOSED for region " + prettyPrintedRegionName +
-                " from server " + sn + " but region was in " +
-                " the state " + regionState + " and not " +
-                "in expected PENDING_CLOSE or CLOSING states");
+          if (regionState != null
+              && !regionState.isPendingCloseOrClosingOnServer(sn)) {
+            LOG.warn("Received CLOSED for region " + prettyPrintedRegionName
+              + " from server " + sn + " but region was in the state " + regionState
+              + " and not in expected PENDING_CLOSE or CLOSING states,"
+              + " or not on the expected server");
             return;
           }
           // Handle CLOSED by assigning elsewhere or stopping if a disable
@@ -759,11 +754,12 @@ public class AssignmentManager extends Z
           break;
 
         case RS_ZK_REGION_FAILED_OPEN:
-          if (regionState != null &&
-              (!regionState.isPendingOpen() && !regionState.isOpening())) {
-            LOG.warn("Received FAILED_OPEN for region " + prettyPrintedRegionName +
-                " from server " + sn + " but region was in " +
-                " the state " + regionState + " and not in PENDING_OPEN or OPENING");
+          if (regionState != null
+              && !regionState.isPendingOpenOrOpeningOnServer(sn)) {
+            LOG.warn("Received FAILED_OPEN for region " + prettyPrintedRegionName
+              + " from server " + sn + " but region was in the state " + regionState
+              + " and not in expected PENDING_OPEN or OPENING states,"
+              + " or not on the expected server");
             return;
           }
           // Handle this the same as if it were opened and then closed.
@@ -771,7 +767,7 @@ public class AssignmentManager extends Z
           // When there are more than one region server a new RS is selected as the
           // destination and the same is updated in the regionplan. (HBASE-5546)
           if (regionState != null) {
-            getRegionPlan(regionState, sn, true);
+            getRegionPlan(regionState.getRegion(), sn, true);
             this.executorService.submit(new ClosedRegionHandler(server,
               this, regionState.getRegion()));
           }
@@ -780,13 +776,12 @@ public class AssignmentManager extends Z
         case RS_ZK_REGION_OPENING:
           // Should see OPENING after we have asked it to OPEN or additional
           // times after already being in state of OPENING
-          if (regionState != null &&
-              (!regionState.isPendingOpen() && !regionState.isOpening())) {
-            LOG.warn("Received OPENING for region " +
-                prettyPrintedRegionName +
-                " from server " + sn + " but region was in " +
-                " the state " + regionState + " and not " +
-                "in expected PENDING_OPEN or OPENING states");
+          if (regionState != null
+              && !regionState.isPendingOpenOrOpeningOnServer(sn)) {
+            LOG.warn("Received OPENING for region " + prettyPrintedRegionName
+              + " from server " + sn + " but region was in the state " + regionState
+              + " and not in expected PENDING_OPEN or OPENING states,"
+              + " or not on the expected server");
             return;
           }
           // Transition to OPENING (or update stamp if already OPENING)
@@ -795,13 +790,12 @@ public class AssignmentManager extends Z
 
         case RS_ZK_REGION_OPENED:
           // Should see OPENED after OPENING but possible after PENDING_OPEN
-          if (regionState != null &&
-              (!regionState.isPendingOpen() && !regionState.isOpening())) {
-            LOG.warn("Received OPENED for region " +
-                prettyPrintedRegionName +
-                " from server " + sn + " but region was in " +
-                " the state " + regionState + " and not " +
-                "in expected PENDING_OPEN or OPENING states");
+          if (regionState != null
+              && !regionState.isPendingOpenOrOpeningOnServer(sn)) {
+            LOG.warn("Received OPENED for region " + prettyPrintedRegionName
+              + " from server " + sn + " but region was in the state " + regionState
+              + " and not in expected PENDING_OPEN or OPENING states,"
+              + " or not on the expected server");
             return;
           }
           // Handle OPENED by removing from transition and deleted zk node
@@ -948,31 +942,36 @@ public class AssignmentManager extends Z
   public void nodeDeleted(final String path) {
     if (path.startsWith(this.watcher.assignmentZNode)) {
       String regionName = ZKAssign.getRegionName(this.watcher, path);
-      RegionState rs = regionStates.getRegionTransitionState(regionName);
-      if (rs != null) {
-        HRegionInfo regionInfo = rs.getRegion();
-        if (rs.isSplit()) {
-          LOG.debug("Ephemeral node deleted, regionserver crashed?, " +
-            "clearing from RIT; rs=" + rs);
-          regionOffline(rs.getRegion());
-        } else {
-          LOG.debug("The znode of region " + regionInfo.getRegionNameAsString()
-              + " has been deleted.");
-          if (rs.isOpened()) {
-            ServerName serverName = rs.getServerName();
-            regionOnline(regionInfo, serverName);
-            LOG.info("The master has opened the region "
-              + regionInfo.getRegionNameAsString() + " that was online on "
-              + serverName);
-            if (this.getZKTable().isDisablingOrDisabledTable(
-                regionInfo.getTableNameAsString())) {
-              LOG.debug("Opened region "
-                  + regionInfo.getRegionNameAsString() + " but "
-                  + "this table is disabled, triggering close of region");
-              unassign(regionInfo);
+      Lock lock = locker.acquireLock(regionName);
+      try {
+        RegionState rs = regionStates.getRegionTransitionState(regionName);
+        if (rs != null) {
+          HRegionInfo regionInfo = rs.getRegion();
+          if (rs.isSplit()) {
+            LOG.debug("Ephemeral node deleted, regionserver crashed?, " +
+              "clearing from RIT; rs=" + rs);
+            regionOffline(rs.getRegion());
+          } else {
+            LOG.debug("The znode of region " + regionInfo.getRegionNameAsString()
+                + " has been deleted.");
+            if (rs.isOpened()) {
+              ServerName serverName = rs.getServerName();
+              regionOnline(regionInfo, serverName);
+              LOG.info("The master has opened the region "
+                + regionInfo.getRegionNameAsString() + " that was online on "
+                + serverName);
+              if (this.getZKTable().isDisablingOrDisabledTable(
+                  regionInfo.getTableNameAsString())) {
+                LOG.debug("Opened region "
+                    + regionInfo.getRegionNameAsString() + " but "
+                    + "this table is disabled, triggering close of region");
+                unassign(regionInfo);
+              }
             }
           }
         }
+      } finally {
+        lock.unlock();
       }
     }
   }
@@ -1125,22 +1124,12 @@ public class AssignmentManager extends Z
     assign(region, setOfflineInZK, false);
   }
 
-  public void assign(HRegionInfo region, boolean setOfflineInZK,
-      boolean forceNewPlan) {
-    assign(region, setOfflineInZK, forceNewPlan, false);
-  }
-
   /**
-   * @param region
-   * @param setOfflineInZK
-   * @param forceNewPlan
-   * @param hijack True if new assignment is needed, false otherwise
+   * Use care with forceNewPlan. It could cause double assignment.
    */
-  public void assign(HRegionInfo region, boolean setOfflineInZK,
-      boolean forceNewPlan, boolean hijack) {
-    // If hijack is true do not call disableRegionIfInRIT as
-    // we have not yet moved the znode to OFFLINE state.
-    if (!hijack && isDisabledorDisablingRegionInRIT(region)) {
+  public void assign(HRegionInfo region,
+      boolean setOfflineInZK, boolean forceNewPlan) {
+    if (!setOfflineInZK && isDisabledorDisablingRegionInRIT(region)) {
       return;
     }
     if (this.serverManager.isClusterShutdown()) {
@@ -1148,11 +1137,13 @@ public class AssignmentManager extends Z
         region.getRegionNameAsString());
       return;
     }
-    RegionState state = forceRegionStateToOffline(region, hijack);
     String encodedName = region.getEncodedName();
     Lock lock = locker.acquireLock(encodedName);
     try {
-      assign(region, state, setOfflineInZK, forceNewPlan, hijack);
+      RegionState state = forceRegionStateToOffline(region, forceNewPlan);
+      if (state != null) {
+        assign(state, setOfflineInZK, forceNewPlan);
+      }
     } finally {
       lock.unlock();
     }
@@ -1166,226 +1157,251 @@ public class AssignmentManager extends Z
    */
   boolean assign(final ServerName destination,
       final List<HRegionInfo> regions) {
-    if (regions.size() == 0) {
+    int regionCount = regions.size();
+    if (regionCount == 0) {
       return true;
     }
-    LOG.debug("Bulk assigning " + regions.size() + " region(s) to " +
+    LOG.debug("Bulk assigning " + regionCount + " region(s) to " +
       destination.toString());
 
-    List<RegionState> states = new ArrayList<RegionState>(regions.size());
+    Set<String> encodedNames = new HashSet<String>(regionCount);
     for (HRegionInfo region : regions) {
-      states.add(forceRegionStateToOffline(region));
+      encodedNames.add(region.getEncodedName());
     }
-    // Add region plans, so we can updateTimers when one region is opened so
-    // that unnecessary timeout on RIT is reduced.
-    Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
-    for (HRegionInfo region : regions) {
-      plans.put(region.getEncodedName(), new RegionPlan(region, null,
-          destination));
-    }
-    this.addPlans(plans);
 
-    // Presumption is that only this thread will be updating the state at this
-    // time; i.e. handlers on backend won't be trying to set it to OPEN, etc.
-    AtomicInteger counter = new AtomicInteger(0);
-    CreateUnassignedAsyncCallback cb =
-      new CreateUnassignedAsyncCallback(regionStates, this.watcher, destination, counter);
-    for (RegionState state: states) {
-      if (!asyncSetOfflineInZooKeeper(state, cb, state)) {
-        return false;
+    List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
+    Map<String, Lock> locks = locker.acquireLocks(encodedNames);
+    try {
+      AtomicInteger counter = new AtomicInteger(0);
+      Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
+      OfflineCallback cb = new OfflineCallback(
+        regionStates, asyncOfflineZKWatcher, destination, counter, offlineNodesVersions);
+      Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
+      List<RegionState> states = new ArrayList<RegionState>(regions.size());
+      for (HRegionInfo region : regions) {
+        String encodedRegionName = region.getEncodedName();
+        RegionState state = forceRegionStateToOffline(region, true);
+        if (state != null && asyncSetOfflineInZooKeeper(
+            state, asyncOfflineZKWatcher, cb, destination)) {
+          RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
+          plans.put(encodedRegionName, plan);
+          states.add(state);
+        } else {
+          LOG.warn("failed to force region state to offline or "
+            + "failed to set it offline in ZK, will reassign later: " + region);
+          failedToOpenRegions.add(region); // assign individually later
+          Lock lock = locks.remove(encodedRegionName);
+          lock.unlock();
+        }
       }
-    }
-    // Wait until all unassigned nodes have been put up and watchers set.
-    int total = regions.size();
-    for (int oldCounter = 0; !server.isStopped();) {
-      int count = counter.get();
-      if (oldCounter != count) {
-        LOG.info(destination.toString() + " unassigned znodes=" + count +
-          " of total=" + total);
-        oldCounter = count;
+
+      // Wait until all unassigned nodes have been put up and watchers set.
+      int total = states.size();
+      for (int oldCounter = 0; !server.isStopped();) {
+        int count = counter.get();
+        if (oldCounter != count) {
+          LOG.info(destination.toString() + " unassigned znodes=" + count +
+            " of total=" + total);
+          oldCounter = count;
+        }
+        if (count >= total) break;
+        Threads.sleep(5);
       }
-      if (count == total) break;
-      Threads.sleep(10);
-    }
-    if (server.isStopped()) {
-      return false;
-    }
 
-    // Move on to open regions.
-    try {
-      // Send OPEN RPC. If it fails on a IOE or RemoteException, the
-      // TimeoutMonitor will pick up the pieces.
-      long maxWaitTime = System.currentTimeMillis() +
-        this.server.getConfiguration().
-          getLong("hbase.regionserver.rpc.startup.waittime", 60000);
-      while (!this.server.isStopped()) {
-        try {
-          List<RegionOpeningState> regionOpeningStateList = this.serverManager
-              .sendRegionOpen(destination, regions);
-          if (regionOpeningStateList == null) {
-            // Failed getting RPC connection to this server
+      if (server.isStopped()) {
+        return false;
+      }
+
+      // Add region plans, so we can updateTimers when one region is opened so
+      // that unnecessary timeout on RIT is reduced.
+      this.addPlans(plans);
+
+      List<Pair<HRegionInfo, Integer>> regionOpenInfos =
+        new ArrayList<Pair<HRegionInfo, Integer>>(states.size());
+      for (RegionState state: states) {
+        HRegionInfo region = state.getRegion();
+        String encodedRegionName = region.getEncodedName();
+        Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
+        if (nodeVersion == null || nodeVersion.intValue() == -1) {
+          LOG.warn("failed to offline in zookeeper: " + region);
+          failedToOpenRegions.add(region); // assign individually later
+          Lock lock = locks.remove(encodedRegionName);
+          lock.unlock();
+        } else {
+          try { // Set the ZK watcher explicitly
+            ZKAssign.getData(this.watcher, encodedRegionName);
+          } catch (KeeperException e) {
+            server.abort("Unexpected exception watching ZKAssign node", e);
             return false;
           }
-          for (int i = 0; i < regionOpeningStateList.size(); i++) {
-            if (regionOpeningStateList.get(i) == RegionOpeningState.ALREADY_OPENED) {
-              processAlreadyOpenedRegion(regions.get(i), destination);
-            } else if (regionOpeningStateList.get(i) == RegionOpeningState.FAILED_OPENING) {
-              // Failed opening this region, reassign it
-              assign(regions.get(i), true, true);
+          regionStates.updateRegionState(region,
+            RegionState.State.PENDING_OPEN, destination);
+          regionOpenInfos.add(new Pair<HRegionInfo, Integer>(
+            region, nodeVersion));
+        }
+      }
+
+      // Move on to open regions.
+      try {
+        // Send OPEN RPC. If it fails on a IOE or RemoteException, the
+        // TimeoutMonitor will pick up the pieces.
+        long maxWaitTime = System.currentTimeMillis() +
+          this.server.getConfiguration().
+            getLong("hbase.regionserver.rpc.startup.waittime", 60000);
+        while (!this.server.isStopped()) {
+          try {
+            List<RegionOpeningState> regionOpeningStateList = serverManager
+              .sendRegionOpen(destination, regionOpenInfos);
+            if (regionOpeningStateList == null) {
+              // Failed getting RPC connection to this server
+              return false;
             }
+            for (int i = 0, n = regionOpeningStateList.size(); i < n; i++) {
+              RegionOpeningState openingState = regionOpeningStateList.get(i);
+              if (openingState != RegionOpeningState.OPENED) {
+                HRegionInfo region = regionOpenInfos.get(i).getFirst();
+                if (openingState == RegionOpeningState.ALREADY_OPENED) {
+                  processAlreadyOpenedRegion(region, destination);
+                } else if (openingState == RegionOpeningState.FAILED_OPENING) {
+                  // Failed opening this region, reassign it later
+                  failedToOpenRegions.add(region);
+                } else {
+                  LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
+                    + openingState + " in assigning region " + region);
+                }
+              }
+            }
+            break;
+          } catch (IOException e) {
+            if (e instanceof RemoteException) {
+              e = ((RemoteException)e).unwrapRemoteException();
+            }
+            if (e instanceof RegionServerStoppedException) {
+              LOG.warn("The region server was shut down, ", e);
+              // No need to retry, the region server is a goner.
+              return false;
+            } else if (e instanceof ServerNotRunningYetException) {
+              // This is the one exception to retry.  For all else we should just fail
+              // the startup.
+              long now = System.currentTimeMillis();
+              if (now < maxWaitTime) {
+                LOG.debug("Server is not yet up; waiting up to " +
+                  (maxWaitTime - now) + "ms", e);
+                Thread.sleep(100);
+                continue;
+              }
+            }
+            throw e;
           }
-          break;
-        } catch (RemoteException e) {
-          IOException decodedException = e.unwrapRemoteException();
-          if (decodedException instanceof RegionServerStoppedException) {
-            LOG.warn("The region server was shut down, ", decodedException);
-            // No need to retry, the region server is a goner.
-            return false;
-          } else if (decodedException instanceof ServerNotRunningYetException) {
-            // This is the one exception to retry.  For all else we should just fail
-            // the startup.
-            long now = System.currentTimeMillis();
-            if (now > maxWaitTime) throw e;
-            LOG.debug("Server is not yet up; waiting up to " +
-                (maxWaitTime - now) + "ms", e);
-            Thread.sleep(100);
-          }
-
-          throw decodedException;
         }
-      }
-    } catch (IOException e) {
-      // Can be a socket timeout, EOF, NoRouteToHost, etc
-      LOG.info("Unable to communicate with the region server in order" +
+      } catch (IOException e) {
+        // Can be a socket timeout, EOF, NoRouteToHost, etc
+        LOG.info("Unable to communicate with the region server in order" +
           " to assign regions", e);
-      return false;
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-    LOG.debug("Bulk assigning done for " + destination.toString());
-    return true;
-  }
-
-  /**
-   * Callback handler for create unassigned znodes used during bulk assign.
-   */
-  static class CreateUnassignedAsyncCallback implements AsyncCallback.StringCallback {
-    private final Log LOG = LogFactory.getLog(CreateUnassignedAsyncCallback.class);
-    private final RegionStates regionStates;
-    private final ZooKeeperWatcher zkw;
-    private final ServerName destination;
-    private final AtomicInteger counter;
-
-    CreateUnassignedAsyncCallback(final RegionStates regionStates,
-        final ZooKeeperWatcher zkw, final ServerName destination,
-        final AtomicInteger counter) {
-      this.regionStates = regionStates;
-      this.zkw = zkw;
-      this.destination = destination;
-      this.counter = counter;
+        return false;
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    } finally {
+      for (Lock lock : locks.values()) {
+        lock.unlock();
+      }
     }
 
-    @Override
-    public void processResult(int rc, String path, Object ctx, String name) {
-      if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
-        LOG.warn("Node for " + path + " already exists");
-      } else if (rc != 0) {
-        // This is result code.  If non-zero, need to resubmit.
-        LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
-          "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
-        this.zkw.abort("Connectionloss writing unassigned at " + path +
-          ", rc=" + rc, null);
-        return;
+    if (!failedToOpenRegions.isEmpty()) {
+      for (HRegionInfo region : failedToOpenRegions) {
+        invokeAssign(region);
       }
-      LOG.debug("rs=" + (RegionState)ctx + ", server=" + this.destination.toString());
-      // Async exists to set a watcher so we'll get triggered when
-      // unassigned node changes.
-      this.zkw.getRecoverableZooKeeper().getZooKeeper().exists(path, this.zkw,
-        new ExistsUnassignedAsyncCallback(regionStates, counter, destination), ctx);
     }
+    LOG.debug("Bulk assigning done for " + destination.toString());
+    return true;
   }
 
   /**
-   * Callback handler for the exists call that sets watcher on unassigned znodes.
-   * Used during bulk assign on startup.
+   * Send CLOSE RPC if the server is online, otherwise, offline the region
    */
-  static class ExistsUnassignedAsyncCallback implements AsyncCallback.StatCallback {
-    private final Log LOG = LogFactory.getLog(ExistsUnassignedAsyncCallback.class);
-    private final RegionStates regionStates;
-    private final AtomicInteger counter;
-    private ServerName destination;
-
-    ExistsUnassignedAsyncCallback(final RegionStates regionStates,
-        final AtomicInteger counter, ServerName destination) {
-      this.regionStates = regionStates;
-      this.counter = counter;
-      this.destination = destination;
+  private void unassign(final HRegionInfo region,
+      final RegionState state, final int versionOfClosingNode,
+      final ServerName dest, final boolean transitionInZK) {
+    // Send CLOSE RPC
+    ServerName server = state.getServerName();
+    // ClosedRegionhandler can remove the server from this.regions
+    if (!serverManager.isServerOnline(server)) {
+      // delete the node. if no node exists need not bother.
+      deleteClosingOrClosedNode(region);
+      regionOffline(region);
+      return;
     }
 
-    @Override
-    public void processResult(int rc, String path, Object ctx, Stat stat) {
-      if (rc != 0) {
-        // This is result code.  If non-zero, need to resubmit.
-        LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
-          "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
-        return;
+    for (int i = 1; i <= this.maximumAttempts; i++) {
+      try {
+        if (serverManager.sendRegionClose(server, region,
+          versionOfClosingNode, dest, transitionInZK)) {
+          LOG.debug("Sent CLOSE to " + server + " for region " +
+            region.getRegionNameAsString());
+          return;
+        }
+        // This never happens. Currently regionserver close always return true.
+        LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
+          region.getRegionNameAsString());
+      } catch (Throwable t) {
+        if (t instanceof RemoteException) {
+          t = ((RemoteException)t).unwrapRemoteException();
+        }
+        if (t instanceof NotServingRegionException) {
+          deleteClosingOrClosedNode(region);
+          regionOffline(region);
+          return;
+        } else if (t instanceof RegionAlreadyInTransitionException) {
+          // RS is already processing this region, only need to update the timestamp
+          LOG.debug("update " + state + " the timestamp.");
+          state.updateTimestampToNow();
+        }
+        LOG.info("Server " + server + " returned " + t + " for "
+          + region.getRegionNameAsString() + ", try=" + i
+          + " of " + this.maximumAttempts, t);
+        // Presume retry or server will expire.
       }
-      RegionState state = (RegionState)ctx;
-      LOG.debug("rs=" + state);
-      // Transition RegionState to PENDING_OPEN here in master; means we've
-      // sent the open.  We're a little ahead of ourselves here since we've not
-      // yet sent out the actual open but putting this state change after the
-      // call to open risks our writing PENDING_OPEN after state has been moved
-      // to OPENING by the regionserver.
-      regionStates.updateRegionState(state.getRegion(),
-        RegionState.State.PENDING_OPEN, System.currentTimeMillis(),
-        destination);
-      this.counter.addAndGet(1);
     }
   }
 
   /**
-   * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
-   * @param region
-   * @return Amended RegionState.
-   */
-  private RegionState forceRegionStateToOffline(final HRegionInfo region) {
-    return forceRegionStateToOffline(region, false);
-  }
-
-  /**
-   * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
-   * @param region
-   * @param hijack
-   * @return Amended RegionState.
+   * Set region to OFFLINE unless it is opening and forceNewPlan is false.
    */
-  private RegionState forceRegionStateToOffline(final HRegionInfo region,
-      boolean hijack) {
-    String encodedName = region.getEncodedName();
-
-    Lock lock = locker.acquireLock(encodedName);
-    try {
-      RegionState state = regionStates.getRegionTransitionState(encodedName);
-      if (state == null) {
-        state = regionStates.updateRegionState(
-          region, RegionState.State.OFFLINE);
-      } else {
-        // If we are reassigning the node do not force in-memory state to OFFLINE.
-        // Based on the znode state we will decide if to change in-memory state to
-        // OFFLINE or not. It will be done before setting znode to OFFLINE state.
-
-        // We often get here with state == CLOSED because ClosedRegionHandler will
-        // assign on its tail as part of the handling of a region close.
-        if (!hijack) {
+  private RegionState forceRegionStateToOffline(
+      final HRegionInfo region, final boolean forceNewPlan) {
+    RegionState state = regionStates.getRegionState(region);
+    if (state == null) {
+      LOG.warn("Assigning a region not in region states: " + region);
+      state = regionStates.createRegionState(region);
+    } else {
+      switch (state.getState()) {
+      case OPEN:
+      case OPENING:
+      case PENDING_OPEN:
+        if (!forceNewPlan) {
+          LOG.debug("Attempting to assign region " +
+            region + " but it is already in transition: " + state);
+          return null;
+        }
+      case CLOSING:
+      case PENDING_CLOSE:
+        unassign(region, state, -1, null, false);
+      case CLOSED:
+        if (!state.isOffline()) {
           LOG.debug("Forcing OFFLINE; was=" + state);
           state = regionStates.updateRegionState(
             region, RegionState.State.OFFLINE);
         }
+      case OFFLINE:
+        break;
+      default:
+        LOG.error("Trying to assign region " + region
+          + ", which is in state " + state);
+        return null;
       }
-      return state;
-    } finally {
-      lock.unlock();
     }
+    return state;
   }
 
   /**
@@ -1393,35 +1409,41 @@ public class AssignmentManager extends Z
    * @param state
    * @param setOfflineInZK
    * @param forceNewPlan
-   * @param hijack
    */
-  private void assign(final HRegionInfo region, final RegionState state,
-      final boolean setOfflineInZK, final boolean forceNewPlan,
-      boolean hijack) {
-    boolean regionAlreadyInTransitionException = false;
-    boolean serverNotRunningYet = false;
+  private void assign(RegionState state,
+      final boolean setOfflineInZK, final boolean forceNewPlan) {
     RegionState currentState = state;
+    int versionOfOfflineNode = -1;
+    RegionPlan plan = null;
     long maxRegionServerStartupWaitTime = -1;
-    for (int i = 0; i < this.maximumAssignmentAttempts; i++) {
-      int versionOfOfflineNode = -1;
-      if (setOfflineInZK) {
+    HRegionInfo region = state.getRegion();
+    for (int i = 1; i <= this.maximumAttempts; i++) {
+      if (plan == null) { // Get a server for the region at first
+        plan = getRegionPlan(region, forceNewPlan);
+      }
+      if (plan == null) {
+        LOG.debug("Unable to determine a plan to assign " + region);
+        this.timeoutMonitor.setAllRegionServersOffline(true);
+        return; // Should get reassigned later when RIT times out.
+      }
+      if (setOfflineInZK && versionOfOfflineNode == -1) {
         // get the version of the znode after setting it to OFFLINE.
         // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
-        versionOfOfflineNode = setOfflineInZooKeeper(currentState, hijack);
+        versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
         if (versionOfOfflineNode != -1) {
           if (isDisabledorDisablingRegionInRIT(region)) {
             return;
           }
           // In case of assignment from EnableTableHandler table state is ENABLING. Any how
           // EnableTableHandler will set ENABLED after assigning all the table regions. If we
-          // try to set to ENABLED directly then client api may think table is enabled.
+          // try to set to ENABLED directly then client API may think table is enabled.
           // When we have a case such as all the regions are added directly into .META. and we call
           // assignRegion then we need to make the table ENABLED. Hence in such case the table
           // will not be in ENABLING or ENABLED state.
           String tableName = region.getTableNameAsString();
           if (!zkTable.isEnablingTable(tableName) && !zkTable.isEnabledTable(tableName)) {
             LOG.debug("Setting table " + tableName + " to ENABLED state.");
-            setEnabledTable(region);
+            setEnabledTable(tableName);
           }
         }
       }
@@ -1429,29 +1451,21 @@ public class AssignmentManager extends Z
         return;
       }
       if (this.server.isStopped()) {
-        LOG.debug("Server stopped; skipping assign of " + state);
+        LOG.debug("Server stopped; skipping assign of " + region);
         return;
       }
-      RegionPlan plan = getRegionPlan(state,
-        !regionAlreadyInTransitionException && !serverNotRunningYet && forceNewPlan);
-      if (plan == null) {
-        LOG.debug("Unable to determine a plan to assign " + state);
-        this.timeoutMonitor.setAllRegionServersOffline(true);
-        return; // Should get reassigned later when RIT times out.
-      }
       try {
-        LOG.info("Assigning region " + state.getRegion().getRegionNameAsString() +
+        LOG.info("Assigning region " + region.getRegionNameAsString() +
           " to " + plan.getDestination().toString());
         // Transition RegionState to PENDING_OPEN
-        currentState = regionStates.updateRegionState(state.getRegion(),
-          RegionState.State.PENDING_OPEN, System.currentTimeMillis(),
-          plan.getDestination());
+        currentState = regionStates.updateRegionState(region,
+          RegionState.State.PENDING_OPEN, plan.getDestination());
         // Send OPEN RPC. This can fail if the server on other end is is not up.
         // Pass the version that was obtained while setting the node to OFFLINE.
         RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan
-            .getDestination(), state.getRegion(), versionOfOfflineNode);
+            .getDestination(), region, versionOfOfflineNode);
         if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
-          processAlreadyOpenedRegion(state.getRegion(), plan.getDestination());
+          processAlreadyOpenedRegion(region, plan.getDestination());
         } else if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
           // Failed opening this region
           throw new Exception("Get regionOpeningState=" + regionOpenState);
@@ -1461,13 +1475,14 @@ public class AssignmentManager extends Z
         if (t instanceof RemoteException) {
           t = ((RemoteException) t).unwrapRemoteException();
         }
-        regionAlreadyInTransitionException = false;
-        serverNotRunningYet = false;
+        boolean regionAlreadyInTransitionException = false;
+        boolean serverNotRunningYet = false;
+        boolean socketTimedOut = false;
         if (t instanceof RegionAlreadyInTransitionException) {
           regionAlreadyInTransitionException = true;
           if (LOG.isDebugEnabled()) {
             LOG.debug("Failed assignment in: " + plan.getDestination() + " due to "
-                + t.getMessage());
+              + t.getMessage());
           }
         } else if (t instanceof ServerNotRunningYetException) {
           if (maxRegionServerStartupWaitTime < 0) {
@@ -1488,52 +1503,72 @@ public class AssignmentManager extends Z
             }
           } catch (InterruptedException ie) {
             LOG.warn("Failed to assign "
-              + state.getRegion().getRegionNameAsString() + " since interrupted", ie);
+              + region.getRegionNameAsString() + " since interrupted", ie);
             Thread.currentThread().interrupt();
             return;
           }
-        }
-        if (t instanceof java.net.SocketTimeoutException 
+        } else if (t instanceof java.net.SocketTimeoutException
             && this.serverManager.isServerOnline(plan.getDestination())) {
-          LOG.warn("Call openRegion() to " + plan.getDestination()
+          // In case socket is timed out and the region server is still online,
+          // the openRegion RPC could have been accepted by the server and
+          // just the response isn't gone through.  So we will retry to
+          // open the region on the same server to avoid possible
+          // double assignment.
+          socketTimedOut = true;
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Call openRegion() to " + plan.getDestination()
               + " has timed out when trying to assign "
               + region.getRegionNameAsString()
               + ", but the region might already be opened on "
               + plan.getDestination() + ".", t);
-          return;
+          }
         }
+
         LOG.warn("Failed assignment of "
-          + state.getRegion().getRegionNameAsString()
+          + region.getRegionNameAsString()
           + " to "
           + plan.getDestination()
           + ", trying to assign "
-          + (regionAlreadyInTransitionException || serverNotRunningYet
-            ? "to the same region server because of "
-            + "RegionAlreadyInTransitionException/ServerNotRunningYetException;"
-            : "elsewhere instead; ")
-          + "retry=" + i, t);
-        // Clean out plan we failed execute and one that doesn't look like it'll
-        // succeed anyways; we need a new plan!
-        // Transition back to OFFLINE
-        currentState = regionStates.updateRegionState(
-          state.getRegion(), RegionState.State.OFFLINE);
+          + (regionAlreadyInTransitionException || serverNotRunningYet || socketTimedOut
+            ? "to the same region server because of RegionAlreadyInTransitionException"
+              + "/ServerNotRunningYetException/SocketTimeoutException;"
+              : "elsewhere instead; ")
+          + "try=" + i + " of " + this.maximumAttempts, t);
+
+        if (i == this.maximumAttempts) {
+          // Don't reset the region state or get a new plan any more.
+          // This is the last try.
+          continue;
+        }
+
         // If region opened on destination of present plan, reassigning to new
         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
         // reassigning to same RS.
         RegionPlan newPlan = plan;
-        if (!regionAlreadyInTransitionException && !serverNotRunningYet) {
+        if (!(regionAlreadyInTransitionException
+            || serverNotRunningYet || socketTimedOut)) {
           // Force a new plan and reassign. Will return null if no servers.
           // The new plan could be the same as the existing plan since we don't
           // exclude the server of the original plan, which should not be
           // excluded since it could be the only server up now.
-          newPlan = getRegionPlan(state, true);
+          newPlan = getRegionPlan(region, true);
         }
         if (newPlan == null) {
           this.timeoutMonitor.setAllRegionServersOffline(true);
           LOG.warn("Unable to find a viable location to assign region " +
-            state.getRegion().getRegionNameAsString());
+            region.getRegionNameAsString());
           return;
         }
+        if (plan != newPlan
+            && !plan.getDestination().equals(newPlan.getDestination())) {
+          // Clean out plan we failed execute and one that doesn't look like it'll
+          // succeed anyways; we need a new plan!
+          // Transition back to OFFLINE
+          currentState = regionStates.updateRegionState(
+            region, RegionState.State.OFFLINE);
+          versionOfOfflineNode = -1;
+          plan = newPlan;
+        }
       }
     }
   }
@@ -1577,44 +1612,22 @@ public class AssignmentManager extends Z
    * Set region as OFFLINED up in zookeeper
    *
    * @param state
-   * @param hijack
-   *          - true if needs to be hijacked and reassigned, false otherwise.
    * @return the version of the offline node if setting of the OFFLINE node was
    *         successful, -1 otherwise.
    */
-  int setOfflineInZooKeeper(final RegionState state, boolean hijack) {
-    // In case of reassignment the current state in memory need not be
-    // OFFLINE. 
-    if (!hijack && !state.isClosed() && !state.isOffline()) {
+  private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
+    if (!state.isClosed() && !state.isOffline()) {
       String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
       this.server.abort(msg, new IllegalStateException(msg));
       return -1;
     }
-    boolean allowZNodeCreation = false;
-    // Under reassignment if the current state is PENDING_OPEN
-    // or OPENING then refresh the in-memory state to PENDING_OPEN. This is
-    // important because if the region was in
-    // RS_OPENING state for a long time the master will try to force the znode
-    // to OFFLINE state meanwhile the RS could have opened the corresponding
-    // region and the state in znode will be RS_ZK_REGION_OPENED.
-    // For all other cases we can change the in-memory state to OFFLINE.
-    if (hijack &&
-        (state.getState().equals(RegionState.State.PENDING_OPEN) ||
-          state.getState().equals(RegionState.State.OPENING))) {
-      regionStates.updateRegionState(state.getRegion(),
-        RegionState.State.PENDING_OPEN);
-      allowZNodeCreation = false;
-    } else {
-      regionStates.updateRegionState(state.getRegion(),
-        RegionState.State.OFFLINE);
-      allowZNodeCreation = true;
-    }
+    regionStates.updateRegionState(state.getRegion(),
+      RegionState.State.OFFLINE);
     int versionOfOfflineNode = -1;
     try {
       // get the version after setting the znode to OFFLINE
       versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
-        state.getRegion(), this.server.getServerName(),
-          hijack, allowZNodeCreation);
+        state.getRegion(), destination);
       if (versionOfOfflineNode == -1) {
         LOG.warn("Attempted to create/force node into OFFLINE state before "
             + "completing assignment but failed to do so for " + state);
@@ -1628,57 +1641,28 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * Set region as OFFLINED up in zookeeper asynchronously.
-   * @param state
-   * @return True if we succeeded, false otherwise (State was incorrect or failed
-   * updating zk).
-   */
-  boolean asyncSetOfflineInZooKeeper(final RegionState state,
-      final AsyncCallback.StringCallback cb, final Object ctx) {
-    if (!state.isClosed() && !state.isOffline()) {
-      this.server.abort("Unexpected state trying to OFFLINE; " + state,
-        new IllegalStateException());
-      return false;
-    }
-    regionStates.updateRegionState(
-      state.getRegion(), RegionState.State.OFFLINE);
-    try {
-      ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
-        this.server.getServerName(), cb, ctx);
-    } catch (KeeperException e) {
-      if (e instanceof NodeExistsException) {
-        LOG.warn("Node for " + state.getRegion() + " already exists");
-      } else { 
-        server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
-      }
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * @param state
-   * @return Plan for passed <code>state</code> (If none currently, it creates one or
+   * @param region the region to assign
+   * @return Plan for passed <code>region</code> (If none currently, it creates one or
    * if no servers to assign, it returns null).
    */
-  RegionPlan getRegionPlan(final RegionState state,
+  private RegionPlan getRegionPlan(final HRegionInfo region,
       final boolean forceNewPlan) {
-    return getRegionPlan(state, null, forceNewPlan);
+    return getRegionPlan(region, null, forceNewPlan);
   }
 
   /**
-   * @param state
+   * @param region the region to assign
    * @param serverToExclude Server to exclude (we know its bad). Pass null if
    * all servers are thought to be assignable.
    * @param forceNewPlan If true, then if an existing plan exists, a new plan
    * will be generated.
-   * @return Plan for passed <code>state</code> (If none currently, it creates one or
+   * @return Plan for passed <code>region</code> (If none currently, it creates one or
    * if no servers to assign, it returns null).
    */
-  RegionPlan getRegionPlan(final RegionState state,
+  private RegionPlan getRegionPlan(final HRegionInfo region,
       final ServerName serverToExclude, final boolean forceNewPlan) {
     // Pickup existing plan or make a new one
-    final String encodedName = state.getRegion().getEncodedName();
+    final String encodedName = region.getEncodedName();
     final List<ServerName> destServers =
       serverManager.createDestinationServersList(serverToExclude);
 
@@ -1696,9 +1680,8 @@ public class AssignmentManager extends Z
       existingPlan = this.regionPlans.get(encodedName);
 
       if (existingPlan != null && existingPlan.getDestination() != null) {
-        LOG.debug("Found an existing plan for " +
-            state.getRegion().getRegionNameAsString() +
-       " destination server is " + existingPlan.getDestination().toString());
+        LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
+          + " destination server is " + existingPlan.getDestination());
       }
 
       if (forceNewPlan
@@ -1706,15 +1689,15 @@ public class AssignmentManager extends Z
           || existingPlan.getDestination() == null
           || !destServers.contains(existingPlan.getDestination())) {
         newPlan = true;
-        randomPlan = new RegionPlan(state.getRegion(), null,
-            balancer.randomAssignment(state.getRegion(), destServers));
+        randomPlan = new RegionPlan(region, null,
+            balancer.randomAssignment(region, destServers));
         this.regionPlans.put(encodedName, randomPlan);
       }
     }
 
     if (newPlan) {
       LOG.debug("No previous transition plan was found (or we are ignoring " +
-        "an existing plan) for " + state.getRegion().getRegionNameAsString() +
+        "an existing plan) for " + region.getRegionNameAsString() +
         " so generated a random one; " + randomPlan + "; " +
         serverManager.countOfRegionServers() +
                " (online=" + serverManager.getOnlineServers().size() +
@@ -1722,8 +1705,8 @@ public class AssignmentManager extends Z
         return randomPlan;
       }
     LOG.debug("Using pre-existing plan for region " +
-               state.getRegion().getRegionNameAsString() + "; plan=" + existingPlan);
-      return existingPlan;
+      region.getRegionNameAsString() + "; plan=" + existingPlan);
+    return existingPlan;
   }
 
   /**
@@ -1794,13 +1777,6 @@ public class AssignmentManager extends Z
     LOG.debug("Starting unassignment of region " +
       region.getRegionNameAsString() + " (offlining)");
 
-    // Check if this region is currently assigned
-    if (!regionStates.isRegionAssigned(region)) {
-      LOG.debug("Attempted to unassign region " +
-        region.getRegionNameAsString() + " but it is not " +
-        "currently assigned anywhere");
-      return;
-    }
     String encodedName = region.getEncodedName();
     // Grab the state of this region and synchronize on it
     int versionOfClosingNode = -1;
@@ -1812,8 +1788,15 @@ public class AssignmentManager extends Z
       if (state == null) {
         // Create the znode in CLOSING state
         try {
+          state = regionStates.getRegionState(region);
+          if (state == null || state.getServerName() == null) {
+            // We don't know where the region is, offline it.
+            // No need to send CLOSE RPC
+            regionOffline(region);
+            return;
+          }
           versionOfClosingNode = ZKAssign.createNodeClosing(
-            watcher, region, server.getServerName());
+            watcher, region, state.getServerName());
           if (versionOfClosingNode == -1) {
             LOG.debug("Attempting to unassign region " +
                 region.getRegionNameAsString() + " but ZK closing node "
@@ -1862,57 +1845,11 @@ public class AssignmentManager extends Z
           "already in transition (" + state.getState() + ", force=" + force + ")");
         return;
       }
+
+      unassign(region, state, versionOfClosingNode, dest, true);
     } finally {
       lock.unlock();
     }
-
-    // Send CLOSE RPC
-    ServerName server = state.getServerName();
-    // ClosedRegionhandler can remove the server from this.regions
-    if (server == null) {
-      // delete the node. if no node exists need not bother.
-      deleteClosingOrClosedNode(region);
-      return;
-    }
-
-    try {
-      // TODO: We should consider making this look more like it does for the
-      // region open where we catch all throwables and never abort
-      if (serverManager.sendRegionClose(server, state.getRegion(),
-        versionOfClosingNode, dest)) {
-        LOG.debug("Sent CLOSE to " + server + " for region " +
-          region.getRegionNameAsString());
-        return;
-      }
-      // This never happens. Currently regionserver close always return true.
-      LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
-        region.getRegionNameAsString());
-    } catch (Throwable t) {
-      if (t instanceof RemoteException) {
-        t = ((RemoteException)t).unwrapRemoteException();
-      }
-      if (t instanceof NotServingRegionException) {
-        // Presume that master has stale data.  Presume remote side just split.
-        // Presume that the split message when it comes in will fix up the master's
-        // in memory cluster state.
-        if (getZKTable().isDisablingTable(region.getTableNameAsString())) {
-          // Remove from the regionsinTransition map
-          LOG.info("While trying to recover the table "
-              + region.getTableNameAsString()
-              + " to DISABLED state the region " + region
-              + " was offlined but the table was in DISABLING state");
-          regionStates.regionOffline(region);
-          deleteClosingOrClosedNode(region);
-        }
-      } else if (t instanceof RegionAlreadyInTransitionException) {
-        // RS is already processing this region, only need to update the timestamp
-        LOG.debug("update " + state + " the timestamp.");
-        state.updateTimestampToNow();
-      }
-      LOG.info("Server " + server + " returned " + t + " for " +
-        region.getRegionNameAsString(), t);
-      // Presume retry or server will expire.
-    }
   }
 
   public void unassign(HRegionInfo region, boolean force){
@@ -1954,7 +1891,7 @@ public class AssignmentManager extends Z
    * @throws DeserializationException
    */
   private boolean isSplitOrSplitting(final String path)
-  throws KeeperException, DeserializationException {
+      throws KeeperException, DeserializationException {
     boolean result = false;
     // This may fail if the SPLIT or SPLITTING znode gets cleaned up before we
     // can get data from it.
@@ -1981,7 +1918,7 @@ public class AssignmentManager extends Z
    * @throws InterruptedException
    */
   public void waitForAssignment(HRegionInfo regionInfo)
-  throws InterruptedException {
+      throws InterruptedException {
     while(!this.server.isStopped() &&
         !regionStates.isRegionAssigned(regionInfo)) {
       // We should receive a notification, but it's
@@ -2020,6 +1957,35 @@ public class AssignmentManager extends Z
   }
 
   /**
+   * Assigns specified regions retaining assignments, if any.
+   * <p>
+   * This is a synchronous call and will return once every region has been
+   * assigned.  If anything fails, an exception is thrown
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public void assign(Map<HRegionInfo, ServerName> regions)
+        throws IOException, InterruptedException {
+    if (regions == null || regions.isEmpty()) {
+      return;
+    }
+    List<ServerName> servers = serverManager.createDestinationServersList();
+    if (servers == null || servers.isEmpty()) {
+      throw new IOException("Found no destination server to assign region(s)");
+    }
+
+    // Reuse existing assignment info
+    Map<ServerName, List<HRegionInfo>> bulkPlan =
+      balancer.retainAssignment(regions, servers);
+
+    LOG.info("Bulk assigning " + regions.size() + " region(s) across " +
+      servers.size() + " server(s), retainAssignment=true");
+    BulkAssigner ba = new GeneralBulkAssigner(this.server, bulkPlan, this);
+    ba.bulkAssign();
+    LOG.info("Bulk assigning done");
+  }
+
+  /**
    * Assigns specified regions round robin, if any.
    * <p>
    * This is a synchronous call and will return once every region has been
@@ -2050,18 +2016,6 @@ public class AssignmentManager extends Z
     LOG.info("Bulk assigning done");
   }
 
-  // TODO: This method seems way wrong.  Why would we mark a table enabled based
-  // off a single region?  We seem to call this on bulk assign on startup which
-  // isn't too bad but then its also called in assign.  It makes the enabled
-  // flag up in zk meaningless.  St.Ack
-  private void setEnabledTable(HRegionInfo hri) {
-    String tableName = hri.getTableNameAsString();
-    boolean isTableEnabled = this.zkTable.isEnabledTable(tableName);
-    if (!isTableEnabled) {
-      setEnabledTable(tableName);
-    }
-  }
-
   /**
    * Assigns all user regions, if any exist.  Used during cluster startup.
    * <p>
@@ -2095,27 +2049,17 @@ public class AssignmentManager extends Z
       getBoolean("hbase.master.startup.retainassign", true);
 
     if (retainAssignment) {
-      List<ServerName> servers = serverManager.createDestinationServersList();
-      if (servers == null || servers.isEmpty()) {
-        throw new IOException("Found no destination server to assign region(s)");
-      }
-
-      // Reuse existing assignment info
-      Map<ServerName, List<HRegionInfo>> bulkPlan =
-        balancer.retainAssignment(allRegions, servers);
-
-      LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
-        servers.size() + " server(s), retainAssignment=true");
-      BulkAssigner ba = new GeneralBulkAssigner(this.server, bulkPlan, this);
-      ba.bulkAssign();
-      LOG.info("Bulk assigning done");
+      assign(allRegions);
     } else {
       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(allRegions.keySet());
       assign(regions);
     }
 
     for (HRegionInfo hri : allRegions.keySet()) {
-      setEnabledTable(hri);
+      String tableName = hri.getTableNameAsString();
+      if (!zkTable.isEnabledTable(tableName)) {
+        setEnabledTable(tableName);
+      }
     }
   }
 
@@ -2126,7 +2070,7 @@ public class AssignmentManager extends Z
    * @throws InterruptedException
    */
   boolean waitUntilNoRegionsInTransition(final long timeout)
-  throws InterruptedException {
+      throws InterruptedException {
     // Blocks until there are no regions in transition. It is possible that
     // there
     // are regions in transition immediately after this returns but guarantees
@@ -2301,7 +2245,7 @@ public class AssignmentManager extends Z
    */
   private void processDeadServersAndRecoverLostRegions(
       Map<ServerName, List<HRegionInfo>> deadServers, List<String> nodes)
-  throws IOException, KeeperException {
+          throws IOException, KeeperException {
     if (deadServers != null) {
       for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
         ServerName serverName = server.getKey();
@@ -2314,7 +2258,7 @@ public class AssignmentManager extends Z
       this.watcher, this.watcher.assignmentZNode);
     if (!nodes.isEmpty()) {
       for (String encodedRegionName : nodes) {
-        processRegionInTransition(encodedRegionName, null, deadServers);
+        processRegionInTransition(encodedRegionName, null);
       }
     }
 
@@ -2419,10 +2363,9 @@ public class AssignmentManager extends Z
    * Monitor to check for time outs on region transition operations
    */
   public class TimeoutMonitor extends Chore {
-    private final int timeout;
-    private boolean bulkAssign = false;
     private boolean allRegionServersOffline = false;
     private ServerManager serverManager;
+    private final int timeout;
 
     /**
      * Creates a periodic monitor to check for time outs on region transition
@@ -2441,17 +2384,6 @@ public class AssignmentManager extends Z
       this.serverManager = serverManager;
     }
 
-    /**
-     * @param bulkAssign If true, we'll suspend checking regions in transition
-     * up in zookeeper.  If false, will reenable check.
-     * @return Old setting for bulkAssign.
-     */
-    public boolean bulkAssign(final boolean bulkAssign) {
-      boolean result = this.bulkAssign;
-      this.bulkAssign = bulkAssign;
-      return result;
-    }
-
     private synchronized void setAllRegionServersOffline(
       boolean allRegionServersOffline) {
       this.allRegionServersOffline = allRegionServersOffline;
@@ -2459,21 +2391,21 @@ public class AssignmentManager extends Z
 
     @Override
     protected void chore() {
-      // If bulkAssign in progress, suspend checks
-      if (this.bulkAssign) return;
       boolean noRSAvailable = this.serverManager.createDestinationServersList().isEmpty();
 
       // Iterate all regions in transition checking for time outs
       long now = System.currentTimeMillis();
       // no lock concurrent access ok: we will be working on a copy, and it's java-valid to do
       //  a copy while another thread is adding/removing items
-      for (RegionState regionState : regionStates.getRegionsInTransition().values()) {
+      for (String regionName : regionStates.getRegionsInTransition().keySet()) {
+        RegionState regionState = regionStates.getRegionTransitionState(regionName);
+        if (regionState == null) continue;
+
         if (regionState.getStamp() + timeout <= now) {
           // decide on action upon timeout
           actOnTimeOut(regionState);
         } else if (this.allRegionServersOffline && !noRSAvailable) {
-          RegionPlan existingPlan = regionPlans.get(regionState.getRegion()
-              .getEncodedName());
+          RegionPlan existingPlan = regionPlans.get(regionName);
           if (existingPlan == null
               || !this.serverManager.isServerOnline(existingPlan
                   .getDestination())) {
@@ -2541,7 +2473,7 @@ public class AssignmentManager extends Z
   }
 
   private void processOpeningState(HRegionInfo regionInfo) {
-    LOG.info("Region has been OPENING for too " + "long, reassigning region="
+    LOG.info("Region has been OPENING for too long, reassigning region="
         + regionInfo.getRegionNameAsString());
     // Should have a ZK node in OPENING state
     try {
@@ -2573,7 +2505,7 @@ public class AssignmentManager extends Z
     return;
   }
 
-  private void invokeAssign(HRegionInfo regionInfo) {
+  void invokeAssign(HRegionInfo regionInfo) {
     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
   }
 
@@ -2581,11 +2513,11 @@ public class AssignmentManager extends Z
     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
   }
 
-  public boolean isCarryingRoot(ServerName serverName) {
+  boolean isCarryingRoot(ServerName serverName) {
     return isCarryingRegion(serverName, HRegionInfo.ROOT_REGIONINFO);
   }
 
-  public boolean isCarryingMeta(ServerName serverName) {
+  boolean isCarryingMeta(ServerName serverName) {
     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
   }
 
@@ -2599,7 +2531,7 @@ public class AssignmentManager extends Z
    * processing hasn't finished yet when server shutdown occurs.
    * @return whether the serverName currently hosts the region
    */
-  public boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
+  private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
     RegionTransition rt = null;
     try {
       byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
@@ -2713,4 +2645,34 @@ public class AssignmentManager extends Z
       this.server.abort(errorMsg, e);
     }
   }
+
+  /**
+   * Set region as OFFLINED up in zookeeper asynchronously.
+   * @param state
+   * @return True if we succeeded, false otherwise (State was incorrect or failed
+   * updating zk).
+   */
+  private boolean asyncSetOfflineInZooKeeper(final RegionState state,
+      final ZooKeeperWatcher zkw, final AsyncCallback.StringCallback cb,
+      final ServerName destination) {
+    if (!state.isClosed() && !state.isOffline()) {
+      this.server.abort("Unexpected state trying to OFFLINE; " + state,
+        new IllegalStateException());
+      return false;
+    }
+    regionStates.updateRegionState(
+      state.getRegion(), RegionState.State.OFFLINE);
+    try {
+      ZKAssign.asyncCreateNodeOffline(zkw, state.getRegion(),
+        destination, cb, state);
+    } catch (KeeperException e) {
+      if (e instanceof NodeExistsException) {
+        LOG.warn("Node for " + state.getRegion() + " already exists");
+      } else {
+        server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
+      }
+      return false;
+    }
+    return true;
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java?rev=1400358&r1=1400357&r2=1400358&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java Sat Oct 20 03:57:49 2012
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.master;
 
-import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -61,26 +60,13 @@ public class GeneralBulkAssigner extends
   }
 
   @Override
-  public boolean bulkAssign(boolean sync) throws InterruptedException,
-      IOException {
-    // Disable timing out regions in transition up in zk while bulk assigning.
-    this.assignmentManager.timeoutMonitor.bulkAssign(true);
-    try {
-      return super.bulkAssign(sync);
-    } finally {
-      // Re-enable timing out regions in transition up in zk.
-      this.assignmentManager.timeoutMonitor.bulkAssign(false);
-    }
- }
-
-  @Override
   protected String getThreadNamePrefix() {
     return this.server.getServerName() + "-GeneralBulkAssigner";
   }
 
   @Override
   protected void populatePool(ExecutorService pool) {
-    this.pool = pool; // shut it down later in case some assigner hangs 
+    this.pool = pool; // shut it down later in case some assigner hangs
     for (Map.Entry<ServerName, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
       pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
         this.assignmentManager, this.failedPlans));
@@ -204,7 +190,7 @@ public class GeneralBulkAssigner extends
       reassigningRegions.addAll(failedPlans.remove(e.getKey()));
     }
     for (HRegionInfo region : reassigningRegions) {
-      assignmentManager.assign(region, true, true);
+      assignmentManager.invokeAssign(region);
     }
     return reassigningRegions.size();
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1400358&r1=1400357&r2=1400358&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Sat Oct 20 03:57:49 2012
@@ -673,6 +673,7 @@ Server {
     if (!masterRecovery) {
       this.assignmentManager.startTimeOutMonitor();
     }
+
     // TODO: Should do this in background rather than block master startup
     status.setStatus("Splitting logs after master startup");
     splitLogAfterStartup(this.fileSystemManager);
@@ -2136,12 +2137,12 @@ Server {
           return arr;
         }
       }
-      assignRegion(regionInfo);
-     if (cpHost != null) {
-       cpHost.postAssign(regionInfo);
-     }
+      assignmentManager.assign(regionInfo, true, true);
+      if (cpHost != null) {
+        cpHost.postAssign(regionInfo);
+      }
 
-     return arr;
+      return arr;
     } catch (IOException ioe) {
       throw new ServiceException(ioe);
     }