You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/23 07:36:21 UTC

[09/50] [abbrv] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi) Move to a new AssignmentManager, one that describes Assignment using a State Machine built on top of ProcedureV2 facility.

http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
deleted file mode 100644
index 3600fe0..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
+++ /dev/null
@@ -1,906 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.master.procedure;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaMutationAnnotation;
-import org.apache.hadoop.hbase.RegionLoad;
-import org.apache.hadoop.hbase.ServerLoad;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.UnknownRegionException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.exceptions.MergeRegionException;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.CatalogJanitor;
-import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
-import org.apache.hadoop.hbase.master.MasterFileSystem;
-import org.apache.hadoop.hbase.master.RegionPlan;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.master.RegionStates;
-import org.apache.hadoop.hbase.master.ServerManager;
-import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MergeTableRegionsState;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-/**
- * The procedure to Merge a region in a table.
- */
-@InterfaceAudience.Private
-public class MergeTableRegionsProcedure
-    extends AbstractStateMachineTableProcedure<MergeTableRegionsState> {
-  private static final Log LOG = LogFactory.getLog(MergeTableRegionsProcedure.class);
-
-  private Boolean traceEnabled;
-  private AssignmentManager assignmentManager;
-  private int timeout;
-  private ServerName regionLocation;
-  private String regionsToMergeListFullName;
-  private String regionsToMergeListEncodedName;
-
-  private HRegionInfo [] regionsToMerge;
-  private HRegionInfo mergedRegionInfo;
-  private boolean forcible;
-
-  public MergeTableRegionsProcedure() {
-    this.traceEnabled = isTraceEnabled();
-    this.assignmentManager = null;
-    this.timeout = -1;
-    this.regionLocation = null;
-    this.regionsToMergeListFullName = null;
-    this.regionsToMergeListEncodedName = null;
-  }
-
-  public MergeTableRegionsProcedure(
-      final MasterProcedureEnv env,
-      final HRegionInfo[] regionsToMerge,
-      final boolean forcible) throws IOException {
-    super(env);
-    this.traceEnabled = isTraceEnabled();
-    this.assignmentManager = getAssignmentManager(env);
-    // For now, we only merge 2 regions.  It could be extended to more than 2 regions in
-    // the future.
-    assert(regionsToMerge.length == 2);
-    assert(regionsToMerge[0].getTable() == regionsToMerge[1].getTable());
-    this.regionsToMerge = regionsToMerge;
-    this.forcible = forcible;
-
-    this.timeout = -1;
-    this.regionsToMergeListFullName = getRegionsToMergeListFullNameString();
-    this.regionsToMergeListEncodedName = getRegionsToMergeListEncodedNameString();
-
-    // Check daughter regions and make sure that we have valid daughter regions before
-    // doing the real work.
-    checkDaughterRegions();
-    // WARN: make sure there is no parent region of the two merging regions in
-    // hbase:meta If exists, fixing up daughters would cause daughter regions(we
-    // have merged one) online again when we restart master, so we should clear
-    // the parent region to prevent the above case
-    // Since HBASE-7721, we don't need fix up daughters any more. so here do
-    // nothing
-    setupMergedRegionInfo();
-  }
-
-  @Override
-  protected Flow executeFromState(
-      final MasterProcedureEnv env,
-      final MergeTableRegionsState state) throws InterruptedException {
-    if (isTraceEnabled()) {
-      LOG.trace(this + " execute state=" + state);
-    }
-
-    try {
-      switch (state) {
-      case MERGE_TABLE_REGIONS_PREPARE:
-        prepareMergeRegion(env);
-        setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS);
-        break;
-      case MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS:
-        if (MoveRegionsToSameRS(env)) {
-          setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION);
-        } else {
-          LOG.info("Cancel merging regions " + getRegionsToMergeListFullNameString()
-            + ", because can't move them to the same RS");
-          setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION);
-        }
-        break;
-      case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION:
-        preMergeRegions(env);
-        setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE);
-        break;
-      case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE:
-        setRegionStateToMerging(env);
-        setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CLOSE_REGIONS);
-        break;
-      case MERGE_TABLE_REGIONS_CLOSE_REGIONS:
-        closeRegionsForMerge(env);
-        setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CREATE_MERGED_REGION);
-        break;
-      case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
-        createMergedRegion(env);
-        setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION);
-        break;
-      case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION:
-        preMergeRegionsCommit(env);
-        setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_UPDATE_META);
-        break;
-      case MERGE_TABLE_REGIONS_UPDATE_META:
-        updateMetaForMergedRegions(env);
-        setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION);
-        break;
-      case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
-        postMergeRegionsCommit(env);
-        setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_OPEN_MERGED_REGION);
-        break;
-      case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
-        openMergedRegions(env);
-        setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION);
-        break;
-      case MERGE_TABLE_REGIONS_POST_OPERATION:
-        postCompletedMergeRegions(env);
-        return Flow.NO_MORE_STATE;
-      default:
-        throw new UnsupportedOperationException(this + " unhandled state=" + state);
-      }
-    } catch (IOException e) {
-      LOG.warn("Error trying to merge regions " + getRegionsToMergeListFullNameString() +
-        " in the table " + getTableName() + " (in state=" + state + ")", e);
-
-      setFailure("master-merge-regions", e);
-    }
-    return Flow.HAS_MORE_STATE;
-  }
-
-  @Override
-  protected void rollbackState(
-      final MasterProcedureEnv env,
-      final MergeTableRegionsState state) throws IOException, InterruptedException {
-    if (isTraceEnabled()) {
-      LOG.trace(this + " rollback state=" + state);
-    }
-
-    try {
-      switch (state) {
-      case MERGE_TABLE_REGIONS_POST_OPERATION:
-      case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
-      case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
-      case MERGE_TABLE_REGIONS_UPDATE_META:
-        String msg = this + " We are in the " + state + " state."
-            + " It is complicated to rollback the merge operation that region server is working on."
-            + " Rollback is not supported and we should let the merge operation to complete";
-        LOG.warn(msg);
-        // PONR
-        throw new UnsupportedOperationException(this + " unhandled state=" + state);
-      case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION:
-        break;
-      case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
-        cleanupMergedRegion(env);
-        break;
-      case MERGE_TABLE_REGIONS_CLOSE_REGIONS:
-        rollbackCloseRegionsForMerge(env);
-        break;
-      case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE:
-        setRegionStateToRevertMerging(env);
-        break;
-      case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION:
-        postRollBackMergeRegions(env);
-        break;
-      case MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS:
-        break; // nothing to rollback
-      case MERGE_TABLE_REGIONS_PREPARE:
-        break; // nothing to rollback
-      default:
-        throw new UnsupportedOperationException(this + " unhandled state=" + state);
-      }
-    } catch (Exception e) {
-      // This will be retried. Unless there is a bug in the code,
-      // this should be just a "temporary error" (e.g. network down)
-      LOG.warn("Failed rollback attempt step " + state + " for merging the regions "
-          + getRegionsToMergeListFullNameString() + " in table " + getTableName(), e);
-      throw e;
-    }
-  }
-
-  /*
-   * Check whether we are in the state that can be rollback
-   */
-  @Override
-  protected boolean isRollbackSupported(final MergeTableRegionsState state) {
-    switch (state) {
-    case MERGE_TABLE_REGIONS_POST_OPERATION:
-    case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
-    case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION:
-    case MERGE_TABLE_REGIONS_UPDATE_META:
-        // It is not safe to rollback if we reach to these states.
-        return false;
-      default:
-        break;
-    }
-    return true;
-  }
-
-  @Override
-  protected MergeTableRegionsState getState(final int stateId) {
-    return MergeTableRegionsState.forNumber(stateId);
-  }
-
-  @Override
-  protected int getStateId(final MergeTableRegionsState state) {
-    return state.getNumber();
-  }
-
-  @Override
-  protected MergeTableRegionsState getInitialState() {
-    return MergeTableRegionsState.MERGE_TABLE_REGIONS_PREPARE;
-  }
-
-  @Override
-  public void serializeStateData(final OutputStream stream) throws IOException {
-    super.serializeStateData(stream);
-
-    MasterProcedureProtos.MergeTableRegionsStateData.Builder mergeTableRegionsMsg =
-        MasterProcedureProtos.MergeTableRegionsStateData.newBuilder()
-        .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
-        .setMergedRegionInfo(HRegionInfo.convert(mergedRegionInfo))
-        .setForcible(forcible);
-    for (HRegionInfo hri: regionsToMerge) {
-      mergeTableRegionsMsg.addRegionInfo(HRegionInfo.convert(hri));
-    }
-    mergeTableRegionsMsg.build().writeDelimitedTo(stream);
-  }
-
-  @Override
-  public void deserializeStateData(final InputStream stream) throws IOException {
-    super.deserializeStateData(stream);
-
-    MasterProcedureProtos.MergeTableRegionsStateData mergeTableRegionsMsg =
-        MasterProcedureProtos.MergeTableRegionsStateData.parseDelimitedFrom(stream);
-    setUser(MasterProcedureUtil.toUserInfo(mergeTableRegionsMsg.getUserInfo()));
-
-    assert(mergeTableRegionsMsg.getRegionInfoCount() == 2);
-    regionsToMerge = new HRegionInfo[mergeTableRegionsMsg.getRegionInfoCount()];
-    for (int i = 0; i < regionsToMerge.length; i++) {
-      regionsToMerge[i] = HRegionInfo.convert(mergeTableRegionsMsg.getRegionInfo(i));
-    }
-
-    mergedRegionInfo = HRegionInfo.convert(mergeTableRegionsMsg.getMergedRegionInfo());
-  }
-
-  @Override
-  public void toStringClassDetails(StringBuilder sb) {
-    sb.append(getClass().getSimpleName());
-    sb.append(" (table=");
-    sb.append(getTableName());
-    sb.append(" regions=");
-    sb.append(getRegionsToMergeListFullNameString());
-    sb.append(" forcible=");
-    sb.append(forcible);
-    sb.append(")");
-  }
-
-  @Override
-  protected LockState acquireLock(final MasterProcedureEnv env) {
-    if (env.waitInitialized(this)) {
-      return LockState.LOCK_EVENT_WAIT;
-    }
-    return env.getProcedureScheduler().waitRegions(this, getTableName(),
-        regionsToMerge[0], regionsToMerge[1])?
-            LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED;
-  }
-
-  @Override
-  protected void releaseLock(final MasterProcedureEnv env) {
-    env.getProcedureScheduler().wakeRegions(this, getTableName(),
-        regionsToMerge[0], regionsToMerge[1]);
-  }
-
-  @Override
-  public TableName getTableName() {
-    return regionsToMerge[0].getTable();
-  }
-
-  @Override
-  public TableOperationType getTableOperationType() {
-    return TableOperationType.MERGE;
-  }
-
-  /**
-   * check daughter regions
-   * @throws IOException
-   */
-  private void checkDaughterRegions() throws IOException {
-    // Note: the following logic assumes that we only have 2 regions to merge.  In the future,
-    // if we want to extend to more than 2 regions, the code needs to modify a little bit.
-    //
-    if (regionsToMerge[0].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
-        regionsToMerge[1].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
-      throw new MergeRegionException("Can't merge non-default replicas");
-    }
-
-    if (!HRegionInfo.areAdjacent(regionsToMerge[0], regionsToMerge[1])) {
-      String msg = "Trying to merge non-adjacent regions "
-          + getRegionsToMergeListFullNameString() + " where forcible = " + forcible;
-      LOG.warn(msg);
-      if (!forcible) {
-        throw new DoNotRetryIOException(msg);
-      }
-    }
-  }
-
-  /**
-   * Prepare merge and do some check
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  private void prepareMergeRegion(final MasterProcedureEnv env) throws IOException {
-    // Note: the following logic assumes that we only have 2 regions to merge.  In the future,
-    // if we want to extend to more than 2 regions, the code needs to modify a little bit.
-    //
-    CatalogJanitor catalogJanitor = env.getMasterServices().getCatalogJanitor();
-    boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(regionsToMerge[0]);
-    if (regionAHasMergeQualifier
-        || !catalogJanitor.cleanMergeQualifier(regionsToMerge[1])) {
-      String msg = "Skip merging regions " + getRegionsToMergeListFullNameString()
-        + ", because region "
-        + (regionAHasMergeQualifier ? regionsToMerge[0].getEncodedName() : regionsToMerge[1]
-              .getEncodedName()) + " has merge qualifier";
-      LOG.warn(msg);
-      throw new MergeRegionException(msg);
-    }
-
-    RegionStates regionStates = getAssignmentManager(env).getRegionStates();
-    RegionState regionStateA = regionStates.getRegionState(regionsToMerge[0].getEncodedName());
-    RegionState regionStateB = regionStates.getRegionState(regionsToMerge[1].getEncodedName());
-    if (regionStateA == null || regionStateB == null) {
-      throw new UnknownRegionException(
-        regionStateA == null ?
-            regionsToMerge[0].getEncodedName() : regionsToMerge[1].getEncodedName());
-    }
-
-    if (!regionStateA.isOpened() || !regionStateB.isOpened()) {
-      throw new MergeRegionException(
-        "Unable to merge regions not online " + regionStateA + ", " + regionStateB);
-    }
-  }
-
-  /**
-   * Create merged region info through the specified two regions
-   */
-  private void setupMergedRegionInfo() {
-    long rid = EnvironmentEdgeManager.currentTime();
-    // Regionid is timestamp. Merged region's id can't be less than that of
-    // merging regions else will insert at wrong location in hbase:meta
-    if (rid < regionsToMerge[0].getRegionId() || rid < regionsToMerge[1].getRegionId()) {
-      LOG.warn("Clock skew; merging regions id are " + regionsToMerge[0].getRegionId()
-          + " and " + regionsToMerge[1].getRegionId() + ", but current time here is " + rid);
-      rid = Math.max(regionsToMerge[0].getRegionId(), regionsToMerge[1].getRegionId()) + 1;
-    }
-
-    byte[] startKey = null;
-    byte[] endKey = null;
-    // Choose the smaller as start key
-    if (regionsToMerge[0].compareTo(regionsToMerge[1]) <= 0) {
-      startKey = regionsToMerge[0].getStartKey();
-    } else {
-      startKey = regionsToMerge[1].getStartKey();
-    }
-    // Choose the bigger as end key
-    if (Bytes.equals(regionsToMerge[0].getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
-        || (!Bytes.equals(regionsToMerge[1].getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
-            && Bytes.compareTo(regionsToMerge[0].getEndKey(), regionsToMerge[1].getEndKey()) > 0)) {
-      endKey = regionsToMerge[0].getEndKey();
-    } else {
-      endKey = regionsToMerge[1].getEndKey();
-    }
-
-    // Merged region is sorted between two merging regions in META
-    mergedRegionInfo = new HRegionInfo(getTableName(), startKey, endKey, false, rid);
-  }
-
-  /**
-   * Move all regions to the same region server
-   * @param env MasterProcedureEnv
-   * @return whether target regions hosted by the same RS
-   * @throws IOException
-   */
-  private boolean MoveRegionsToSameRS(final MasterProcedureEnv env) throws IOException {
-    // Make sure regions are on the same regionserver before send merge
-    // regions request to region server.
-    //
-    boolean onSameRS = isRegionsOnTheSameServer(env);
-    if (!onSameRS) {
-      // Note: the following logic assumes that we only have 2 regions to merge.  In the future,
-      // if we want to extend to more than 2 regions, the code needs to modify a little bit.
-      //
-      RegionStates regionStates = getAssignmentManager(env).getRegionStates();
-      ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]);
-
-      RegionLoad loadOfRegionA = getRegionLoad(env, regionLocation, regionsToMerge[0]);
-      RegionLoad loadOfRegionB = getRegionLoad(env, regionLocation2, regionsToMerge[1]);
-      if (loadOfRegionA != null && loadOfRegionB != null
-          && loadOfRegionA.getRequestsCount() < loadOfRegionB.getRequestsCount()) {
-        // switch regionsToMerge[0] and regionsToMerge[1]
-        HRegionInfo tmpRegion = this.regionsToMerge[0];
-        this.regionsToMerge[0] = this.regionsToMerge[1];
-        this.regionsToMerge[1] = tmpRegion;
-        ServerName tmpLocation = regionLocation;
-        regionLocation = regionLocation2;
-        regionLocation2 = tmpLocation;
-      }
-
-      long startTime = EnvironmentEdgeManager.currentTime();
-
-      RegionPlan regionPlan = new RegionPlan(regionsToMerge[1], regionLocation2, regionLocation);
-      LOG.info("Moving regions to same server for merge: " + regionPlan.toString());
-      getAssignmentManager(env).balance(regionPlan);
-      do {
-        try {
-          Thread.sleep(20);
-          // Make sure check RIT first, then get region location, otherwise
-          // we would make a wrong result if region is online between getting
-          // region location and checking RIT
-          boolean isRIT = regionStates.isRegionInTransition(regionsToMerge[1]);
-          regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]);
-          onSameRS = regionLocation.equals(regionLocation2);
-          if (onSameRS || !isRIT) {
-            // Regions are on the same RS, or regionsToMerge[1] is not in
-            // RegionInTransition any more
-            break;
-          }
-        } catch (InterruptedException e) {
-          InterruptedIOException iioe = new InterruptedIOException();
-          iioe.initCause(e);
-          throw iioe;
-        }
-      } while ((EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env));
-    }
-    return onSameRS;
-  }
-
-  /**
-   * Pre merge region action
-   * @param env MasterProcedureEnv
-   **/
-  private void preMergeRegions(final MasterProcedureEnv env) throws IOException {
-    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
-    if (cpHost != null) {
-      boolean ret = cpHost.preMergeRegionsAction(regionsToMerge, getUser());
-      if (ret) {
-        throw new IOException(
-          "Coprocessor bypassing regions " + getRegionsToMergeListFullNameString() + " merge.");
-      }
-    }
-  }
-
-  /**
-   * Action after rollback a merge table regions action.
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  private void postRollBackMergeRegions(final MasterProcedureEnv env) throws IOException {
-    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
-    if (cpHost != null) {
-      cpHost.postRollBackMergeRegionsAction(regionsToMerge, getUser());
-    }
-  }
-
-  /**
-   * Set the region states to MERGING state
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  public void setRegionStateToMerging(final MasterProcedureEnv env) throws IOException {
-    RegionStateTransition.Builder transition = RegionStateTransition.newBuilder();
-    transition.setTransitionCode(TransitionCode.READY_TO_MERGE);
-    transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo));
-    transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0]));
-    transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1]));
-    if (env.getMasterServices().getAssignmentManager().onRegionTransition(
-      getServerName(env), transition.build()) != null) {
-      throw new IOException("Failed to update region state to MERGING for "
-          + getRegionsToMergeListFullNameString());
-    }
-  }
-
-  /**
-   * Rollback the region state change
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  private void setRegionStateToRevertMerging(final MasterProcedureEnv env) throws IOException {
-    RegionStateTransition.Builder transition = RegionStateTransition.newBuilder();
-    transition.setTransitionCode(TransitionCode.MERGE_REVERTED);
-    transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo));
-    transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0]));
-    transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1]));
-    String msg = env.getMasterServices().getAssignmentManager().onRegionTransition(
-      getServerName(env), transition.build());
-    if (msg != null) {
-      // If daughter regions are online, the msg is coming from RPC retry.  Ignore it.
-      RegionStates regionStates = getAssignmentManager(env).getRegionStates();
-      if (!regionStates.isRegionOnline(regionsToMerge[0]) ||
-          !regionStates.isRegionOnline(regionsToMerge[1])) {
-        throw new IOException("Failed to update region state for "
-          + getRegionsToMergeListFullNameString()
-          + " as part of operation for reverting merge.  Error message: " + msg);
-      }
-    }
-  }
-
-  /**
-   * Create merged region
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  private void createMergedRegion(final MasterProcedureEnv env) throws IOException {
-    final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
-    final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
-    final FileSystem fs = mfs.getFileSystem();
-    HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
-      env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false);
-    regionFs.createMergesDir();
-
-    mergeStoreFiles(env, regionFs, regionFs.getMergesDir());
-    HRegionFileSystem regionFs2 = HRegionFileSystem.openRegionFromFileSystem(
-      env.getMasterConfiguration(), fs, tabledir, regionsToMerge[1], false);
-    mergeStoreFiles(env, regionFs2, regionFs.getMergesDir());
-
-    regionFs.commitMergedRegion(mergedRegionInfo);
-  }
-
-  /**
-   * Create reference file(s) of merging regions under the merges directory
-   * @param env MasterProcedureEnv
-   * @param regionFs region file system
-   * @param mergedDir the temp directory of merged region
-   * @throws IOException
-   */
-  private void mergeStoreFiles(
-      final MasterProcedureEnv env, final HRegionFileSystem regionFs, final Path mergedDir)
-      throws IOException {
-    final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
-    final Configuration conf = env.getMasterConfiguration();
-    final HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
-
-    for (String family: regionFs.getFamilies()) {
-      final HColumnDescriptor hcd = htd.getFamily(family.getBytes());
-      final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
-
-      if (storeFiles != null && storeFiles.size() > 0) {
-        final CacheConfig cacheConf = new CacheConfig(conf, hcd);
-        for (StoreFileInfo storeFileInfo: storeFiles) {
-          // Create reference file(s) of the region in mergedDir
-          regionFs.mergeStoreFile(mergedRegionInfo, family, new StoreFile(mfs.getFileSystem(),
-              storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), true),
-            mergedDir);
-        }
-      }
-    }
-  }
-
-  /**
-   * Clean up merged region
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  private void cleanupMergedRegion(final MasterProcedureEnv env) throws IOException {
-    final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
-    final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
-    final FileSystem fs = mfs.getFileSystem();
-    HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
-      env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false);
-    regionFs.cleanupMergedRegion(mergedRegionInfo);
-  }
-
-  /**
-   * RPC to region server that host the regions to merge, ask for close these regions
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  private void closeRegionsForMerge(final MasterProcedureEnv env) throws IOException {
-    boolean success = env.getMasterServices().getServerManager().sendRegionCloseForSplitOrMerge(
-      getServerName(env), regionsToMerge[0], regionsToMerge[1]);
-    if (!success) {
-      throw new IOException("Close regions " + getRegionsToMergeListFullNameString()
-          + " for merging failed. Check region server log for more details.");
-    }
-  }
-
-  /**
-   * Rollback close regions
-   * @param env MasterProcedureEnv
-   **/
-  private void rollbackCloseRegionsForMerge(final MasterProcedureEnv env) throws IOException {
-    // Check whether the region is closed; if so, open it in the same server
-    RegionStates regionStates = getAssignmentManager(env).getRegionStates();
-    for(int i = 1; i < regionsToMerge.length; i++) {
-      RegionState state = regionStates.getRegionState(regionsToMerge[i]);
-      if (state != null && (state.isClosing() || state.isClosed())) {
-        env.getMasterServices().getServerManager().sendRegionOpen(
-          getServerName(env),
-          regionsToMerge[i],
-          ServerName.EMPTY_SERVER_LIST);
-      }
-    }
-  }
-
-  /**
-   * Post merge region action
-   * @param env MasterProcedureEnv
-   **/
-  private void preMergeRegionsCommit(final MasterProcedureEnv env) throws IOException {
-    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
-    if (cpHost != null) {
-      @MetaMutationAnnotation
-      final List<Mutation> metaEntries = new ArrayList<>();
-      boolean ret = cpHost.preMergeRegionsCommit(regionsToMerge, metaEntries, getUser());
-
-      if (ret) {
-        throw new IOException(
-          "Coprocessor bypassing regions " + getRegionsToMergeListFullNameString() + " merge.");
-      }
-      try {
-        for (Mutation p : metaEntries) {
-          HRegionInfo.parseRegionName(p.getRow());
-        }
-      } catch (IOException e) {
-        LOG.error("Row key of mutation from coprocessor is not parsable as region name."
-          + "Mutations from coprocessor should only be for hbase:meta table.", e);
-        throw e;
-      }
-    }
-  }
-
-  /**
-   * Add merged region to META and delete original regions.
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  private void updateMetaForMergedRegions(final MasterProcedureEnv env) throws IOException {
-    RegionStateTransition.Builder transition = RegionStateTransition.newBuilder();
-    transition.setTransitionCode(TransitionCode.MERGE_PONR);
-    transition.addRegionInfo(HRegionInfo.convert(mergedRegionInfo));
-    transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[0]));
-    transition.addRegionInfo(HRegionInfo.convert(regionsToMerge[1]));
-    // Add merged region and delete original regions
-    // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region
-    // will determine whether the region is merged or not in case of failures.
-    if (env.getMasterServices().getAssignmentManager().onRegionTransition(
-      getServerName(env), transition.build()) != null) {
-      throw new IOException("Failed to update meta to add merged region that merges "
-          + getRegionsToMergeListFullNameString());
-    }
-  }
-
-  /**
-   * Post merge region action
-   * @param env MasterProcedureEnv
-   **/
-  private void postMergeRegionsCommit(final MasterProcedureEnv env) throws IOException {
-    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
-    if (cpHost != null) {
-      cpHost.postMergeRegionsCommit(regionsToMerge, mergedRegionInfo, getUser());
-    }
-  }
-
-  /**
-   * Assign merged region
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   * @throws InterruptedException
-   **/
-  private void openMergedRegions(final MasterProcedureEnv env)
-      throws IOException, InterruptedException {
-    // Check whether the merged region is already opened; if so,
-    // this is retry and we should just ignore.
-    RegionState regionState =
-        getAssignmentManager(env).getRegionStates().getRegionState(mergedRegionInfo);
-    if (regionState != null && regionState.isOpened()) {
-      LOG.info("Skip opening merged region " + mergedRegionInfo.getRegionNameAsString()
-        + " as it is already opened.");
-      return;
-    }
-
-    // TODO: The new AM should provide an API to force assign the merged region to the same RS
-    // as daughter regions; if the RS is unavailable, then assign to a different RS.
-    env.getMasterServices().getAssignmentManager().assignMergedRegion(
-      mergedRegionInfo, regionsToMerge[0], regionsToMerge[1]);
-  }
-
-  /**
-   * Post merge region action
-   * @param env MasterProcedureEnv
-   **/
-  private void postCompletedMergeRegions(final MasterProcedureEnv env) throws IOException {
-    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
-    if (cpHost != null) {
-      cpHost.postCompletedMergeRegionsAction(regionsToMerge, mergedRegionInfo, getUser());
-    }
-  }
-
-  private RegionLoad getRegionLoad(
-      final MasterProcedureEnv env,
-      final ServerName sn,
-      final HRegionInfo hri) {
-    ServerManager serverManager =  env.getMasterServices().getServerManager();
-    ServerLoad load = serverManager.getLoad(sn);
-    if (load != null) {
-      Map<byte[], RegionLoad> regionsLoad = load.getRegionsLoad();
-      if (regionsLoad != null) {
-        return regionsLoad.get(hri.getRegionName());
-      }
-    }
-    return null;
-  }
-
-  /**
-   * The procedure could be restarted from a different machine. If the variable is null, we need to
-   * retrieve it.
-   * @param env MasterProcedureEnv
-   * @return whether target regions hosted by the same RS
-   */
-  private boolean isRegionsOnTheSameServer(final MasterProcedureEnv env) throws IOException{
-    Boolean onSameRS = true;
-    int i = 0;
-    RegionStates regionStates = getAssignmentManager(env).getRegionStates();
-    regionLocation = regionStates.getRegionServerOfRegion(regionsToMerge[i]);
-    if (regionLocation != null) {
-      for(i = 1; i < regionsToMerge.length; i++) {
-        ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[i]);
-        if (regionLocation2 != null) {
-          if (onSameRS) {
-            onSameRS = regionLocation.equals(regionLocation2);
-          }
-        } else {
-          // At least one region is not online, merge will fail, no need to continue.
-          break;
-        }
-      }
-      if (i == regionsToMerge.length) {
-        // Finish checking all regions, return the result;
-        return onSameRS;
-      }
-    }
-
-    // If reaching here, at least one region is not online.
-    String msg = "Skip merging regions " + getRegionsToMergeListFullNameString() +
-        ", because region " + regionsToMerge[i].getEncodedName() + " is not online now.";
-    LOG.warn(msg);
-    throw new IOException(msg);
-  }
-
-  /**
-   * The procedure could be restarted from a different machine. If the variable is null, we need to
-   * retrieve it.
-   * @param env MasterProcedureEnv
-   * @return assignmentManager
-   */
-  private AssignmentManager getAssignmentManager(final MasterProcedureEnv env) {
-    if (assignmentManager == null) {
-      assignmentManager = env.getMasterServices().getAssignmentManager();
-    }
-    return assignmentManager;
-  }
-
-  /**
-   * The procedure could be restarted from a different machine. If the variable is null, we need to
-   * retrieve it.
-   * @param env MasterProcedureEnv
-   * @return timeout value
-   */
-  private int getTimeout(final MasterProcedureEnv env) {
-    if (timeout == -1) {
-      timeout = env.getMasterConfiguration().getInt(
-        "hbase.master.regionmerge.timeout", regionsToMerge.length * 60 * 1000);
-    }
-    return timeout;
-  }
-
-  /**
-   * The procedure could be restarted from a different machine. If the variable is null, we need to
-   * retrieve it.
-   * @param env MasterProcedureEnv
-   * @return serverName
-   */
-  private ServerName getServerName(final MasterProcedureEnv env) {
-    if (regionLocation == null) {
-      regionLocation =
-          getAssignmentManager(env).getRegionStates().getRegionServerOfRegion(regionsToMerge[0]);
-    }
-    return regionLocation;
-  }
-
-  /**
-   * The procedure could be restarted from a different machine. If the variable is null, we need to
-   * retrieve it.
-   * @param fullName whether return only encoded name
-   * @return region names in a list
-   */
-  private String getRegionsToMergeListFullNameString() {
-    if (regionsToMergeListFullName == null) {
-      StringBuilder sb = new StringBuilder("[");
-      int i = 0;
-      while(i < regionsToMerge.length - 1) {
-        sb.append(regionsToMerge[i].getRegionNameAsString() + ", ");
-        i++;
-      }
-      sb.append(regionsToMerge[i].getRegionNameAsString() + " ]");
-      regionsToMergeListFullName = sb.toString();
-    }
-    return regionsToMergeListFullName;
-  }
-
-  /**
-   * The procedure could be restarted from a different machine. If the variable is null, we need to
-   * retrieve it.
-   * @return encoded region names
-   */
-  private String getRegionsToMergeListEncodedNameString() {
-    if (regionsToMergeListEncodedName == null) {
-      StringBuilder sb = new StringBuilder("[");
-      int i = 0;
-      while(i < regionsToMerge.length - 1) {
-        sb.append(regionsToMerge[i].getEncodedName() + ", ");
-        i++;
-      }
-      sb.append(regionsToMerge[i].getEncodedName() + " ]");
-      regionsToMergeListEncodedName = sb.toString();
-    }
-    return regionsToMergeListEncodedName;
-  }
-
-  /**
-   * The procedure could be restarted from a different machine. If the variable is null, we need to
-   * retrieve it.
-   * @return traceEnabled
-   */
-  private Boolean isTraceEnabled() {
-    if (traceEnabled == null) {
-      traceEnabled = LOG.isTraceEnabled();
-    }
-    return traceEnabled;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
index 52bb4d5..622c19f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
@@ -21,17 +21,14 @@ package org.apache.hadoop.hbase.master.procedure;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.InvalidFamilyOperationException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@@ -97,7 +94,9 @@ public class ModifyColumnFamilyProcedure
         setNextState(ModifyColumnFamilyState.MODIFY_COLUMN_FAMILY_REOPEN_ALL_REGIONS);
         break;
       case MODIFY_COLUMN_FAMILY_REOPEN_ALL_REGIONS:
-        reOpenAllRegionsIfTableIsOnline(env);
+        if (env.getAssignmentManager().isTableEnabled(getTableName())) {
+          addChildProcedure(env.getAssignmentManager().createReopenProcedures(getTableName()));
+        }
         return Flow.NO_MORE_STATE;
       default:
         throw new UnsupportedOperationException(this + " unhandled state=" + state);
@@ -265,7 +264,8 @@ public class ModifyColumnFamilyProcedure
     env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor);
 
     // Make sure regions are opened after table descriptor is updated.
-    reOpenAllRegionsIfTableIsOnline(env);
+    //reOpenAllRegionsIfTableIsOnline(env);
+    // TODO: NUKE ROLLBACK!!!!
   }
 
   /**
@@ -281,26 +281,6 @@ public class ModifyColumnFamilyProcedure
   }
 
   /**
-   * Last action from the procedure - executed when online schema change is supported.
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException {
-    // This operation only run when the table is enabled.
-    if (!env.getMasterServices().getTableStateManager()
-        .isTableState(getTableName(), TableState.State.ENABLED)) {
-      return;
-    }
-
-    List<HRegionInfo> regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
-    if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), regionInfoList)) {
-      LOG.info("Completed add column family operation on table " + getTableName());
-    } else {
-      LOG.warn("Error on reopening the regions on table " + getTableName());
-    }
-  }
-
-  /**
    * The procedure could be restarted from a different machine. If the variable is null, we need to
    * retrieve it.
    * @return traceEnabled

http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 6a70f62..20a6a03 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -120,7 +120,10 @@ public class ModifyTableProcedure
         setNextState(ModifyTableState.MODIFY_TABLE_REOPEN_ALL_REGIONS);
         break;
       case MODIFY_TABLE_REOPEN_ALL_REGIONS:
-        reOpenAllRegionsIfTableIsOnline(env);
+        if (env.getAssignmentManager().isTableEnabled(getTableName())) {
+          addChildProcedure(env.getAssignmentManager()
+            .createReopenProcedures(getRegionInfoList(env)));
+        }
         return Flow.NO_MORE_STATE;
       default:
         throw new UnsupportedOperationException("unhandled state=" + state);
@@ -299,7 +302,8 @@ public class ModifyTableProcedure
     deleteFromFs(env, modifiedHTableDescriptor, unmodifiedHTableDescriptor);
 
     // Make sure regions are opened after table descriptor is updated.
-    reOpenAllRegionsIfTableIsOnline(env);
+    //reOpenAllRegionsIfTableIsOnline(env);
+    // TODO: NUKE ROLLBACK!!!!
   }
 
   /**
@@ -374,25 +378,6 @@ public class ModifyTableProcedure
   }
 
   /**
-   * Last action from the procedure - executed when online schema change is supported.
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException {
-    // This operation only run when the table is enabled.
-    if (!env.getMasterServices().getTableStateManager()
-        .isTableState(getTableName(), TableState.State.ENABLED)) {
-      return;
-    }
-
-    if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) {
-      LOG.info("Completed modify table operation on table " + getTableName());
-    } else {
-      LOG.warn("Error on reopening the regions on table " + getTableName());
-    }
-  }
-
-  /**
    * The procedure could be restarted from a different machine. If the variable is null, we need to
    * retrieve it.
    * @return traceEnabled whether the trace is enabled
@@ -430,7 +415,8 @@ public class ModifyTableProcedure
 
   private List<HRegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException {
     if (regionInfoList == null) {
-      regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
+      regionInfoList = env.getAssignmentManager().getRegionStates()
+          .getRegionsOfTable(getTableName());
     }
     return regionInfoList;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
index 3777c79..21bd6c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
@@ -21,30 +21,26 @@ package org.apache.hadoop.hbase.master.procedure;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CoordinatedStateException;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
 import org.apache.hadoop.hbase.ProcedureInfo;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.master.RegionStates;
-import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 
 /**
  * Helper to synchronously wait on conditions.
@@ -64,19 +60,93 @@ public final class ProcedureSyncWait {
     T evaluate() throws IOException;
   }
 
+  private static class ProcedureFuture implements Future<byte[]> {
+      private final ProcedureExecutor<MasterProcedureEnv> procExec;
+      private final long procId;
+
+      private boolean hasResult = false;
+      private byte[] result = null;
+
+      public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, long procId) {
+        this.procExec = procExec;
+        this.procId = procId;
+      }
+
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning) { return false; }
+
+      @Override
+      public boolean isCancelled() { return false; }
+
+      @Override
+      public boolean isDone() { return hasResult; }
+
+      @Override
+      public byte[] get() throws InterruptedException, ExecutionException {
+        if (hasResult) return result;
+        try {
+          return waitForProcedureToComplete(procExec, procId, Long.MAX_VALUE);
+        } catch (Exception e) {
+          throw new ExecutionException(e);
+        }
+      }
+
+      @Override
+      public byte[] get(long timeout, TimeUnit unit)
+          throws InterruptedException, ExecutionException, TimeoutException {
+        if (hasResult) return result;
+        try {
+          result = waitForProcedureToComplete(procExec, procId, unit.toMillis(timeout));
+          hasResult = true;
+          return result;
+        } catch (TimeoutIOException e) {
+          throw new TimeoutException(e.getMessage());
+        } catch (Exception e) {
+          throw new ExecutionException(e);
+        }
+      }
+    }
+
+  public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec,
+      final Procedure proc) {
+    if (proc.isInitializing()) {
+      procExec.submitProcedure(proc);
+    }
+    return new ProcedureFuture(procExec, proc.getProcId());
+  }
+
   public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec,
       final Procedure proc) throws IOException {
-    long procId = procExec.submitProcedure(proc);
-    return waitForProcedureToComplete(procExec, procId);
+    if (proc.isInitializing()) {
+      procExec.submitProcedure(proc);
+    }
+    return waitForProcedureToCompleteIOE(procExec, proc.getProcId(), Long.MAX_VALUE);
   }
 
-  private static byte[] waitForProcedureToComplete(ProcedureExecutor<MasterProcedureEnv> procExec,
-      final long procId) throws IOException {
-    while (!procExec.isFinished(procId) && procExec.isRunning()) {
-      // TODO: add a config to make it tunable
-      // Dev Consideration: are we waiting forever, or we can set up some timeout value?
-      Threads.sleepWithoutInterrupt(250);
+  public static byte[] waitForProcedureToCompleteIOE(
+      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final long timeout)
+  throws IOException {
+    try {
+      return waitForProcedureToComplete(procExec, procId, timeout);
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException(e);
     }
+  }
+
+  public static byte[] waitForProcedureToComplete(
+      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final long timeout)
+      throws IOException {
+    waitFor(procExec.getEnvironment(), "procId=" + procId,
+      new ProcedureSyncWait.Predicate<Boolean>() {
+        @Override
+        public Boolean evaluate() throws IOException {
+          return !procExec.isRunning() || procExec.isFinished(procId);
+        }
+      }
+    );
+
     ProcedureInfo result = procExec.getResult(procId);
     if (result != null) {
       if (result.isFailed()) {
@@ -104,6 +174,7 @@ public final class ProcedureSyncWait {
   public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents,
       String purpose, Predicate<T> predicate) throws IOException {
     final long done = EnvironmentEdgeManager.currentTime() + waitTime;
+    boolean logged = false;
     do {
       T result = predicate.evaluate();
       if (result != null && !result.equals(Boolean.FALSE)) {
@@ -115,7 +186,12 @@ public final class ProcedureSyncWait {
         LOG.warn("Interrupted while sleeping, waiting on " + purpose);
         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
       }
-      LOG.debug("Waiting on " + purpose);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("waitFor " + purpose);
+      } else {
+        if (!logged) LOG.debug("waitFor " + purpose);
+      }
+      logged = true;
     } while (EnvironmentEdgeManager.currentTime() < done && env.isRunning());
 
     throw new TimeoutIOException("Timed out while waiting on " + purpose);
@@ -133,44 +209,14 @@ public final class ProcedureSyncWait {
     }
   }
 
-  protected static void waitRegionServers(final MasterProcedureEnv env) throws IOException {
-    final ServerManager sm = env.getMasterServices().getServerManager();
-    ProcedureSyncWait.waitFor(env, "server to assign region(s)",
-        new ProcedureSyncWait.Predicate<Boolean>() {
-      @Override
-      public Boolean evaluate() throws IOException {
-        List<ServerName> servers = sm.createDestinationServersList();
-        return servers != null && !servers.isEmpty();
-      }
-    });
-  }
-
-  protected static List<HRegionInfo> getRegionsFromMeta(final MasterProcedureEnv env,
-      final TableName tableName) throws IOException {
-    return ProcedureSyncWait.waitFor(env, "regions of table=" + tableName + " from meta",
-        new ProcedureSyncWait.Predicate<List<HRegionInfo>>() {
-      @Override
-      public List<HRegionInfo> evaluate() throws IOException {
-        if (TableName.META_TABLE_NAME.equals(tableName)) {
-          return new MetaTableLocator().getMetaRegions(env.getMasterServices().getZooKeeper());
-        }
-        return MetaTableAccessor.getTableRegions(env.getMasterServices().getConnection(),tableName);
-      }
-    });
-  }
-
   protected static void waitRegionInTransition(final MasterProcedureEnv env,
       final List<HRegionInfo> regions) throws IOException, CoordinatedStateException {
-    final AssignmentManager am = env.getMasterServices().getAssignmentManager();
-    final RegionStates states = am.getRegionStates();
+    final RegionStates states = env.getAssignmentManager().getRegionStates();
     for (final HRegionInfo region : regions) {
       ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition",
           new ProcedureSyncWait.Predicate<Boolean>() {
         @Override
         public Boolean evaluate() throws IOException {
-          if (states.isRegionInState(region, State.FAILED_OPEN)) {
-            am.regionOffline(region);
-          }
           return !states.isRegionInTransition(region);
         }
       });

http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
new file mode 100644
index 0000000..887e272
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -0,0 +1,541 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import com.google.common.collect.ArrayListMultimap;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerListener;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * A remote procecdure dispatcher for regionservers.
+ */
+public class RSProcedureDispatcher
+    extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName>
+    implements ServerListener {
+  private static final Log LOG = LogFactory.getLog(RSProcedureDispatcher.class);
+
+  public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY =
+      "hbase.regionserver.rpc.startup.waittime";
+  private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
+
+  private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0201000; // 2.1
+
+  protected final MasterServices master;
+  protected final long rsStartupWaitTime;
+
+  public RSProcedureDispatcher(final MasterServices master) {
+    super(master.getConfiguration());
+
+    this.master = master;
+    this.rsStartupWaitTime = master.getConfiguration().getLong(
+      RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME);
+  }
+
+  @Override
+  public boolean start() {
+    if (!super.start()) {
+      return false;
+    }
+
+    master.getServerManager().registerListener(this);
+    for (ServerName serverName: master.getServerManager().getOnlineServersList()) {
+      addNode(serverName);
+    }
+    return true;
+  }
+
+  @Override
+  public boolean stop() {
+    if (!super.stop()) {
+      return false;
+    }
+
+    master.getServerManager().unregisterListener(this);
+    return true;
+  }
+
+  @Override
+  protected void remoteDispatch(final ServerName serverName,
+      final Set<RemoteProcedure> operations) {
+    final int rsVersion = master.getAssignmentManager().getServerVersion(serverName);
+    if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) {
+      LOG.info(String.format(
+        "Using procedure batch rpc execution for serverName=%s version=%s",
+        serverName, rsVersion));
+      submitTask(new ExecuteProceduresRemoteCall(serverName, operations));
+    } else {
+      LOG.info(String.format(
+        "Fallback to compat rpc execution for serverName=%s version=%s",
+        serverName, rsVersion));
+      submitTask(new CompatRemoteProcedureResolver(serverName, operations));
+    }
+  }
+
+  protected void abortPendingOperations(final ServerName serverName,
+      final Set<RemoteProcedure> operations) {
+    // TODO: Replace with a ServerNotOnlineException()
+    final IOException e = new DoNotRetryIOException("server not online " + serverName);
+    final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+    for (RemoteProcedure proc: operations) {
+      proc.remoteCallFailed(env, serverName, e);
+    }
+  }
+
+  public void serverAdded(final ServerName serverName) {
+    addNode(serverName);
+  }
+
+  public void serverRemoved(final ServerName serverName) {
+    removeNode(serverName);
+  }
+
+  /**
+   * Base remote call
+   */
+  protected abstract class AbstractRSRemoteCall implements Callable<Void> {
+    private final ServerName serverName;
+
+    private int numberOfAttemptsSoFar = 0;
+    private long maxWaitTime = -1;
+
+    public AbstractRSRemoteCall(final ServerName serverName) {
+      this.serverName = serverName;
+    }
+
+    public abstract Void call();
+
+    protected AdminService.BlockingInterface getRsAdmin() throws IOException {
+      final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
+      if (admin == null) {
+        throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
+          " failed because no RPC connection found to this server");
+      }
+      return admin;
+    }
+
+    protected ServerName getServerName() {
+      return serverName;
+    }
+
+    protected boolean scheduleForRetry(final IOException e) {
+      // Should we wait a little before retrying? If the server is starting it's yes.
+      final boolean hold = (e instanceof ServerNotRunningYetException);
+      if (hold) {
+        LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d",
+            serverName, numberOfAttemptsSoFar), e);
+        long now = EnvironmentEdgeManager.currentTime();
+        if (now < getMaxWaitTime()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("server is not yet up; waiting up to %dms",
+              (getMaxWaitTime() - now)), e);
+          }
+          submitTask(this, 100, TimeUnit.MILLISECONDS);
+          return true;
+        }
+
+        LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e);
+        return false;
+      }
+
+      // In case socket is timed out and the region server is still online,
+      // the openRegion RPC could have been accepted by the server and
+      // just the response didn't go through.  So we will retry to
+      // open the region on the same server.
+      final boolean retry = !hold && (e instanceof SocketTimeoutException
+          && master.getServerManager().isServerOnline(serverName));
+      if (retry) {
+        // we want to retry as many times as needed as long as the RS is not dead.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Retrying to same RegionServer %s because: %s",
+              serverName, e.getMessage()), e);
+        }
+        submitTask(this);
+        return true;
+      }
+
+      // trying to send the request elsewhere instead
+      LOG.warn(String.format("the request should be tried elsewhere instead; server=%s try=%d",
+                  serverName, numberOfAttemptsSoFar), e);
+      return false;
+    }
+
+    private long getMaxWaitTime() {
+      if (this.maxWaitTime < 0) {
+        // This is the max attempts, not retries, so it should be at least 1.
+        this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
+      }
+      return this.maxWaitTime;
+    }
+
+    protected IOException unwrapException(IOException e) {
+      if (e instanceof RemoteException) {
+        e = ((RemoteException)e).unwrapRemoteException();
+      }
+      return e;
+    }
+  }
+
+  private interface RemoteProcedureResolver {
+    void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
+    void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
+  }
+
+  public void splitAndResolveOperation(final ServerName serverName,
+      final Set<RemoteProcedure> operations, final RemoteProcedureResolver resolver) {
+    final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+    final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
+      buildAndGroupRequestByType(env, serverName, operations);
+
+    final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
+    if (!openOps.isEmpty()) resolver.dispatchOpenRequests(env, openOps);
+
+    final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
+    if (!closeOps.isEmpty()) resolver.dispatchCloseRequests(env, closeOps);
+
+    if (!reqsByType.isEmpty()) {
+      LOG.warn("unknown request type in the queue: " + reqsByType);
+    }
+  }
+
+  // ==========================================================================
+  //  Compatibility calls
+  // ==========================================================================
+  protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall
+      implements RemoteProcedureResolver {
+    private final Set<RemoteProcedure> operations;
+
+    private ExecuteProceduresRequest.Builder request = null;
+
+    public ExecuteProceduresRemoteCall(final ServerName serverName,
+        final Set<RemoteProcedure> operations) {
+      super(serverName);
+      this.operations = operations;
+    }
+
+    public Void call() {
+      final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+
+      request = ExecuteProceduresRequest.newBuilder();
+      splitAndResolveOperation(getServerName(), operations, this);
+
+      try {
+        final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build());
+        remoteCallCompleted(env, response);
+      } catch (IOException e) {
+        e = unwrapException(e);
+        // TODO: In the future some operation may want to bail out early.
+        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
+        if (!scheduleForRetry(e)) {
+          remoteCallFailed(env, e);
+        }
+      }
+      return null;
+    }
+
+    public void dispatchOpenRequests(final MasterProcedureEnv env,
+        final List<RegionOpenOperation> operations) {
+      request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
+    }
+
+    public void dispatchCloseRequests(final MasterProcedureEnv env,
+        final List<RegionCloseOperation> operations) {
+      for (RegionCloseOperation op: operations) {
+        request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
+      }
+    }
+
+    protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
+        final ExecuteProceduresRequest request) throws IOException {
+      try {
+        return getRsAdmin().executeProcedures(null, request);
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
+      }
+    }
+
+
+    private void remoteCallCompleted(final MasterProcedureEnv env,
+        final ExecuteProceduresResponse response) {
+      /*
+      for (RemoteProcedure proc: operations) {
+        proc.remoteCallCompleted(env, getServerName(), response);
+      }*/
+    }
+
+    private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
+      for (RemoteProcedure proc: operations) {
+        proc.remoteCallFailed(env, getServerName(), e);
+      }
+    }
+  }
+
+  // ==========================================================================
+  //  Compatibility calls
+  //  Since we don't have a "batch proc-exec" request on the target RS
+  //  we have to chunk the requests by type and dispatch the specific request.
+  // ==========================================================================
+  private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
+      final ServerName serverName, final List<RegionOpenOperation> operations) {
+    final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
+    builder.setServerStartCode(serverName.getStartcode());
+    builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
+    for (RegionOpenOperation op: operations) {
+      builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
+    }
+    return builder.build();
+  }
+
+  private final class OpenRegionRemoteCall extends AbstractRSRemoteCall {
+    private final List<RegionOpenOperation> operations;
+
+    public OpenRegionRemoteCall(final ServerName serverName,
+        final List<RegionOpenOperation> operations) {
+      super(serverName);
+      this.operations = operations;
+    }
+
+    @Override
+    public Void call() {
+      final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+      final OpenRegionRequest request = buildOpenRegionRequest(env, getServerName(), operations);
+
+      try {
+        OpenRegionResponse response = sendRequest(getServerName(), request);
+        remoteCallCompleted(env, response);
+      } catch (IOException e) {
+        e = unwrapException(e);
+        // TODO: In the future some operation may want to bail out early.
+        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
+        if (!scheduleForRetry(e)) {
+          remoteCallFailed(env, e);
+        }
+      }
+      return null;
+    }
+
+    private OpenRegionResponse sendRequest(final ServerName serverName,
+        final OpenRegionRequest request) throws IOException {
+      try {
+        return getRsAdmin().openRegion(null, request);
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
+      }
+    }
+
+    private void remoteCallCompleted(final MasterProcedureEnv env,
+        final OpenRegionResponse response) {
+      int index = 0;
+      for (RegionOpenOperation op: operations) {
+        OpenRegionResponse.RegionOpeningState state = response.getOpeningState(index++);
+        op.setFailedOpen(state == OpenRegionResponse.RegionOpeningState.FAILED_OPENING);
+        op.getRemoteProcedure().remoteCallCompleted(env, getServerName(), op);
+      }
+    }
+
+    private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
+      for (RegionOpenOperation op: operations) {
+        op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
+      }
+    }
+  }
+
+  private final class CloseRegionRemoteCall extends AbstractRSRemoteCall {
+    private final RegionCloseOperation operation;
+
+    public CloseRegionRemoteCall(final ServerName serverName,
+        final RegionCloseOperation operation) {
+      super(serverName);
+      this.operation = operation;
+    }
+
+    @Override
+    public Void call() {
+      final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+      final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName());
+      try {
+        CloseRegionResponse response = sendRequest(getServerName(), request);
+        remoteCallCompleted(env, response);
+      } catch (IOException e) {
+        e = unwrapException(e);
+        // TODO: In the future some operation may want to bail out early.
+        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
+        if (!scheduleForRetry(e)) {
+          remoteCallFailed(env, e);
+        }
+      }
+      return null;
+    }
+
+    private CloseRegionResponse sendRequest(final ServerName serverName,
+        final CloseRegionRequest request) throws IOException {
+      try {
+        return getRsAdmin().closeRegion(null, request);
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
+      }
+    }
+
+    private void remoteCallCompleted(final MasterProcedureEnv env,
+        final CloseRegionResponse response) {
+      operation.setClosed(response.getClosed());
+      operation.getRemoteProcedure().remoteCallCompleted(env, getServerName(), operation);
+    }
+
+    private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
+      operation.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
+    }
+  }
+
+  protected class CompatRemoteProcedureResolver implements Callable<Void>, RemoteProcedureResolver {
+    private final Set<RemoteProcedure> operations;
+    private final ServerName serverName;
+
+    public CompatRemoteProcedureResolver(final ServerName serverName,
+        final Set<RemoteProcedure> operations) {
+      this.serverName = serverName;
+      this.operations = operations;
+    }
+
+    @Override
+    public Void call() {
+      splitAndResolveOperation(serverName, operations, this);
+      return null;
+    }
+
+    public void dispatchOpenRequests(final MasterProcedureEnv env,
+        final List<RegionOpenOperation> operations) {
+      submitTask(new OpenRegionRemoteCall(serverName, operations));
+    }
+
+    public void dispatchCloseRequests(final MasterProcedureEnv env,
+        final List<RegionCloseOperation> operations) {
+      for (RegionCloseOperation op: operations) {
+        submitTask(new CloseRegionRemoteCall(serverName, op));
+      }
+    }
+  }
+
+  // ==========================================================================
+  //  RPC Messages
+  //  - ServerOperation: refreshConfig, grant, revoke, ...
+  //  - RegionOperation: open, close, flush, snapshot, ...
+  // ==========================================================================
+  public static abstract class ServerOperation extends RemoteOperation {
+    protected ServerOperation(final RemoteProcedure remoteProcedure) {
+      super(remoteProcedure);
+    }
+  }
+
+  public static abstract class RegionOperation extends RemoteOperation {
+    private final HRegionInfo regionInfo;
+
+    protected RegionOperation(final RemoteProcedure remoteProcedure,
+        final HRegionInfo regionInfo) {
+      super(remoteProcedure);
+      this.regionInfo = regionInfo;
+    }
+
+    public HRegionInfo getRegionInfo() {
+      return this.regionInfo;
+    }
+  }
+
+  public static class RegionOpenOperation extends RegionOperation {
+    private final List<ServerName> favoredNodes;
+    private final boolean openForReplay;
+    private boolean failedOpen;
+
+    public RegionOpenOperation(final RemoteProcedure remoteProcedure,
+        final HRegionInfo regionInfo, final List<ServerName> favoredNodes,
+        final boolean openForReplay) {
+      super(remoteProcedure, regionInfo);
+      this.favoredNodes = favoredNodes;
+      this.openForReplay = openForReplay;
+    }
+
+    protected void setFailedOpen(final boolean failedOpen) {
+      this.failedOpen = failedOpen;
+    }
+
+    public boolean isFailedOpen() {
+      return failedOpen;
+    }
+
+    public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest(
+        final MasterProcedureEnv env) {
+      return RequestConverter.buildRegionOpenInfo(getRegionInfo(),
+        env.getAssignmentManager().getFavoredNodes(getRegionInfo()), false);
+    }
+  }
+
+  public static class RegionCloseOperation extends RegionOperation {
+    private final ServerName destinationServer;
+    private boolean closed = false;
+
+    public RegionCloseOperation(final RemoteProcedure remoteProcedure,
+        final HRegionInfo regionInfo, final ServerName destinationServer) {
+      super(remoteProcedure, regionInfo);
+      this.destinationServer = destinationServer;
+    }
+
+    public ServerName getDestinationServer() {
+      return destinationServer;
+    }
+
+    protected void setClosed(final boolean closed) {
+      this.closed = closed;
+    }
+
+    public boolean isClosed() {
+      return closed;
+    }
+
+    public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
+      return ProtobufUtil.buildCloseRegionRequest(serverName,
+        getRegionInfo().getRegionName(), getDestinationServer());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
index 21709f8..cfd9df9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.MetricsSnapshot;
-import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -416,17 +415,7 @@ public class RestoreSnapshotProcedure
     try {
       Connection conn = env.getMasterServices().getConnection();
 
-      // 1. Forces all the RegionStates to be offline
-      //
-      // The AssignmentManager keeps all the region states around
-      // with no possibility to remove them, until the master is restarted.
-      // This means that a region marked as SPLIT before the restore will never be assigned again.
-      // To avoid having all states around all the regions are switched to the OFFLINE state,
-      // which is the same state that the regions will be after a delete table.
-      forceRegionsOffline(env, regionsToAdd);
-      forceRegionsOffline(env, regionsToRestore);
-      forceRegionsOffline(env, regionsToRemove);
-
+      // 1. Prepare to restore
       getMonitorStatus().setStatus("Preparing to restore each region");
 
       // 2. Applies changes to hbase:meta
@@ -496,20 +485,6 @@ public class RestoreSnapshotProcedure
   }
 
   /**
-   * Make sure that region states of the region list is in OFFLINE state.
-   * @param env MasterProcedureEnv
-   * @param hris region info list
-   **/
-  private void forceRegionsOffline(final MasterProcedureEnv env, final List<HRegionInfo> hris) {
-    RegionStates states = env.getMasterServices().getAssignmentManager().getRegionStates();
-    if (hris != null) {
-      for (HRegionInfo hri: hris) {
-        states.regionOffline(hri);
-      }
-    }
-  }
-
-  /**
    * The procedure could be restarted from a different machine. If the variable is null, we need to
    * retrieve it.
    * @return traceEnabled

http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java
new file mode 100644
index 0000000..dd1874b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Passed as Exception by {@link ServerCrashProcedure}
+ * notifying on-going RIT that server has failed.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings("serial")
+public class ServerCrashException extends HBaseIOException {
+  private final long procId;
+
+  /**
+   * @param server The server that crashed.
+   */
+  public ServerCrashException(long procId) {
+    this.procId = procId;
+  }
+
+  @Override
+  public String getMessage() {
+    return "Caused by ServerCrashProcedure pid=" + this.procId;
+  }
+}
\ No newline at end of file