You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/08/20 22:16:27 UTC

[6/7] hbase git commit: HBASE-20881 Introduce a region transition procedure to handle all the state transition for a region

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index 70a9680..9b020c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -19,14 +19,12 @@ package org.apache.hadoop.hbase.master.assignment;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,22 +32,23 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.PleaseHoldException;
 import org.apache.hadoop.hbase.RegionException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
 import org.apache.hadoop.hbase.favored.FavoredNodesManager;
 import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
-import org.apache.hadoop.hbase.master.AssignmentListener;
 import org.apache.hadoop.hbase.master.LoadBalancer;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
@@ -59,14 +58,10 @@ import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.RegionState.State;
 import org.apache.hadoop.hbase.master.ServerListener;
 import org.apache.hadoop.hbase.master.TableStateManager;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerState;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode;
 import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
 import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
-import org.apache.hadoop.hbase.master.procedure.ServerCrashException;
 import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
@@ -90,7 +85,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
@@ -149,10 +143,6 @@ public class AssignmentManager implements ServerListener {
   private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
   private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
 
-  /** Listeners that are called on assignment events. */
-  private final CopyOnWriteArrayList<AssignmentListener> listeners =
-      new CopyOnWriteArrayList<AssignmentListener>();
-
   private final MetricsAssignmentManager metrics;
   private final RegionInTransitionChore ritChore;
   private final MasterServices master;
@@ -216,16 +206,55 @@ public class AssignmentManager implements ServerListener {
     // it could be null in some tests
     if (zkw != null) {
       RegionState regionState = MetaTableLocator.getMetaRegionState(zkw);
-      RegionStateNode regionStateNode =
+      RegionStateNode regionNode =
         regionStates.getOrCreateRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO);
-      synchronized (regionStateNode) {
-        regionStateNode.setRegionLocation(regionState.getServerName());
-        regionStateNode.setState(regionState.getState());
+      regionNode.lock();
+      try {
+        regionNode.setRegionLocation(regionState.getServerName());
+        regionNode.setState(regionState.getState());
         setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN);
+      } finally {
+        regionNode.unlock();
       }
     }
   }
 
+  /**
+   * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode.
+   * <p>
+   * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta
+   * method below. And it is also very important as now before submitting a TRSP, we need to attach
+   * it to the RegionStateNode, which acts like a guard, so we need to restore this information at
+   * the very beginning, before we start processing any procedures.
+   */
+  public void setupRIT(List<TransitRegionStateProcedure> procs) {
+    procs.forEach(proc -> {
+      RegionInfo regionInfo = proc.getRegion();
+      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
+      TransitRegionStateProcedure existingProc = regionNode.getProcedure();
+      if (existingProc != null) {
+        // This is possible, as we will detach the procedure from the RSN before we
+        // actually finish the procedure. This is because that, we will update the region state
+        // directly in the reportTransition method for TRSP, and theoretically the region transition
+        // has been done, so we need to detach the procedure from the RSN. But actually the
+        // procedure has not been marked as done in the pv2 framework yet, so it is possible that we
+        // schedule a new TRSP immediately and when arriving here, we will find out that there are
+        // multiple TRSPs for the region. But we can make sure that, only the last one can take the
+        // charge, the previous ones should have all been finished already.
+        // So here we will compare the proc id, the greater one will win.
+        if (existingProc.getProcId() < proc.getProcId()) {
+          // the new one wins, unset and set it to the new one below
+          regionNode.unsetProcedure(existingProc);
+        } else {
+          // the old one wins, skip
+          return;
+        }
+      }
+      LOG.info("Attach {} to {} to restore RIT", proc, regionNode);
+      regionNode.setProcedure(proc);
+    });
+  }
+
   public void stop() {
     if (!running.compareAndSet(true, false)) {
       return;
@@ -288,22 +317,6 @@ public class AssignmentManager implements ServerListener {
     return assignMaxAttempts;
   }
 
-  /**
-   * Add the listener to the notification list.
-   * @param listener The AssignmentListener to register
-   */
-  public void registerListener(final AssignmentListener listener) {
-    this.listeners.add(listener);
-  }
-
-  /**
-   * Remove the listener from the notification list.
-   * @param listener The AssignmentListener to unregister
-   */
-  public boolean unregisterListener(final AssignmentListener listener) {
-    return this.listeners.remove(listener);
-  }
-
   public RegionStates getRegionStates() {
     return regionStates;
   }
@@ -513,51 +526,89 @@ public class AssignmentManager implements ServerListener {
   private List<RegionInfo> getSystemTables(ServerName serverName) {
     Set<RegionStateNode> regions = this.getRegionStates().getServerNode(serverName).getRegions();
     if (regions == null) {
-      return new ArrayList<>();
+      return Collections.emptyList();
     }
-    return regions.stream()
-        .map(RegionStateNode::getRegionInfo)
-        .filter(r -> r.getTable().isSystemTable())
-        .collect(Collectors.toList());
+    return regions.stream().map(RegionStateNode::getRegionInfo)
+      .filter(r -> r.getTable().isSystemTable()).collect(Collectors.toList());
   }
 
-  public void assign(final RegionInfo regionInfo, ServerName sn) throws IOException {
-    AssignProcedure proc = createAssignProcedure(regionInfo, sn);
-    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
+  private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
+      throws HBaseIOException {
+    if (regionNode.getProcedure() != null) {
+      throw new HBaseIOException(regionNode + " is currently in transition");
+    }
+    if (!regionNode.isInState(expectedStates)) {
+      throw new DoNotRetryRegionException("Unexpected state for " + regionNode);
+    }
+    if (getTableStateManager().isTableState(regionNode.getTable(), TableState.State.DISABLING,
+      TableState.State.DISABLED)) {
+      throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
+    }
   }
 
-  public void assign(final RegionInfo regionInfo) throws IOException {
-    AssignProcedure proc = createAssignProcedure(regionInfo);
+  public void assign(RegionInfo regionInfo, ServerName sn) throws IOException {
+    // TODO: should we use getRegionStateNode?
+    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
+    TransitRegionStateProcedure proc;
+    regionNode.lock();
+    try {
+      preTransitCheck(regionNode, RegionStates.STATES_EXPECTED_ON_OPEN);
+      proc = TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn);
+      regionNode.setProcedure(proc);
+    } finally {
+      regionNode.unlock();
+    }
     ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
   }
 
-  public void unassign(final RegionInfo regionInfo) throws IOException {
-    unassign(regionInfo, false);
+  public void assign(RegionInfo regionInfo) throws IOException {
+    assign(regionInfo, null);
   }
 
-  public void unassign(final RegionInfo regionInfo, final boolean forceNewPlan)
-  throws IOException {
-    // TODO: rename this reassign
-    RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo);
-    ServerName destinationServer = node.getRegionLocation();
-    if (destinationServer == null) {
-      throw new UnexpectedStateException("DestinationServer is null; Assigned? " + node.toString());
+  public void unassign(RegionInfo regionInfo) throws IOException {
+    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
+    if (regionNode == null) {
+      throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
+    }
+    TransitRegionStateProcedure proc;
+    regionNode.lock();
+    try {
+      preTransitCheck(regionNode, RegionStates.STATES_EXPECTED_ON_CLOSE);
+      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
+      regionNode.setProcedure(proc);
+    } finally {
+      regionNode.unlock();
     }
-    assert destinationServer != null; node.toString();
-    UnassignProcedure proc = createUnassignProcedure(regionInfo, destinationServer, forceNewPlan);
     ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
   }
 
-  public void move(final RegionInfo regionInfo) throws IOException {
-    RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo);
-    ServerName sourceServer = node.getRegionLocation();
-    RegionPlan plan = new RegionPlan(regionInfo, sourceServer, null);
-    MoveRegionProcedure proc = createMoveRegionProcedure(plan);
+  private TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
+      ServerName targetServer) throws HBaseIOException {
+    RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
+    if (regionNode == null) {
+      throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
+    }
+    TransitRegionStateProcedure proc;
+    regionNode.lock();
+    try {
+      preTransitCheck(regionNode, RegionStates.STATES_EXPECTED_ON_CLOSE);
+      regionNode.checkOnline();
+      proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
+      regionNode.setProcedure(proc);
+    } finally {
+      regionNode.unlock();
+    }
+    return proc;
+  }
+
+  public void move(RegionInfo regionInfo) throws IOException {
+    TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null);
     ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
   }
 
-  public Future<byte[]> moveAsync(final RegionPlan regionPlan) throws HBaseIOException {
-    MoveRegionProcedure proc = createMoveRegionProcedure(regionPlan);
+  public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException {
+    TransitRegionStateProcedure proc =
+      createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination());
     return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
   }
 
@@ -569,7 +620,7 @@ public class AssignmentManager implements ServerListener {
   @VisibleForTesting
   // TODO: Remove this?
   public boolean waitForAssignment(final RegionInfo regionInfo, final long timeout)
-  throws IOException {
+      throws IOException {
     RegionStateNode node = null;
     // This method can be called before the regionInfo has made it into the regionStateMap
     // so wait around here a while.
@@ -577,24 +628,27 @@ public class AssignmentManager implements ServerListener {
     // Something badly wrong if takes ten seconds to register a region.
     long endTime = startTime + 10000;
     while ((node = regionStates.getRegionStateNode(regionInfo)) == null && isRunning() &&
-        System.currentTimeMillis() < endTime) {
+      System.currentTimeMillis() < endTime) {
       // Presume it not yet added but will be added soon. Let it spew a lot so we can tell if
       // we are waiting here alot.
       LOG.debug("Waiting on " + regionInfo + " to be added to regionStateMap");
       Threads.sleep(10);
     }
     if (node == null) {
-      if (!isRunning()) return false;
-      throw new RegionException(regionInfo.getRegionNameAsString() + " never registered with Assigment.");
+      if (!isRunning()) {
+        return false;
+      }
+      throw new RegionException(
+        regionInfo.getRegionNameAsString() + " never registered with Assigment.");
     }
 
-    RegionTransitionProcedure proc = node.getProcedure();
+    TransitRegionStateProcedure proc = node.getProcedure();
     if (proc == null) {
       throw new NoSuchProcedureException(node.toString());
     }
 
-    ProcedureSyncWait.waitForProcedureToCompleteIOE(
-      master.getMasterProcedureExecutor(), proc, timeout);
+    ProcedureSyncWait.waitForProcedureToCompleteIOE(master.getMasterProcedureExecutor(), proc,
+      timeout);
     return true;
   }
 
@@ -604,22 +658,23 @@ public class AssignmentManager implements ServerListener {
 
   /**
    * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
-   * @return AssignProcedures made out of the passed in <code>hris</code> and a call
-   * to the balancer to populate the assigns with targets chosen using round-robin (default
-   * balancer scheme). If at assign-time, the target chosen is no longer up, thats fine,
-   * the AssignProcedure will ask the balancer for a new target, and so on.
+   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
+   *         to populate the assigns with targets chosen using round-robin (default balancer
+   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
+   *         AssignProcedure will ask the balancer for a new target, and so on.
    */
-  public AssignProcedure[] createRoundRobinAssignProcedures(final List<RegionInfo> hris) {
+  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(
+      List<RegionInfo> hris) {
     if (hris.isEmpty()) {
-      return null;
+      return new TransitRegionStateProcedure[0];
     }
     try {
       // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
       // a better job if it has all the assignments in the one lump.
       Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
-          this.master.getServerManager().createDestinationServersList(null));
+        this.master.getServerManager().createDestinationServersList(null));
       // Return mid-method!
-      return createAssignProcedures(assignments, hris.size());
+      return createAssignProcedures(assignments);
     } catch (HBaseIOException hioe) {
       LOG.warn("Failed roundRobinAssignment", hioe);
     }
@@ -627,130 +682,97 @@ public class AssignmentManager implements ServerListener {
     return createAssignProcedures(hris);
   }
 
-  /**
-   * Create an array of AssignProcedures w/o specifying a target server.
-   * If no target server, at assign time, we will try to use the former location of the region
-   * if one exists. This is how we 'retain' the old location across a server restart.
-   * Used by {@link ServerCrashProcedure} assigning regions on a server that has crashed (SCP is
-   * also used across a cluster-restart just-in-case to ensure we do cleanup of any old WALs or
-   * server processes).
-   */
-  public AssignProcedure[] createAssignProcedures(final List<RegionInfo> hris) {
-    if (hris.isEmpty()) {
-      return null;
-    }
-    int index = 0;
-    AssignProcedure [] procedures = new AssignProcedure[hris.size()];
-    for (RegionInfo hri : hris) {
-      // Sort the procedures so meta and system regions are first in the returned array.
-      procedures[index++] = createAssignProcedure(hri);
-    }
-    if (procedures.length > 1) {
-      // Sort the procedures so meta and system regions are first in the returned array.
-      Arrays.sort(procedures, AssignProcedure.COMPARATOR);
-    }
-    return procedures;
-  }
-
-  // Make this static for the method below where we use it typing the AssignProcedure array we
-  // return as result.
-  private static final AssignProcedure [] ASSIGN_PROCEDURE_ARRAY_TYPE = new AssignProcedure[] {};
-
-  /**
-   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
-   * @param size Count of assignments to make (the caller may know the total count)
-   * @return Assignments made from the passed in <code>assignments</code>
-   */
-  private AssignProcedure[] createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments,
-      int size) {
-    List<AssignProcedure> procedures = new ArrayList<>(size > 0? size: 8/*Arbitrary*/);
-    for (Map.Entry<ServerName, List<RegionInfo>> e: assignments.entrySet()) {
-      for (RegionInfo ri: e.getValue()) {
-        AssignProcedure ap = createAssignProcedure(ri, e.getKey());
-        ap.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
-        procedures.add(ap);
+  @VisibleForTesting
+  static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
+    if (left.getRegion().isMetaRegion()) {
+      if (right.getRegion().isMetaRegion()) {
+        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
       }
+      return -1;
+    } else if (right.getRegion().isMetaRegion()) {
+      return +1;
     }
-    if (procedures.size() > 1) {
-      // Sort the procedures so meta and system regions are first in the returned array.
-      procedures.sort(AssignProcedure.COMPARATOR);
+    if (left.getRegion().getTable().isSystemTable()) {
+      if (right.getRegion().getTable().isSystemTable()) {
+        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
+      }
+      return -1;
+    } else if (right.getRegion().getTable().isSystemTable()) {
+      return +1;
     }
-    return procedures.toArray(ASSIGN_PROCEDURE_ARRAY_TYPE);
+    return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
   }
 
-  // Needed for the following method so it can type the created Array we retur n
-  private static final UnassignProcedure [] UNASSIGN_PROCEDURE_ARRAY_TYPE =
-      new UnassignProcedure[0];
-
-  UnassignProcedure[] createUnassignProcedures(final Collection<RegionStateNode> nodes) {
-    if (nodes.isEmpty()) return null;
-    final List<UnassignProcedure> procs = new ArrayList<UnassignProcedure>(nodes.size());
-    for (RegionStateNode node: nodes) {
-      if (!this.regionStates.include(node, false)) continue;
-      // Look for regions that are offline/closed; i.e. already unassigned.
-      if (this.regionStates.isRegionOffline(node.getRegionInfo())) continue;
-      assert node.getRegionLocation() != null: node.toString();
-      procs.add(createUnassignProcedure(node.getRegionInfo(), node.getRegionLocation(), false));
+  private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode,
+      ServerName targetServer) {
+    TransitRegionStateProcedure proc;
+    regionNode.lock();
+    try {
+      assert regionNode.getProcedure() == null;
+      proc = TransitRegionStateProcedure.assign(getProcedureEnvironment(),
+        regionNode.getRegionInfo(), targetServer);
+      regionNode.setProcedure(proc);
+    } finally {
+      regionNode.unlock();
     }
-    return procs.toArray(UNASSIGN_PROCEDURE_ARRAY_TYPE);
+    return proc;
   }
 
   /**
-   * Called by things like DisableTableProcedure to get a list of UnassignProcedure
-   * to unassign the regions of the table.
+   * Create an array of TransitRegionStateProcedure w/o specifying a target server.
+   * <p/>
+   * If no target server, at assign time, we will try to use the former location of the region if
+   * one exists. This is how we 'retain' the old location across a server restart.
+   * <p/>
+   * Should only be called when you can make sure that no one can touch these regions other than
+   * you. For example, when you are creating table.
    */
-  public UnassignProcedure[] createUnassignProcedures(final TableName tableName) {
-    return createUnassignProcedures(regionStates.getTableRegionStateNodes(tableName));
+  public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) {
+    return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
+      .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare)
+      .toArray(TransitRegionStateProcedure[]::new);
   }
 
-  public AssignProcedure createAssignProcedure(final RegionInfo regionInfo) {
-    AssignProcedure proc = new AssignProcedure(regionInfo);
-    proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
-    return proc;
-  }
-
-  public AssignProcedure createAssignProcedure(final RegionInfo regionInfo,
-      final ServerName targetServer) {
-    AssignProcedure proc = new AssignProcedure(regionInfo, targetServer);
-    proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
-    return proc;
-  }
-
-  UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo,
-      final ServerName destinationServer, final boolean force) {
-    return createUnassignProcedure(regionInfo, destinationServer, force, false);
-  }
-
-  UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo,
-      final ServerName destinationServer, final boolean force,
-      final boolean removeAfterUnassigning) {
-    // If destinationServer is null, figure it.
-    ServerName sn = destinationServer != null? destinationServer:
-        getRegionStates().getRegionState(regionInfo).getServerName();
-    assert sn != null;
-    UnassignProcedure proc = new UnassignProcedure(regionInfo, sn, force, removeAfterUnassigning);
-    proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
-    return proc;
+  /**
+   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
+   * @return Assignments made from the passed in <code>assignments</code>
+   */
+  private TransitRegionStateProcedure[] createAssignProcedures(
+      Map<ServerName, List<RegionInfo>> assignments) {
+    return assignments.entrySet().stream()
+      .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
+        .map(regionNode -> createAssignProcedure(regionNode, e.getKey())))
+      .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
   }
 
-  private MoveRegionProcedure createMoveRegionProcedure(RegionPlan plan) throws HBaseIOException {
-    if (plan.getRegionInfo().getTable().isSystemTable()) {
-      List<ServerName> exclude = getExcludedServersForSystemTable();
-      if (plan.getDestination() != null && exclude.contains(plan.getDestination())) {
-        try {
-          LOG.info("Can not move " + plan.getRegionInfo() + " to " + plan.getDestination() +
-            " because the server is not with highest version");
-          plan.setDestination(getBalancer().randomAssignment(plan.getRegionInfo(),
-            this.master.getServerManager().createDestinationServersList(exclude)));
-        } catch (HBaseIOException e) {
-          LOG.warn(e.toString(), e);
+  /**
+   * Called by DisableTableProcedure to unassign all the regions for a table.
+   */
+  public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) {
+    return regionStates.getTableRegionStateNodes(tableName).stream().map(regionNode -> {
+      regionNode.lock();
+      try {
+        if (!regionStates.include(regionNode, false) ||
+          regionStates.isRegionOffline(regionNode.getRegionInfo())) {
+          return null;
+        }
+        // As in DisableTableProcedure, we will hold the xlock for table, so we can make sure that
+        // this procedure has not been executed yet, as TRSP will hold the shared lock for table all
+        // the time. So here we will unset it and when it is actually executed, it will find that
+        // the attach procedure is not itself and quit immediately.
+        if (regionNode.getProcedure() != null) {
+          regionNode.unsetProcedure(regionNode.getProcedure());
         }
+        TransitRegionStateProcedure proc = TransitRegionStateProcedure
+          .unassign(getProcedureEnvironment(), regionNode.getRegionInfo());
+        regionNode.setProcedure(proc);
+        return proc;
+      } finally {
+        regionNode.unlock();
       }
-    }
-    return new MoveRegionProcedure(getProcedureEnvironment(), plan, true);
+    }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
   }
 
-
   public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
       final byte[] splitKey) throws IOException {
     return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
@@ -780,8 +802,7 @@ public class AssignmentManager implements ServerListener {
   // ============================================================================================
   // TODO: Move this code in MasterRpcServices and call on specific event?
   public ReportRegionStateTransitionResponse reportRegionStateTransition(
-      final ReportRegionStateTransitionRequest req)
-  throws PleaseHoldException {
+      final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
     final ReportRegionStateTransitionResponse.Builder builder =
         ReportRegionStateTransitionResponse.newBuilder();
     final ServerName serverName = ProtobufUtil.toServerName(req.getServer());
@@ -819,7 +840,7 @@ public class AssignmentManager implements ServerListener {
         }
       }
     } catch (PleaseHoldException e) {
-      if (LOG.isTraceEnabled()) LOG.trace("Failed transition " + e.getMessage());
+      LOG.trace("Failed transition ", e);
       throw e;
     } catch (UnsupportedOperationException|IOException e) {
       // TODO: at the moment we have a single error message and the RS will abort
@@ -830,56 +851,53 @@ public class AssignmentManager implements ServerListener {
     return builder.build();
   }
 
-  private void updateRegionTransition(final ServerName serverName, final TransitionCode state,
-      final RegionInfo regionInfo, final long seqId)
-      throws PleaseHoldException, UnexpectedStateException {
+  private void updateRegionTransition(ServerName serverName, TransitionCode state,
+      RegionInfo regionInfo, long seqId) throws IOException {
     checkMetaLoaded(regionInfo);
 
-    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
+    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
     if (regionNode == null) {
       // the table/region is gone. maybe a delete, split, merge
       throw new UnexpectedStateException(String.format(
         "Server %s was trying to transition region %s to %s. but the region was removed.",
         serverName, regionInfo, state));
     }
+    LOG.trace("Update region transition serverName={} region={} regionState={}", serverName,
+      regionNode, state);
 
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(String.format("Update region transition serverName=%s region=%s regionState=%s",
-        serverName, regionNode, state));
-    }
-
-    final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
-    if (!reportTransition(regionNode, serverNode, state, seqId)) {
-      // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
-      // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
-      //   rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
-      //   table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
-      //   to CLOSED
-      // These happen because on cluster shutdown, we currently let the RegionServers close
-      // regions. This is the only time that region close is not run by the Master (so cluster
-      // goes down fast). Consider changing it so Master runs all shutdowns.
-      if (this.master.getServerManager().isClusterShutdown() &&
+    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
+    regionNode.lock();
+    try {
+      if (!reportTransition(regionNode, serverNode, state, seqId)) {
+        // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
+        // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
+        // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
+        // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
+        // to CLOSED
+        // These happen because on cluster shutdown, we currently let the RegionServers close
+        // regions. This is the only time that region close is not run by the Master (so cluster
+        // goes down fast). Consider changing it so Master runs all shutdowns.
+        if (this.master.getServerManager().isClusterShutdown() &&
           state.equals(TransitionCode.CLOSED)) {
-        LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
-      } else {
-        LOG.warn("No matching procedure found for {} transition to {}", regionNode, state);
+          LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
+        } else {
+          LOG.warn("No matching procedure found for {} transition to {}", regionNode, state);
+        }
       }
+    } finally {
+      regionNode.unlock();
     }
   }
 
-  // FYI: regionNode is sometimes synchronized by the caller but not always.
-  private boolean reportTransition(final RegionStateNode regionNode,
-      final ServerStateNode serverNode, final TransitionCode state, final long seqId)
-      throws UnexpectedStateException {
-    final ServerName serverName = serverNode.getServerName();
-    synchronized (regionNode) {
-      final RegionTransitionProcedure proc = regionNode.getProcedure();
-      if (proc == null) return false;
-
-      // serverNode.getReportEvent().removeProcedure(proc);
-      proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(),
-        serverName, state, seqId);
+  private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
+      TransitionCode state, long seqId) throws IOException {
+    ServerName serverName = serverNode.getServerName();
+    TransitRegionStateProcedure proc = regionNode.getProcedure();
+    if (proc == null) {
+      return false;
     }
+    proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
+      serverName, state, seqId);
     return true;
   }
 
@@ -984,10 +1002,9 @@ public class AssignmentManager implements ServerListener {
     wakeServerReportEvent(serverNode);
   }
 
-  void checkOnlineRegionsReportForMeta(final ServerStateNode serverNode,
-      final Set<byte[]> regionNames) {
+  void checkOnlineRegionsReportForMeta(ServerStateNode serverNode, Set<byte[]> regionNames) {
     try {
-      for (byte[] regionName: regionNames) {
+      for (byte[] regionName : regionNames) {
         final RegionInfo hri = getMetaRegionFromName(regionName);
         if (hri == null) {
           if (LOG.isTraceEnabled()) {
@@ -997,18 +1014,23 @@ public class AssignmentManager implements ServerListener {
           continue;
         }
 
-        final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
+        RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
         LOG.info("META REPORTED: " + regionNode);
-        if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) {
-          LOG.warn("META REPORTED but no procedure found (complete?); set location=" +
+        regionNode.lock();
+        try {
+          if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) {
+            LOG.warn("META REPORTED but no procedure found (complete?); set location=" +
               serverNode.getServerName());
-          regionNode.setRegionLocation(serverNode.getServerName());
-        } else if (LOG.isTraceEnabled()) {
-          LOG.trace("META REPORTED: " + regionNode);
+            regionNode.setRegionLocation(serverNode.getServerName());
+          } else if (LOG.isTraceEnabled()) {
+            LOG.trace("META REPORTED: " + regionNode);
+          }
+        } finally {
+          regionNode.unlock();
         }
       }
-    } catch (UnexpectedStateException e) {
-      final ServerName serverName = serverNode.getServerName();
+    } catch (IOException e) {
+      ServerName serverName = serverNode.getServerName();
       LOG.warn("KILLING " + serverName + ": " + e.getMessage());
       killRegionServer(serverNode);
     }
@@ -1019,12 +1041,15 @@ public class AssignmentManager implements ServerListener {
     final ServerName serverName = serverNode.getServerName();
     try {
       for (byte[] regionName: regionNames) {
-        if (!isRunning()) return;
+        if (!isRunning()) {
+          return;
+        }
         final RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
         if (regionNode == null) {
           throw new UnexpectedStateException("Not online: " + Bytes.toStringBinary(regionName));
         }
-        synchronized (regionNode) {
+        regionNode.lock();
+        try {
           if (regionNode.isInState(State.OPENING, State.OPEN)) {
             if (!regionNode.getRegionLocation().equals(serverName)) {
               throw new UnexpectedStateException(regionNode.toString() +
@@ -1050,9 +1075,11 @@ public class AssignmentManager implements ServerListener {
                 " reported an unexpected OPEN; time since last update=" + diff);
             }
           }
+        } finally {
+          regionNode.unlock();
         }
       }
-    } catch (UnexpectedStateException e) {
+    } catch (IOException e) {
       LOG.warn("Killing " + serverName + ": " + e.getMessage());
       killRegionServer(serverNode);
       throw (YouAreDeadException)new YouAreDeadException(e.getMessage()).initCause(e);
@@ -1267,29 +1294,20 @@ public class AssignmentManager implements ServerListener {
 
           localState = State.OFFLINE;
         }
-        final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
-        synchronized (regionNode) {
-          if (!regionNode.isInTransition()) {
-            regionNode.setState(localState);
-            regionNode.setLastHost(lastHost);
-            regionNode.setRegionLocation(regionLocation);
-            regionNode.setOpenSeqNum(openSeqNum);
-
-            if (localState == State.OPEN) {
-              assert regionLocation != null : "found null region location for " + regionNode;
-              regionStates.addRegionToServer(regionNode);
-            } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
-              regionStates.addToOfflineRegions(regionNode);
-            } else if (localState == State.CLOSED && getTableStateManager().
-                isTableState(regionNode.getTable(), TableState.State.DISABLED,
-                TableState.State.DISABLING)) {
-              // The region is CLOSED and the table is DISABLED/ DISABLING, there is nothing to
-              // schedule; the region is inert.
-            } else {
-              // These regions should have a procedure in replay
-              regionStates.addRegionInTransition(regionNode, null);
-            }
-          }
+        RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
+        // Do not need to lock on regionNode, as we can make sure that before we finish loading
+        // meta, all the related procedures can not be executed. The only exception is formeta
+        // region related operations, but here we do not load the informations for meta region.
+        regionNode.setState(localState);
+        regionNode.setLastHost(lastHost);
+        regionNode.setRegionLocation(regionLocation);
+        regionNode.setOpenSeqNum(openSeqNum);
+
+        if (localState == State.OPEN) {
+          assert regionLocation != null : "found null region location for " + regionNode;
+          regionStates.addRegionToServer(regionNode);
+        } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
+          regionStates.addToOfflineRegions(regionNode);
         }
       }
     });
@@ -1335,9 +1353,11 @@ public class AssignmentManager implements ServerListener {
   }
 
   public void offlineRegion(final RegionInfo regionInfo) {
-    // TODO used by MasterRpcServices ServerCrashProcedure
-    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
-    if (node != null) node.offline();
+    // TODO used by MasterRpcServices
+    RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
+    if (node != null) {
+      node.offline();
+    }
   }
 
   public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
@@ -1373,16 +1393,6 @@ public class AssignmentManager implements ServerListener {
   // ============================================================================================
   //  TODO: Region State In Transition
   // ============================================================================================
-  protected boolean addRegionInTransition(final RegionStateNode regionNode,
-      final RegionTransitionProcedure procedure) {
-    return regionStates.addRegionInTransition(regionNode, procedure);
-  }
-
-  protected void removeRegionInTransition(final RegionStateNode regionNode,
-      final RegionTransitionProcedure procedure) {
-    regionStates.removeRegionInTransition(regionNode, procedure);
-  }
-
   public boolean hasRegionsInTransition() {
     return regionStates.hasRegionsInTransition();
   }
@@ -1401,102 +1411,82 @@ public class AssignmentManager implements ServerListener {
   }
 
   // ============================================================================================
-  //  TODO: Region Status update
+  //  Region Status update
+  //  Should only be called in TransitRegionStateProcedure
   // ============================================================================================
-  private void sendRegionOpenedNotification(final RegionInfo regionInfo,
-      final ServerName serverName) {
-    getBalancer().regionOnline(regionInfo, serverName);
-    if (!this.listeners.isEmpty()) {
-      for (AssignmentListener listener : this.listeners) {
-        listener.regionOpened(regionInfo, serverName);
-      }
-    }
-  }
-
-  private void sendRegionClosedNotification(final RegionInfo regionInfo) {
-    getBalancer().regionOffline(regionInfo);
-    if (!this.listeners.isEmpty()) {
-      for (AssignmentListener listener : this.listeners) {
-        listener.regionClosed(regionInfo);
-      }
-    }
-  }
 
-  public void markRegionAsOpening(final RegionStateNode regionNode) throws IOException {
-    synchronized (regionNode) {
-      regionNode.transitionState(State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN);
-      regionStates.addRegionToServer(regionNode);
-      regionStateStore.updateRegionLocation(regionNode);
-    }
+  // should be called within the synchronized block of RegionStateNode
+  void regionOpening(RegionStateNode regionNode) throws IOException {
+    regionNode.transitionState(State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN);
+    regionStateStore.updateRegionLocation(regionNode);
 
+    regionStates.addRegionToServer(regionNode);
     // update the operation count metrics
     metrics.incrementOperationCounter();
   }
 
-  public void undoRegionAsOpening(final RegionStateNode regionNode) {
-    boolean opening = false;
-    synchronized (regionNode) {
-      if (regionNode.isInState(State.OPENING)) {
-        opening = true;
-        regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
-      }
-      // Should we update hbase:meta?
+  // should be called within the synchronized block of RegionStateNode.
+  // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
+  // we will persist the FAILED_OPEN state into hbase:meta.
+  void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
+    if (regionNode.getRegionLocation() != null) {
+      regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
     }
-    if (opening) {
-      // TODO: Metrics. Do opposite of metrics.incrementOperationCounter();
+    if (giveUp) {
+      regionNode.setState(State.FAILED_OPEN);
+      regionNode.setRegionLocation(null);
+      regionStateStore.updateRegionLocation(regionNode);
     }
   }
 
-  public void markRegionAsOpened(final RegionStateNode regionNode) throws IOException {
-    final RegionInfo hri = regionNode.getRegionInfo();
-    synchronized (regionNode) {
-      regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN);
-      if (isMetaRegion(hri)) {
-        // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
-        // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
-        // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
-        // on table that contains state.
-        setMetaAssigned(hri, true);
-      }
-      regionStates.addRegionToServer(regionNode);
-      // TODO: OPENING Updates hbase:meta too... we need to do both here and there?
-      // That is a lot of hbase:meta writing.
-      regionStateStore.updateRegionLocation(regionNode);
-      sendRegionOpenedNotification(hri, regionNode.getRegionLocation());
+  // should be called within the synchronized block of RegionStateNode
+  void regionOpened(RegionStateNode regionNode) throws IOException {
+    regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN);
+    // TODO: OPENING Updates hbase:meta too... we need to do both here and there?
+    // That is a lot of hbase:meta writing.
+    regionStateStore.updateRegionLocation(regionNode);
+
+    RegionInfo hri = regionNode.getRegionInfo();
+    if (isMetaRegion(hri)) {
+      // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
+      // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
+      // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
+      // on table that contains state.
+      setMetaAssigned(hri, true);
     }
+    regionStates.addRegionToServer(regionNode);
+    regionStates.removeFromFailedOpen(hri);
   }
 
-  public void markRegionAsClosing(final RegionStateNode regionNode) throws IOException {
-    final RegionInfo hri = regionNode.getRegionInfo();
-    synchronized (regionNode) {
-      regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE);
-      // Set meta has not initialized early. so people trying to create/edit tables will wait
-      if (isMetaRegion(hri)) {
-        setMetaAssigned(hri, false);
-      }
-      regionStates.addRegionToServer(regionNode);
-      regionStateStore.updateRegionLocation(regionNode);
-    }
+  // should be called within the synchronized block of RegionStateNode
+  void regionClosing(RegionStateNode regionNode) throws IOException {
+    regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE);
+    regionStateStore.updateRegionLocation(regionNode);
 
+    RegionInfo hri = regionNode.getRegionInfo();
+    // Set meta has not initialized early. so people trying to create/edit tables will wait
+    if (isMetaRegion(hri)) {
+      setMetaAssigned(hri, false);
+    }
+    regionStates.addRegionToServer(regionNode);
     // update the operation count metrics
     metrics.incrementOperationCounter();
   }
 
-  public void undoRegionAsClosing(final RegionStateNode regionNode) {
-    // TODO: Metrics. Do opposite of metrics.incrementOperationCounter();
-    // There is nothing to undo?
-  }
-
-  public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException {
-    final RegionInfo hri = regionNode.getRegionInfo();
-    synchronized (regionNode) {
-      regionNode.transitionState(State.CLOSED, RegionStates.STATES_EXPECTED_ON_CLOSE);
-      regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
-      regionNode.setLastHost(regionNode.getRegionLocation());
+  // should be called within the synchronized block of RegionStateNode
+  // The parameter 'normally' means whether we are closed cleanly, if it is true, then it means that
+  // we are closed due to a RS crash.
+  void regionClosed(RegionStateNode regionNode, boolean normally) throws IOException {
+    regionNode.transitionState(normally ? State.CLOSED : State.ABNORMALLY_CLOSED,
+      RegionStates.STATES_EXPECTED_ON_CLOSE);
+    ServerName loc = regionNode.getRegionLocation();
+    if (loc != null) {
+      // could be a retry so add a check here to avoid set the lastHost to null.
+      regionNode.setLastHost(loc);
       regionNode.setRegionLocation(null);
-      regionStateStore.updateRegionLocation(regionNode);
-      sendRegionClosedNotification(hri);
+      regionStates.removeRegionFromServer(loc, regionNode);
     }
+    regionStateStore.updateRegionLocation(regionNode);
   }
 
   public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
@@ -1829,76 +1819,4 @@ public class AssignmentManager implements ServerListener {
   private void killRegionServer(final ServerStateNode serverNode) {
     master.getServerManager().expireServer(serverNode.getServerName());
   }
-
-  /**
-   * <p>
-   * This is a very particular check. The {@link org.apache.hadoop.hbase.master.ServerManager} is
-   * where you go to check on state of 'Servers', what Servers are online, etc.
-   * </p>
-   * <p>
-   * Here we are checking the state of a server that is post expiration, a ServerManager function
-   * that moves a server from online to dead. Here we are seeing if the server has moved beyond a
-   * particular point in the recovery process such that it is safe to move on with assigns; etc.
-   * </p>
-   * <p>
-   * For now it is only used in
-   * {@link UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException)} to
-   * see whether we can safely quit without losing data.
-   * </p>
-   * @param meta whether to check for meta log splitting
-   * @return {@code true} if the server does not exist or the log splitting is done, i.e, the server
-   *         is in OFFLINE state, or for meta log, is in SPLITTING_META_DONE state. If null,
-   *         presumes the ServerStateNode was cleaned up by SCP.
-   * @see UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException)
-   */
-  boolean isLogSplittingDone(ServerName serverName, boolean meta) {
-    ServerStateNode ssn = this.regionStates.getServerNode(serverName);
-    if (ssn == null) {
-      return true;
-    }
-    ServerState[] inState =
-      meta
-        ? new ServerState[] { ServerState.SPLITTING_META_DONE, ServerState.SPLITTING,
-          ServerState.OFFLINE }
-        : new ServerState[] { ServerState.OFFLINE };
-    synchronized (ssn) {
-      return ssn.isInState(inState);
-    }
-  }
-
-  /**
-   * Handle RIT of meta region against crashed server.
-   * Only used when ServerCrashProcedure is not enabled.
-   * See handleRIT in ServerCrashProcedure for similar function.
-   *
-   * @param serverName Server that has already crashed
-   */
-  public void handleMetaRITOnCrashedServer(ServerName serverName) {
-    RegionInfo hri = RegionReplicaUtil
-        .getRegionInfoForReplica(RegionInfoBuilder.FIRST_META_REGIONINFO,
-            RegionInfo.DEFAULT_REPLICA_ID);
-    RegionState regionStateNode = getRegionStates().getRegionState(hri);
-    if (regionStateNode == null) {
-      LOG.warn("RegionStateNode is null for " + hri);
-      return;
-    }
-    ServerName rsnServerName = regionStateNode.getServerName();
-    if (rsnServerName != null && !rsnServerName.equals(serverName)) {
-      return;
-    } else if (rsnServerName == null) {
-      LOG.warn("Empty ServerName in RegionStateNode; proceeding anyways in case latched " +
-          "RecoverMetaProcedure so meta latch gets cleaned up.");
-    }
-    // meta has been assigned to crashed server.
-    LOG.info("Meta assigned to crashed " + serverName + "; reassigning...");
-    // Handle failure and wake event
-    RegionTransitionProcedure rtp = getRegionStates().getRegionTransitionProcedure(hri);
-    // Do not need to consider for REGION_TRANSITION_QUEUE step
-    if (rtp != null && rtp.isMeta() &&
-        rtp.getTransitionState() == RegionTransitionState.REGION_TRANSITION_DISPATCH) {
-      LOG.debug("Failing " + rtp.toString());
-      rtp.remoteCallFailed(master.getMasterProcedureExecutor().getEnvironment(), serverName,
-          new ServerCrashException(rtp.getProcId(), serverName));
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java
new file mode 100644
index 0000000..d0dca09
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.favored.FavoredNodesManager;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+
+/**
+ * Utility for this assignment package only.
+ */
+@InterfaceAudience.Private
+final class AssignmentManagerUtil {
+  private AssignmentManagerUtil() {
+  }
+
+  /**
+   * Raw call to remote regionserver to get info on a particular region.
+   * @throws IOException Let it out so can report this IOE as reason for failure
+   */
+  static GetRegionInfoResponse getRegionInfoResponse(final MasterProcedureEnv env,
+      final ServerName regionLocation, final RegionInfo hri) throws IOException {
+    return getRegionInfoResponse(env, regionLocation, hri, false);
+  }
+
+  static GetRegionInfoResponse getRegionInfoResponse(final MasterProcedureEnv env,
+      final ServerName regionLocation, final RegionInfo hri, boolean includeBestSplitRow)
+      throws IOException {
+    // TODO: There is no timeout on this controller. Set one!
+    HBaseRpcController controller =
+      env.getMasterServices().getClusterConnection().getRpcControllerFactory().newController();
+    final AdminService.BlockingInterface admin =
+      env.getMasterServices().getClusterConnection().getAdmin(regionLocation);
+    GetRegionInfoRequest request = null;
+    if (includeBestSplitRow) {
+      request = RequestConverter.buildGetRegionInfoRequest(hri.getRegionName(), false, true);
+    } else {
+      request = RequestConverter.buildGetRegionInfoRequest(hri.getRegionName());
+    }
+    try {
+      return admin.getRegionInfo(controller, request);
+    } catch (ServiceException e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  private static void lock(List<RegionStateNode> regionNodes) {
+    regionNodes.iterator().forEachRemaining(RegionStateNode::lock);
+  }
+
+  private static void unlock(List<RegionStateNode> regionNodes) {
+    for (ListIterator<RegionStateNode> iter = regionNodes.listIterator(regionNodes.size()); iter
+      .hasPrevious();) {
+      iter.previous().unlock();
+    }
+  }
+
+  static TransitRegionStateProcedure[] createUnassignProceduresForSplitOrMerge(
+      MasterProcedureEnv env, Stream<RegionInfo> regions, int regionReplication)
+      throws IOException {
+    List<RegionStateNode> regionNodes = regions
+      .flatMap(hri -> IntStream.range(0, regionReplication)
+        .mapToObj(i -> RegionReplicaUtil.getRegionInfoForReplica(hri, i)))
+      .map(env.getAssignmentManager().getRegionStates()::getOrCreateRegionStateNode)
+      .collect(Collectors.toList());
+    TransitRegionStateProcedure[] procs = new TransitRegionStateProcedure[regionNodes.size()];
+    boolean rollback = true;
+    int i = 0;
+    // hold the lock at once, and then release it in finally. This is important as SCP may jump in
+    // if we release the lock in the middle when we want to do rollback, and cause problems.
+    lock(regionNodes);
+    try {
+      for (; i < procs.length; i++) {
+        RegionStateNode regionNode = regionNodes.get(i);
+        TransitRegionStateProcedure proc =
+          TransitRegionStateProcedure.unassign(env, regionNode.getRegionInfo());
+        if (regionNode.getProcedure() != null) {
+          throw new HBaseIOException(
+            "The parent region " + regionNode + " is currently in transition, give up");
+        }
+        regionNode.setProcedure(proc);
+        procs[i] = proc;
+      }
+      // all succeeded, set rollback to false
+      rollback = false;
+    } finally {
+      if (rollback) {
+        for (;;) {
+          i--;
+          if (i < 0) {
+            break;
+          }
+          RegionStateNode regionNode = regionNodes.get(i);
+          regionNode.unsetProcedure(procs[i]);
+        }
+      }
+      unlock(regionNodes);
+    }
+    return procs;
+  }
+
+  private static TransitRegionStateProcedure[] createAssignProcedures(MasterProcedureEnv env,
+      Stream<RegionInfo> regions, int regionReplication, ServerName targetServer) {
+    return regions
+      .flatMap(hri -> IntStream.range(0, regionReplication)
+        .mapToObj(i -> RegionReplicaUtil.getRegionInfoForReplica(hri, i)))
+      .map(env.getAssignmentManager().getRegionStates()::getOrCreateRegionStateNode)
+      .map(regionNode -> {
+        TransitRegionStateProcedure proc =
+          TransitRegionStateProcedure.assign(env, regionNode.getRegionInfo(), targetServer);
+        regionNode.lock();
+        try {
+          // should never fail, as we have the exclusive region lock, and the region is newly
+          // created, or has been successfully closed so should not be on any servers, so SCP will
+          // not process it either.
+          assert !regionNode.isInTransition();
+          regionNode.setProcedure(proc);
+        } finally {
+          regionNode.unlock();
+        }
+        return proc;
+      }).toArray(TransitRegionStateProcedure[]::new);
+  }
+
+  static TransitRegionStateProcedure[] createAssignProceduresForOpeningNewRegions(
+      MasterProcedureEnv env, Stream<RegionInfo> regions, int regionReplication,
+      ServerName targetServer) {
+    return createAssignProcedures(env, regions, regionReplication, targetServer);
+  }
+
+  static void reopenRegionsForRollback(MasterProcedureEnv env, Stream<RegionInfo> regions,
+      int regionReplication, ServerName targetServer) {
+    TransitRegionStateProcedure[] procs =
+      createAssignProcedures(env, regions, regionReplication, targetServer);
+    env.getMasterServices().getMasterProcedureExecutor().submitProcedures(procs);
+  }
+
+  static void removeNonDefaultReplicas(MasterProcedureEnv env, Stream<RegionInfo> regions,
+      int regionReplication) {
+    // Remove from in-memory states
+    regions.flatMap(hri -> IntStream.range(1, regionReplication)
+      .mapToObj(i -> RegionReplicaUtil.getRegionInfoForReplica(hri, i))).forEach(hri -> {
+        env.getAssignmentManager().getRegionStates().deleteRegion(hri);
+        env.getMasterServices().getServerManager().removeRegion(hri);
+        FavoredNodesManager fnm = env.getMasterServices().getFavoredNodesManager();
+        if (fnm != null) {
+          fnm.deleteFavoredNodesForRegions(Collections.singletonList(hri));
+        }
+      });
+  }
+
+  static void checkClosedRegion(MasterProcedureEnv env, RegionInfo regionInfo) throws IOException {
+    if (WALSplitter.hasRecoveredEdits(env.getMasterServices().getFileSystem(),
+      env.getMasterConfiguration(), regionInfo)) {
+      throw new IOException("Recovered.edits are found in Region: " + regionInfo +
+        ", abort split/merge to prevent data loss");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/CloseRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/CloseRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/CloseRegionProcedure.java
new file mode 100644
index 0000000..e446e17
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/CloseRegionProcedure.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.CloseRegionProcedureStateData;
+
+/**
+ * The remote procedure used to close a region.
+ */
+@InterfaceAudience.Private
+public class CloseRegionProcedure extends RegionRemoteProcedureBase {
+
+  // For a region move operation, we will assign the region after we unassign it, this is the target
+  // server for the subsequent assign. We will send this value to RS, and RS will record the region
+  // in a Map to tell client that where the region has been moved to. Can be null. And also, can be
+  // wrong(but do not make it wrong intentionally). The client can handle this error.
+  private ServerName assignCandidate;
+
+  public CloseRegionProcedure() {
+    super();
+  }
+
+  public CloseRegionProcedure(RegionInfo region, ServerName targetServer,
+      ServerName assignCandidate) {
+    super(region, targetServer);
+    this.assignCandidate = assignCandidate;
+  }
+
+  @Override
+  public TableOperationType getTableOperationType() {
+    return TableOperationType.REGION_UNASSIGN;
+  }
+
+  @Override
+  public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
+    return new RegionCloseOperation(this, region, assignCandidate);
+  }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+    super.serializeStateData(serializer);
+    CloseRegionProcedureStateData.Builder builder = CloseRegionProcedureStateData.newBuilder();
+    if (assignCandidate != null) {
+      builder.setAssignCandidate(ProtobufUtil.toServerName(assignCandidate));
+    }
+    serializer.serialize(builder.build());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+    super.deserializeStateData(serializer);
+    CloseRegionProcedureStateData data =
+      serializer.deserialize(CloseRegionProcedureStateData.class);
+    if (data.hasAssignCandidate()) {
+      assignCandidate = ProtobufUtil.toServerName(data.getAssignCandidate());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
index 20ae444..ff2ba5b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.master.assignment;
 
 import java.io.IOException;
@@ -23,7 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-
+import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.exceptions.MergeRegionException;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -64,7 +62,9 @@ import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@@ -72,10 +72,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.M
 
 /**
  * The procedure to Merge a region in a table.
+ * <p/>
  * This procedure takes an exclusive table lock since it is working over multiple regions.
+ * <p/>
  * It holds the lock for the life of the procedure.
- * <p>Throws exception on construction if determines context hostile to merge (cluster going
- * down or master is shutting down or table is disabled).</p>
+ * <p/>
+ * Throws exception on construction if determines context hostile to merge (cluster going down or
+ * master is shutting down or table is disabled).
  */
 @InterfaceAudience.Private
 public class MergeTableRegionsProcedure
@@ -216,6 +219,20 @@ public class MergeTableRegionsProcedure
     return rid;
   }
 
+
+  private void removeNonDefaultReplicas(MasterProcedureEnv env) throws IOException {
+    AssignmentManagerUtil.removeNonDefaultReplicas(env, Stream.of(regionsToMerge),
+      getRegionReplication(env));
+  }
+
+  private void checkClosedRegions(MasterProcedureEnv env) throws IOException {
+    // theoretically this should not happen any more after we use TRSP, but anyway let's add a check
+    // here
+    for (RegionInfo region : regionsToMerge) {
+      AssignmentManagerUtil.checkClosedRegion(env, region);
+    }
+  }
+
   @Override
   protected Flow executeFromState(final MasterProcedureEnv env,
       MergeTableRegionsState state) {
@@ -234,27 +251,15 @@ public class MergeTableRegionsProcedure
           setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CLOSE_REGIONS);
           break;
         case MERGE_TABLE_REGIONS_CLOSE_REGIONS:
-          addChildProcedure(createUnassignProcedures(env, getRegionReplication(env)));
+          addChildProcedure(createUnassignProcedures(env));
           setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CHECK_CLOSED_REGIONS);
           break;
         case MERGE_TABLE_REGIONS_CHECK_CLOSED_REGIONS:
-          List<RegionInfo> ris = hasRecoveredEdits(env);
-          if (ris.isEmpty()) {
-            setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CREATE_MERGED_REGION);
-          } else {
-            // Need to reopen parent regions to pickup missed recovered.edits. Do it by creating
-            // child assigns and then stepping back to MERGE_TABLE_REGIONS_CLOSE_REGIONS.
-            // Just assign the primary regions recovering the missed recovered.edits -- no replicas.
-            // May need to cycle here a few times if heavy writes.
-            // TODO: Add an assign read-only.
-            for (RegionInfo ri: ris) {
-              LOG.info("Found recovered.edits under {}, reopen to pickup missed edits!", ri);
-              addChildProcedure(env.getAssignmentManager().createAssignProcedure(ri));
-            }
-            setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CLOSE_REGIONS);
-          }
+          checkClosedRegions(env);
+          setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CREATE_MERGED_REGION);
           break;
         case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
+          removeNonDefaultReplicas(env);
           createMergedRegion(env);
           setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_WRITE_MAX_SEQUENCE_ID_FILE);
           break;
@@ -275,7 +280,7 @@ public class MergeTableRegionsProcedure
           setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_OPEN_MERGED_REGION);
           break;
         case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
-          addChildProcedure(createAssignProcedures(env, getRegionReplication(env)));
+          addChildProcedure(createAssignProcedures(env));
           setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION);
           break;
         case MERGE_TABLE_REGIONS_POST_OPERATION:
@@ -471,25 +476,7 @@ public class MergeTableRegionsProcedure
   }
 
   /**
-   * Return list of regions that have recovered.edits... usually its an empty list.
-   * @param env the master env
-   * @throws IOException IOException
-   */
-  private List<RegionInfo> hasRecoveredEdits(final MasterProcedureEnv env) throws IOException {
-    List<RegionInfo> ris =  new ArrayList<RegionInfo>(regionsToMerge.length);
-    for (int i = 0; i < regionsToMerge.length; i++) {
-      RegionInfo ri = regionsToMerge[i];
-      if (SplitTableRegionProcedure.hasRecoveredEdits(env, ri)) {
-        ris.add(ri);
-      }
-    }
-    return ris;
-  }
-
-  /**
    * Prepare merge and do some check
-   * @param env MasterProcedureEnv
-   * @throws IOException
    */
   private boolean prepareMergeRegion(final MasterProcedureEnv env) throws IOException {
     // Note: the following logic assumes that we only have 2 regions to merge.  In the future,
@@ -559,9 +546,9 @@ public class MergeTableRegionsProcedure
   }
 
   private boolean isMergeable(final MasterProcedureEnv env, final RegionState rs)
-  throws IOException {
+      throws IOException {
     GetRegionInfoResponse response =
-      Util.getRegionInfoResponse(env, rs.getServerName(), rs.getRegion());
+      AssignmentManagerUtil.getRegionInfoResponse(env, rs.getServerName(), rs.getRegion());
     return response.hasMergeable() && response.getMergeable();
   }
 
@@ -598,9 +585,8 @@ public class MergeTableRegionsProcedure
 
   /**
    * Set the region states to MERGING state
-   * @param env MasterProcedureEnv
    */
-  public void setRegionStateToMerging(final MasterProcedureEnv env) {
+  private void setRegionStateToMerging(final MasterProcedureEnv env) {
     // Set State.MERGING to regions to be merged
     RegionStates regionStates = env.getAssignmentManager().getRegionStates();
     regionStates.getRegionStateNode(regionsToMerge[0]).setState(State.MERGING);
@@ -675,49 +661,22 @@ public class MergeTableRegionsProcedure
 
   /**
    * Rollback close regions
-   * @param env MasterProcedureEnv
    **/
-  private void rollbackCloseRegionsForMerge(final MasterProcedureEnv env) throws IOException {
-    // Check whether the region is closed; if so, open it in the same server
-    final int regionReplication = getRegionReplication(env);
-    final ServerName serverName = getServerName(env);
-
-    final AssignProcedure[] procs =
-        new AssignProcedure[regionsToMerge.length * regionReplication];
-    int procsIdx = 0;
-    for (int i = 0; i < regionsToMerge.length; ++i) {
-      for (int j = 0; j < regionReplication; ++j) {
-        final RegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(regionsToMerge[i], j);
-        procs[procsIdx++] = env.getAssignmentManager().createAssignProcedure(hri, serverName);
-      }
-    }
-    env.getMasterServices().getMasterProcedureExecutor().submitProcedures(procs);
+  private void rollbackCloseRegionsForMerge(MasterProcedureEnv env) throws IOException {
+    AssignmentManagerUtil.reopenRegionsForRollback(env, Stream.of(regionsToMerge),
+      getRegionReplication(env), getServerName(env));
   }
 
-  private UnassignProcedure[] createUnassignProcedures(final MasterProcedureEnv env,
-      final int regionReplication) {
-    final UnassignProcedure[] procs =
-        new UnassignProcedure[regionsToMerge.length * regionReplication];
-    int procsIdx = 0;
-    for (int i = 0; i < regionsToMerge.length; ++i) {
-      for (int j = 0; j < regionReplication; ++j) {
-        final RegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(regionsToMerge[i], j);
-        procs[procsIdx++] = env.getAssignmentManager().
-            createUnassignProcedure(hri, null, true, !RegionReplicaUtil.isDefaultReplica(hri));
-      }
-    }
-    return procs;
+  private TransitRegionStateProcedure[] createUnassignProcedures(MasterProcedureEnv env)
+      throws IOException {
+    return AssignmentManagerUtil.createUnassignProceduresForSplitOrMerge(env,
+      Stream.of(regionsToMerge), getRegionReplication(env));
   }
 
-  private AssignProcedure[] createAssignProcedures(final MasterProcedureEnv env,
-      final int regionReplication) {
-    final ServerName targetServer = getServerName(env);
-    final AssignProcedure[] procs = new AssignProcedure[regionReplication];
-    for (int i = 0; i < procs.length; ++i) {
-      final RegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(mergedRegion, i);
-      procs[i] = env.getAssignmentManager().createAssignProcedure(hri, targetServer);
-    }
-    return procs;
+  private TransitRegionStateProcedure[] createAssignProcedures(MasterProcedureEnv env)
+      throws IOException {
+    return AssignmentManagerUtil.createAssignProceduresForOpeningNewRegions(env,
+      Stream.of(mergedRegion), getRegionReplication(env), getServerName(env));
   }
 
   private int getRegionReplication(final MasterProcedureEnv env) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
index 6135ce1..3aadb92 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,8 +19,6 @@
 package org.apache.hadoop.hbase.master.assignment;
 
 import java.io.IOException;
-
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -30,81 +27,29 @@ import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProced
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MoveRegionState;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MoveRegionStateData;
 
 /**
- * Procedure that implements a RegionPlan.
- * It first runs an unassign subprocedure followed
- * by an assign subprocedure. It takes a lock on the region being moved.
- * It holds the lock for the life of the procedure.
- *
- * <p>Throws exception on construction if determines context hostile to move (cluster going
- * down or master is shutting down or table is disabled).</p>
+ * Leave here only for checking if we can successfully start the master.
+ * @deprecated Do not use any more.
+ * @see TransitRegionStateProcedure
  */
+@Deprecated
 @InterfaceAudience.Private
 public class MoveRegionProcedure extends AbstractStateMachineRegionProcedure<MoveRegionState> {
-  private static final Logger LOG = LoggerFactory.getLogger(MoveRegionProcedure.class);
   private RegionPlan plan;
 
   public MoveRegionProcedure() {
-    // Required by the Procedure framework to create the procedure on replay
     super();
   }
 
-  /**
-   * @param check whether we should do some checks in the constructor. We will skip the checks if we
-   *          are reopening a region as this may fail the whole procedure and cause stuck. We will
-   *          do the check later when actually executing the procedure so not a big problem.
-   * @throws IOException If the cluster is offline or master is stopping or if table is disabled or
-   *           non-existent.
-   */
-  public MoveRegionProcedure(MasterProcedureEnv env, RegionPlan plan, boolean check)
-      throws HBaseIOException {
-    super(env, plan.getRegionInfo());
-    this.plan = plan;
-    if (check) {
-      preflightChecks(env, true);
-      checkOnline(env, plan.getRegionInfo());
-    }
-  }
-
   @Override
   protected Flow executeFromState(final MasterProcedureEnv env, final MoveRegionState state)
       throws InterruptedException {
-    LOG.trace("{} execute state={}", this, state);
-    switch (state) {
-      case MOVE_REGION_PREPARE:
-        // Check context again and that region is online; do it here after we have lock on region.
-        try {
-          preflightChecks(env, true);
-          checkOnline(env, this.plan.getRegionInfo());
-          if (!env.getMasterServices().getServerManager().isServerOnline(this.plan.getSource())) {
-            throw new HBaseIOException(this.plan.getSource() + " not online");
-          }
-        } catch (HBaseIOException e) {
-          LOG.warn(this.toString() + " FAILED because " + e.toString());
-          return Flow.NO_MORE_STATE;
-        }
-        break;
-      case MOVE_REGION_UNASSIGN:
-        addChildProcedure(new UnassignProcedure(plan.getRegionInfo(), plan.getSource(),
-            plan.getDestination(), true));
-        setNextState(MoveRegionState.MOVE_REGION_ASSIGN);
-        break;
-      case MOVE_REGION_ASSIGN:
-        AssignProcedure assignProcedure = plan.getDestination() == null ?
-            new AssignProcedure(plan.getRegionInfo()):
-            new AssignProcedure(plan.getRegionInfo(), plan.getDestination());
-        addChildProcedure(assignProcedure);
-        return Flow.NO_MORE_STATE;
-      default:
-        throw new UnsupportedOperationException("unhandled state=" + state);
-    }
-    return Flow.HAS_MORE_STATE;
+    return Flow.NO_MORE_STATE;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/OpenRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/OpenRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/OpenRegionProcedure.java
new file mode 100644
index 0000000..1a79697
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/OpenRegionProcedure.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.OpenRegionProcedureStateData;
+
+/**
+ * The remote procedure used to open a region.
+ */
+@InterfaceAudience.Private
+public class OpenRegionProcedure extends RegionRemoteProcedureBase {
+
+  public OpenRegionProcedure() {
+    super();
+  }
+
+  public OpenRegionProcedure(RegionInfo region, ServerName targetServer) {
+    super(region, targetServer);
+  }
+
+  @Override
+  public TableOperationType getTableOperationType() {
+    return TableOperationType.REGION_ASSIGN;
+  }
+
+  @Override
+  public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
+    return new RegionOpenOperation(this, region, env.getAssignmentManager().getFavoredNodes(region),
+      false);
+  }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+    super.serializeStateData(serializer);
+    serializer.serialize(OpenRegionProcedureStateData.getDefaultInstance());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+    super.deserializeStateData(serializer);
+    serializer.deserialize(OpenRegionProcedureStateData.class);
+  }
+}