You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/24 07:20:33 UTC

[10/27] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi) Move to a new AssignmentManager, one that describes Assignment using a State Machine built on top of ProcedureV2 facility.

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
new file mode 100644
index 0000000..887e272
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -0,0 +1,541 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import com.google.common.collect.ArrayListMultimap;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerListener;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * A remote procecdure dispatcher for regionservers.
+ */
+public class RSProcedureDispatcher
+    extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName>
+    implements ServerListener {
+  private static final Log LOG = LogFactory.getLog(RSProcedureDispatcher.class);
+
+  public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY =
+      "hbase.regionserver.rpc.startup.waittime";
+  private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
+
+  private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0201000; // 2.1
+
+  protected final MasterServices master;
+  protected final long rsStartupWaitTime;
+
+  public RSProcedureDispatcher(final MasterServices master) {
+    super(master.getConfiguration());
+
+    this.master = master;
+    this.rsStartupWaitTime = master.getConfiguration().getLong(
+      RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME);
+  }
+
+  @Override
+  public boolean start() {
+    if (!super.start()) {
+      return false;
+    }
+
+    master.getServerManager().registerListener(this);
+    for (ServerName serverName: master.getServerManager().getOnlineServersList()) {
+      addNode(serverName);
+    }
+    return true;
+  }
+
+  @Override
+  public boolean stop() {
+    if (!super.stop()) {
+      return false;
+    }
+
+    master.getServerManager().unregisterListener(this);
+    return true;
+  }
+
+  @Override
+  protected void remoteDispatch(final ServerName serverName,
+      final Set<RemoteProcedure> operations) {
+    final int rsVersion = master.getAssignmentManager().getServerVersion(serverName);
+    if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) {
+      LOG.info(String.format(
+        "Using procedure batch rpc execution for serverName=%s version=%s",
+        serverName, rsVersion));
+      submitTask(new ExecuteProceduresRemoteCall(serverName, operations));
+    } else {
+      LOG.info(String.format(
+        "Fallback to compat rpc execution for serverName=%s version=%s",
+        serverName, rsVersion));
+      submitTask(new CompatRemoteProcedureResolver(serverName, operations));
+    }
+  }
+
+  protected void abortPendingOperations(final ServerName serverName,
+      final Set<RemoteProcedure> operations) {
+    // TODO: Replace with a ServerNotOnlineException()
+    final IOException e = new DoNotRetryIOException("server not online " + serverName);
+    final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+    for (RemoteProcedure proc: operations) {
+      proc.remoteCallFailed(env, serverName, e);
+    }
+  }
+
+  public void serverAdded(final ServerName serverName) {
+    addNode(serverName);
+  }
+
+  public void serverRemoved(final ServerName serverName) {
+    removeNode(serverName);
+  }
+
+  /**
+   * Base remote call
+   */
+  protected abstract class AbstractRSRemoteCall implements Callable<Void> {
+    private final ServerName serverName;
+
+    private int numberOfAttemptsSoFar = 0;
+    private long maxWaitTime = -1;
+
+    public AbstractRSRemoteCall(final ServerName serverName) {
+      this.serverName = serverName;
+    }
+
+    public abstract Void call();
+
+    protected AdminService.BlockingInterface getRsAdmin() throws IOException {
+      final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
+      if (admin == null) {
+        throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
+          " failed because no RPC connection found to this server");
+      }
+      return admin;
+    }
+
+    protected ServerName getServerName() {
+      return serverName;
+    }
+
+    protected boolean scheduleForRetry(final IOException e) {
+      // Should we wait a little before retrying? If the server is starting it's yes.
+      final boolean hold = (e instanceof ServerNotRunningYetException);
+      if (hold) {
+        LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d",
+            serverName, numberOfAttemptsSoFar), e);
+        long now = EnvironmentEdgeManager.currentTime();
+        if (now < getMaxWaitTime()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("server is not yet up; waiting up to %dms",
+              (getMaxWaitTime() - now)), e);
+          }
+          submitTask(this, 100, TimeUnit.MILLISECONDS);
+          return true;
+        }
+
+        LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e);
+        return false;
+      }
+
+      // In case socket is timed out and the region server is still online,
+      // the openRegion RPC could have been accepted by the server and
+      // just the response didn't go through.  So we will retry to
+      // open the region on the same server.
+      final boolean retry = !hold && (e instanceof SocketTimeoutException
+          && master.getServerManager().isServerOnline(serverName));
+      if (retry) {
+        // we want to retry as many times as needed as long as the RS is not dead.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Retrying to same RegionServer %s because: %s",
+              serverName, e.getMessage()), e);
+        }
+        submitTask(this);
+        return true;
+      }
+
+      // trying to send the request elsewhere instead
+      LOG.warn(String.format("the request should be tried elsewhere instead; server=%s try=%d",
+                  serverName, numberOfAttemptsSoFar), e);
+      return false;
+    }
+
+    private long getMaxWaitTime() {
+      if (this.maxWaitTime < 0) {
+        // This is the max attempts, not retries, so it should be at least 1.
+        this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
+      }
+      return this.maxWaitTime;
+    }
+
+    protected IOException unwrapException(IOException e) {
+      if (e instanceof RemoteException) {
+        e = ((RemoteException)e).unwrapRemoteException();
+      }
+      return e;
+    }
+  }
+
+  private interface RemoteProcedureResolver {
+    void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
+    void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
+  }
+
+  public void splitAndResolveOperation(final ServerName serverName,
+      final Set<RemoteProcedure> operations, final RemoteProcedureResolver resolver) {
+    final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+    final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
+      buildAndGroupRequestByType(env, serverName, operations);
+
+    final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
+    if (!openOps.isEmpty()) resolver.dispatchOpenRequests(env, openOps);
+
+    final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
+    if (!closeOps.isEmpty()) resolver.dispatchCloseRequests(env, closeOps);
+
+    if (!reqsByType.isEmpty()) {
+      LOG.warn("unknown request type in the queue: " + reqsByType);
+    }
+  }
+
+  // ==========================================================================
+  //  Compatibility calls
+  // ==========================================================================
+  protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall
+      implements RemoteProcedureResolver {
+    private final Set<RemoteProcedure> operations;
+
+    private ExecuteProceduresRequest.Builder request = null;
+
+    public ExecuteProceduresRemoteCall(final ServerName serverName,
+        final Set<RemoteProcedure> operations) {
+      super(serverName);
+      this.operations = operations;
+    }
+
+    public Void call() {
+      final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+
+      request = ExecuteProceduresRequest.newBuilder();
+      splitAndResolveOperation(getServerName(), operations, this);
+
+      try {
+        final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build());
+        remoteCallCompleted(env, response);
+      } catch (IOException e) {
+        e = unwrapException(e);
+        // TODO: In the future some operation may want to bail out early.
+        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
+        if (!scheduleForRetry(e)) {
+          remoteCallFailed(env, e);
+        }
+      }
+      return null;
+    }
+
+    public void dispatchOpenRequests(final MasterProcedureEnv env,
+        final List<RegionOpenOperation> operations) {
+      request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
+    }
+
+    public void dispatchCloseRequests(final MasterProcedureEnv env,
+        final List<RegionCloseOperation> operations) {
+      for (RegionCloseOperation op: operations) {
+        request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
+      }
+    }
+
+    protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
+        final ExecuteProceduresRequest request) throws IOException {
+      try {
+        return getRsAdmin().executeProcedures(null, request);
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
+      }
+    }
+
+
+    private void remoteCallCompleted(final MasterProcedureEnv env,
+        final ExecuteProceduresResponse response) {
+      /*
+      for (RemoteProcedure proc: operations) {
+        proc.remoteCallCompleted(env, getServerName(), response);
+      }*/
+    }
+
+    private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
+      for (RemoteProcedure proc: operations) {
+        proc.remoteCallFailed(env, getServerName(), e);
+      }
+    }
+  }
+
+  // ==========================================================================
+  //  Compatibility calls
+  //  Since we don't have a "batch proc-exec" request on the target RS
+  //  we have to chunk the requests by type and dispatch the specific request.
+  // ==========================================================================
+  private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
+      final ServerName serverName, final List<RegionOpenOperation> operations) {
+    final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
+    builder.setServerStartCode(serverName.getStartcode());
+    builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
+    for (RegionOpenOperation op: operations) {
+      builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
+    }
+    return builder.build();
+  }
+
+  private final class OpenRegionRemoteCall extends AbstractRSRemoteCall {
+    private final List<RegionOpenOperation> operations;
+
+    public OpenRegionRemoteCall(final ServerName serverName,
+        final List<RegionOpenOperation> operations) {
+      super(serverName);
+      this.operations = operations;
+    }
+
+    @Override
+    public Void call() {
+      final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+      final OpenRegionRequest request = buildOpenRegionRequest(env, getServerName(), operations);
+
+      try {
+        OpenRegionResponse response = sendRequest(getServerName(), request);
+        remoteCallCompleted(env, response);
+      } catch (IOException e) {
+        e = unwrapException(e);
+        // TODO: In the future some operation may want to bail out early.
+        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
+        if (!scheduleForRetry(e)) {
+          remoteCallFailed(env, e);
+        }
+      }
+      return null;
+    }
+
+    private OpenRegionResponse sendRequest(final ServerName serverName,
+        final OpenRegionRequest request) throws IOException {
+      try {
+        return getRsAdmin().openRegion(null, request);
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
+      }
+    }
+
+    private void remoteCallCompleted(final MasterProcedureEnv env,
+        final OpenRegionResponse response) {
+      int index = 0;
+      for (RegionOpenOperation op: operations) {
+        OpenRegionResponse.RegionOpeningState state = response.getOpeningState(index++);
+        op.setFailedOpen(state == OpenRegionResponse.RegionOpeningState.FAILED_OPENING);
+        op.getRemoteProcedure().remoteCallCompleted(env, getServerName(), op);
+      }
+    }
+
+    private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
+      for (RegionOpenOperation op: operations) {
+        op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
+      }
+    }
+  }
+
+  private final class CloseRegionRemoteCall extends AbstractRSRemoteCall {
+    private final RegionCloseOperation operation;
+
+    public CloseRegionRemoteCall(final ServerName serverName,
+        final RegionCloseOperation operation) {
+      super(serverName);
+      this.operation = operation;
+    }
+
+    @Override
+    public Void call() {
+      final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+      final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName());
+      try {
+        CloseRegionResponse response = sendRequest(getServerName(), request);
+        remoteCallCompleted(env, response);
+      } catch (IOException e) {
+        e = unwrapException(e);
+        // TODO: In the future some operation may want to bail out early.
+        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
+        if (!scheduleForRetry(e)) {
+          remoteCallFailed(env, e);
+        }
+      }
+      return null;
+    }
+
+    private CloseRegionResponse sendRequest(final ServerName serverName,
+        final CloseRegionRequest request) throws IOException {
+      try {
+        return getRsAdmin().closeRegion(null, request);
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
+      }
+    }
+
+    private void remoteCallCompleted(final MasterProcedureEnv env,
+        final CloseRegionResponse response) {
+      operation.setClosed(response.getClosed());
+      operation.getRemoteProcedure().remoteCallCompleted(env, getServerName(), operation);
+    }
+
+    private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
+      operation.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
+    }
+  }
+
+  protected class CompatRemoteProcedureResolver implements Callable<Void>, RemoteProcedureResolver {
+    private final Set<RemoteProcedure> operations;
+    private final ServerName serverName;
+
+    public CompatRemoteProcedureResolver(final ServerName serverName,
+        final Set<RemoteProcedure> operations) {
+      this.serverName = serverName;
+      this.operations = operations;
+    }
+
+    @Override
+    public Void call() {
+      splitAndResolveOperation(serverName, operations, this);
+      return null;
+    }
+
+    public void dispatchOpenRequests(final MasterProcedureEnv env,
+        final List<RegionOpenOperation> operations) {
+      submitTask(new OpenRegionRemoteCall(serverName, operations));
+    }
+
+    public void dispatchCloseRequests(final MasterProcedureEnv env,
+        final List<RegionCloseOperation> operations) {
+      for (RegionCloseOperation op: operations) {
+        submitTask(new CloseRegionRemoteCall(serverName, op));
+      }
+    }
+  }
+
+  // ==========================================================================
+  //  RPC Messages
+  //  - ServerOperation: refreshConfig, grant, revoke, ...
+  //  - RegionOperation: open, close, flush, snapshot, ...
+  // ==========================================================================
+  public static abstract class ServerOperation extends RemoteOperation {
+    protected ServerOperation(final RemoteProcedure remoteProcedure) {
+      super(remoteProcedure);
+    }
+  }
+
+  public static abstract class RegionOperation extends RemoteOperation {
+    private final HRegionInfo regionInfo;
+
+    protected RegionOperation(final RemoteProcedure remoteProcedure,
+        final HRegionInfo regionInfo) {
+      super(remoteProcedure);
+      this.regionInfo = regionInfo;
+    }
+
+    public HRegionInfo getRegionInfo() {
+      return this.regionInfo;
+    }
+  }
+
+  public static class RegionOpenOperation extends RegionOperation {
+    private final List<ServerName> favoredNodes;
+    private final boolean openForReplay;
+    private boolean failedOpen;
+
+    public RegionOpenOperation(final RemoteProcedure remoteProcedure,
+        final HRegionInfo regionInfo, final List<ServerName> favoredNodes,
+        final boolean openForReplay) {
+      super(remoteProcedure, regionInfo);
+      this.favoredNodes = favoredNodes;
+      this.openForReplay = openForReplay;
+    }
+
+    protected void setFailedOpen(final boolean failedOpen) {
+      this.failedOpen = failedOpen;
+    }
+
+    public boolean isFailedOpen() {
+      return failedOpen;
+    }
+
+    public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest(
+        final MasterProcedureEnv env) {
+      return RequestConverter.buildRegionOpenInfo(getRegionInfo(),
+        env.getAssignmentManager().getFavoredNodes(getRegionInfo()), false);
+    }
+  }
+
+  public static class RegionCloseOperation extends RegionOperation {
+    private final ServerName destinationServer;
+    private boolean closed = false;
+
+    public RegionCloseOperation(final RemoteProcedure remoteProcedure,
+        final HRegionInfo regionInfo, final ServerName destinationServer) {
+      super(remoteProcedure, regionInfo);
+      this.destinationServer = destinationServer;
+    }
+
+    public ServerName getDestinationServer() {
+      return destinationServer;
+    }
+
+    protected void setClosed(final boolean closed) {
+      this.closed = closed;
+    }
+
+    public boolean isClosed() {
+      return closed;
+    }
+
+    public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
+      return ProtobufUtil.buildCloseRegionRequest(serverName,
+        getRegionInfo().getRegionName(), getDestinationServer());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
index 21709f8..cfd9df9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.MetricsSnapshot;
-import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -416,17 +415,7 @@ public class RestoreSnapshotProcedure
     try {
       Connection conn = env.getMasterServices().getConnection();
 
-      // 1. Forces all the RegionStates to be offline
-      //
-      // The AssignmentManager keeps all the region states around
-      // with no possibility to remove them, until the master is restarted.
-      // This means that a region marked as SPLIT before the restore will never be assigned again.
-      // To avoid having all states around all the regions are switched to the OFFLINE state,
-      // which is the same state that the regions will be after a delete table.
-      forceRegionsOffline(env, regionsToAdd);
-      forceRegionsOffline(env, regionsToRestore);
-      forceRegionsOffline(env, regionsToRemove);
-
+      // 1. Prepare to restore
       getMonitorStatus().setStatus("Preparing to restore each region");
 
       // 2. Applies changes to hbase:meta
@@ -496,20 +485,6 @@ public class RestoreSnapshotProcedure
   }
 
   /**
-   * Make sure that region states of the region list is in OFFLINE state.
-   * @param env MasterProcedureEnv
-   * @param hris region info list
-   **/
-  private void forceRegionsOffline(final MasterProcedureEnv env, final List<HRegionInfo> hris) {
-    RegionStates states = env.getMasterServices().getAssignmentManager().getRegionStates();
-    if (hris != null) {
-      for (HRegionInfo hri: hris) {
-        states.regionOffline(hri);
-      }
-    }
-  }
-
-  /**
    * The procedure could be restarted from a different machine. If the variable is null, we need to
    * retrieve it.
    * @return traceEnabled

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java
new file mode 100644
index 0000000..ca351f69
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Passed as Exception by {@link ServerCrashProcedure}
+ * notifying on-going RIT that server has failed.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings("serial")
+public class ServerCrashException extends HBaseIOException {
+  private final long procId;
+  private final ServerName serverName;
+
+  /**
+   * @param serverName The server that crashed.
+   */
+  public ServerCrashException(long procId, ServerName serverName) {
+    this.procId = procId;
+    this.serverName = serverName;
+  }
+
+  @Override
+  public String getMessage() {
+    return "ServerCrashProcedure pid=" + this.procId + ", server=" + this.serverName;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index 2703947..71c6b89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -19,55 +19,40 @@ package org.apache.hadoop.hbase.master.procedure;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.MasterWalManager;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.master.RegionStates;
+import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.RegionTransitionProcedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * Handle crashed server. This is a port to ProcedureV2 of what used to be euphemistically called
  * ServerShutdownHandler.
  *
- * <p>The procedure flow varies dependent on whether meta is assigned, if we are
- * doing distributed log replay versus distributed log splitting, and if we are to split logs at
- * all.
- *
- * <p>This procedure asks that all crashed servers get processed equally; we yield after the
- * completion of each successful flow step. We do this so that we do not 'deadlock' waiting on
- * a region assignment so we can replay edits which could happen if a region moved there are edits
- * on two servers for replay.
+ * <p>The procedure flow varies dependent on whether meta is assigned and if we are to split logs.
  *
- * <p>TODO: ASSIGN and WAIT_ON_ASSIGN (at least) are not idempotent. Revisit when assign is pv2.
- * TODO: We do not have special handling for system tables.
+ * <p>We come in here after ServerManager has noticed a server has expired. Procedures
+ * queued on the rpc should have been notified about fail and should be concurrently
+ * getting themselves ready to assign elsewhere.
  */
 public class ServerCrashProcedure
 extends StateMachineProcedure<MasterProcedureEnv, ServerCrashState>
@@ -75,36 +60,6 @@ implements ServerProcedureInterface {
   private static final Log LOG = LogFactory.getLog(ServerCrashProcedure.class);
 
   /**
-   * Configuration key to set how long to wait in ms doing a quick check on meta state.
-   */
-  public static final String KEY_SHORT_WAIT_ON_META =
-      "hbase.master.servercrash.short.wait.on.meta.ms";
-
-  public static final int DEFAULT_SHORT_WAIT_ON_META = 1000;
-
-  /**
-   * Configuration key to set how many retries to cycle before we give up on meta.
-   * Each attempt will wait at least {@link #KEY_SHORT_WAIT_ON_META} milliseconds.
-   */
-  public static final String KEY_RETRIES_ON_META =
-      "hbase.master.servercrash.meta.retries";
-
-  public static final int DEFAULT_RETRIES_ON_META = 10;
-
-  /**
-   * Configuration key to set how long to wait in ms on regions in transition.
-   */
-  public static final String KEY_WAIT_ON_RIT =
-      "hbase.master.servercrash.wait.on.rit.ms";
-
-  public static final int DEFAULT_WAIT_ON_RIT = 30000;
-
-  private static final Set<HRegionInfo> META_REGION_SET = new HashSet<>();
-  static {
-    META_REGION_SET.add(HRegionInfo.FIRST_META_REGIONINFO);
-  }
-
-  /**
    * Name of the crashed server to process.
    */
   private ServerName serverName;
@@ -117,14 +72,8 @@ implements ServerProcedureInterface {
   /**
    * Regions that were on the crashed server.
    */
-  private Set<HRegionInfo> regionsOnCrashedServer;
+  private List<HRegionInfo> regionsOnCrashedServer;
 
-  /**
-   * Regions assigned. Usually some subset of {@link #regionsOnCrashedServer}.
-   */
-  private List<HRegionInfo> regionsAssigned;
-
-  private boolean distributedLogReplay = false;
   private boolean carryingMeta = false;
   private boolean shouldSplitWal;
 
@@ -164,20 +113,11 @@ implements ServerProcedureInterface {
     super();
   }
 
-  private void throwProcedureYieldException(final String msg) throws ProcedureYieldException {
-    String logMsg = msg + "; cycle=" + this.cycles + ", running for " +
-        StringUtils.formatTimeDiff(System.currentTimeMillis(), getSubmittedTime());
-    // The procedure executor logs ProcedureYieldException at trace level. For now, log these
-    // yields for server crash processing at DEBUG. Revisit when stable.
-    if (LOG.isDebugEnabled()) LOG.debug(logMsg);
-    throw new ProcedureYieldException(logMsg);
-  }
-
   @Override
   protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
-      throws ProcedureYieldException {
+      throws ProcedureSuspendedException, ProcedureYieldException {
     if (LOG.isTraceEnabled()) {
-      LOG.trace(state);
+      LOG.trace(state  + " " + this + "; cycles=" + this.cycles);
     }
     // Keep running count of cycles
     if (state.ordinal() != this.previousState) {
@@ -186,11 +126,7 @@ implements ServerProcedureInterface {
     } else {
       this.cycles++;
     }
-    MasterServices services = env.getMasterServices();
-    // Is master fully online? If not, yield. No processing of servers unless master is up
-    if (!services.getAssignmentManager().isFailoverCleanupDone()) {
-      throwProcedureYieldException("Waiting on master failover to complete");
-    }
+    final MasterServices services = env.getMasterServices();
     // HBASE-14802
     // If we have not yet notified that we are processing a dead server, we should do now.
     if (!notifiedDeadServer) {
@@ -201,102 +137,61 @@ implements ServerProcedureInterface {
     try {
       switch (state) {
       case SERVER_CRASH_START:
-        LOG.info("Start processing crashed " + this.serverName);
+        LOG.info("Start " + this);
         start(env);
         // If carrying meta, process it first. Else, get list of regions on crashed server.
-        if (this.carryingMeta) setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META);
-        else setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
+        if (this.carryingMeta) {
+          setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META);
+        } else {
+          setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
+        }
         break;
 
       case SERVER_CRASH_GET_REGIONS:
         // If hbase:meta is not assigned, yield.
-        if (!isMetaAssignedQuickTest(env)) {
-          // isMetaAssignedQuickTest does not really wait. Let's delay a little before
-          // another round of execution.
-          long wait =
-              env.getMasterConfiguration().getLong(KEY_SHORT_WAIT_ON_META,
-                DEFAULT_SHORT_WAIT_ON_META);
-          wait = wait / 10;
-          Thread.sleep(wait);
-          throwProcedureYieldException("Waiting on hbase:meta assignment");
+        if (env.getAssignmentManager().waitMetaInitialized(this)) {
+          throw new ProcedureSuspendedException();
         }
-        this.regionsOnCrashedServer =
-            services.getAssignmentManager().getRegionStates().getServerRegions(this.serverName);
-        // Where to go next? Depends on whether we should split logs at all or if we should do
-        // distributed log splitting (DLS) vs distributed log replay (DLR).
+
+        this.regionsOnCrashedServer = services.getAssignmentManager().getRegionStates()
+          .getServerRegionInfoSet(serverName);
+        // Where to go next? Depends on whether we should split logs at all or
+        // if we should do distributed log splitting.
         if (!this.shouldSplitWal) {
           setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
-        } else if (this.distributedLogReplay) {
-          setNextState(ServerCrashState.SERVER_CRASH_PREPARE_LOG_REPLAY);
         } else {
           setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
         }
         break;
 
       case SERVER_CRASH_PROCESS_META:
-        // If we fail processing hbase:meta, yield.
-        if (!processMeta(env)) {
-          throwProcedureYieldException("Waiting on regions-in-transition to clear");
-        }
+        processMeta(env);
         setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
         break;
 
-      case SERVER_CRASH_PREPARE_LOG_REPLAY:
-        prepareLogReplay(env, this.regionsOnCrashedServer);
-        setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
-        break;
-
       case SERVER_CRASH_SPLIT_LOGS:
         splitLogs(env);
-        // If DLR, go to FINISH. Otherwise, if DLS, go to SERVER_CRASH_CALC_REGIONS_TO_ASSIGN
-        if (this.distributedLogReplay) setNextState(ServerCrashState.SERVER_CRASH_FINISH);
-        else setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
+        setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
         break;
 
       case SERVER_CRASH_ASSIGN:
-        List<HRegionInfo> regionsToAssign = calcRegionsToAssign(env);
-
-        // Assign may not be idempotent. SSH used to requeue the SSH if we got an IOE assigning
-        // which is what we are mimicing here but it looks prone to double assignment if assign
-        // fails midway. TODO: Test.
-
         // If no regions to assign, skip assign and skip to the finish.
-        boolean regions = regionsToAssign != null && !regionsToAssign.isEmpty();
-        if (regions) {
-          this.regionsAssigned = regionsToAssign;
-          if (!assign(env, regionsToAssign)) {
-            throwProcedureYieldException("Failed assign; will retry");
+        // Filter out meta regions. Those are handled elsewhere in this procedure.
+        // Filter changes this.regionsOnCrashedServer.
+        if (filterDefaultMetaRegions(regionsOnCrashedServer)) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Assigning regions " +
+              HRegionInfo.getShortNameToLog(regionsOnCrashedServer) + ", " + this +
+              "; cycles=" + this.cycles);
           }
+          handleRIT(env, regionsOnCrashedServer);
+          addChildProcedure(env.getAssignmentManager().
+              createAssignProcedures(regionsOnCrashedServer, true));
         }
-        if (this.shouldSplitWal && distributedLogReplay) {
-          // Take this route even if there are apparently no regions assigned. This may be our
-          // second time through here; i.e. we assigned and crashed just about here. On second
-          // time through, there will be no regions because we assigned them in the previous step.
-          // Even though no regions, we need to go through here to clean up the DLR zk markers.
-          setNextState(ServerCrashState.SERVER_CRASH_WAIT_ON_ASSIGN);
-        } else {
-          setNextState(ServerCrashState.SERVER_CRASH_FINISH);
-        }
-        break;
-
-      case SERVER_CRASH_WAIT_ON_ASSIGN:
-        // TODO: The list of regionsAssigned may be more than we actually assigned. See down in
-        // AM #1629 around 'if (regionStates.wasRegionOnDeadServer(encodedName)) {' where where we
-        // will skip assigning a region because it is/was on a dead server. Should never happen!
-        // It was on this server. Worst comes to worst, we'll still wait here till other server is
-        // processed.
-
-        // If the wait on assign failed, yield -- if we have regions to assign.
-        if (this.regionsAssigned != null && !this.regionsAssigned.isEmpty()) {
-          if (!waitOnAssign(env, this.regionsAssigned)) {
-            throwProcedureYieldException("Waiting on region assign");
-          }
-        }
-        setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
+        setNextState(ServerCrashState.SERVER_CRASH_FINISH);
         break;
 
       case SERVER_CRASH_FINISH:
-        LOG.info("Finished processing of crashed " + serverName);
         services.getServerManager().getDeadServers().finish(serverName);
         return Flow.NO_MORE_STATE;
 
@@ -304,11 +199,7 @@ implements ServerProcedureInterface {
         throw new UnsupportedOperationException("unhandled state=" + state);
       }
     } catch (IOException e) {
-      LOG.warn("Failed serverName=" + this.serverName + ", state=" + state + "; retry", e);
-    } catch (InterruptedException e) {
-      // TODO: Make executor allow IEs coming up out of execute.
-      LOG.warn("Interrupted serverName=" + this.serverName + ", state=" + state + "; retry", e);
-      Thread.currentThread().interrupt();
+      LOG.warn("Failed state=" + state + ", retry " + this + "; cycles=" + this.cycles, e);
     }
     return Flow.HAS_MORE_STATE;
   }
@@ -318,96 +209,60 @@ implements ServerProcedureInterface {
    * @param env
    * @throws IOException
    */
-  private void start(final MasterProcedureEnv env) throws IOException {
-    MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
-    // Set recovery mode late. This is what the old ServerShutdownHandler used do.
-    mwm.setLogRecoveryMode();
-    this.distributedLogReplay = mwm.getLogRecoveryMode() == RecoveryMode.LOG_REPLAY;
-  }
+  private void start(final MasterProcedureEnv env) throws IOException {}
 
   /**
    * @param env
-   * @return False if we fail to assign and split logs on meta ('process').
    * @throws IOException
    * @throws InterruptedException
    */
-  private boolean processMeta(final MasterProcedureEnv env)
-  throws IOException {
+  private void processMeta(final MasterProcedureEnv env) throws IOException {
     if (LOG.isDebugEnabled()) LOG.debug("Processing hbase:meta that was on " + this.serverName);
-    MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
-    AssignmentManager am = env.getMasterServices().getAssignmentManager();
-    HRegionInfo metaHRI = HRegionInfo.FIRST_META_REGIONINFO;
+
     if (this.shouldSplitWal) {
-      if (this.distributedLogReplay) {
-        prepareLogReplay(env, META_REGION_SET);
-      } else {
-        // TODO: Matteo. We BLOCK here but most important thing to be doing at this moment.
-        mwm.splitMetaLog(serverName);
-        am.getRegionStates().logSplit(metaHRI);
-      }
+      // TODO: Matteo. We BLOCK here but most important thing to be doing at this moment.
+      env.getMasterServices().getMasterWalManager().splitMetaLog(serverName);
     }
 
     // Assign meta if still carrying it. Check again: region may be assigned because of RIT timeout
-    boolean processed = true;
-    if (am.isCarryingMeta(serverName)) {
-      // TODO: May block here if hard time figuring state of meta.
-      am.regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
-      verifyAndAssignMetaWithRetries(env);
-      if (this.shouldSplitWal && distributedLogReplay) {
-        int timeout = env.getMasterConfiguration().getInt(KEY_WAIT_ON_RIT, DEFAULT_WAIT_ON_RIT);
-        if (!waitOnRegionToClearRegionsInTransition(am, metaHRI, timeout)) {
-          processed = false;
-        } else {
-          // TODO: Matteo. We BLOCK here but most important thing to be doing at this moment.
-          mwm.splitMetaLog(serverName);
-        }
-      }
+    final AssignmentManager am = env.getMasterServices().getAssignmentManager();
+    for (HRegionInfo hri: am.getRegionStates().getServerRegionInfoSet(serverName)) {
+      if (!isDefaultMetaRegion(hri)) continue;
+
+      am.offlineRegion(hri);
+      addChildProcedure(am.createAssignProcedure(hri, true));
     }
-    return processed;
   }
 
-  /**
-   * @return True if region cleared RIT, else false if we timed out waiting.
-   * @throws InterruptedIOException
-   */
-  private boolean waitOnRegionToClearRegionsInTransition(AssignmentManager am,
-      final HRegionInfo hri, final int timeout)
-  throws InterruptedIOException {
-    try {
-      if (!am.waitOnRegionToClearRegionsInTransition(hri, timeout)) {
-        // Wait here is to avoid log replay hits current dead server and incur a RPC timeout
-        // when replay happens before region assignment completes.
-        LOG.warn("Region " + hri.getEncodedName() + " didn't complete assignment in time");
-        return false;
+  private boolean filterDefaultMetaRegions(final List<HRegionInfo> regions) {
+    if (regions == null) return false;
+    final Iterator<HRegionInfo> it = regions.iterator();
+    while (it.hasNext()) {
+      final HRegionInfo hri = it.next();
+      if (isDefaultMetaRegion(hri)) {
+        it.remove();
       }
-    } catch (InterruptedException ie) {
-      throw new InterruptedIOException("Caught " + ie +
-        " during waitOnRegionToClearRegionsInTransition for " + hri);
     }
-    return true;
+    return !regions.isEmpty();
   }
 
-  private void prepareLogReplay(final MasterProcedureEnv env, final Set<HRegionInfo> regions)
-  throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Mark " + size(this.regionsOnCrashedServer) + " regions-in-recovery from " +
-        this.serverName);
-    }
-    MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
-    AssignmentManager am = env.getMasterServices().getAssignmentManager();
-    mwm.prepareLogReplay(this.serverName, regions);
-    am.getRegionStates().logSplit(this.serverName);
+  private boolean isDefaultMetaRegion(final HRegionInfo hri) {
+    return hri.getTable().equals(TableName.META_TABLE_NAME) &&
+      RegionReplicaUtil.isDefaultReplica(hri);
   }
 
   private void splitLogs(final MasterProcedureEnv env) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Splitting logs from " + serverName + "; region count=" +
-        size(this.regionsOnCrashedServer));
+      LOG.debug("Splitting WALs " + this);
     }
     MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
     AssignmentManager am = env.getMasterServices().getAssignmentManager();
     // TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor while it is running.
+    // PROBLEM!!! WE BLOCK HERE.
     mwm.splitLog(this.serverName);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Done splitting WALs " + this);
+    }
     am.getRegionStates().logSplit(this.serverName);
   }
 
@@ -415,124 +270,6 @@ implements ServerProcedureInterface {
     return hris == null? 0: hris.size();
   }
 
-  /**
-   * Figure out what we need to assign. Should be idempotent.
-   * @param env
-   * @return List of calculated regions to assign; may be empty or null.
-   * @throws IOException
-   */
-  private List<HRegionInfo> calcRegionsToAssign(final MasterProcedureEnv env)
-  throws IOException {
-    AssignmentManager am = env.getMasterServices().getAssignmentManager();
-    List<HRegionInfo> regionsToAssignAggregator = new ArrayList<>();
-    int replicaCount = env.getMasterConfiguration().getInt(HConstants.META_REPLICAS_NUM,
-      HConstants.DEFAULT_META_REPLICA_NUM);
-    for (int i = 1; i < replicaCount; i++) {
-      HRegionInfo metaHri =
-          RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, i);
-      if (am.isCarryingMetaReplica(this.serverName, metaHri)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Reassigning meta replica" + metaHri + " that was on " + this.serverName);
-        }
-        regionsToAssignAggregator.add(metaHri);
-      }
-    }
-    // Clean out anything in regions in transition.
-    List<HRegionInfo> regionsInTransition = am.cleanOutCrashedServerReferences(serverName);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Reassigning " + size(this.regionsOnCrashedServer) +
-        " region(s) that " + (serverName == null? "null": serverName)  +
-        " was carrying (and " + regionsInTransition.size() +
-        " regions(s) that were opening on this server)");
-    }
-    regionsToAssignAggregator.addAll(regionsInTransition);
-
-    // Iterate regions that were on this server and figure which of these we need to reassign
-    if (this.regionsOnCrashedServer != null && !this.regionsOnCrashedServer.isEmpty()) {
-      RegionStates regionStates = am.getRegionStates();
-      for (HRegionInfo hri: this.regionsOnCrashedServer) {
-        if (regionsInTransition.contains(hri)) continue;
-        String encodedName = hri.getEncodedName();
-        Lock lock = am.acquireRegionLock(encodedName);
-        try {
-          RegionState rit = regionStates.getRegionTransitionState(hri);
-          if (processDeadRegion(hri, am)) {
-            ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
-            if (addressFromAM != null && !addressFromAM.equals(this.serverName)) {
-              // If this region is in transition on the dead server, it must be
-              // opening or pending_open, which should have been covered by
-              // AM#cleanOutCrashedServerReferences
-              LOG.info("Skip assigning " + hri.getRegionNameAsString()
-                + " because opened on " + addressFromAM.getServerName());
-              continue;
-            }
-            if (rit != null) {
-              if (rit.getServerName() != null && !rit.isOnServer(this.serverName)) {
-                // Skip regions that are in transition on other server
-                LOG.info("Skip assigning region in transition on other server" + rit);
-                continue;
-              }
-              LOG.info("Reassigning region " + rit + " and clearing zknode if exists");
-              regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
-            } else if (regionStates.isRegionInState(
-                hri, RegionState.State.SPLITTING_NEW, RegionState.State.MERGING_NEW)) {
-              regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
-            }
-            regionsToAssignAggregator.add(hri);
-          // TODO: The below else if is different in branch-1 from master branch.
-          } else if (rit != null) {
-            if ((rit.isClosing() || rit.isFailedClose() || rit.isOffline())
-                && am.getTableStateManager().isTableState(hri.getTable(),
-                TableState.State.DISABLED, TableState.State.DISABLING) ||
-                am.getReplicasToClose().contains(hri)) {
-              // If the table was partially disabled and the RS went down, we should clear the
-              // RIT and remove the node for the region.
-              // The rit that we use may be stale in case the table was in DISABLING state
-              // but though we did assign we will not be clearing the znode in CLOSING state.
-              // Doing this will have no harm. See HBASE-5927
-              regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
-              am.offlineDisabledRegion(hri);
-            } else {
-              LOG.warn("THIS SHOULD NOT HAPPEN: unexpected region in transition "
-                + rit + " not to be assigned by SSH of server " + serverName);
-            }
-          }
-        } finally {
-          lock.unlock();
-        }
-      }
-    }
-    return regionsToAssignAggregator;
-  }
-
-  private boolean assign(final MasterProcedureEnv env, final List<HRegionInfo> hris)
-  throws InterruptedIOException {
-    AssignmentManager am = env.getMasterServices().getAssignmentManager();
-    try {
-      am.assign(hris);
-    } catch (InterruptedException ie) {
-      LOG.error("Caught " + ie + " during round-robin assignment");
-      throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
-    } catch (IOException ioe) {
-      LOG.info("Caught " + ioe + " during region assignment, will retry");
-      return false;
-    }
-    return true;
-  }
-
-  private boolean waitOnAssign(final MasterProcedureEnv env, final List<HRegionInfo> hris)
-  throws InterruptedIOException {
-    int timeout = env.getMasterConfiguration().getInt(KEY_WAIT_ON_RIT, DEFAULT_WAIT_ON_RIT);
-    for (HRegionInfo hri: hris) {
-      // TODO: Blocks here.
-      if (!waitOnRegionToClearRegionsInTransition(env.getMasterServices().getAssignmentManager(),
-          hri, timeout)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
   @Override
   protected void rollbackState(MasterProcedureEnv env, ServerCrashState state)
   throws IOException {
@@ -580,11 +317,11 @@ implements ServerProcedureInterface {
   @Override
   public void toStringClassDetails(StringBuilder sb) {
     sb.append(getClass().getSimpleName());
-    sb.append(" serverName=");
-    sb.append(this.serverName);
-    sb.append(", shouldSplitWal=");
+    sb.append(" server=");
+    sb.append(serverName);
+    sb.append(", splitWal=");
     sb.append(shouldSplitWal);
-    sb.append(", carryingMeta=");
+    sb.append(", meta=");
     sb.append(carryingMeta);
   }
 
@@ -595,7 +332,6 @@ implements ServerProcedureInterface {
     MasterProcedureProtos.ServerCrashStateData.Builder state =
       MasterProcedureProtos.ServerCrashStateData.newBuilder().
       setServerName(ProtobufUtil.toServerName(this.serverName)).
-      setDistributedLogReplay(this.distributedLogReplay).
       setCarryingMeta(this.carryingMeta).
       setShouldSplitWal(this.shouldSplitWal);
     if (this.regionsOnCrashedServer != null && !this.regionsOnCrashedServer.isEmpty()) {
@@ -603,11 +339,6 @@ implements ServerProcedureInterface {
         state.addRegionsOnCrashedServer(HRegionInfo.convert(hri));
       }
     }
-    if (this.regionsAssigned != null && !this.regionsAssigned.isEmpty()) {
-      for (HRegionInfo hri: this.regionsAssigned) {
-        state.addRegionsAssigned(HRegionInfo.convert(hri));
-      }
-    }
     state.build().writeDelimitedTo(stream);
   }
 
@@ -618,142 +349,16 @@ implements ServerProcedureInterface {
     MasterProcedureProtos.ServerCrashStateData state =
       MasterProcedureProtos.ServerCrashStateData.parseDelimitedFrom(stream);
     this.serverName = ProtobufUtil.toServerName(state.getServerName());
-    this.distributedLogReplay = state.hasDistributedLogReplay()?
-      state.getDistributedLogReplay(): false;
     this.carryingMeta = state.hasCarryingMeta()? state.getCarryingMeta(): false;
     // shouldSplitWAL has a default over in pb so this invocation will always work.
     this.shouldSplitWal = state.getShouldSplitWal();
     int size = state.getRegionsOnCrashedServerCount();
     if (size > 0) {
-      this.regionsOnCrashedServer = new HashSet<>(size);
+      this.regionsOnCrashedServer = new ArrayList<HRegionInfo>(size);
       for (RegionInfo ri: state.getRegionsOnCrashedServerList()) {
         this.regionsOnCrashedServer.add(HRegionInfo.convert(ri));
       }
     }
-    size = state.getRegionsAssignedCount();
-    if (size > 0) {
-      this.regionsAssigned = new ArrayList<>(size);
-      for (RegionInfo ri: state.getRegionsOnCrashedServerList()) {
-        this.regionsAssigned.add(HRegionInfo.convert(ri));
-      }
-    }
-  }
-
-  /**
-   * Process a dead region from a dead RS. Checks if the region is disabled or
-   * disabling or if the region has a partially completed split.
-   * @param hri
-   * @param assignmentManager
-   * @return Returns true if specified region should be assigned, false if not.
-   * @throws IOException
-   */
-  private static boolean processDeadRegion(HRegionInfo hri, AssignmentManager assignmentManager)
-  throws IOException {
-    boolean tablePresent = assignmentManager.getTableStateManager().isTablePresent(hri.getTable());
-    if (!tablePresent) {
-      LOG.info("The table " + hri.getTable() + " was deleted.  Hence not proceeding.");
-      return false;
-    }
-    // If table is not disabled but the region is offlined,
-    boolean disabled = assignmentManager.getTableStateManager().isTableState(hri.getTable(),
-      TableState.State.DISABLED);
-    if (disabled){
-      LOG.info("The table " + hri.getTable() + " was disabled.  Hence not proceeding.");
-      return false;
-    }
-    if (hri.isOffline() && hri.isSplit()) {
-      // HBASE-7721: Split parent and daughters are inserted into hbase:meta as an atomic operation.
-      // If the meta scanner saw the parent split, then it should see the daughters as assigned
-      // to the dead server. We don't have to do anything.
-      return false;
-    }
-    boolean disabling = assignmentManager.getTableStateManager().isTableState(hri.getTable(),
-        TableState.State.DISABLING);
-    if (disabling) {
-      LOG.info("The table " + hri.getTable() + " is disabled.  Hence not assigning region" +
-        hri.getEncodedName());
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * If hbase:meta is not assigned already, assign.
-   * @throws IOException
-   */
-  private void verifyAndAssignMetaWithRetries(final MasterProcedureEnv env) throws IOException {
-    MasterServices services = env.getMasterServices();
-    int iTimes = services.getConfiguration().getInt(KEY_RETRIES_ON_META, DEFAULT_RETRIES_ON_META);
-    // Just reuse same time as we have for short wait on meta. Adding another config is overkill.
-    long waitTime =
-      services.getConfiguration().getLong(KEY_SHORT_WAIT_ON_META, DEFAULT_SHORT_WAIT_ON_META);
-    int iFlag = 0;
-    while (true) {
-      try {
-        verifyAndAssignMeta(env);
-        break;
-      } catch (KeeperException e) {
-        services.abort("In server shutdown processing, assigning meta", e);
-        throw new IOException("Aborting", e);
-      } catch (Exception e) {
-        if (iFlag >= iTimes) {
-          services.abort("verifyAndAssignMeta failed after" + iTimes + " retries, aborting", e);
-          throw new IOException("Aborting", e);
-        }
-        try {
-          Thread.sleep(waitTime);
-        } catch (InterruptedException e1) {
-          LOG.warn("Interrupted when is the thread sleep", e1);
-          Thread.currentThread().interrupt();
-          throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
-        }
-        iFlag++;
-      }
-    }
-  }
-
-  /**
-   * If hbase:meta is not assigned already, assign.
-   * @throws InterruptedException
-   * @throws IOException
-   * @throws KeeperException
-   */
-  private void verifyAndAssignMeta(final MasterProcedureEnv env)
-      throws InterruptedException, IOException, KeeperException {
-    MasterServices services = env.getMasterServices();
-    if (!isMetaAssignedQuickTest(env)) {
-      services.getAssignmentManager().assignMeta(HRegionInfo.FIRST_META_REGIONINFO);
-    } else if (serverName.equals(services.getMetaTableLocator().
-        getMetaRegionLocation(services.getZooKeeper()))) {
-      throw new IOException("hbase:meta is onlined on the dead server " + this.serverName);
-    } else {
-      LOG.info("Skip assigning hbase:meta because it is online at "
-          + services.getMetaTableLocator().getMetaRegionLocation(services.getZooKeeper()));
-    }
-  }
-
-  /**
-   * A quick test that hbase:meta is assigned; blocks for short time only.
-   * @return True if hbase:meta location is available and verified as good.
-   * @throws InterruptedException
-   * @throws IOException
-   */
-  private boolean isMetaAssignedQuickTest(final MasterProcedureEnv env)
-  throws InterruptedException, IOException {
-    ZooKeeperWatcher zkw = env.getMasterServices().getZooKeeper();
-    MetaTableLocator mtl = env.getMasterServices().getMetaTableLocator();
-    boolean metaAssigned = false;
-    // Is hbase:meta location available yet?
-    if (mtl.isLocationAvailable(zkw)) {
-      ClusterConnection connection = env.getMasterServices().getClusterConnection();
-      // Is hbase:meta location good yet?
-      long timeout =
-        env.getMasterConfiguration().getLong(KEY_SHORT_WAIT_ON_META, DEFAULT_SHORT_WAIT_ON_META);
-      if (mtl.verifyMetaRegionLocation(connection, zkw, timeout)) {
-        metaAssigned = true;
-      }
-    }
-    return metaAssigned;
   }
 
   @Override
@@ -789,4 +394,46 @@ implements ServerProcedureInterface {
     // the client does not know about this procedure.
     return false;
   }
-}
+
+  /**
+   * Handle any outstanding RIT that are up against this.serverName, the crashed server.
+   * Notify them of crash. Remove assign entries from the passed in <code>regions</code>
+   * otherwise we have two assigns going on and they will fight over who has lock.
+   * Notify Unassigns also.
+   * @param crashedServer Server that crashed.
+   * @param regions Regions that were on crashed server
+   * @return Subset of <code>regions</code> that were RIT against <code>crashedServer</code>
+   */
+  private void handleRIT(final MasterProcedureEnv env, final List<HRegionInfo> regions) {
+    if (regions == null) return;
+    AssignmentManager am = env.getMasterServices().getAssignmentManager();
+    final Iterator<HRegionInfo> it = regions.iterator();
+    ServerCrashException sce = null;
+    while (it.hasNext()) {
+      final HRegionInfo hri = it.next();
+      RegionTransitionProcedure rtp = am.getRegionStates().getRegionTransitionProcedure(hri);
+      if (rtp == null) continue;
+      // Make sure the RIT is against this crashed server. In the case where there are many
+      // processings of a crashed server -- backed up for whatever reason (slow WAL split) --
+      // then a previous SCP may have already failed an assign, etc., and it may have a new
+      // location target; DO NOT fail these else we make for assign flux.
+      ServerName rtpServerName = rtp.getServer(env);
+      if (rtpServerName == null) {
+        LOG.warn("RIT with ServerName null! " + rtp);
+        continue;
+      }
+      if (!rtpServerName.equals(this.serverName)) continue;
+      LOG.info("pid=" + getProcId() + " found RIT " + rtp + "; " +
+        rtp.getRegionState(env).toShortString());
+      // Notify RIT on server crash.
+      if (sce == null) {
+        sce = new ServerCrashException(getProcId(), getServerName());
+      }
+      rtp.remoteCallFailed(env, this.serverName, sce);
+      if (rtp instanceof AssignProcedure) {
+        // If an assign, include it in our return and remove from passed-in list of regions.
+        it.remove();
+      }
+    }
+  }
+}
\ No newline at end of file