You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2014/06/14 17:40:38 UTC

[3/5] HBASE-11059 ZK-less region assignment

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-protocol/src/main/protobuf/RegionServerStatus.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/RegionServerStatus.proto b/hbase-protocol/src/main/protobuf/RegionServerStatus.proto
index a3b4d74..fbd6e05 100644
--- a/hbase-protocol/src/main/protobuf/RegionServerStatus.proto
+++ b/hbase-protocol/src/main/protobuf/RegionServerStatus.proto
@@ -78,6 +78,47 @@ message GetLastFlushedSequenceIdResponse {
   required uint64 last_flushed_sequence_id = 1;
 }
 
+message RegionTransition {
+  required TransitionCode transition_code = 1;
+
+  /** Mutliple regions are involved during merging/splitting */
+  repeated RegionInfo region_info = 2;
+
+  /** For newly opened region, the open seq num is needed */
+  optional uint64 open_seq_num = 3;
+
+  enum TransitionCode {
+    OPENED = 0;
+    FAILED_OPEN = 1;
+    /** No failed_close, in which case region server will abort */
+    CLOSED = 2;
+
+    /** Ask master for ok to split/merge region(s) */
+    READY_TO_SPLIT = 3;
+    READY_TO_MERGE = 4;
+
+    SPLIT_PONR = 5;
+    MERGE_PONR = 6;
+
+    SPLIT = 7;
+    MERGED = 8;
+    SPLIT_REVERTED = 9;
+    MERGE_REVERTED = 10;
+  }
+}
+
+message ReportRegionTransitionRequest {
+  /** This region server's server name */
+  required ServerName server = 1;
+
+  repeated RegionTransition transition = 2;
+}
+
+message ReportRegionTransitionResponse {
+  /** Error message if failed to update the region state */
+  optional string error_message = 1;
+}
+
 service RegionServerStatusService {
   /** Called when a region server first starts. */
   rpc RegionServerStartup(RegionServerStartupRequest)
@@ -99,4 +140,12 @@ service RegionServerStatusService {
    * log splitting. */
   rpc GetLastFlushedSequenceId(GetLastFlushedSequenceIdRequest)
     returns(GetLastFlushedSequenceIdResponse);
+
+  /**
+   * Called by a region server to report the progress of a region
+   * transition. If the request fails, the transition should
+   * be aborted.
+   */
+  rpc ReportRegionTransition(ReportRegionTransitionRequest)
+    returns(ReportRegionTransitionResponse);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
index adeeed4..ddab430 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
@@ -34,15 +34,18 @@ public class AssignCallable implements Callable<Object> {
   private AssignmentManager assignmentManager;
 
   private HRegionInfo hri;
+  private boolean newPlan;
 
-  public AssignCallable(AssignmentManager assignmentManager, HRegionInfo hri) {
+  public AssignCallable(
+      AssignmentManager assignmentManager, HRegionInfo hri, boolean newPlan) {
     this.assignmentManager = assignmentManager;
+    this.newPlan = newPlan;
     this.hri = hri;
   }
 
   @Override
   public Object call() throws Exception {
-    assignmentManager.assign(hri, true, true);
+    assignmentManager.assign(hri, true, newPlan);
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 597ddf9..73b06b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -44,6 +44,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CoordinatedStateException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
@@ -58,10 +60,10 @@ import org.apache.hadoop.hbase.TableStateManager;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.catalog.MetaReader;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
-import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
 import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
+import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
@@ -76,13 +78,18 @@ import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
 import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
 import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.KeyLocker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PairOfSameType;
@@ -99,6 +106,7 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.data.Stat;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.LinkedHashMultimap;
 
 /**
@@ -221,6 +229,13 @@ public class AssignmentManager extends ZooKeeperListener {
   private final ConcurrentHashMap<String, AtomicInteger>
     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
 
+  // A flag to indicate if we are using ZK for region assignment
+  private final boolean useZKForAssignment;
+
+  // In case not using ZK for region assignment, region states
+  // are persisted in meta with a state store
+  private final RegionStateStore regionStateStore;
+
   /**
    * For testing only!  Set to true to skip handling of split.
    */
@@ -250,6 +265,7 @@ public class AssignmentManager extends ZooKeeperListener {
     this.serverManager = serverManager;
     this.catalogTracker = catalogTracker;
     this.executorService = service;
+    this.regionStateStore = new RegionStateStore(server);
     this.regionsToReopen = Collections.synchronizedMap
                            (new HashMap<String, HRegionInfo> ());
     Configuration conf = server.getConfiguration();
@@ -275,7 +291,7 @@ public class AssignmentManager extends ZooKeeperListener {
     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
-    this.regionStates = new RegionStates(server, serverManager);
+    this.regionStates = new RegionStates(server, serverManager, regionStateStore);
 
     this.bulkAssignWaitTillAllAssigned =
       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
@@ -289,6 +305,7 @@ public class AssignmentManager extends ZooKeeperListener {
     this.tableLockManager = tableLockManager;
 
     this.metricsAssignmentManager = new MetricsAssignmentManager();
+    useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
   }
 
   /**
@@ -326,6 +343,14 @@ public class AssignmentManager extends ZooKeeperListener {
     return regionStates;
   }
 
+  /**
+   * Used in some tests to mock up region state in meta
+   */
+  @VisibleForTesting
+  RegionStateStore getRegionStateStore() {
+    return regionStateStore;
+  }
+
   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
   }
@@ -422,6 +447,7 @@ public class AssignmentManager extends ZooKeeperListener {
    */
   void joinCluster() throws IOException,
       KeeperException, InterruptedException, CoordinatedStateException {
+    long startTime = System.currentTimeMillis();
     // Concurrency note: In the below the accesses on regionsInTransition are
     // outside of a synchronization block where usually all accesses to RIT are
     // synchronized.  The presumption is that in this case it is safe since this
@@ -438,10 +464,16 @@ public class AssignmentManager extends ZooKeeperListener {
     // This method will assign all user regions if a clean server startup or
     // it will reconstruct master state and cleanup any leftovers from
     // previous master process.
-    processDeadServersAndRegionsInTransition(deadServers);
+    boolean failover = processDeadServersAndRegionsInTransition(deadServers);
 
+    if (!useZKForAssignment) {
+      // Not use ZK for assignment any more, remove the ZNode
+      ZKUtil.deleteNodeFailSilent(watcher, watcher.assignmentZNode);
+    }
     recoverTableInDisablingState();
     recoverTableInEnablingState();
+    LOG.info("Joined the cluster in " + (System.currentTimeMillis()
+      - startTime) + "ms, failover=" + failover);
   }
 
   /**
@@ -455,43 +487,102 @@ public class AssignmentManager extends ZooKeeperListener {
    * @throws IOException
    * @throws InterruptedException
    */
-  void processDeadServersAndRegionsInTransition(
+  boolean processDeadServersAndRegionsInTransition(
       final Map<ServerName, List<HRegionInfo>> deadServers)
     throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
     List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
       watcher.assignmentZNode);
 
-    if (nodes == null) {
+    if (useZKForAssignment && nodes == null) {
       String errorMessage = "Failed to get the children from ZK";
       server.abort(errorMessage, new IOException(errorMessage));
-      return;
+      return true; // Doesn't matter in this case
     }
 
-    boolean failover = (!serverManager.getDeadServers().isEmpty() || !serverManager
-        .getRequeuedDeadServers().isEmpty());
-
-    if (!failover) {
+    boolean failover = !serverManager.getDeadServers().isEmpty();
+    if (failover) {
+      // This may not be a failover actually, especially if meta is on this master.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
+      }
+    } else {
       // If any one region except meta is assigned, it's a failover.
-      Map<HRegionInfo, ServerName> regions = regionStates.getRegionAssignments();
-      for (HRegionInfo hri: regions.keySet()) {
+      for (HRegionInfo hri: regionStates.getRegionAssignments().keySet()) {
         if (!hri.isMetaTable()) {
           LOG.debug("Found " + hri + " out on cluster");
           failover = true;
           break;
         }
       }
-      if (!failover) {
-        // If any one region except meta is in transition, it's a failover.
-        for (String encodedName: nodes) {
-          RegionState state = regionStates.getRegionState(encodedName);
-          if (state != null && !state.getRegion().isMetaRegion()) {
-            LOG.debug("Found " + state.getRegion().getRegionNameAsString() + " in RITs");
+    }
+    if (!failover && nodes != null) {
+      // If any one region except meta is in transition, it's a failover.
+      for (String encodedName: nodes) {
+        RegionState regionState = regionStates.getRegionState(encodedName);
+        if (regionState != null && !regionState.getRegion().isMetaRegion()) {
+          LOG.debug("Found " + regionState + " in RITs");
+          failover = true;
+          break;
+        }
+      }
+    }
+    if (!failover && !useZKForAssignment) {
+      // If any region except meta is in transition on a live server, it's a failover.
+      Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
+      if (!regionsInTransition.isEmpty()) {
+        Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
+        for (RegionState regionState: regionsInTransition.values()) {
+          if (!regionState.getRegion().isMetaRegion()
+              && onlineServers.contains(regionState.getServerName())) {
+            LOG.debug("Found " + regionState + " in RITs");
             failover = true;
             break;
           }
         }
       }
     }
+    if (!failover) {
+      // If we get here, we have a full cluster restart. It is a failover only
+      // if there are some HLogs are not split yet. For meta HLogs, they should have
+      // been split already, if any. We can walk through those queued dead servers,
+      // if they don't have any HLogs, this restart should be considered as a clean one
+      Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
+      if (!queuedDeadServers.isEmpty()) {
+        Configuration conf = server.getConfiguration();
+        Path rootdir = FSUtils.getRootDir(conf);
+        FileSystem fs = rootdir.getFileSystem(conf);
+        for (ServerName serverName: queuedDeadServers) {
+          Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
+          Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
+          if (fs.exists(logDir) || fs.exists(splitDir)) {
+            LOG.debug("Found queued dead server " + serverName);
+            failover = true;
+            break;
+          }
+        }
+        if (!failover) {
+          // We figured that it's not a failover, so no need to
+          // work on these re-queued dead servers any more.
+          LOG.info("AM figured that it's not a failover and cleaned up "
+            + queuedDeadServers.size() + " queued dead servers");
+          serverManager.removeRequeuedDeadServers();
+        }
+      }
+    }
+
+    Set<TableName> disabledOrDisablingOrEnabling = null;
+
+    if (!failover) {
+      disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
+        ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING,
+        ZooKeeperProtos.Table.State.ENABLING);
+
+      // Clean re/start, mark all user regions closed before reassignment
+      regionStates.closeAllUserRegions(disabledOrDisablingOrEnabling);
+    }
+
+    // Now region states are restored
+    regionStateStore.start();
 
     // If we found user regions out on cluster, its a failover.
     if (failover) {
@@ -499,11 +590,26 @@ public class AssignmentManager extends ZooKeeperListener {
       // Process list of dead servers and regions in RIT.
       // See HBASE-4580 for more information.
       processDeadServersAndRecoverLostRegions(deadServers);
-    } else {
+    }
+
+    if (!failover && useZKForAssignment) {
+      // Cleanup any existing ZK nodes and start watching
+      ZKAssign.deleteAllNodes(watcher);
+      ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
+        this.watcher.assignmentZNode);
+    }
+
+    // Now we can safely claim failover cleanup completed and enable
+    // ServerShutdownHandler for further processing. The nodes (below)
+    // in transition, if any, are for regions not related to those
+    // dead servers at all, and can be done in parallel to SSH.
+    failoverCleanupDone();
+    if (!failover) {
       // Fresh cluster startup.
-      LOG.info("Clean cluster startup. Assigning userregions");
-      assignAllUserRegions();
+      LOG.info("Clean cluster startup. Assigning user regions");
+      assignAllUserRegions(disabledOrDisablingOrEnabling);
     }
+    return failover;
   }
 
   /**
@@ -670,7 +776,7 @@ public class AssignmentManager extends ZooKeeperListener {
               try {
                 final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
                   .getVersion();
-                unassign(regionInfo, rsClosing, expectedVersion, null, true, null);
+                unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
                 if (regionStates.isRegionOffline(regionInfo)) {
                   assign(regionInfo, true);
                 }
@@ -1273,6 +1379,7 @@ public class AssignmentManager extends ZooKeeperListener {
     }
   }
 
+  
   /**
    * Marks the region as online.  Removes it from regions in transition and
    * updates the in-memory assignment information.
@@ -1282,8 +1389,12 @@ public class AssignmentManager extends ZooKeeperListener {
    * @param sn
    */
   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
+    regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
+  }
+
+  void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
     numRegionsOpened.incrementAndGet();
-    regionStates.regionOnline(regionInfo, sn);
+    regionStates.regionOnline(regionInfo, sn, openSeqNum);
 
     // Remove plan if one.
     clearRegionPlan(regionInfo);
@@ -1354,13 +1465,15 @@ public class AssignmentManager extends ZooKeeperListener {
   }
 
   public void offlineDisabledRegion(HRegionInfo regionInfo) {
-    // Disabling so should not be reassigned, just delete the CLOSED node
-    LOG.debug("Table being disabled so deleting ZK node and removing from " +
-      "regions in transition, skipping assignment of region " +
-        regionInfo.getRegionNameAsString());
-    String encodedName = regionInfo.getEncodedName();
-    deleteNodeInStates(encodedName, "closed", null,
-      EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
+    if (useZKForAssignment) {
+      // Disabling so should not be reassigned, just delete the CLOSED node
+      LOG.debug("Table being disabled so deleting ZK node and removing from " +
+        "regions in transition, skipping assignment of region " +
+          regionInfo.getRegionNameAsString());
+      String encodedName = regionInfo.getEncodedName();
+      deleteNodeInStates(encodedName, "closed", null,
+        EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
+    }
     regionOffline(regionInfo);
   }
 
@@ -1412,7 +1525,7 @@ public class AssignmentManager extends ZooKeeperListener {
             + " is dead but not processed yet");
           return;
         }
-        assign(state, setOfflineInZK, forceNewPlan);
+        assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
       }
     } finally {
       lock.unlock();
@@ -1459,7 +1572,8 @@ public class AssignmentManager extends ZooKeeperListener {
                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
                   + " is dead but not processed yet");
                 onDeadServer = true;
-              } else if (asyncSetOfflineInZooKeeper(state, cb, destination)) {
+              } else if (!useZKForAssignment
+                  || asyncSetOfflineInZooKeeper(state, cb, destination)) {
                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
                 plans.put(encodedName, plan);
                 states.add(state);
@@ -1479,17 +1593,19 @@ public class AssignmentManager extends ZooKeeperListener {
           lock.unlock();
         }
 
-        // Wait until all unassigned nodes have been put up and watchers set.
-        int total = states.size();
-        for (int oldCounter = 0; !server.isStopped();) {
-          int count = counter.get();
-          if (oldCounter != count) {
-            LOG.debug(destination.toString() + " unassigned znodes=" + count +
-              " of total=" + total + "; oldCounter=" + oldCounter);
-            oldCounter = count;
+        if (useZKForAssignment) {
+          // Wait until all unassigned nodes have been put up and watchers set.
+          int total = states.size();
+          for (int oldCounter = 0; !server.isStopped();) {
+            int count = counter.get();
+            if (oldCounter != count) {
+              LOG.debug(destination.toString() + " unassigned znodes=" + count +
+                " of total=" + total + "; oldCounter=" + oldCounter);
+              oldCounter = count;
+            }
+            if (count >= total) break;
+            Thread.sleep(5);
           }
-          if (count >= total) break;
-          Thread.sleep(5);
         }
 
         if (server.isStopped()) {
@@ -1506,7 +1622,7 @@ public class AssignmentManager extends ZooKeeperListener {
           HRegionInfo region = state.getRegion();
           String encodedRegionName = region.getEncodedName();
           Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
-          if (nodeVersion == null || nodeVersion == -1) {
+          if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
             LOG.warn("failed to offline in zookeeper: " + region);
             failedToOpenRegions.add(region); // assign individually later
             Lock lock = locks.remove(encodedRegionName);
@@ -1659,11 +1775,11 @@ public class AssignmentManager extends ZooKeeperListener {
           versionOfClosingNode, dest, transitionInZK)) {
           LOG.debug("Sent CLOSE to " + server + " for region " +
             region.getRegionNameAsString());
-          if (!transitionInZK && state != null) {
+          if (useZKForAssignment && !transitionInZK && state != null) {
             // Retry to make sure the region is
             // closed so as to avoid double assignment.
             unassign(region, state, versionOfClosingNode,
-              dest, transitionInZK,src);
+              dest, transitionInZK, src);
           }
           return;
         }
@@ -1787,10 +1903,14 @@ public class AssignmentManager extends ZooKeeperListener {
       // yet, we can move on only if the meta shows the
       // region is not on this server actually, or on a server
       // not dead, or dead and processed already.
-      if (regionStates.isServerDeadAndNotProcessed(sn)
+      // In case not using ZK, we don't need this check because
+      // we have the latest info in memory, and the caller
+      // will do another round checking any way.
+      if (useZKForAssignment
+          && regionStates.isServerDeadAndNotProcessed(sn)
           && wasRegionOnDeadServerByMeta(region, sn)) {
         LOG.info("Skip assigning " + region.getRegionNameAsString()
-          + ", it is on a dead but not processed yet server");
+          + ", it is on a dead but not processed yet server: " + sn);
         return null;
       }
     case CLOSED:
@@ -1813,9 +1933,9 @@ public class AssignmentManager extends ZooKeeperListener {
       while (!server.isStopped()) {
         try {
           catalogTracker.waitForMeta();
-          Pair<HRegionInfo, ServerName> r =
-            MetaReader.getRegion(catalogTracker, region.getRegionName());
-          ServerName server = r == null ? null : r.getSecond();
+          Result r = MetaReader.getRegionResult(catalogTracker, region.getRegionName());
+          if (r == null || r.isEmpty()) return false;
+          ServerName server = HRegionInfo.getServerName(r);
           return regionStates.isServerDeadAndNotProcessed(server);
         } catch (IOException ioe) {
           LOG.info("Received exception accessing hbase:meta during force assign "
@@ -2263,14 +2383,16 @@ public class AssignmentManager extends ZooKeeperListener {
             regionOffline(region);
             return;
           }
-          versionOfClosingNode = ZKAssign.createNodeClosing(
-            watcher, region, state.getServerName());
-          if (versionOfClosingNode == -1) {
-            LOG.info("Attempting to unassign " +
-              region.getRegionNameAsString() + " but ZK closing node "
-              + "can't be created.");
-            reassign = false; // not unassigned at all
-            return;
+          if (useZKForAssignment) {
+            versionOfClosingNode = ZKAssign.createNodeClosing(
+              watcher, region, state.getServerName());
+            if (versionOfClosingNode == -1) {
+              LOG.info("Attempting to unassign " +
+                region.getRegionNameAsString() + " but ZK closing node "
+                + "can't be created.");
+              reassign = false; // not unassigned at all
+              return;
+            }
           }
         } catch (KeeperException e) {
           if (e instanceof NodeExistsException) {
@@ -2323,7 +2445,7 @@ public class AssignmentManager extends ZooKeeperListener {
         return;
       }
 
-      unassign(region, state, versionOfClosingNode, dest, true, null);
+      unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
     } finally {
       lock.unlock();
 
@@ -2527,29 +2649,20 @@ public class AssignmentManager extends ZooKeeperListener {
    * @throws IOException
    * @throws KeeperException
    */
-  private void assignAllUserRegions()
+  private void assignAllUserRegions(Set<TableName> disabledOrDisablingOrEnabling)
       throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
-    // Cleanup any existing ZK nodes and start watching
-    ZKAssign.deleteAllNodes(watcher);
-    ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
-      this.watcher.assignmentZNode);
-    failoverCleanupDone();
-
     // Skip assignment for regions of tables in DISABLING state because during clean cluster startup
     // no RS is alive and regions map also doesn't have any information about the regions.
     // See HBASE-6281.
-    Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
-      ZooKeeperProtos.Table.State.DISABLED,
-      ZooKeeperProtos.Table.State.DISABLING,
-      ZooKeeperProtos.Table.State.ENABLING);
-
     // Scan hbase:meta for all user regions, skipping any disabled tables
     Map<HRegionInfo, ServerName> allRegions;
     SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
        new SnapshotOfRegionAssignmentFromMeta(catalogTracker, disabledOrDisablingOrEnabling, true);
     snapshotOfRegionAssignment.initialize();
     allRegions = snapshotOfRegionAssignment.getRegionToRegionServerMap();
-    if (allRegions == null || allRegions.isEmpty()) return;
+    if (allRegions == null || allRegions.isEmpty()) {
+      return;
+    }
 
     // Determine what type of assignment to do on startup
     boolean retainAssignment = server.getConfiguration().
@@ -2606,8 +2719,6 @@ public class AssignmentManager extends ZooKeeperListener {
    */
   Map<ServerName, List<HRegionInfo>> rebuildUserRegions() throws
       IOException, KeeperException, CoordinatedStateException {
-    Set<TableName> enablingTables = tableStateManager.getTablesInStates(
-      ZooKeeperProtos.Table.State.ENABLING);
     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
 
@@ -2625,66 +2736,39 @@ public class AssignmentManager extends ZooKeeperListener {
       new TreeMap<ServerName, List<HRegionInfo>>();
     // Iterate regions in META
     for (Result result : results) {
-      Pair<HRegionInfo, ServerName> region = HRegionInfo.getHRegionInfoAndServerName(result);
-      if (region == null) continue;
-      HRegionInfo regionInfo = region.getFirst();
-      ServerName regionLocation = region.getSecond();
+      HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(result);
       if (regionInfo == null) continue;
-      regionStates.createRegionState(regionInfo);
-      if (regionStates.isRegionInState(regionInfo, State.SPLIT)) {
-        // Split is considered to be completed. If the split znode still
-        // exists, the region will be put back to SPLITTING state later
-        LOG.debug("Region " + regionInfo.getRegionNameAsString()
-           + " split is completed. Hence need not add to regions list");
+      State state = RegionStateStore.getRegionState(result);
+      ServerName regionLocation = RegionStateStore.getRegionServer(result);
+      regionStates.createRegionState(regionInfo, state, regionLocation);
+      if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
+        // Region is not open (either offline or in transition), skip
         continue;
       }
       TableName tableName = regionInfo.getTable();
-      if (regionLocation == null) {
-        // regionLocation could be null if createTable didn't finish properly.
-        // When createTable is in progress, HMaster restarts.
-        // Some regions have been added to hbase:meta, but have not been assigned.
-        // When this happens, the region's table must be in ENABLING state.
-        // It can't be in ENABLED state as that is set when all regions are
-        // assigned.
-        // It can't be in DISABLING state, because DISABLING state transitions
-        // from ENABLED state when application calls disableTable.
-        // It can't be in DISABLED state, because DISABLED states transitions
-        // from DISABLING state.
-        if (!enablingTables.contains(tableName)) {
-          LOG.warn("Region " + regionInfo.getEncodedName() +
-            " has null regionLocation." + " But its table " + tableName +
-            " isn't in ENABLING state.");
-        }
-      } else if (!onlineServers.contains(regionLocation)) {
+      if (!onlineServers.contains(regionLocation)) {
         // Region is located on a server that isn't online
         List<HRegionInfo> offlineRegions = offlineServers.get(regionLocation);
         if (offlineRegions == null) {
           offlineRegions = new ArrayList<HRegionInfo>(1);
           offlineServers.put(regionLocation, offlineRegions);
         }
+        regionStates.regionOffline(regionInfo);
         offlineRegions.add(regionInfo);
-        // need to enable the table if not disabled or disabling or enabling
-        // this will be used in rolling restarts
-        if (!disabledOrDisablingOrEnabling.contains(tableName)
-            && !getTableStateManager().isTableState(tableName,
-            ZooKeeperProtos.Table.State.ENABLED)) {
-          setEnabledTable(tableName);
-        }
-      } else {
+      } else if (!disabledOrEnablingTables.contains(tableName)) {
         // Region is being served and on an active server
         // add only if region not in disabled or enabling table
-        if (!disabledOrEnablingTables.contains(tableName)) {
-          regionStates.updateRegionState(regionInfo, State.OPEN, regionLocation);
-          regionStates.regionOnline(regionInfo, regionLocation);
-          balancer.regionOnline(regionInfo, regionLocation);
-        }
-        // need to enable the table if not disabled or disabling or enabling
-        // this will be used in rolling restarts
-        if (!disabledOrDisablingOrEnabling.contains(tableName)
-          && !getTableStateManager().isTableState(tableName,
-            ZooKeeperProtos.Table.State.ENABLED)) {
-          setEnabledTable(tableName);
-        }
+        regionStates.regionOnline(regionInfo, regionLocation);
+        balancer.regionOnline(regionInfo, regionLocation);
+      } else if (useZKForAssignment) {
+        regionStates.regionOffline(regionInfo);
+      }
+      // need to enable the table if not disabled or disabling or enabling
+      // this will be used in rolling restarts
+      if (!disabledOrDisablingOrEnabling.contains(tableName)
+        && !getTableStateManager().isTableState(tableName,
+          ZooKeeperProtos.Table.State.ENABLED)) {
+        setEnabledTable(tableName);
       }
     }
     return offlineServers;
@@ -2775,19 +2859,152 @@ public class AssignmentManager extends ZooKeeperListener {
         }
       }
     }
-    List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(
-      this.watcher, this.watcher.assignmentZNode);
-    if (!nodes.isEmpty()) {
+
+    List<String> nodes = useZKForAssignment ?
+      ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
+      : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
+    if (nodes != null && !nodes.isEmpty()) {
       for (String encodedRegionName : nodes) {
         processRegionInTransition(encodedRegionName, null);
       }
+    } else if (!useZKForAssignment) {
+      // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
+      // in case the RPC call is not sent out yet before the master was shut down
+      // since we update the state before we send the RPC call. We can't update
+      // the state after the RPC call. Otherwise, we don't know what's happened
+      // to the region if the master dies right after the RPC call is out.
+      Map<String, RegionState> rits = regionStates.getRegionsInTransition();
+      for (RegionState regionState: rits.values()) {
+        if (!serverManager.isServerOnline(regionState.getServerName())) {
+          continue; // SSH will handle it
+        }
+        State state = regionState.getState();
+        LOG.info("Processing " + regionState);
+        switch (state) {
+        case PENDING_OPEN:
+          retrySendRegionOpen(regionState);
+          break;
+        case PENDING_CLOSE:
+          retrySendRegionClose(regionState);
+          break;
+        default:
+          // No process for other states
+        }
+      }
     }
+  }
 
-    // Now we can safely claim failover cleanup completed and enable
-    // ServerShutdownHandler for further processing. The nodes (below)
-    // in transition, if any, are for regions not related to those
-    // dead servers at all, and can be done in parallel to SSH.
-    failoverCleanupDone();
+  /**
+   * At master failover, for pending_open region, make sure
+   * sendRegionOpen RPC call is sent to the target regionserver
+   */
+  private void retrySendRegionOpen(final RegionState regionState) {
+    this.executorService.submit(
+      new EventHandler(server, EventType.M_MASTER_RECOVERY) {
+        @Override
+        public void process() throws IOException {
+          HRegionInfo hri = regionState.getRegion();
+          ServerName serverName = regionState.getServerName();
+          ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
+          try {
+            if (!regionState.equals(regionStates.getRegionState(hri))) {
+              return; // Region is not in the expected state any more
+            }
+            while (serverManager.isServerOnline(serverName)
+                && !server.isStopped() && !server.isAborted()) {
+              try {
+                List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
+                if (shouldAssignRegionsWithFavoredNodes) {
+                  favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
+                }
+                RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
+                  serverName, hri, -1, favoredNodes);
+
+                if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
+                  // Failed opening this region, this means the target server didn't get
+                  // the original region open RPC, so re-assign it with a new plan
+                  LOG.debug("Got failed_opening in retry sendRegionOpen for "
+                    + regionState + ", re-assign it");
+                  invokeAssign(hri, true);
+                }
+                return; // Done.
+              } catch (Throwable t) {
+                if (t instanceof RemoteException) {
+                  t = ((RemoteException) t).unwrapRemoteException();
+                }
+                // In case SocketTimeoutException/FailedServerException, retry
+                if (t instanceof java.net.SocketTimeoutException
+                    || t instanceof FailedServerException) {
+                  Threads.sleep(100);
+                  continue;
+                }
+                // For other exceptions, re-assign it
+                LOG.debug("Got exception in retry sendRegionOpen for "
+                  + regionState + ", re-assign it", t);
+                invokeAssign(hri);
+                return; // Done.
+              }
+            }
+          } finally {
+            lock.unlock();
+          }
+        }
+      });
+  }
+
+  /**
+   * At master failover, for pending_close region, make sure
+   * sendRegionClose RPC call is sent to the target regionserver
+   */
+  private void retrySendRegionClose(final RegionState regionState) {
+    this.executorService.submit(
+      new EventHandler(server, EventType.M_MASTER_RECOVERY) {
+        @Override
+        public void process() throws IOException {
+          HRegionInfo hri = regionState.getRegion();
+          ServerName serverName = regionState.getServerName();
+          ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
+          try {
+            if (!regionState.equals(regionStates.getRegionState(hri))) {
+              return; // Region is not in the expected state any more
+            }
+            while (serverManager.isServerOnline(serverName)
+                && !server.isStopped() && !server.isAborted()) {
+              try {
+                if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
+                  // This means the region is still on the target server
+                  LOG.debug("Got false in retry sendRegionClose for "
+                    + regionState + ", re-close it");
+                  invokeUnAssign(hri);
+                }
+                return; // Done.
+              } catch (Throwable t) {
+                if (t instanceof RemoteException) {
+                  t = ((RemoteException) t).unwrapRemoteException();
+                }
+                // In case SocketTimeoutException/FailedServerException, retry
+                if (t instanceof java.net.SocketTimeoutException
+                    || t instanceof FailedServerException) {
+                  Threads.sleep(100);
+                  continue;
+                }
+                if (!(t instanceof NotServingRegionException
+                    || t instanceof RegionAlreadyInTransitionException)) {
+                  // NotServingRegionException/RegionAlreadyInTransitionException
+                  // means the target server got the original region close request.
+                  // For other exceptions, re-close it
+                  LOG.debug("Got exception in retry sendRegionClose for "
+                    + regionState + ", re-close it", t);
+                  invokeUnAssign(hri);
+                }
+                return; // Done.
+              }
+            }
+          } finally {
+            lock.unlock();
+          }
+        }
+      });
   }
 
   /**
@@ -2871,7 +3088,15 @@ public class AssignmentManager extends ZooKeeperListener {
   }
 
   void invokeAssign(HRegionInfo regionInfo) {
-    threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
+    invokeAssign(regionInfo, true);
+  }
+
+  void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
+    threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
+  }
+
+  void invokeUnAssign(HRegionInfo regionInfo) {
+    threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
   }
 
   public boolean isCarryingMeta(ServerName serverName) {
@@ -3024,6 +3249,7 @@ public class AssignmentManager extends ZooKeeperListener {
     }
     threadPoolExecutorService.shutdownNow();
     zkEventWorkers.shutdownNow();
+    regionStateStore.stop();
   }
 
   protected void setEnabledTable(TableName tableName) {
@@ -3098,6 +3324,180 @@ public class AssignmentManager extends ZooKeeperListener {
       EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
   }
 
+  private void onRegionFailedOpen(
+      final HRegionInfo hri, final ServerName sn) {
+    String encodedName = hri.getEncodedName();
+    AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
+    if (failedOpenCount == null) {
+      failedOpenCount = new AtomicInteger();
+      // No need to use putIfAbsent, or extra synchronization since
+      // this whole handleRegion block is locked on the encoded region
+      // name, and failedOpenTracker is updated only in this block
+      failedOpenTracker.put(encodedName, failedOpenCount);
+    }
+    if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
+      regionStates.updateRegionState(hri, State.FAILED_OPEN);
+      // remove the tracking info to save memory, also reset
+      // the count for next open initiative
+      failedOpenTracker.remove(encodedName);
+    } else {
+      // Handle this the same as if it were opened and then closed.
+      RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
+      if (regionState != null) {
+        // When there are more than one region server a new RS is selected as the
+        // destination and the same is updated in the region plan. (HBASE-5546)
+        if (getTableStateManager().isTableState(hri.getTable(),
+            ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+          offlineDisabledRegion(hri);
+          return;
+        }
+        // ZK Node is in CLOSED state, assign it.
+         regionStates.updateRegionState(hri, RegionState.State.CLOSED);
+        // This below has to do w/ online enable/disable of a table
+        removeClosedRegion(hri);
+        try {
+          getRegionPlan(hri, sn, true);
+        } catch (HBaseIOException e) {
+          LOG.warn("Failed to get region plan", e);
+        }
+        invokeAssign(hri, false);
+      }
+    }
+  }
+
+  private void onRegionOpen(
+      final HRegionInfo hri, final ServerName sn, long openSeqNum) {
+    regionOnline(hri, sn, openSeqNum);
+    if (useZKForAssignment) {
+      try {
+        // Delete the ZNode if exists
+        ZKAssign.deleteNodeFailSilent(watcher, hri);
+      } catch (KeeperException ke) {
+        server.abort("Unexpected ZK exception deleting node " + hri, ke);
+      }
+    }
+
+    // reset the count, if any
+    failedOpenTracker.remove(hri.getEncodedName());
+    if (getTableStateManager().isTableState(hri.getTable(),
+        ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+      invokeUnAssign(hri);
+    }
+  }
+
+  private void onRegionClosed(final HRegionInfo hri) {
+    if (getTableStateManager().isTableState(hri.getTable(),
+        ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+      offlineDisabledRegion(hri);
+      return;
+    }
+    regionStates.updateRegionState(hri, RegionState.State.CLOSED);
+    sendRegionClosedNotification(hri);
+    // This below has to do w/ online enable/disable of a table
+    removeClosedRegion(hri);
+    invokeAssign(hri, false);
+  }
+
+  private String onRegionSplit(ServerName sn, TransitionCode code,
+      HRegionInfo p, HRegionInfo a, HRegionInfo b) {
+    RegionState rs_p = regionStates.getRegionState(p);
+    RegionState rs_a = regionStates.getRegionState(a);
+    RegionState rs_b = regionStates.getRegionState(b);
+    if (!(rs_p.isOpenOrSplittingOnServer(sn)
+        && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
+        && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
+      return "Not in state good for split";
+    }
+
+    regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
+    regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
+    regionStates.updateRegionState(p, State.SPLITTING);
+
+    if (code == TransitionCode.SPLIT) {
+      if (TEST_SKIP_SPLIT_HANDLING) {
+        return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
+      }
+      regionOffline(p, State.SPLIT);
+      regionOnline(a, sn, 1);
+      regionOnline(b, sn, 1);
+
+      // User could disable the table before master knows the new region.
+      if (getTableStateManager().isTableState(p.getTable(),
+          ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+        invokeUnAssign(a);
+        invokeUnAssign(b);
+      }
+    } else if (code == TransitionCode.SPLIT_PONR) {
+      try {
+        regionStateStore.splitRegion(p, a, b, sn);
+      } catch (IOException ioe) {
+        LOG.info("Failed to record split region " + p.getShortNameToLog());
+        return "Failed to record the splitting in meta";
+      }
+    } else if (code == TransitionCode.SPLIT_REVERTED) {
+      regionOnline(p, sn);
+      regionOffline(a);
+      regionOffline(b);
+
+      if (getTableStateManager().isTableState(p.getTable(),
+          ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+        invokeUnAssign(p);
+      }
+    }
+    return null;
+  }
+
+  private String onRegionMerge(ServerName sn, TransitionCode code,
+      HRegionInfo p, HRegionInfo a, HRegionInfo b) {
+    RegionState rs_p = regionStates.getRegionState(p);
+    RegionState rs_a = regionStates.getRegionState(a);
+    RegionState rs_b = regionStates.getRegionState(b);
+    if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
+        && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
+      return "Not in state good for merge";
+    }
+
+    regionStates.updateRegionState(a, State.MERGING);
+    regionStates.updateRegionState(b, State.MERGING);
+    regionStates.updateRegionState(p, State.MERGING_NEW, sn);
+
+    String encodedName = p.getEncodedName();
+    if (code == TransitionCode.READY_TO_MERGE) {
+      mergingRegions.put(encodedName,
+        new PairOfSameType<HRegionInfo>(a, b));
+    } else if (code == TransitionCode.MERGED) {
+      mergingRegions.remove(encodedName);
+      regionOffline(a, State.MERGED);
+      regionOffline(b, State.MERGED);
+      regionOnline(p, sn, 1);
+
+      // User could disable the table before master knows the new region.
+      if (getTableStateManager().isTableState(p.getTable(),
+          ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+        invokeUnAssign(p);
+      }
+    } else if (code == TransitionCode.MERGE_PONR) {
+      try {
+        regionStateStore.mergeRegions(p, a, b, sn);
+      } catch (IOException ioe) {
+        LOG.info("Failed to record merged region " + p.getShortNameToLog());
+        return "Failed to record the merging in meta";
+      }
+    } else {
+      mergingRegions.remove(encodedName);
+      regionOnline(a, sn);
+      regionOnline(b, sn);
+      regionOffline(p);
+
+      if (getTableStateManager().isTableState(p.getTable(),
+          ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+        invokeUnAssign(a);
+        invokeUnAssign(b);
+      }
+    }
+    return null;
+  }
+
   /**
    * A helper to handle region merging transition event.
    * It transitions merging regions to MERGING state.
@@ -3167,7 +3567,6 @@ public class AssignmentManager extends ZooKeeperListener {
       regionStates.updateRegionState(p, State.MERGING_NEW, sn);
 
       if (et != EventType.RS_ZK_REGION_MERGED) {
-        regionStates.regionOffline(p, State.MERGING_NEW);
         this.mergingRegions.put(encodedName,
           new PairOfSameType<HRegionInfo>(hri_a, hri_b));
       } else {
@@ -3288,8 +3687,6 @@ public class AssignmentManager extends ZooKeeperListener {
     synchronized (regionStates) {
       regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
       regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
-      regionStates.regionOffline(hri_a, State.SPLITTING_NEW);
-      regionStates.regionOffline(hri_b, State.SPLITTING_NEW);
       regionStates.updateRegionState(rt, State.SPLITTING);
 
       // The below is for testing ONLY!  We can't do fault injection easily, so
@@ -3373,6 +3770,121 @@ public class AssignmentManager extends ZooKeeperListener {
   }
 
   /**
+   * Try to update some region states. If the state machine prevents
+   * such update, an error message is returned to explain the reason.
+   *
+   * It's expected that in each transition there should have just one
+   * region for opening/closing, 3 regions for splitting/merging.
+   * These regions should be on the server that requested the change.
+   *
+   * Region state machine. Only these transitions
+   * are expected to be triggered by a region server.
+   *
+   * On the state transition:
+   *  (1) Open/Close should be initiated by master
+   *      (a) Master sets the region to pending_open/pending_close
+   *        in memory and hbase:meta after sending the request
+   *        to the region server
+   *      (b) Region server reports back to the master
+   *        after open/close is done (either success/failure)
+   *      (c) If region server has problem to report the status
+   *        to master, it must be because the master is down or some
+   *        temporary network issue. Otherwise, the region server should
+   *        abort since it must be a bug. If the master is not accessible,
+   *        the region server should keep trying until the server is
+   *        stopped or till the status is reported to the (new) master
+   *      (d) If region server dies in the middle of opening/closing
+   *        a region, SSH picks it up and finishes it
+   *      (e) If master dies in the middle, the new master recovers
+   *        the state during initialization from hbase:meta. Region server
+   *        can report any transition that has not been reported to
+   *        the previous active master yet
+   *  (2) Split/merge is initiated by region servers
+   *      (a) To split a region, a region server sends a request
+   *        to master to try to set a region to splitting, together with
+   *        two daughters (to be created) to splitting new. If approved
+   *        by the master, the splitting can then move ahead
+   *      (b) To merge two regions, a region server sends a request to
+   *        master to try to set the new merged region (to be created) to
+   *        merging_new, together with two regions (to be merged) to merging.
+   *        If it is ok with the master, the merge can then move ahead
+   *      (c) Once the splitting/merging is done, the region server
+   *        reports the status back to the master either success/failure.
+   *      (d) Other scenarios should be handled similarly as for
+   *        region open/close
+   */
+  protected String onRegionTransition(final ServerName serverName,
+      final RegionServerStatusProtos.RegionTransition transition) {
+    TransitionCode code = transition.getTransitionCode();
+    HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
+    RegionState current = regionStates.getRegionState(hri);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got transition " + code + " for "
+        + (current != null ? current.toString() : hri.getShortNameToLog())
+        + " from " + serverName);
+    }
+    String errorMsg = null;
+    switch (code) {
+    case OPENED:
+    case FAILED_OPEN:
+      if (current == null
+          || !current.isPendingOpenOrOpeningOnServer(serverName)) {
+        errorMsg = hri.getShortNameToLog()
+          + " is not pending open on " + serverName;
+      } else if (code == TransitionCode.FAILED_OPEN) {
+        onRegionFailedOpen(hri, serverName);
+      } else {
+        long openSeqNum = HConstants.NO_SEQNUM;
+        if (transition.hasOpenSeqNum()) {
+          openSeqNum = transition.getOpenSeqNum();
+        }
+        if (openSeqNum < 0) {
+          errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
+        } else {
+          onRegionOpen(hri, serverName, openSeqNum);
+        }
+      }
+      break;
+
+    case CLOSED:
+      if (current == null
+          || !current.isPendingCloseOrClosingOnServer(serverName)) {
+        errorMsg = hri.getShortNameToLog()
+          + " is not pending close on " + serverName;
+      } else {
+        onRegionClosed(hri);
+      }
+      break;
+
+    case READY_TO_SPLIT:
+    case SPLIT_PONR:
+    case SPLIT:
+    case SPLIT_REVERTED:
+      errorMsg = onRegionSplit(serverName, code, hri,
+        HRegionInfo.convert(transition.getRegionInfo(1)),
+        HRegionInfo.convert(transition.getRegionInfo(2)));
+      break;
+
+    case READY_TO_MERGE:
+    case MERGE_PONR:
+    case MERGED:
+    case MERGE_REVERTED:
+      errorMsg = onRegionMerge(serverName, code, hri,
+        HRegionInfo.convert(transition.getRegionInfo(1)),
+        HRegionInfo.convert(transition.getRegionInfo(2)));
+      break;
+
+    default:
+      errorMsg = "Unexpected transition code " + code;
+    }
+    if (errorMsg != null) {
+      LOG.error("Failed to transtion region from " + current + " to "
+        + code + " by " + serverName + ": " + errorMsg);
+    }
+    return errorMsg;
+  }
+
+  /**
    * @return Instance of load balancer
    */
   public LoadBalancer getBalancer() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 1f43208..4de9d33 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.PleaseHoldException;
 import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -46,9 +47,7 @@ import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.*;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@@ -143,8 +142,11 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionTransitionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionTransitionResponse;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@@ -1240,4 +1242,31 @@ public class MasterRpcServices extends RSRpcServices
       throw new ServiceException(ioe);
     }
   }
+
+  @Override
+  public ReportRegionTransitionResponse reportRegionTransition(RpcController controller,
+      ReportRegionTransitionRequest req) throws ServiceException {
+    try {
+      master.checkServiceStarted();
+      RegionTransition rt = req.getTransition(0);
+      TableName tableName = ProtobufUtil.toTableName(
+        rt.getRegionInfo(0).getTableName());
+      if (!TableName.META_TABLE_NAME.equals(tableName)
+          && !master.assignmentManager.isFailoverCleanupDone()) {
+        // Meta region is assigned before master finishes the
+        // failover cleanup. So no need this check for it
+        throw new PleaseHoldException("Master is rebuilding user regions");
+      }
+      ServerName sn = ProtobufUtil.toServerName(req.getServer());
+      String error = master.assignmentManager.onRegionTransition(sn, rt);
+      ReportRegionTransitionResponse.Builder rrtr =
+        ReportRegionTransitionResponse.newBuilder();
+      if (error != null) {
+        rrtr.setErrorMessage(error);
+      }
+      return rrtr.build();
+    } catch (IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
new file mode 100644
index 0000000..e0f07ff
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConfigUtil;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A helper to persist region state in meta. We may change this class
+ * to StateStore later if we also use it to store other states in meta
+ */
+@InterfaceAudience.Private
+public class RegionStateStore {
+  private static final Log LOG = LogFactory.getLog(RegionStateStore.class);
+
+  private volatile HRegion metaRegion;
+  private volatile HTableInterface metaTable;
+  private volatile boolean initialized;
+
+  private final boolean noPersistence;
+  private final CatalogTracker catalogTracker;
+  private final Server server;
+
+  /**
+   * Returns the {@link ServerName} from catalog table {@link Result}
+   * where the region is transitioning. It should be the same as
+   * {@link HRegionInfo#getServerName(Result)} if the server is at OPEN state.
+   * @param r Result to pull the transitioning server name from
+   * @return A ServerName instance or {@link HRegionInfo#getServerName(Result)}
+   * if necessary fields not found or empty.
+   */
+  static ServerName getRegionServer(final Result r) {
+    Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, HConstants.SERVERNAME_QUALIFIER);
+    if (cell == null || cell.getValueLength() == 0) return HRegionInfo.getServerName(r);
+    return ServerName.parseServerName(Bytes.toString(cell.getValueArray(),
+      cell.getValueOffset(), cell.getValueLength()));
+  }
+
+  /**
+   * Pull the region state from a catalog table {@link Result}.
+   * @param r Result to pull the region state from
+   * @return the region state, or OPEN if there's no value written.
+   */
+  static State getRegionState(final Result r) {
+    Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER);
+    if (cell == null || cell.getValueLength() == 0) return State.OPEN;
+    return State.valueOf(Bytes.toString(cell.getValueArray(),
+      cell.getValueOffset(), cell.getValueLength()));
+  }
+
+  /**
+   * Check if we should persist a state change in meta. Generally it's
+   * better to persist all state changes. However, we should not do that
+   * if the region is not in meta at all. Based on the state and the
+   * previous state, we can identify if a user region has an entry
+   * in meta. For example, merged regions are deleted from meta;
+   * New merging parents, or splitting daughters are
+   * not created in meta yet.
+   */
+  private boolean shouldPersistStateChange(
+      HRegionInfo hri, RegionState state, RegionState oldState) {
+    return !hri.isMetaRegion() && !RegionStates.isOneOfStates(
+      state, State.MERGING_NEW, State.SPLITTING_NEW, State.MERGED)
+      && !(RegionStates.isOneOfStates(state, State.OFFLINE)
+        && RegionStates.isOneOfStates(oldState, State.MERGING_NEW,
+          State.SPLITTING_NEW, State.MERGED));
+  }
+
+  RegionStateStore(final Server server) {
+    Configuration conf = server.getConfiguration();
+    // No need to persist if using ZK but not migrating
+    noPersistence = ConfigUtil.useZKForAssignment(conf)
+      && !conf.getBoolean("hbase.assignment.usezk.migrating", false);
+    catalogTracker = server.getCatalogTracker();
+    this.server = server;
+    initialized = false;
+  }
+
+  @SuppressWarnings("deprecation")
+  void start() throws IOException {
+    if (!noPersistence) {
+      if (server instanceof RegionServerServices) {
+        metaRegion = ((RegionServerServices)server).getFromOnlineRegions(
+          HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
+      }
+      if (metaRegion == null) {
+        metaTable = new HTable(TableName.META_TABLE_NAME,
+          catalogTracker.getConnection());
+      }
+    }
+    initialized = true;
+  }
+
+  void stop() {
+    initialized = false;
+    if (metaTable != null) {
+      try {
+        metaTable.close();
+      } catch (IOException e) {
+        LOG.info("Got exception in closing meta table", e);
+      } finally {
+        metaTable = null;
+      }
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  void updateRegionState(long openSeqNum,
+      RegionState newState, RegionState oldState) {
+    if (noPersistence || !initialized) {
+      return;
+    }
+
+    HRegionInfo hri = newState.getRegion();
+    if (!shouldPersistStateChange(hri, newState, oldState)) {
+      return;
+    }
+
+    ServerName oldServer = oldState != null ? oldState.getServerName() : null;
+    ServerName serverName = newState.getServerName();
+    State state = newState.getState();
+
+    try {
+      Put put = new Put(hri.getRegionName());
+      StringBuilder info = new StringBuilder("Updating row ");
+      info.append(hri.getRegionNameAsString()).append(" with state=").append(state);
+      if (serverName != null && !serverName.equals(oldServer)) {
+        put.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVERNAME_QUALIFIER,
+          Bytes.toBytes(serverName.getServerName()));
+        info.append("&sn=").append(serverName);
+      }
+      if (openSeqNum >= 0) {
+        Preconditions.checkArgument(state == State.OPEN
+          && serverName != null, "Open region should be on a server");
+        put.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
+          Bytes.toBytes(serverName.getHostAndPort()));
+        put.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
+          Bytes.toBytes(serverName.getStartcode()));
+        put.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
+          Bytes.toBytes(openSeqNum));
+        info.append("&openSeqNum=").append(openSeqNum);
+        info.append("&server=").append(serverName);
+      }
+      put.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
+        Bytes.toBytes(state.name()));
+      LOG.info(info);
+
+      // Persist the state change to meta
+      if (metaRegion != null) {
+        try {
+          // Assume meta is pinned to master.
+          // At least, that's what we want.
+          metaRegion.put(put);
+          return; // Done here
+        } catch (Throwable t) {
+          // In unit tests, meta could be moved away by intention
+          // So, the shortcut is gone. We won't try to establish the
+          // shortcut any more because we prefer meta to be pinned
+          // to the master
+          synchronized (this) {
+            if (metaRegion != null) {
+              LOG.info("Meta region shortcut failed", t);
+              metaTable = new HTable(TableName.META_TABLE_NAME,
+                catalogTracker.getConnection());
+              metaRegion = null;
+            }
+          }
+        }
+      }
+      synchronized(metaTable) {
+        metaTable.put(put);
+      }
+    } catch (IOException ioe) {
+      LOG.error("Failed to persist region state " + newState, ioe);
+      server.abort("Failed to update region location", ioe);
+    }
+  }
+
+  void splitRegion(HRegionInfo p,
+      HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException {
+    MetaEditor.splitRegion(catalogTracker, p, a, b, sn);
+  }
+
+  void mergeRegions(HRegionInfo p,
+      HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException {
+    MetaEditor.mergeRegions(catalogTracker, p, a, b, sn);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
index 5f96a22..39c42b9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RegionTransition;
 import org.apache.hadoop.hbase.Server;
@@ -109,6 +110,7 @@ public class RegionStates {
   private final HashMap<ServerName, Long> processedServers;
   private long lastProcessedServerCleanTime;
 
+  private final RegionStateStore regionStateStore;
   private final ServerManager serverManager;
   private final Server server;
 
@@ -116,7 +118,8 @@ public class RegionStates {
   static final String LOG_SPLIT_TIME = "hbase.master.maximum.logsplit.keeptime";
   static final long DEFAULT_LOG_SPLIT_TIME = 7200000L; // 2 hours
 
-  RegionStates(final Server master, final ServerManager serverManager) {
+  RegionStates(final Server master,
+      final ServerManager serverManager, final RegionStateStore regionStateStore) {
     regionStates = new HashMap<String, RegionState>();
     regionsInTransition = new HashMap<String, RegionState>();
     serverHoldings = new HashMap<ServerName, Set<HRegionInfo>>();
@@ -124,6 +127,7 @@ public class RegionStates {
     lastAssignments = new HashMap<String, ServerName>();
     processedServers = new HashMap<ServerName, Long>();
     deadServers = new HashMap<String, Long>();
+    this.regionStateStore = regionStateStore;
     this.serverManager = serverManager;
     this.server = master;
   }
@@ -188,7 +192,7 @@ public class RegionStates {
   /**
    * @return True if specified region is in one of the specified states.
    */
-  public synchronized boolean isRegionInState(
+  public boolean isRegionInState(
       final HRegionInfo hri, final State... states) {
     return isRegionInState(hri.getEncodedName(), states);
   }
@@ -196,14 +200,10 @@ public class RegionStates {
   /**
    * @return True if specified region is in one of the specified states.
    */
-  public synchronized boolean isRegionInState(
+  public boolean isRegionInState(
       final String encodedName, final State... states) {
     RegionState regionState = getRegionState(encodedName);
-    State s = regionState != null ? regionState.getState() : null;
-    for (State state: states) {
-      if (s == state) return true;
-    }
-    return false;
+    return isOneOfStates(regionState, states);
   }
 
   /**
@@ -217,9 +217,8 @@ public class RegionStates {
   /**
    * Get region transition state
    */
-  public synchronized RegionState
-      getRegionTransitionState(final HRegionInfo hri) {
-    return regionsInTransition.get(hri.getEncodedName());
+  public RegionState getRegionTransitionState(final HRegionInfo hri) {
+    return getRegionTransitionState(hri.getEncodedName());
   }
 
   /**
@@ -235,7 +234,7 @@ public class RegionStates {
    * and offline, its state will be SPLIT. Otherwise, its state will
    * be OFFLINE. Region already in RegionStates will be skipped.
    */
-  public synchronized void createRegionStates(
+  public void createRegionStates(
       final List<HRegionInfo> hris) {
     for (HRegionInfo hri: hris) {
       createRegionState(hri);
@@ -248,16 +247,44 @@ public class RegionStates {
    * be OFFLINE. If it is already in RegionStates, this call has
    * no effect, and the original state is returned.
    */
-  public synchronized RegionState createRegionState(final HRegionInfo hri) {
-    State newState = (hri.isOffline() && hri.isSplit()) ? State.SPLIT : State.OFFLINE;
+  public RegionState createRegionState(final HRegionInfo hri) {
+    return createRegionState(hri, null, null);
+  }
+
+  /**
+   * Add a region to RegionStates with the specified state.
+   * If the region is already in RegionStates, this call has
+   * no effect, and the original state is returned.
+   */
+  public synchronized RegionState createRegionState(
+      final HRegionInfo hri, State newState, ServerName serverName) {
+    if (newState == null || (newState == State.OPEN && serverName == null)) {
+      newState =  State.OFFLINE;
+    }
+    if (hri.isOffline() && hri.isSplit()) {
+      newState = State.SPLIT;
+      serverName = null;
+    }
     String encodedName = hri.getEncodedName();
     RegionState regionState = regionStates.get(encodedName);
     if (regionState != null) {
       LOG.warn("Tried to create a state for a region already in RegionStates, "
         + "used existing: " + regionState + ", ignored new: " + newState);
     } else {
-      regionState = new RegionState(hri, newState);
+      regionState = new RegionState(hri, newState, serverName);
       regionStates.put(encodedName, regionState);
+      if (newState == State.OPEN) {
+        regionAssignments.put(hri, serverName);
+        lastAssignments.put(encodedName, serverName);
+        Set<HRegionInfo> regions = serverHoldings.get(serverName);
+        if (regions == null) {
+          regions = new HashSet<HRegionInfo>();
+          serverHoldings.put(serverName, regions);
+        }
+        regions.add(hri);
+      } else if (!regionState.isUnassignable()) {
+        regionsInTransition.put(encodedName, regionState);
+      }
     }
     return regionState;
   }
@@ -265,9 +292,9 @@ public class RegionStates {
   /**
    * Update a region state. It will be put in transition if not already there.
    */
-  public synchronized RegionState updateRegionState(
+  public RegionState updateRegionState(
       final HRegionInfo hri, final State state) {
-    RegionState regionState = regionStates.get(hri.getEncodedName());
+    RegionState regionState = getRegionState(hri.getEncodedName());
     return updateRegionState(hri, state,
       regionState == null ? null : regionState.getServerName());
   }
@@ -278,7 +305,7 @@ public class RegionStates {
    * If we can't find the region info based on the region name in
    * the transition, log a warning and return null.
    */
-  public synchronized RegionState updateRegionState(
+  public RegionState updateRegionState(
       final RegionTransition transition, final State state) {
     byte [] regionName = transition.getRegionName();
     HRegionInfo regionInfo = getRegionInfo(regionName);
@@ -297,54 +324,14 @@ public class RegionStates {
   /**
    * Update a region state. It will be put in transition if not already there.
    */
-  public synchronized RegionState updateRegionState(
+  public RegionState updateRegionState(
       final HRegionInfo hri, final State state, final ServerName serverName) {
-    if (state == State.FAILED_CLOSE || state == State.FAILED_OPEN) {
-      LOG.warn("Failed to open/close " + hri.getShortNameToLog()
-        + " on " + serverName + ", set to " + state);
-    }
-
-    String encodedName = hri.getEncodedName();
-    RegionState regionState = new RegionState(
-      hri, state, System.currentTimeMillis(), serverName);
-    regionsInTransition.put(encodedName, regionState);
-    RegionState oldState = regionStates.put(encodedName, regionState);
-    ServerName oldServerName = oldState == null ? null : oldState.getServerName();
-    if (oldState == null || oldState.getState() != regionState.getState()
-        || (oldServerName == null && serverName != null)
-        || (oldServerName != null && !oldServerName.equals(serverName))) {
-      LOG.info("Transitioned " + oldState + " to " + regionState);
-    }
-
-    // For these states, region should be properly closed.
-    // There should be no log splitting issue.
-    if ((state == State.CLOSED || state == State.MERGED
-        || state == State.SPLIT) && lastAssignments.containsKey(encodedName)) {
-      ServerName last = lastAssignments.get(encodedName);
-      if (last.equals(serverName)) {
-        lastAssignments.remove(encodedName);
-      } else {
-        LOG.warn(encodedName + " moved to " + state + " on "
-          + serverName + ", expected " + last);
-      }
-    }
-
-    // Once a region is opened, record its last assignment right away.
-    if (serverName != null && state == State.OPEN) {
-      ServerName last = lastAssignments.get(encodedName);
-      if (!serverName.equals(last)) {
-        lastAssignments.put(encodedName, serverName);
-        if (last != null && isServerDeadAndNotProcessed(last)) {
-          LOG.warn(encodedName + " moved to " + serverName
-            + ", while it's previous host " + last
-            + " is dead but not processed yet");
-        }
-      }
-    }
+    return updateRegionState(hri, state, serverName, HConstants.NO_SEQNUM);
+  }
 
-    // notify the change
-    this.notifyAll();
-    return regionState;
+  public void regionOnline(
+      final HRegionInfo hri, final ServerName serverName) {
+    regionOnline(hri, serverName, HConstants.NO_SEQNUM);
   }
 
   /**
@@ -352,8 +339,8 @@ public class RegionStates {
    * We can't confirm it is really online on specified region server
    * because it hasn't been put in region server's online region list yet.
    */
-  public synchronized void regionOnline(
-      final HRegionInfo hri, final ServerName serverName) {
+  public void regionOnline(final HRegionInfo hri,
+      final ServerName serverName, long openSeqNum) {
     if (!serverManager.isServerOnline(serverName)) {
       // This is possible if the region server dies before master gets a
       // chance to handle ZK event in time. At this time, if the dead server
@@ -363,30 +350,26 @@ public class RegionStates {
         + " was opened on a dead server: " + serverName);
       return;
     }
+    updateRegionState(hri, State.OPEN, serverName, openSeqNum);
 
-    String encodedName = hri.getEncodedName();
-    RegionState oldState = regionStates.get(encodedName);
-    if (oldState == null) {
-      LOG.warn("Online region not in RegionStates: " + hri.getShortNameToLog());
-    }
-    updateRegionState(hri, State.OPEN, serverName);
-    regionsInTransition.remove(encodedName);
-
-    ServerName oldServerName = regionAssignments.put(hri, serverName);
-    if (!serverName.equals(oldServerName)) {
-      LOG.info("Onlined " + hri.getShortNameToLog() + " on " + serverName);
-      Set<HRegionInfo> regions = serverHoldings.get(serverName);
-      if (regions == null) {
-        regions = new HashSet<HRegionInfo>();
-        serverHoldings.put(serverName, regions);
-      }
-      regions.add(hri);
-      if (oldServerName != null) {
-        LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
-        Set<HRegionInfo> oldRegions = serverHoldings.get(oldServerName);
-        oldRegions.remove(hri);
-        if (oldRegions.isEmpty()) {
-          serverHoldings.remove(oldServerName);
+    synchronized (this) {
+      regionsInTransition.remove(hri.getEncodedName());
+      ServerName oldServerName = regionAssignments.put(hri, serverName);
+      if (!serverName.equals(oldServerName)) {
+        LOG.info("Onlined " + hri.getShortNameToLog() + " on " + serverName);
+        Set<HRegionInfo> regions = serverHoldings.get(serverName);
+        if (regions == null) {
+          regions = new HashSet<HRegionInfo>();
+          serverHoldings.put(serverName, regions);
+        }
+        regions.add(hri);
+        if (oldServerName != null) {
+          LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
+          Set<HRegionInfo> oldRegions = serverHoldings.get(oldServerName);
+          oldRegions.remove(hri);
+          if (oldRegions.isEmpty()) {
+            serverHoldings.remove(oldServerName);
+          }
         }
       }
     }
@@ -405,6 +388,9 @@ public class RegionStates {
       }
     }
     long now = System.currentTimeMillis();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding to processed servers " + serverName);
+    }
     processedServers.put(serverName, Long.valueOf(now));
     Configuration conf = server.getConfiguration();
     long obsoleteTime = conf.getLong(LOG_SPLIT_TIME, DEFAULT_LOG_SPLIT_TIME);
@@ -416,6 +402,9 @@ public class RegionStates {
           = processedServers.entrySet().iterator(); it.hasNext();) {
         Map.Entry<ServerName, Long> e = it.next();
         if (e.getValue().longValue() < cutoff) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Removed from processed servers " + e.getKey());
+          }
           it.remove();
         }
       }
@@ -425,7 +414,7 @@ public class RegionStates {
   /**
    * Log split is done for a given region, so it is assignable now.
    */
-  public synchronized void logSplit(final HRegionInfo region) {
+  public void logSplit(final HRegionInfo region) {
     clearLastAssignment(region);
   }
 
@@ -445,24 +434,30 @@ public class RegionStates {
    * should be the specified expected state, which can only be
    * Split/Merged/Offline/null(=Offline)/SplittingNew/MergingNew.
    */
-  public synchronized void regionOffline(
+  public void regionOffline(
       final HRegionInfo hri, final State expectedState) {
     Preconditions.checkArgument(expectedState == null
       || RegionState.isUnassignable(expectedState),
         "Offlined region should not be " + expectedState);
-    String encodedName = hri.getEncodedName();
+    if (isRegionInState(hri, State.SPLITTING_NEW, State.MERGING_NEW)) {
+      // Remove it from all region maps
+      deleteRegion(hri);
+      return;
+    }
     State newState =
       expectedState == null ? State.OFFLINE : expectedState;
     updateRegionState(hri, newState);
-    regionsInTransition.remove(encodedName);
 
-    ServerName oldServerName = regionAssignments.remove(hri);
-    if (oldServerName != null) {
-      LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
-      Set<HRegionInfo> oldRegions = serverHoldings.get(oldServerName);
-      oldRegions.remove(hri);
-      if (oldRegions.isEmpty()) {
-        serverHoldings.remove(oldServerName);
+    synchronized (this) {
+      regionsInTransition.remove(hri.getEncodedName());
+      ServerName oldServerName = regionAssignments.remove(hri);
+      if (oldServerName != null && serverHoldings.containsKey(oldServerName)) {
+        LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
+        Set<HRegionInfo> oldRegions = serverHoldings.get(oldServerName);
+        oldRegions.remove(hri);
+        if (oldRegions.isEmpty()) {
+          serverHoldings.remove(oldServerName);
+        }
       }
     }
   }
@@ -580,12 +575,14 @@ public class RegionStates {
    * A table is deleted. Remove its regions from all internal maps.
    * We loop through all regions assuming we don't delete tables too much.
    */
-  public synchronized void tableDeleted(final TableName tableName) {
+  public void tableDeleted(final TableName tableName) {
     Set<HRegionInfo> regionsToDelete = new HashSet<HRegionInfo>();
-    for (RegionState state: regionStates.values()) {
-      HRegionInfo region = state.getRegion();
-      if (region.getTable().equals(tableName)) {
-        regionsToDelete.add(region);
+    synchronized (this) {
+      for (RegionState state: regionStates.values()) {
+        HRegionInfo region = state.getRegion();
+        if (region.getTable().equals(tableName)) {
+          regionsToDelete.add(region);
+        }
       }
     }
     for (HRegionInfo region: regionsToDelete) {
@@ -654,6 +651,22 @@ public class RegionStates {
     lastAssignments.put(encodedName, serverName);
   }
 
+  synchronized void closeAllUserRegions(Set<TableName> excludedTables) {
+    boolean noExcludeTables = excludedTables == null || excludedTables.isEmpty();
+    Set<HRegionInfo> toBeClosed = new HashSet<HRegionInfo>(regionStates.size());
+    for(RegionState state: regionStates.values()) {
+      HRegionInfo hri = state.getRegion();
+      TableName tableName = hri.getTable();
+      if (!TableName.META_TABLE_NAME.equals(tableName) && !hri.isSplit()
+          && (noExcludeTables || !excludedTables.contains(tableName))) {
+        toBeClosed.add(hri);
+      }
+    }
+    for (HRegionInfo hri: toBeClosed) {
+      updateRegionState(hri, State.CLOSED);
+    }
+  }
+
   /**
    * Compute the average load across all region servers.
    * Currently, this uses a very naive computation - just uses the number of
@@ -739,8 +752,8 @@ public class RegionStates {
     return result;
   }
 
-  protected synchronized RegionState getRegionState(final HRegionInfo hri) {
-    return regionStates.get(hri.getEncodedName());
+  protected RegionState getRegionState(final HRegionInfo hri) {
+    return getRegionState(hri.getEncodedName());
   }
 
   protected synchronized RegionState getRegionState(final String encodedName) {
@@ -754,7 +767,7 @@ public class RegionStates {
    */
   protected HRegionInfo getRegionInfo(final byte [] regionName) {
     String encodedName = HRegionInfo.encodeRegionName(regionName);
-    RegionState regionState = regionStates.get(encodedName);
+    RegionState regionState = getRegionState(encodedName);
     if (regionState != null) {
       return regionState.getRegion();
     }
@@ -774,10 +787,74 @@ public class RegionStates {
     }
   }
 
+  static boolean isOneOfStates(RegionState regionState, State... states) {
+    State s = regionState != null ? regionState.getState() : null;
+    for (State state: states) {
+      if (s == state) return true;
+    }
+    return false;
+  }
+
+  /**
+   * Update a region state. It will be put in transition if not already there.
+   */
+  private RegionState updateRegionState(final HRegionInfo hri,
+      final State state, final ServerName serverName, long openSeqNum) {
+    if (state == State.FAILED_CLOSE || state == State.FAILED_OPEN) {
+      LOG.warn("Failed to open/close " + hri.getShortNameToLog()
+        + " on " + serverName + ", set to " + state);
+    }
+
+    String encodedName = hri.getEncodedName();
+    RegionState regionState = new RegionState(
+      hri, state, System.currentTimeMillis(), serverName);
+    RegionState oldState = getRegionState(encodedName);
+    if (!regionState.equals(oldState)) {
+      LOG.info("Transition " + oldState + " to " + regionState);
+      // Persist region state before updating in-memory info, if needed
+      regionStateStore.updateRegionState(openSeqNum, regionState, oldState);
+    }
+
+    synchronized (this) {
+      regionsInTransition.put(encodedName, regionState);
+      regionStates.put(encodedName, regionState);
+
+      // For these states, region should be properly closed.
+      // There should be no log splitting issue.
+      if ((state == State.CLOSED || state == State.MERGED
+          || state == State.SPLIT) && lastAssignments.containsKey(encodedName)) {
+        ServerName last = lastAssignments.get(encodedName);
+        if (last.equals(serverName)) {
+          lastAssignments.remove(encodedName);
+        } else {
+          LOG.warn(encodedName + " moved to " + state + " on "
+            + serverName + ", expected " + last);
+        }
+      }
+
+      // Once a region is opened, record its last assignment right away.
+      if (serverName != null && state == State.OPEN) {
+        ServerName last = lastAssignments.get(encodedName);
+        if (!serverName.equals(last)) {
+          lastAssignments.put(encodedName, serverName);
+          if (last != null && isServerDeadAndNotProcessed(last)) {
+            LOG.warn(encodedName + " moved to " + serverName
+              + ", while it's previous host " + last
+              + " is dead but not processed yet");
+          }
+        }
+      }
+
+      // notify the change
+      this.notifyAll();
+    }
+    return regionState;
+  }
+
   /**
    * Remove a region from all state maps.
    */
-  private void deleteRegion(final HRegionInfo hri) {
+  private synchronized void deleteRegion(final HRegionInfo hri) {
     String encodedName = hri.getEncodedName();
     regionsInTransition.remove(encodedName);
     regionStates.remove(encodedName);