You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/24 07:20:35 UTC

[12/27] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi) Move to a new AssignmentManager, one that describes Assignment using a State Machine built on top of ProcedureV2 facility.

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
new file mode 100644
index 0000000..126718a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
@@ -0,0 +1,247 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.ServerCrashException;
+import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UnassignRegionStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+
+
+/**
+ * Procedure that describe the unassignment of a single region.
+ * There can only be one RegionTransitionProcedure per region running at the time,
+ * since each procedure takes a lock on the region.
+ *
+ * <p>The Unassign starts by placing a "close region" request in the Remote Dispatcher
+ * queue, and the procedure will then go into a "waiting state".
+ * The Remote Dispatcher will batch the various requests for that server and
+ * they will be sent to the RS for execution.
+ * The RS will complete the open operation by calling master.reportRegionStateTransition().
+ * The AM will intercept the transition report, and notify the procedure.
+ * The procedure will finish the unassign by publishing its new state on meta
+ * or it will retry the unassign.
+ */
+@InterfaceAudience.Private
+public class UnassignProcedure extends RegionTransitionProcedure {
+  private static final Log LOG = LogFactory.getLog(UnassignProcedure.class);
+
+  /**
+   * Where to send the unassign RPC.
+   */
+  protected volatile ServerName destinationServer;
+
+  private final AtomicBoolean serverCrashed = new AtomicBoolean(false);
+
+  // TODO: should this be in a reassign procedure?
+  //       ...and keep unassign for 'disable' case?
+  private boolean force;
+
+  public UnassignProcedure() {
+    // Required by the Procedure framework to create the procedure on replay
+    super();
+  }
+
+  public UnassignProcedure(final HRegionInfo regionInfo,
+      final ServerName destinationServer, final boolean force) {
+    super(regionInfo);
+    this.destinationServer = destinationServer;
+    this.force = force;
+
+    // we don't need REGION_TRANSITION_QUEUE, we jump directly to sending the request
+    setTransitionState(RegionTransitionState.REGION_TRANSITION_DISPATCH);
+  }
+
+  @Override
+  public TableOperationType getTableOperationType() {
+    return TableOperationType.REGION_UNASSIGN;
+  }
+
+  @Override
+  protected boolean isRollbackSupported(final RegionTransitionState state) {
+    switch (state) {
+      case REGION_TRANSITION_QUEUE:
+      case REGION_TRANSITION_DISPATCH:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  @Override
+  public void serializeStateData(final OutputStream stream) throws IOException {
+    UnassignRegionStateData.Builder state = UnassignRegionStateData.newBuilder()
+        .setTransitionState(getTransitionState())
+        .setDestinationServer(ProtobufUtil.toServerName(destinationServer))
+        .setRegionInfo(HRegionInfo.convert(getRegionInfo()));
+    if (force) {
+      state.setForce(true);
+    }
+    state.build().writeDelimitedTo(stream);
+  }
+
+  @Override
+  public void deserializeStateData(final InputStream stream) throws IOException {
+    final UnassignRegionStateData state = UnassignRegionStateData.parseDelimitedFrom(stream);
+    setTransitionState(state.getTransitionState());
+    setRegionInfo(HRegionInfo.convert(state.getRegionInfo()));
+    force = state.getForce();
+    if (state.hasDestinationServer()) {
+      this.destinationServer = ProtobufUtil.toServerName(state.getDestinationServer());
+    }
+  }
+
+  @Override
+  protected boolean startTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) {
+    // nothing to do here. we skip the step in the constructor
+    // by jumping to REGION_TRANSITION_DISPATCH
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected boolean updateTransition(final MasterProcedureEnv env, final RegionStateNode regionNode)
+        throws IOException {
+    // if the region is already closed or offline we can't do much...
+    if (regionNode.isInState(State.CLOSED, State.OFFLINE)) {
+      LOG.info("Not unassigned " + this + "; " + regionNode.toShortString());
+      return false;
+    }
+
+    // if the server is down, mark the operation as complete
+    if (serverCrashed.get() || !isServerOnline(env, regionNode)) {
+      LOG.info("Server already down: " + this + "; " + regionNode.toShortString());
+      return false;
+    }
+
+    // if we haven't started the operation yet, we can abort
+    if (aborted.get() && regionNode.isInState(State.OPEN)) {
+      setAbortFailure(getClass().getSimpleName(), "abort requested");
+      return false;
+    }
+
+    // Mark the region as CLOSING.
+    env.getAssignmentManager().markRegionAsClosing(regionNode);
+
+    // Add the close region operation the the server dispatch queue.
+    if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) {
+      // If addToRemoteDispatcher fails, it calls #remoteCallFailed which
+      // does all cleanup.
+    }
+
+    // We always return true, even if we fail dispatch because addToRemoteDispatcher
+    // failure processing sets state back to REGION_TRANSITION_QUEUE so we try again;
+    // i.e. return true to keep the Procedure running; it has been reset to startover.
+    return true;
+  }
+
+  @Override
+  protected void finishTransition(final MasterProcedureEnv env, final RegionStateNode regionNode)
+      throws IOException {
+    env.getAssignmentManager().markRegionAsClosed(regionNode);
+  }
+
+  @Override
+  public RemoteOperation remoteCallBuild(final MasterProcedureEnv env, final ServerName serverName) {
+    assert serverName.equals(getRegionState(env).getRegionLocation());
+    return new RegionCloseOperation(this, getRegionInfo(), destinationServer);
+  }
+
+  @Override
+  protected void reportTransition(final MasterProcedureEnv env, final RegionStateNode regionNode,
+      final TransitionCode code, final long seqId) throws UnexpectedStateException {
+    switch (code) {
+      case CLOSED:
+        setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
+        break;
+      default:
+        throw new UnexpectedStateException(String.format(
+          "Received report unexpected transition state=%s for region=%s server=%s, expected CLOSED.",
+          code, regionNode.getRegionInfo(), regionNode.getRegionLocation()));
+    }
+  }
+
+  @Override
+  protected void remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
+      final IOException exception) {
+    // TODO: Is there on-going rpc to cleanup?
+    if (exception instanceof ServerCrashException) {
+      // This exception comes from ServerCrashProcedure after log splitting.
+      // It is ok to let this procedure go on to complete close now.
+      // This will release lock on this region so the subsequent assign can succeed.
+      try {
+        reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM);
+      } catch (UnexpectedStateException e) {
+        // Should never happen.
+        throw new RuntimeException(e);
+      }
+    } else if (exception instanceof RegionServerAbortedException ||
+        exception instanceof RegionServerStoppedException ||
+        exception instanceof ServerNotRunningYetException) {
+      // TODO
+      // RS is aborting, we cannot offline the region since the region may need to do WAL
+      // recovery. Until we see the RS expiration, we should retry.
+      LOG.info("Ignoring; waiting on ServerCrashProcedure", exception);
+      // serverCrashed.set(true);
+    } else if (exception instanceof NotServingRegionException) {
+      LOG.info("IS THIS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD " + regionNode, exception);
+      setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
+    } else {
+      // TODO: kill the server in case we get an exception we are not able to handle
+      LOG.warn("Killing server; unexpected exception; " +
+          this + "; " + regionNode.toShortString() +
+        " exception=" + exception);
+      env.getMasterServices().getServerManager().expireServer(regionNode.getRegionLocation());
+      serverCrashed.set(true);
+    }
+  }
+
+  @Override
+  public void toStringClassDetails(StringBuilder sb) {
+    super.toStringClassDetails(sb);
+    sb.append(", server=").append(this.destinationServer);
+  }
+
+  @Override
+  public ServerName getServer(final MasterProcedureEnv env) {
+    return this.destinationServer;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/Util.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/Util.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/Util.java
new file mode 100644
index 0000000..cb3861a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/Util.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+
+/**
+ * Utility for this assignment package only.
+ */
+@InterfaceAudience.Private
+class Util {
+  private Util() {}
+
+  /**
+   * Raw call to remote regionserver to get info on a particular region.
+   * @throws IOException Let it out so can report this IOE as reason for failure
+   */
+  static GetRegionInfoResponse getRegionInfoResponse(final MasterProcedureEnv env,
+      final ServerName regionLocation, final HRegionInfo hri)
+  throws IOException {
+    // TODO: There is no timeout on this controller. Set one!
+    HBaseRpcController controller = env.getMasterServices().getClusterConnection().
+        getRpcControllerFactory().newController();
+    final AdminService.BlockingInterface admin =
+        env.getMasterServices().getClusterConnection().getAdmin(regionLocation);
+    GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(hri.getRegionName());
+    try {
+      return admin.getRegionInfo(controller, request);
+    } catch (ServiceException e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index 6410375..a494ecc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -1,4 +1,4 @@
-/**
+ /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -62,9 +62,11 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 /**
- * The base class for load balancers. It provides functions used by
- * {@link org.apache.hadoop.hbase.master.AssignmentManager} to assign regions in the edge cases.
- * It doesn't provide an implementation of the actual balancing algorithm.
+ * The base class for load balancers. It provides the the functions used to by
+ * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions
+ * in the edge cases. It doesn't provide an implementation of the
+ * actual balancing algorithm.
+ *
  */
 public abstract class BaseLoadBalancer implements LoadBalancer {
   protected static final int MIN_SERVER_BALANCE = 2;
@@ -202,7 +204,15 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       // Use servername and port as there can be dead servers in this list. We want everything with
       // a matching hostname and port to have the same index.
       for (ServerName sn : clusterState.keySet()) {
-        if (serversToIndex.get(sn.getHostAndPort()) == null) {
+        if (sn == null) {
+          LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " +
+              "skipping; unassigned regions?");
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
+          }
+          continue;
+        }
+        if (serversToIndex.get(sn.getAddress().toString()) == null) {
           serversToIndex.put(sn.getHostAndPort(), numServers++);
         }
         if (!hostsToIndex.containsKey(sn.getHostname())) {
@@ -257,6 +267,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
 
       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
+        if (entry.getKey() == null) {
+          LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
+          continue;
+        }
         int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
 
         // keep the servername if this is the first server name for this hostname
@@ -585,8 +599,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
     /**
      * Return true if the placement of region on server would lower the availability
      * of the region in question
-     * @param server
-     * @param region
      * @return true or false
      */
     boolean wouldLowerAvailability(HRegionInfo regionInfo, ServerName serverName) {
@@ -899,8 +911,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
           }
         }
         if (leastLoadedServerIndex != -1) {
-          LOG.debug("Pick the least loaded server " + servers[leastLoadedServerIndex].getHostname()
-            + " with better locality for region " + regions[region]);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Pick the least loaded server " +
+                servers[leastLoadedServerIndex].getHostname() +
+                " with better locality for region " + regions[region].getShortNameToLog());
+          }
         }
         return leastLoadedServerIndex;
       } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
index fd98c9c..a8e22ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
@@ -469,6 +469,10 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
     }
   }
 
+  public synchronized List<ServerName> getFavoredNodes(HRegionInfo regionInfo) {
+    return this.fnm.getFavoredNodes(regionInfo);
+  }
+
   /*
    * Generate Favored Nodes for daughters during region split.
    *
@@ -709,7 +713,12 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
             // No favored nodes, lets unassign.
             LOG.warn("Region not on favored nodes, unassign. Region: " + hri
               + " current: " + current + " favored nodes: " + favoredNodes);
-            this.services.getAssignmentManager().unassign(hri);
+            try {
+              this.services.getAssignmentManager().unassign(hri);
+            } catch (IOException e) {
+              LOG.warn("Failed unassign", e);
+              continue;
+            }
             RegionPlan rp = new RegionPlan(hri, null, null);
             regionPlans.add(rp);
             misplacedRegions++;

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
index f7e166d..907e745 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
@@ -39,9 +38,8 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
@@ -149,19 +147,15 @@ class RegionLocationFinder {
     if (services == null) {
       return false;
     }
-    AssignmentManager am = services.getAssignmentManager();
 
+    final AssignmentManager am = services.getAssignmentManager();
     if (am == null) {
       return false;
     }
-    RegionStates regionStates = am.getRegionStates();
-    if (regionStates == null) {
-      return false;
-    }
 
-    Set<HRegionInfo> regions = regionStates.getRegionAssignments().keySet();
+    // TODO: Should this refresh all the regions or only the ones assigned?
     boolean includesUserTables = false;
-    for (final HRegionInfo hri : regions) {
+    for (final HRegionInfo hri : am.getAssignedRegions()) {
       cache.refresh(hri);
       includesUserTables = includesUserTables || !hri.isSystemTable();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
index 7e8d696..818156d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
@@ -20,28 +20,27 @@ package org.apache.hadoop.hbase.master.balancer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Random;
 import java.util.TreeMap;
-import java.util.Comparator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.common.collect.MinMaxPriorityQueue;
-import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Makes decisions about the placement and movement of Regions across
@@ -54,7 +53,7 @@ import org.apache.hadoop.hbase.util.Pair;
  * locations for all Regions in a cluster.
  *
  * <p>This classes produces plans for the
- * {@link org.apache.hadoop.hbase.master.AssignmentManager} to execute.
+ * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to execute.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public class SimpleLoadBalancer extends BaseLoadBalancer {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index 53db1f2..4b96bc6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -293,9 +293,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
 
     if (total <= 0 || sumMultiplier <= 0
         || (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance)) {
-      LOG.info("Skipping load balancing because balanced cluster; " + "total cost is " + total
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Skipping load balancing because balanced cluster; " + "total cost is " + total
           + ", sum multiplier is " + sumMultiplier + " min cost which need balance is "
           + minCostNeedBalance);
+      }
       return false;
     }
     return true;
@@ -1153,11 +1155,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
         stats = new double[cluster.numServers];
       }
 
-      for (int i =0; i < cluster.numServers; i++) {
+      for (int i = 0; i < cluster.numServers; i++) {
         stats[i] = 0;
         for (int regionIdx : cluster.regionsPerServer[i]) {
           if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
-            stats[i] ++;
+            stats[i]++;
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java
index 512f7e2..edbba83 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java
@@ -232,7 +232,8 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
   }
 
   @Override
-  protected Procedure<?>[] execute(final MasterProcedureEnv env) throws ProcedureSuspendedException {
+  protected Procedure<MasterProcedureEnv>[] execute(final MasterProcedureEnv env)
+  throws ProcedureSuspendedException {
     // Local master locks don't store any state, so on recovery, simply finish this procedure
     // immediately.
     if (recoveredMasterLock) return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java
index 03fdaef..6ebadb4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java
@@ -52,9 +52,8 @@ public abstract class AbstractStateMachineNamespaceProcedure<TState>
   @Override
   public void toStringClassDetails(final StringBuilder sb) {
     sb.append(getClass().getSimpleName());
-    sb.append(" (namespace=");
+    sb.append(", namespace=");
     sb.append(getNamespaceName());
-    sb.append(")");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java
new file mode 100644
index 0000000..41502d4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+/**
+ * Base class for all the Region procedures that want to use a StateMachine.
+ * It provides some basic helpers like basic locking, sync latch, and toStringClassDetails().
+ * Defaults to holding the lock for the life of the procedure.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractStateMachineRegionProcedure<TState>
+    extends AbstractStateMachineTableProcedure<TState> {
+  private HRegionInfo hri;
+  private volatile boolean lock = false;
+
+  public AbstractStateMachineRegionProcedure(final MasterProcedureEnv env,
+      final HRegionInfo hri) {
+    super(env);
+    this.hri = hri;
+  }
+
+  public AbstractStateMachineRegionProcedure() {
+    // Required by the Procedure framework to create the procedure on replay
+    super();
+  }
+
+  /**
+   * @return The HRegionInfo of the region we are operating on.
+   */
+  protected HRegionInfo getRegion() {
+    return this.hri;
+  }
+
+  /**
+   * Used when deserializing. Otherwise, DON'T TOUCH IT!
+   */
+  protected void setRegion(final HRegionInfo hri) {
+    this.hri = hri;
+  }
+
+  @Override
+  public TableName getTableName() {
+    return getRegion().getTable();
+  }
+
+  @Override
+  public abstract TableOperationType getTableOperationType();
+
+  @Override
+  public void toStringClassDetails(final StringBuilder sb) {
+    super.toStringClassDetails(sb);
+    sb.append(", region=").append(getRegion().getShortNameToLog());
+  }
+
+  /**
+   * Check whether a table is modifiable - exists and either offline or online with config set
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  protected void checkTableModifiable(final MasterProcedureEnv env) throws IOException {
+    // Checks whether the table exists
+    if (!MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), getTableName())) {
+      throw new TableNotFoundException(getTableName());
+    }
+  }
+
+  @Override
+  protected boolean holdLock(MasterProcedureEnv env) {
+    return true;
+  }
+
+  protected LockState acquireLock(final MasterProcedureEnv env) {
+    if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT;
+    if (env.getProcedureScheduler().waitRegions(this, getTableName(), getRegion())) {
+      return LockState.LOCK_EVENT_WAIT;
+    }
+    this.lock = true;
+    return LockState.LOCK_ACQUIRED;
+  }
+
+  protected void releaseLock(final MasterProcedureEnv env) {
+    this.lock = false;
+    env.getProcedureScheduler().wakeRegions(this, getTableName(), getRegion());
+  }
+
+  @Override
+  protected boolean hasLock(final MasterProcedureEnv env) {
+    return this.lock;
+  }
+
+  protected void setFailure(Throwable cause) {
+    super.setFailure(getClass().getSimpleName(), cause);
+  }
+
+  @Override
+  protected void serializeStateData(final OutputStream stream) throws IOException {
+    super.serializeStateData(stream);
+    HRegionInfo.convert(getRegion()).writeDelimitedTo(stream);
+  }
+
+  @Override
+  protected void deserializeStateData(final InputStream stream) throws IOException {
+    super.deserializeStateData(stream);
+    this.hri = HRegionInfo.convert(HBaseProtos.RegionInfo.parseDelimitedFrom(stream));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
index 9f23848..1417159 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.security.User;
 
 /**
  * Base class for all the Table procedures that want to use a StateMachineProcedure.
- * It provide some basic helpers like basic locking, sync latch, and basic toStringClassDetails().
+ * It provides helpers like basic locking, sync latch, and toStringClassDetails().
  */
 @InterfaceAudience.Private
 public abstract class AbstractStateMachineTableProcedure<TState>
@@ -50,11 +50,15 @@ public abstract class AbstractStateMachineTableProcedure<TState>
     this(env, null);
   }
 
+  /**
+   * @param env Uses this to set Procedure Owner at least.
+   */
   protected AbstractStateMachineTableProcedure(final MasterProcedureEnv env,
       final ProcedurePrepareLatch latch) {
-    this.user = env.getRequestUser();
-    this.setOwner(user);
-
+    if (env != null) {
+      this.user = env.getRequestUser();
+      this.setOwner(user);
+    }
     // used for compatibility with clients without procedures
     // they need a sync TableExistsException, TableNotFoundException, TableNotDisabledException, ...
     this.syncLatch = latch;
@@ -110,4 +114,4 @@ public abstract class AbstractStateMachineTableProcedure<TState>
       throw new TableNotFoundException(getTableName());
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
index 7bb2887..34c1853 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.InvalidFamilyOperationException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@@ -100,7 +99,10 @@ public class AddColumnFamilyProcedure
         setNextState(AddColumnFamilyState.ADD_COLUMN_FAMILY_REOPEN_ALL_REGIONS);
         break;
       case ADD_COLUMN_FAMILY_REOPEN_ALL_REGIONS:
-        reOpenAllRegionsIfTableIsOnline(env);
+        if (env.getAssignmentManager().isTableEnabled(getTableName())) {
+          addChildProcedure(env.getAssignmentManager()
+            .createReopenProcedures(getRegionInfoList(env)));
+        }
         return Flow.NO_MORE_STATE;
       default:
         throw new UnsupportedOperationException(this + " unhandled state=" + state);
@@ -285,7 +287,8 @@ public class AddColumnFamilyProcedure
       env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor);
 
       // Make sure regions are opened after table descriptor is updated.
-      reOpenAllRegionsIfTableIsOnline(env);
+      //reOpenAllRegionsIfTableIsOnline(env);
+      // TODO: NUKE ROLLBACK!!!!
     }
   }
 
@@ -302,25 +305,6 @@ public class AddColumnFamilyProcedure
   }
 
   /**
-   * Last action from the procedure - executed when online schema change is supported.
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException {
-    // This operation only run when the table is enabled.
-    if (!env.getMasterServices().getTableStateManager()
-        .isTableState(getTableName(), TableState.State.ENABLED)) {
-      return;
-    }
-
-    if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) {
-      LOG.info("Completed add column family operation on table " + getTableName());
-    } else {
-      LOG.warn("Error on reopening the regions on table " + getTableName());
-    }
-  }
-
-  /**
    * The procedure could be restarted from a different machine. If the variable is null, we need to
    * retrieve it.
    * @return traceEnabled
@@ -362,7 +346,8 @@ public class AddColumnFamilyProcedure
 
   private List<HRegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException {
     if (regionInfoList == null) {
-      regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
+      regionInfoList = env.getAssignmentManager().getRegionStates()
+          .getRegionsOfTable(getTableName());
     }
     return regionInfoList;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java
index 683d840..c1d0326 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java
@@ -149,10 +149,12 @@ public class CloneSnapshotProcedure
           setNextState(CloneSnapshotState.CLONE_SNAPSHOT_ASSIGN_REGIONS);
           break;
         case CLONE_SNAPSHOT_ASSIGN_REGIONS:
-          CreateTableProcedure.assignRegions(env, getTableName(), newRegions);
+          CreateTableProcedure.setEnablingState(env, getTableName());
+          addChildProcedure(env.getAssignmentManager().createAssignProcedures(newRegions));
           setNextState(CloneSnapshotState.CLONE_SNAPSHOT_UPDATE_DESC_CACHE);
           break;
         case CLONE_SNAPSHOT_UPDATE_DESC_CACHE:
+          CreateTableProcedure.setEnabledState(env, getTableName());
           CreateTableProcedure.updateTableDescCache(env, getTableName());
           setNextState(CloneSnapshotState.CLONE_SNAPHOST_RESTORE_ACL);
           break;

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index ced7abc..c3900dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -107,10 +106,12 @@ public class CreateTableProcedure
           setNextState(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS);
           break;
         case CREATE_TABLE_ASSIGN_REGIONS:
-          assignRegions(env, getTableName(), newRegions);
+          setEnablingState(env, getTableName());
+          addChildProcedure(env.getAssignmentManager().createAssignProcedures(newRegions));
           setNextState(CreateTableState.CREATE_TABLE_UPDATE_DESC_CACHE);
           break;
         case CREATE_TABLE_UPDATE_DESC_CACHE:
+          setEnabledState(env, getTableName());
           updateTableDescCache(env, getTableName());
           setNextState(CreateTableState.CREATE_TABLE_POST_OPERATION);
           break;
@@ -333,21 +334,21 @@ public class CreateTableProcedure
   protected static List<HRegionInfo> addTableToMeta(final MasterProcedureEnv env,
       final HTableDescriptor hTableDescriptor,
       final List<HRegionInfo> regions) throws IOException {
-    if (regions != null && regions.size() > 0) {
-      ProcedureSyncWait.waitMetaRegions(env);
+    assert (regions != null && regions.size() > 0) : "expected at least 1 region, got " + regions;
 
-      // Add regions to META
-      addRegionsToMeta(env, hTableDescriptor, regions);
-      // Add replicas if needed
-      List<HRegionInfo> newRegions = addReplicas(env, hTableDescriptor, regions);
+    ProcedureSyncWait.waitMetaRegions(env);
 
-      // Setup replication for region replicas if needed
-      if (hTableDescriptor.getRegionReplication() > 1) {
-        ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration());
-      }
-      return newRegions;
+    // Add replicas if needed
+    List<HRegionInfo> newRegions = addReplicas(env, hTableDescriptor, regions);
+
+    // Add regions to META
+    addRegionsToMeta(env, hTableDescriptor, newRegions);
+
+    // Setup replication for region replicas if needed
+    if (hTableDescriptor.getRegionReplication() > 1) {
+      ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration());
     }
-    return regions;
+    return newRegions;
   }
 
   /**
@@ -374,18 +375,16 @@ public class CreateTableProcedure
     return hRegionInfos;
   }
 
-  protected static void assignRegions(final MasterProcedureEnv env,
-      final TableName tableName, final List<HRegionInfo> regions) throws IOException {
-    ProcedureSyncWait.waitRegionServers(env);
 
+  protected static void setEnablingState(final MasterProcedureEnv env, final TableName tableName)
+      throws IOException {
     // Mark the table as Enabling
     env.getMasterServices().getTableStateManager()
       .setTableState(tableName, TableState.State.ENABLING);
+  }
 
-    // Trigger immediate assignment of the regions in round-robin fashion
-    final AssignmentManager assignmentManager = env.getMasterServices().getAssignmentManager();
-    ModifyRegionUtils.assignRegions(assignmentManager, regions);
-
+  protected static void setEnabledState(final MasterProcedureEnv env, final TableName tableName)
+      throws IOException {
     // Enable table
     env.getMasterServices().getTableStateManager()
       .setTableState(tableName, TableState.State.ENABLED);

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
index 096172a..78bd715 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.InvalidFamilyOperationException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -106,7 +105,10 @@ public class DeleteColumnFamilyProcedure
         setNextState(DeleteColumnFamilyState.DELETE_COLUMN_FAMILY_REOPEN_ALL_REGIONS);
         break;
       case DELETE_COLUMN_FAMILY_REOPEN_ALL_REGIONS:
-        reOpenAllRegionsIfTableIsOnline(env);
+        if (env.getAssignmentManager().isTableEnabled(getTableName())) {
+          addChildProcedure(env.getAssignmentManager()
+            .createReopenProcedures(getRegionInfoList(env)));
+        }
         return Flow.NO_MORE_STATE;
       default:
         throw new UnsupportedOperationException(this + " unhandled state=" + state);
@@ -292,7 +294,8 @@ public class DeleteColumnFamilyProcedure
     env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor);
 
     // Make sure regions are opened after table descriptor is updated.
-    reOpenAllRegionsIfTableIsOnline(env);
+    //reOpenAllRegionsIfTableIsOnline(env);
+    // TODO: NUKE ROLLBACK!!!!
   }
 
   /**
@@ -316,25 +319,6 @@ public class DeleteColumnFamilyProcedure
   }
 
   /**
-   * Last action from the procedure - executed when online schema change is supported.
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException {
-    // This operation only run when the table is enabled.
-    if (!env.getMasterServices().getTableStateManager()
-        .isTableState(getTableName(), TableState.State.ENABLED)) {
-      return;
-    }
-
-    if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) {
-      LOG.info("Completed delete column family operation on table " + getTableName());
-    } else {
-      LOG.warn("Error on reopening the regions on table " + getTableName());
-    }
-  }
-
-  /**
    * The procedure could be restarted from a different machine. If the variable is null, we need to
    * retrieve it.
    * @return traceEnabled
@@ -376,7 +360,8 @@ public class DeleteColumnFamilyProcedure
 
   private List<HRegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException {
     if (regionInfoList == null) {
-      regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
+      regionInfoList = env.getAssignmentManager().getRegionStates()
+          .getRegionsOfTable(getTableName());
     }
     return regionInfoList;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
index bda68eb..04dfc60 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.exceptions.HBaseException;
 import org.apache.hadoop.hbase.favored.FavoredNodesManager;
-import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.mob.MobConstants;
@@ -97,8 +96,8 @@ public class DeleteTableProcedure
           }
 
           // TODO: Move out... in the acquireLock()
-          LOG.debug("waiting for '" + getTableName() + "' regions in transition");
-          regions = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
+          LOG.debug("Waiting for '" + getTableName() + "' regions in transition");
+          regions = env.getAssignmentManager().getRegionStates().getRegionsOfTable(getTableName());
           assert regions != null && !regions.isEmpty() : "unexpected 0 regions";
           ProcedureSyncWait.waitRegionInTransition(env, regions);
 
@@ -350,8 +349,7 @@ public class DeleteTableProcedure
       final TableName tableName) throws IOException {
     Connection connection = env.getMasterServices().getConnection();
     Scan tableScan = MetaTableAccessor.getScanForTableName(connection, tableName);
-    try (Table metaTable =
-        connection.getTable(TableName.META_TABLE_NAME)) {
+    try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME)) {
       List<Delete> deletes = new ArrayList<>();
       try (ResultScanner resScanner = metaTable.getScanner(tableScan)) {
         for (Result result : resScanner) {
@@ -385,11 +383,9 @@ public class DeleteTableProcedure
 
   protected static void deleteAssignmentState(final MasterProcedureEnv env,
       final TableName tableName) throws IOException {
-    final AssignmentManager am = env.getMasterServices().getAssignmentManager();
-
     // Clean up regions of the table in RegionStates.
     LOG.debug("Removing '" + tableName + "' from region states.");
-    am.getRegionStates().tableDeleted(tableName);
+    env.getMasterServices().getAssignmentManager().deleteTable(tableName);
 
     // If entry for this table states, remove it.
     LOG.debug("Marking '" + tableName + "' as deleted.");

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
index b53ce45..409ca26 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
@@ -21,12 +21,9 @@ package org.apache.hadoop.hbase.master.procedure;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotEnabledException;
@@ -34,17 +31,11 @@ import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.constraint.ConstraintException;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.BulkAssigner;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.TableStateManager;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DisableTableState;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.htrace.Trace;
 
 @InterfaceAudience.Private
 public class DisableTableProcedure
@@ -116,12 +107,8 @@ public class DisableTableProcedure
         setNextState(DisableTableState.DISABLE_TABLE_MARK_REGIONS_OFFLINE);
         break;
       case DISABLE_TABLE_MARK_REGIONS_OFFLINE:
-        if (markRegionsOffline(env, tableName, true) ==
-            MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL) {
-          setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE);
-        } else {
-          LOG.trace("Retrying later to disable the missing regions");
-        }
+        addChildProcedure(env.getAssignmentManager().createUnassignProcedures(tableName));
+        setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE);
         break;
       case DISABLE_TABLE_SET_DISABLED_TABLE_STATE:
         setTableStateToDisabled(env, tableName);
@@ -249,7 +236,7 @@ public class DisableTableProcedure
       // set the state later on). A quick state check should be enough for us to move forward.
       TableStateManager tsm = env.getMasterServices().getTableStateManager();
       TableState.State state = tsm.getTableState(tableName);
-      if(!state.equals(TableState.State.ENABLED)){
+      if (!state.equals(TableState.State.ENABLED)){
         LOG.info("Table " + tableName + " isn't enabled;is "+state.name()+"; skipping disable");
         setFailure("master-disable-table", new TableNotEnabledException(
                 tableName+" state is "+state.name()));
@@ -290,83 +277,6 @@ public class DisableTableProcedure
   }
 
   /**
-   * Mark regions of the table offline with retries
-   * @param env MasterProcedureEnv
-   * @param tableName the target table
-   * @param retryRequired whether to retry if the first run failed
-   * @return whether the operation is fully completed or being interrupted.
-   * @throws IOException
-   */
-  protected static MarkRegionOfflineOpResult markRegionsOffline(
-      final MasterProcedureEnv env,
-      final TableName tableName,
-      final Boolean retryRequired) throws IOException {
-    // Dev consideration: add a config to control max number of retry. For now, it is hard coded.
-    int maxTry = (retryRequired ? 10 : 1);
-    MarkRegionOfflineOpResult operationResult =
-        MarkRegionOfflineOpResult.BULK_ASSIGN_REGIONS_FAILED;
-    do {
-      try {
-        operationResult = markRegionsOffline(env, tableName);
-        if (operationResult == MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL) {
-          break;
-        }
-        maxTry--;
-      } catch (Exception e) {
-        LOG.warn("Received exception while marking regions online. tries left: " + maxTry, e);
-        maxTry--;
-        if (maxTry > 0) {
-          continue; // we still have some retry left, try again.
-        }
-        throw e;
-      }
-    } while (maxTry > 0);
-
-    if (operationResult != MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL) {
-      LOG.warn("Some or all regions of the Table '" + tableName + "' were still online");
-    }
-
-    return operationResult;
-  }
-
-  /**
-   * Mark regions of the table offline
-   * @param env MasterProcedureEnv
-   * @param tableName the target table
-   * @return whether the operation is fully completed or being interrupted.
-   * @throws IOException
-   */
-  private static MarkRegionOfflineOpResult markRegionsOffline(
-      final MasterProcedureEnv env,
-      final TableName tableName) throws IOException {
-    // Get list of online regions that are of this table.  Regions that are
-    // already closed will not be included in this list; i.e. the returned
-    // list is not ALL regions in a table, its all online regions according
-    // to the in-memory state on this master.
-    MarkRegionOfflineOpResult operationResult =
-        MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL;
-    final List<HRegionInfo> regions =
-        env.getMasterServices().getAssignmentManager().getRegionStates()
-            .getRegionsOfTable(tableName);
-    if (regions.size() > 0) {
-      LOG.info("Offlining " + regions.size() + " regions.");
-
-      BulkDisabler bd = new BulkDisabler(env, tableName, regions);
-      try {
-        if (!bd.bulkAssign()) {
-          operationResult = MarkRegionOfflineOpResult.BULK_ASSIGN_REGIONS_FAILED;
-        }
-      } catch (InterruptedException e) {
-        LOG.warn("Disable was interrupted");
-        // Preserve the interrupt.
-        Thread.currentThread().interrupt();
-        operationResult = MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_INTERRUPTED;
-      }
-    }
-    return operationResult;
-  }
-
-  /**
    * Mark table state to Disabled
    * @param env MasterProcedureEnv
    * @throws IOException
@@ -428,64 +338,4 @@ public class DisableTableProcedure
       }
     }
   }
-
-  /**
-   * Run bulk disable.
-   */
-  private static class BulkDisabler extends BulkAssigner {
-    private final AssignmentManager assignmentManager;
-    private final List<HRegionInfo> regions;
-    private final TableName tableName;
-    private final int waitingTimeForEvents;
-
-    public BulkDisabler(final MasterProcedureEnv env, final TableName tableName,
-        final List<HRegionInfo> regions) {
-      super(env.getMasterServices());
-      this.assignmentManager = env.getMasterServices().getAssignmentManager();
-      this.tableName = tableName;
-      this.regions = regions;
-      this.waitingTimeForEvents =
-          env.getMasterServices().getConfiguration()
-              .getInt("hbase.master.event.waiting.time", 1000);
-    }
-
-    @Override
-    protected void populatePool(ExecutorService pool) {
-      RegionStates regionStates = assignmentManager.getRegionStates();
-      for (final HRegionInfo region : regions) {
-        if (regionStates.isRegionInTransition(region)
-            && !regionStates.isRegionInState(region, RegionState.State.FAILED_CLOSE)) {
-          continue;
-        }
-        pool.execute(Trace.wrap("DisableTableHandler.BulkDisabler", new Runnable() {
-          @Override
-          public void run() {
-            assignmentManager.unassign(region);
-          }
-        }));
-      }
-    }
-
-    @Override
-    protected boolean waitUntilDone(long timeout) throws InterruptedException {
-      long startTime = EnvironmentEdgeManager.currentTime();
-      long remaining = timeout;
-      List<HRegionInfo> regions = null;
-      long lastLogTime = startTime;
-      while (!server.isStopped() && remaining > 0) {
-        Thread.sleep(waitingTimeForEvents);
-        regions = assignmentManager.getRegionStates().getRegionsOfTable(tableName);
-        long now = EnvironmentEdgeManager.currentTime();
-        // Don't log more than once every ten seconds. Its obnoxious. And only log table regions
-        // if we are waiting a while for them to go down...
-        if (LOG.isDebugEnabled() && ((now - lastLogTime) > 10000)) {
-          lastLogTime = now;
-          LOG.debug("Disable waiting until done; " + remaining + " ms remaining; " + regions);
-        }
-        if (regions.isEmpty()) break;
-        remaining = timeout - (now - startTime);
-      }
-      return regions != null && regions.isEmpty();
-    }
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java
new file mode 100644
index 0000000..15ed429
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.MergeRegionException;
+import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
+import org.apache.hadoop.hbase.master.CatalogJanitor;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DispatchMergingRegionsState;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * The procedure to Merge a region in a table.
+ */
+@InterfaceAudience.Private
+public class DispatchMergingRegionsProcedure
+    extends AbstractStateMachineTableProcedure<DispatchMergingRegionsState> {
+  private static final Log LOG = LogFactory.getLog(DispatchMergingRegionsProcedure.class);
+
+  private final AtomicBoolean aborted = new AtomicBoolean(false);
+  private Boolean traceEnabled;
+  private AssignmentManager assignmentManager;
+  private int timeout;
+  private ServerName regionLocation;
+  private String regionsToMergeListFullName;
+  private String regionsToMergeListEncodedName;
+
+  private TableName tableName;
+  private HRegionInfo [] regionsToMerge;
+  private boolean forcible;
+
+  public DispatchMergingRegionsProcedure() {
+    this.traceEnabled = isTraceEnabled();
+    this.assignmentManager = null;
+    this.timeout = -1;
+    this.regionLocation = null;
+    this.regionsToMergeListFullName = null;
+    this.regionsToMergeListEncodedName = null;
+  }
+
+  public DispatchMergingRegionsProcedure(
+      final MasterProcedureEnv env,
+      final TableName tableName,
+      final HRegionInfo [] regionsToMerge,
+      final boolean forcible) {
+    super(env);
+    this.traceEnabled = isTraceEnabled();
+    this.assignmentManager = getAssignmentManager(env);
+    this.tableName = tableName;
+    // For now, we only merge 2 regions.  It could be extended to more than 2 regions in
+    // the future.
+    assert(regionsToMerge.length == 2);
+    this.regionsToMerge = regionsToMerge;
+    this.forcible = forcible;
+
+    this.timeout = -1;
+    this.regionsToMergeListFullName = getRegionsToMergeListFullNameString();
+    this.regionsToMergeListEncodedName = getRegionsToMergeListEncodedNameString();
+  }
+
+  @Override
+  protected Flow executeFromState(
+      final MasterProcedureEnv env,
+      final DispatchMergingRegionsState state) throws InterruptedException {
+    if (isTraceEnabled()) {
+      LOG.trace(this + " execute state=" + state);
+    }
+
+    try {
+      switch (state) {
+      case DISPATCH_MERGING_REGIONS_PREPARE:
+        prepareMergeRegion(env);
+        setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_PRE_OPERATION);
+        break;
+      case DISPATCH_MERGING_REGIONS_PRE_OPERATION:
+        //Unused for now - reserve to add preMerge coprocessor in the future
+        setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS);
+        break;
+      case DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS:
+        if (MoveRegionsToSameRS(env)) {
+          setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS);
+        } else {
+          LOG.info("Cancel merging regions " + getRegionsToMergeListFullNameString()
+            + ", because can't move them to the same RS");
+          setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_POST_OPERATION);
+        }
+        break;
+      case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS:
+        doMergeInRS(env);
+        setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_POST_OPERATION);
+        break;
+      case DISPATCH_MERGING_REGIONS_POST_OPERATION:
+        //Unused for now - reserve to add postCompletedMerge coprocessor in the future
+        return Flow.NO_MORE_STATE;
+      default:
+        throw new UnsupportedOperationException(this + " unhandled state=" + state);
+      }
+    } catch (IOException e) {
+      LOG.warn("Error trying to merge regions " + getRegionsToMergeListFullNameString() +
+        " in the table " + tableName + " (in state=" + state + ")", e);
+
+      setFailure("master-merge-regions", e);
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(
+      final MasterProcedureEnv env,
+      final DispatchMergingRegionsState state) throws IOException, InterruptedException {
+    if (isTraceEnabled()) {
+      LOG.trace(this + " rollback state=" + state);
+    }
+
+    try {
+      switch (state) {
+      case DISPATCH_MERGING_REGIONS_POST_OPERATION:
+      case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS:
+        String msg = this + " We are in the " + state + " state."
+            + " It is complicated to rollback the merge operation that region server is working on."
+            + " Rollback is not supported and we should let the merge operation to complete";
+        LOG.warn(msg);
+        // PONR
+        throw new UnsupportedOperationException(this + " unhandled state=" + state);
+      case DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS:
+        break; // nothing to rollback
+      case DISPATCH_MERGING_REGIONS_PRE_OPERATION:
+        break; // nothing to rollback
+      case DISPATCH_MERGING_REGIONS_PREPARE:
+        break; // nothing to rollback
+      default:
+        throw new UnsupportedOperationException(this + " unhandled state=" + state);
+      }
+    } catch (Exception e) {
+      // This will be retried. Unless there is a bug in the code,
+      // this should be just a "temporary error" (e.g. network down)
+      LOG.warn("Failed rollback attempt step " + state + " for merging the regions "
+          + getRegionsToMergeListFullNameString() + " in table " + tableName, e);
+      throw e;
+    }
+  }
+
+  @Override
+  protected DispatchMergingRegionsState getState(final int stateId) {
+    return DispatchMergingRegionsState.valueOf(stateId);
+  }
+
+  @Override
+  protected int getStateId(final DispatchMergingRegionsState state) {
+    return state.getNumber();
+  }
+
+  @Override
+  protected DispatchMergingRegionsState getInitialState() {
+    return DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_PREPARE;
+  }
+
+  /*
+   * Check whether we are in the state that can be rollback
+   */
+  @Override
+  protected boolean isRollbackSupported(final DispatchMergingRegionsState state) {
+    switch (state) {
+    case DISPATCH_MERGING_REGIONS_POST_OPERATION:
+    case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS:
+        // It is not safe to rollback if we reach to these states.
+        return false;
+      default:
+        break;
+    }
+    return true;
+  }
+
+  @Override
+  public void serializeStateData(final OutputStream stream) throws IOException {
+    super.serializeStateData(stream);
+
+    MasterProcedureProtos.DispatchMergingRegionsStateData.Builder dispatchMergingRegionsMsg =
+        MasterProcedureProtos.DispatchMergingRegionsStateData.newBuilder()
+        .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
+        .setTableName(ProtobufUtil.toProtoTableName(tableName))
+        .setForcible(forcible);
+    for (HRegionInfo hri: regionsToMerge) {
+      dispatchMergingRegionsMsg.addRegionInfo(HRegionInfo.convert(hri));
+    }
+    dispatchMergingRegionsMsg.build().writeDelimitedTo(stream);
+  }
+
+  @Override
+  public void deserializeStateData(final InputStream stream) throws IOException {
+    super.deserializeStateData(stream);
+
+    MasterProcedureProtos.DispatchMergingRegionsStateData dispatchMergingRegionsMsg =
+        MasterProcedureProtos.DispatchMergingRegionsStateData.parseDelimitedFrom(stream);
+    setUser(MasterProcedureUtil.toUserInfo(dispatchMergingRegionsMsg.getUserInfo()));
+    tableName = ProtobufUtil.toTableName(dispatchMergingRegionsMsg.getTableName());
+
+    assert(dispatchMergingRegionsMsg.getRegionInfoCount() == 2);
+    regionsToMerge = new HRegionInfo[dispatchMergingRegionsMsg.getRegionInfoCount()];
+    for (int i = 0; i < regionsToMerge.length; i++) {
+      regionsToMerge[i] = HRegionInfo.convert(dispatchMergingRegionsMsg.getRegionInfo(i));
+    }
+  }
+
+  @Override
+  public void toStringClassDetails(StringBuilder sb) {
+    sb.append(getClass().getSimpleName());
+    sb.append(" (table=");
+    sb.append(tableName);
+    sb.append(" regions=");
+    sb.append(getRegionsToMergeListFullNameString());
+    sb.append(" forcible=");
+    sb.append(forcible);
+    sb.append(")");
+  }
+
+  @Override
+  protected LockState acquireLock(final MasterProcedureEnv env) {
+    if (!getTableName().isSystemTable() && env.waitInitialized(this)) {
+      return LockState.LOCK_EVENT_WAIT;
+    }
+    if (env.getProcedureScheduler().waitRegions(this, getTableName(), regionsToMerge)) {
+      return LockState.LOCK_EVENT_WAIT;
+    }
+    return LockState.LOCK_ACQUIRED;
+  }
+
+  @Override
+  protected void releaseLock(final MasterProcedureEnv env) {
+    env.getProcedureScheduler().wakeRegions(this, getTableName(), regionsToMerge[0], regionsToMerge[1]);
+  }
+
+  @Override
+  public TableName getTableName() {
+    return tableName;
+  }
+
+  @Override
+  public TableOperationType getTableOperationType() {
+    return TableOperationType.REGION_MERGE;
+  }
+
+  /**
+   * Prepare merge and do some check
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  private void prepareMergeRegion(final MasterProcedureEnv env) throws IOException {
+    // Note: the following logic assumes that we only have 2 regions to merge.  In the future,
+    // if we want to extend to more than 2 regions, the code needs to modify a little bit.
+    //
+    CatalogJanitor catalogJanitor = env.getMasterServices().getCatalogJanitor();
+    boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(regionsToMerge[0]);
+    if (regionAHasMergeQualifier
+        || !catalogJanitor.cleanMergeQualifier(regionsToMerge[1])) {
+      String msg = "Skip merging regions " + regionsToMerge[0].getRegionNameAsString()
+          + ", " + regionsToMerge[1].getRegionNameAsString() + ", because region "
+          + (regionAHasMergeQualifier ? regionsToMerge[0].getEncodedName() : regionsToMerge[1]
+              .getEncodedName()) + " has merge qualifier";
+      LOG.info(msg);
+      throw new MergeRegionException(msg);
+    }
+
+      RegionStates regionStates = getAssignmentManager(env).getRegionStates();
+      RegionState regionStateA = regionStates.getRegionState(regionsToMerge[0].getEncodedName());
+      RegionState regionStateB = regionStates.getRegionState(regionsToMerge[1].getEncodedName());
+      if (regionStateA == null || regionStateB == null) {
+        throw new UnknownRegionException(
+          regionStateA == null ?
+              regionsToMerge[0].getEncodedName() : regionsToMerge[1].getEncodedName());
+      }
+
+      if (!regionStateA.isOpened() || !regionStateB.isOpened()) {
+        throw new MergeRegionException(
+          "Unable to merge regions not online " + regionStateA + ", " + regionStateB);
+      }
+
+      if (regionsToMerge[0].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
+          regionsToMerge[1].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+        throw new MergeRegionException("Can't merge non-default replicas");
+      }
+
+      if (!forcible && !HRegionInfo.areAdjacent(regionsToMerge[0], regionsToMerge[1])) {
+        throw new MergeRegionException(
+          "Unable to merge not adjacent regions "
+            + regionsToMerge[0].getRegionNameAsString() + ", "
+            + regionsToMerge[1].getRegionNameAsString()
+            + " where forcible = " + forcible);
+      }
+  }
+
+  /**
+   * Move all regions to the same region server
+   * @param env MasterProcedureEnv
+   * @return whether target regions hosted by the same RS
+   * @throws IOException
+   */
+  private boolean MoveRegionsToSameRS(final MasterProcedureEnv env) throws IOException {
+    // Make sure regions are on the same regionserver before send merge
+    // regions request to region server.
+    //
+    boolean onSameRS = isRegionsOnTheSameServer(env);
+    if (!onSameRS) {
+      // Note: the following logic assumes that we only have 2 regions to merge.  In the future,
+      // if we want to extend to more than 2 regions, the code needs to modify a little bit.
+      //
+      RegionStates regionStates = getAssignmentManager(env).getRegionStates();
+      ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]);
+
+      RegionLoad loadOfRegionA = getRegionLoad(env, regionLocation, regionsToMerge[0]);
+      RegionLoad loadOfRegionB = getRegionLoad(env, regionLocation2, regionsToMerge[1]);
+      if (loadOfRegionA != null && loadOfRegionB != null
+          && loadOfRegionA.getRequestsCount() < loadOfRegionB.getRequestsCount()) {
+        // switch regionsToMerge[0] and regionsToMerge[1]
+        HRegionInfo tmpRegion = this.regionsToMerge[0];
+        this.regionsToMerge[0] = this.regionsToMerge[1];
+        this.regionsToMerge[1] = tmpRegion;
+        ServerName tmpLocation = regionLocation;
+        regionLocation = regionLocation2;
+        regionLocation2 = tmpLocation;
+      }
+
+      long startTime = EnvironmentEdgeManager.currentTime();
+
+      RegionPlan regionPlan = new RegionPlan(regionsToMerge[1], regionLocation2, regionLocation);
+      LOG.info("Moving regions to same server for merge: " + regionPlan.toString());
+      getAssignmentManager(env).moveAsync(regionPlan);
+      do {
+        try {
+          Thread.sleep(20);
+          // Make sure check RIT first, then get region location, otherwise
+          // we would make a wrong result if region is online between getting
+          // region location and checking RIT
+          boolean isRIT = regionStates.isRegionInTransition(regionsToMerge[1]);
+          regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]);
+          onSameRS = regionLocation.equals(regionLocation2);
+          if (onSameRS || !isRIT) {
+            // Regions are on the same RS, or regionsToMerge[1] is not in
+            // RegionInTransition any more
+            break;
+          }
+        } catch (InterruptedException e) {
+          InterruptedIOException iioe = new InterruptedIOException();
+          iioe.initCause(e);
+          throw iioe;
+        }
+      } while ((EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env));
+    }
+    return onSameRS;
+  }
+
+  /**
+   * Do the real merge operation in the region server that hosts regions
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  private void doMergeInRS(final MasterProcedureEnv env) throws IOException {
+    long duration = 0;
+    long startTime = EnvironmentEdgeManager.currentTime();
+    do {
+      try {
+        if (getServerName(env) == null) {
+          // The merge probably already happen. Check
+          RegionState regionState = getAssignmentManager(env).getRegionStates().getRegionState(
+            regionsToMerge[0].getEncodedName());
+          if (regionState.isMerging() || regionState.isMerged()) {
+            LOG.info("Merge regions " +  getRegionsToMergeListEncodedNameString() +
+              " is in progress or completed.  No need to send a new request.");
+          } else {
+            LOG.warn("Cannot sending merge to hosting server of the regions " +
+              getRegionsToMergeListEncodedNameString() + " as the server is unknown");
+          }
+          return;
+        }
+        // TODO: the following RPC call is not idempotent.  Multiple calls (eg. after master
+        // failover, re-execute this step) could result in some exception thrown that does not
+        // paint the correct picture.  This behavior is on-par with old releases.  Improvement
+        // could happen in the future.
+        env.getMasterServices().getServerManager().sendRegionsMerge(
+          getServerName(env),
+          regionsToMerge[0],
+          regionsToMerge[1],
+          forcible,
+          getUser());
+        LOG.info("Sent merge to server " + getServerName(env) + " for region " +
+            getRegionsToMergeListEncodedNameString() + ", forcible=" + forcible);
+        return;
+      } catch (RegionOpeningException roe) {
+        // Do a retry since region should be online on RS immediately
+        LOG.warn("Failed mergering regions in " + getServerName(env) + ", retrying...", roe);
+      } catch (Exception ie) {
+        LOG.warn("Failed sending merge to " + getServerName(env) + " for regions " +
+            getRegionsToMergeListEncodedNameString() + ", forcible=" + forcible, ie);
+        return;
+      }
+    } while ((duration = EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env));
+
+    // If we reaches here, it means that we get timed out.
+    String msg = "Failed sending merge to " + getServerName(env) + " after " + duration + "ms";
+    LOG.warn(msg);
+    throw new IOException(msg);
+  }
+
+  private RegionLoad getRegionLoad(
+      final MasterProcedureEnv env,
+      final ServerName sn,
+      final HRegionInfo hri) {
+    ServerManager serverManager =  env.getMasterServices().getServerManager();
+    ServerLoad load = serverManager.getLoad(sn);
+    if (load != null) {
+      Map<byte[], RegionLoad> regionsLoad = load.getRegionsLoad();
+      if (regionsLoad != null) {
+        return regionsLoad.get(hri.getRegionName());
+      }
+    }
+    return null;
+  }
+
+  /**
+   * The procedure could be restarted from a different machine. If the variable is null, we need to
+   * retrieve it.
+   * @param env MasterProcedureEnv
+   * @return whether target regions hosted by the same RS
+   */
+  private boolean isRegionsOnTheSameServer(final MasterProcedureEnv env) throws IOException{
+    Boolean onSameRS = true;
+    int i = 0;
+    RegionStates regionStates = getAssignmentManager(env).getRegionStates();
+    regionLocation = regionStates.getRegionServerOfRegion(regionsToMerge[i]);
+    if (regionLocation != null) {
+      for(i = 1; i < regionsToMerge.length; i++) {
+        ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[i]);
+        if (regionLocation2 != null) {
+          if (onSameRS) {
+            onSameRS = regionLocation.equals(regionLocation2);
+          }
+        } else {
+          // At least one region is not online, merge will fail, no need to continue.
+          break;
+        }
+      }
+      if (i == regionsToMerge.length) {
+        // Finish checking all regions, return the result;
+        return onSameRS;
+      }
+    }
+
+    // If reaching here, at least one region is not online.
+    String msg = "Skip merging regions " + getRegionsToMergeListFullNameString() +
+        ", because region " + regionsToMerge[i].getEncodedName() + " is not online now.";
+    LOG.warn(msg);
+    throw new IOException(msg);
+  }
+
+  /**
+   * The procedure could be restarted from a different machine. If the variable is null, we need to
+   * retrieve it.
+   * @param env MasterProcedureEnv
+   * @return assignmentManager
+   */
+  private AssignmentManager getAssignmentManager(final MasterProcedureEnv env) {
+    if (assignmentManager == null) {
+      assignmentManager = env.getMasterServices().getAssignmentManager();
+    }
+    return assignmentManager;
+  }
+
+  /**
+   * The procedure could be restarted from a different machine. If the variable is null, we need to
+   * retrieve it.
+   * @param env MasterProcedureEnv
+   * @return timeout value
+   */
+  private int getTimeout(final MasterProcedureEnv env) {
+    if (timeout == -1) {
+      timeout = env.getMasterConfiguration().getInt(
+        "hbase.master.regionmerge.timeout", regionsToMerge.length * 60 * 1000);
+    }
+    return timeout;
+  }
+
+  /**
+   * The procedure could be restarted from a different machine. If the variable is null, we need to
+   * retrieve it.
+   * @param env MasterProcedureEnv
+   * @return serverName
+   */
+  private ServerName getServerName(final MasterProcedureEnv env) {
+    if (regionLocation == null) {
+      regionLocation =
+          getAssignmentManager(env).getRegionStates().getRegionServerOfRegion(regionsToMerge[0]);
+    }
+    return regionLocation;
+  }
+
+  /**
+   * The procedure could be restarted from a different machine. If the variable is null, we need to
+   * retrieve it.
+   * @param fullName whether return only encoded name
+   * @return region names in a list
+   */
+  private String getRegionsToMergeListFullNameString() {
+    if (regionsToMergeListFullName == null) {
+      StringBuilder sb = new StringBuilder("[");
+      int i = 0;
+      while(i < regionsToMerge.length - 1) {
+        sb.append(regionsToMerge[i].getRegionNameAsString() + ", ");
+        i++;
+      }
+      sb.append(regionsToMerge[i].getRegionNameAsString() + " ]");
+      regionsToMergeListFullName = sb.toString();
+    }
+    return regionsToMergeListFullName;
+  }
+
+  /**
+   * The procedure could be restarted from a different machine. If the variable is null, we need to
+   * retrieve it.
+   * @return encoded region names
+   */
+  private String getRegionsToMergeListEncodedNameString() {
+    if (regionsToMergeListEncodedName == null) {
+      StringBuilder sb = new StringBuilder("[");
+      int i = 0;
+      while(i < regionsToMerge.length - 1) {
+        sb.append(regionsToMerge[i].getEncodedName() + ", ");
+        i++;
+      }
+      sb.append(regionsToMerge[i].getEncodedName() + " ]");
+      regionsToMergeListEncodedName = sb.toString();
+    }
+    return regionsToMergeListEncodedName;
+  }
+
+  /**
+   * The procedure could be restarted from a different machine. If the variable is null, we need to
+   * retrieve it.
+   * @return traceEnabled
+   */
+  private Boolean isTraceEnabled() {
+    if (traceEnabled == null) {
+      traceEnabled = LOG.isTraceEnabled();
+    }
+    return traceEnabled;
+  }
+}