You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/26 23:47:26 UTC

[18/59] [abbrv] hbase git commit: Revert "HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi)" Revert a mistaken commit!!!

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
new file mode 100644
index 0000000..69ebd97
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -0,0 +1,3053 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.RegionStateListener;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.favored.FavoredNodesManager;
+import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
+import org.apache.hadoop.hbase.ipc.FailedServerException;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.quotas.QuotaExceededException;
+import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
+import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.KeyLocker;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Manages and performs region assignment.
+ * Related communications with regionserver are all done over RPC.
+ */
+@InterfaceAudience.Private
+public class AssignmentManager {
+  private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
+
+  protected final MasterServices server;
+
+  private ServerManager serverManager;
+
+  private boolean shouldAssignRegionsWithFavoredNodes;
+
+  private LoadBalancer balancer;
+
+  private final MetricsAssignmentManager metricsAssignmentManager;
+
+  private AtomicInteger numRegionsOpened = new AtomicInteger(0);
+
+  final private KeyLocker<String> locker = new KeyLocker<>();
+
+  Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
+
+  /**
+   * Map of regions to reopen after the schema of a table is changed. Key -
+   * encoded region name, value - HRegionInfo
+   */
+  private final Map <String, HRegionInfo> regionsToReopen;
+
+  /*
+   * Maximum times we recurse an assignment/unassignment.
+   * See below in {@link #assign()} and {@link #unassign()}.
+   */
+  private final int maximumAttempts;
+
+  /**
+   * The sleep time for which the assignment will wait before retrying in case of
+   * hbase:meta assignment failure due to lack of availability of region plan or bad region plan
+   */
+  private final long sleepTimeBeforeRetryingMetaAssignment;
+
+  /** Plans for region movement. Key is the encoded version of a region name*/
+  // TODO: When do plans get cleaned out?  Ever? In server open and in server
+  // shutdown processing -- St.Ack
+  // All access to this Map must be synchronized.
+  final NavigableMap<String, RegionPlan> regionPlans = new TreeMap<>();
+
+  private final TableStateManager tableStateManager;
+
+  private final ExecutorService executorService;
+
+  private java.util.concurrent.ExecutorService threadPoolExecutorService;
+  private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
+
+  private final RegionStates regionStates;
+
+  // The threshold to use bulk assigning. Using bulk assignment
+  // only if assigning at least this many regions to at least this
+  // many servers. If assigning fewer regions to fewer servers,
+  // bulk assigning may be not as efficient.
+  private final int bulkAssignThresholdRegions;
+  private final int bulkAssignThresholdServers;
+  private final int bulkPerRegionOpenTimeGuesstimate;
+
+  // Should bulk assignment wait till all regions are assigned,
+  // or it is timed out?  This is useful to measure bulk assignment
+  // performance, but not needed in most use cases.
+  private final boolean bulkAssignWaitTillAllAssigned;
+
+  /**
+   * Indicator that AssignmentManager has recovered the region states so
+   * that ServerShutdownHandler can be fully enabled and re-assign regions
+   * of dead servers. So that when re-assignment happens, AssignmentManager
+   * has proper region states.
+   *
+   * Protected to ease testing.
+   */
+  protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
+
+  /**
+   * A map to track the count a region fails to open in a row.
+   * So that we don't try to open a region forever if the failure is
+   * unrecoverable.  We don't put this information in region states
+   * because we don't expect this to happen frequently; we don't
+   * want to copy this information over during each state transition either.
+   */
+  private final ConcurrentHashMap<String, AtomicInteger> failedOpenTracker = new ConcurrentHashMap<>();
+
+  // In case not using ZK for region assignment, region states
+  // are persisted in meta with a state store
+  private final RegionStateStore regionStateStore;
+
+  /**
+   * For testing only!  Set to true to skip handling of split.
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
+  public static boolean TEST_SKIP_SPLIT_HANDLING = false;
+
+  /** Listeners that are called on assignment events. */
+  private List<AssignmentListener> listeners = new CopyOnWriteArrayList<>();
+
+  private RegionStateListener regionStateListener;
+
+  private RetryCounter.BackoffPolicy backoffPolicy;
+  private RetryCounter.RetryConfig retryConfig;
+  /**
+   * Constructs a new assignment manager.
+   *
+   * @param server instance of HMaster this AM running inside
+   * @param serverManager serverManager for associated HMaster
+   * @param balancer implementation of {@link LoadBalancer}
+   * @param service Executor service
+   * @param metricsMaster metrics manager
+   * @throws IOException
+   */
+  public AssignmentManager(MasterServices server, ServerManager serverManager,
+      final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster,
+      final TableStateManager tableStateManager)
+          throws IOException {
+    this.server = server;
+    this.serverManager = serverManager;
+    this.executorService = service;
+    this.regionStateStore = new RegionStateStore(server);
+    this.regionsToReopen = Collections.synchronizedMap
+                           (new HashMap<String, HRegionInfo> ());
+    Configuration conf = server.getConfiguration();
+
+    this.tableStateManager = tableStateManager;
+
+    // This is the max attempts, not retries, so it should be at least 1.
+    this.maximumAttempts = Math.max(1,
+      this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
+    this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
+        "hbase.meta.assignment.retry.sleeptime", 1000l);
+    this.balancer = balancer;
+    // Only read favored nodes if using the favored nodes load balancer.
+    this.shouldAssignRegionsWithFavoredNodes = this.balancer instanceof FavoredNodesPromoter;
+    int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
+
+    this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
+        maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
+
+    this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
+        Threads.newDaemonThreadFactory("AM.Scheduler"));
+
+    this.regionStates = new RegionStates(
+      server, tableStateManager, serverManager, regionStateStore);
+
+    this.bulkAssignWaitTillAllAssigned =
+      conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
+    this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
+    this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
+    this.bulkPerRegionOpenTimeGuesstimate =
+      conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
+
+    this.metricsAssignmentManager = new MetricsAssignmentManager();
+
+    // Configurations for retrying opening a region on receiving a FAILED_OPEN
+    this.retryConfig = new RetryCounter.RetryConfig();
+    this.retryConfig.setSleepInterval(conf.getLong("hbase.assignment.retry.sleep.initial", 0l));
+    // Set the max time limit to the initial sleep interval so we use a constant time sleep strategy
+    // if the user does not set a max sleep limit
+    this.retryConfig.setMaxSleepTime(conf.getLong("hbase.assignment.retry.sleep.max",
+        retryConfig.getSleepInterval()));
+    this.backoffPolicy = getBackoffPolicy();
+  }
+
+  /**
+   * Returns the backoff policy used for Failed Region Open retries
+   * @return the backoff policy used for Failed Region Open retries
+   */
+  RetryCounter.BackoffPolicy getBackoffPolicy() {
+    return new RetryCounter.ExponentialBackoffPolicyWithLimit();
+  }
+
+  MetricsAssignmentManager getAssignmentManagerMetrics() {
+    return this.metricsAssignmentManager;
+  }
+
+  /**
+   * Add the listener to the notification list.
+   * @param listener The AssignmentListener to register
+   */
+  public void registerListener(final AssignmentListener listener) {
+    this.listeners.add(listener);
+  }
+
+  /**
+   * Remove the listener from the notification list.
+   * @param listener The AssignmentListener to unregister
+   */
+  public boolean unregisterListener(final AssignmentListener listener) {
+    return this.listeners.remove(listener);
+  }
+
+  /**
+   * @return Instance of ZKTableStateManager.
+   */
+  public TableStateManager getTableStateManager() {
+    // These are 'expensive' to make involving trip to zk ensemble so allow
+    // sharing.
+    return this.tableStateManager;
+  }
+
+  /**
+   * This SHOULD not be public. It is public now
+   * because of some unit tests.
+   *
+   * TODO: make it package private and keep RegionStates in the master package
+   */
+  public RegionStates getRegionStates() {
+    return regionStates;
+  }
+
+  /**
+   * Used in some tests to mock up region state in meta
+   */
+  @VisibleForTesting
+  RegionStateStore getRegionStateStore() {
+    return regionStateStore;
+  }
+
+  public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
+    return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
+  }
+
+  /**
+   * Add a regionPlan for the specified region.
+   * @param encodedName
+   * @param plan
+   */
+  public void addPlan(String encodedName, RegionPlan plan) {
+    synchronized (regionPlans) {
+      regionPlans.put(encodedName, plan);
+    }
+  }
+
+  /**
+   * Add a map of region plans.
+   */
+  public void addPlans(Map<String, RegionPlan> plans) {
+    synchronized (regionPlans) {
+      regionPlans.putAll(plans);
+    }
+  }
+
+  /**
+   * Set the list of regions that will be reopened
+   * because of an update in table schema
+   *
+   * @param regions
+   *          list of regions that should be tracked for reopen
+   */
+  public void setRegionsToReopen(List <HRegionInfo> regions) {
+    for(HRegionInfo hri : regions) {
+      regionsToReopen.put(hri.getEncodedName(), hri);
+    }
+  }
+
+  /**
+   * Used by the client to identify if all regions have the schema updates
+   *
+   * @param tableName
+   * @return Pair indicating the status of the alter command
+   * @throws IOException
+   */
+  public Pair<Integer, Integer> getReopenStatus(TableName tableName)
+      throws IOException {
+    List<HRegionInfo> hris;
+    if (TableName.META_TABLE_NAME.equals(tableName)) {
+      hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
+    } else {
+      hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true);
+    }
+
+    Integer pending = 0;
+    for (HRegionInfo hri : hris) {
+      String name = hri.getEncodedName();
+      // no lock concurrent access ok: sequential consistency respected.
+      if (regionsToReopen.containsKey(name)
+          || regionStates.isRegionInTransition(name)) {
+        pending++;
+      }
+    }
+    return new Pair<>(pending, hris.size());
+  }
+
+  /**
+   * Used by ServerShutdownHandler to make sure AssignmentManager has completed
+   * the failover cleanup before re-assigning regions of dead servers. So that
+   * when re-assignment happens, AssignmentManager has proper region states.
+   */
+  public boolean isFailoverCleanupDone() {
+    return failoverCleanupDone.get();
+  }
+
+  /**
+   * To avoid racing with AM, external entities may need to lock a region,
+   * for example, when SSH checks what regions to skip re-assigning.
+   */
+  public Lock acquireRegionLock(final String encodedName) {
+    return locker.acquireLock(encodedName);
+  }
+
+  /**
+   * Now, failover cleanup is completed. Notify server manager to
+   * process queued up dead servers processing, if any.
+   */
+  void failoverCleanupDone() {
+    failoverCleanupDone.set(true);
+    serverManager.processQueuedDeadServers();
+  }
+
+  /**
+   * Called on startup.
+   * Figures whether a fresh cluster start of we are joining extant running cluster.
+   * @throws IOException
+   * @throws KeeperException
+   * @throws InterruptedException
+   * @throws CoordinatedStateException
+   */
+  void joinCluster()
+  throws IOException, KeeperException, InterruptedException, CoordinatedStateException {
+    long startTime = System.currentTimeMillis();
+    // Concurrency note: In the below the accesses on regionsInTransition are
+    // outside of a synchronization block where usually all accesses to RIT are
+    // synchronized.  The presumption is that in this case it is safe since this
+    // method is being played by a single thread on startup.
+
+    // TODO: Regions that have a null location and are not in regionsInTransitions
+    // need to be handled.
+
+    // Scan hbase:meta to build list of existing regions, servers, and assignment
+    // Returns servers who have not checked in (assumed dead) that some regions
+    // were assigned to (according to the meta)
+    Set<ServerName> deadServers = rebuildUserRegions();
+
+    // This method will assign all user regions if a clean server startup or
+    // it will reconstruct master state and cleanup any leftovers from previous master process.
+    boolean failover = processDeadServersAndRegionsInTransition(deadServers);
+
+    LOG.info("Joined the cluster in " + (System.currentTimeMillis()
+      - startTime) + "ms, failover=" + failover);
+  }
+
+  /**
+   * Process all regions that are in transition in zookeeper and also
+   * processes the list of dead servers.
+   * Used by master joining an cluster.  If we figure this is a clean cluster
+   * startup, will assign all user regions.
+   * @param deadServers Set of servers that are offline probably legitimately that were carrying
+   * regions according to a scan of hbase:meta. Can be null.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
+  throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
+    // TODO Needed? List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
+    boolean failover = !serverManager.getDeadServers().isEmpty();
+    if (failover) {
+      // This may not be a failover actually, especially if meta is on this master.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
+      }
+      // Check if there are any regions on these servers
+      failover = false;
+      for (ServerName serverName : serverManager.getDeadServers().copyServerNames()) {
+        if (regionStates.getRegionAssignments().values().contains(serverName)) {
+          LOG.debug("Found regions on dead server: " + serverName);
+          failover = true;
+          break;
+        }
+      }
+    }
+    Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
+    if (!failover) {
+      // If any one region except meta is assigned, it's a failover.
+      for (Map.Entry<HRegionInfo, ServerName> en:
+          regionStates.getRegionAssignments().entrySet()) {
+        HRegionInfo hri = en.getKey();
+        if (!hri.isMetaTable()
+            && onlineServers.contains(en.getValue())) {
+          LOG.debug("Found region " + hri + " out on cluster");
+          failover = true;
+          break;
+        }
+      }
+    }
+    if (!failover) {
+      // If any region except meta is in transition on a live server, it's a failover.
+      Set<RegionState> regionsInTransition = regionStates.getRegionsInTransition();
+      if (!regionsInTransition.isEmpty()) {
+        for (RegionState regionState: regionsInTransition) {
+          ServerName serverName = regionState.getServerName();
+          if (!regionState.getRegion().isMetaRegion()
+              && serverName != null && onlineServers.contains(serverName)) {
+            LOG.debug("Found " + regionState + " for region " +
+              regionState.getRegion().getRegionNameAsString() + " for server " +
+                serverName + "in RITs");
+            failover = true;
+            break;
+          }
+        }
+      }
+    }
+    if (!failover) {
+      // If we get here, we have a full cluster restart. It is a failover only
+      // if there are some WALs are not split yet. For meta WALs, they should have
+      // been split already, if any. We can walk through those queued dead servers,
+      // if they don't have any WALs, this restart should be considered as a clean one
+      Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
+      if (!queuedDeadServers.isEmpty()) {
+        Configuration conf = server.getConfiguration();
+        Path walRootDir = FSUtils.getWALRootDir(conf);
+        FileSystem walFs = FSUtils.getWALFileSystem(conf);
+        for (ServerName serverName: queuedDeadServers) {
+          // In the case of a clean exit, the shutdown handler would have presplit any WALs and
+          // removed empty directories.
+          Path walDir = new Path(walRootDir,
+            AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
+          Path splitDir = walDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
+          if (checkWals(walFs, walDir) || checkWals(walFs, splitDir)) {
+            LOG.debug("Found queued dead server " + serverName);
+            failover = true;
+            break;
+          }
+        }
+        if (!failover) {
+          // We figured that it's not a failover, so no need to
+          // work on these re-queued dead servers any more.
+          LOG.info("AM figured that it's not a failover and cleaned up "
+            + queuedDeadServers.size() + " queued dead servers");
+          serverManager.removeRequeuedDeadServers();
+        }
+      }
+    }
+
+    Set<TableName> disabledOrDisablingOrEnabling = null;
+    Map<HRegionInfo, ServerName> allRegions = null;
+
+    if (!failover) {
+      disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
+        TableState.State.DISABLED, TableState.State.DISABLING,
+        TableState.State.ENABLING);
+
+      // Clean re/start, mark all user regions closed before reassignment
+      allRegions = regionStates.closeAllUserRegions(
+        disabledOrDisablingOrEnabling);
+    }
+
+    // Now region states are restored
+    regionStateStore.start();
+
+    if (failover) {
+      if (deadServers != null && !deadServers.isEmpty()) {
+        for (ServerName serverName: deadServers) {
+          if (!serverManager.isServerDead(serverName)) {
+            serverManager.expireServer(serverName); // Let SSH do region re-assign
+          }
+        }
+      }
+      processRegionsInTransition(regionStates.getRegionsInTransition());
+    }
+
+    // Now we can safely claim failover cleanup completed and enable
+    // ServerShutdownHandler for further processing. The nodes (below)
+    // in transition, if any, are for regions not related to those
+    // dead servers at all, and can be done in parallel to SSH.
+    failoverCleanupDone();
+    if (!failover) {
+      // Fresh cluster startup.
+      LOG.info("Clean cluster startup. Don't reassign user regions");
+      assignAllUserRegions(allRegions);
+    } else {
+      LOG.info("Failover! Reassign user regions");
+    }
+    // unassign replicas of the split parents and the merged regions
+    // the daughter replicas are opened in assignAllUserRegions if it was
+    // not already opened.
+    for (HRegionInfo h : replicasToClose) {
+      unassign(h);
+    }
+    replicasToClose.clear();
+    return failover;
+  }
+
+  private boolean checkWals(FileSystem fs, Path dir) throws IOException {
+    if (!fs.exists(dir)) {
+      LOG.debug(dir + " doesn't exist");
+      return false;
+    }
+    if (!fs.getFileStatus(dir).isDirectory()) {
+      LOG.warn(dir + " is not a directory");
+      return false;
+    }
+    FileStatus[] files = FSUtils.listStatus(fs, dir);
+    if (files == null || files.length == 0) {
+      LOG.debug(dir + " has no files");
+      return false;
+    }
+    for (int i = 0; i < files.length; i++) {
+      if (files[i].isFile() && files[i].getLen() > 0) {
+        LOG.debug(dir + " has a non-empty file: " + files[i].getPath());
+        return true;
+      } else if (files[i].isDirectory() && checkWals(fs, files[i].getPath())) {
+        LOG.debug(dir + " is a directory and has a non-empty file: " + files[i].getPath());
+        return true;
+      }
+    }
+    LOG.debug("Found 0 non-empty wal files for :" + dir);
+    return false;
+  }
+
+  /**
+   * When a region is closed, it should be removed from the regionsToReopen
+   * @param hri HRegionInfo of the region which was closed
+   */
+  public void removeClosedRegion(HRegionInfo hri) {
+    if (regionsToReopen.remove(hri.getEncodedName()) != null) {
+      LOG.debug("Removed region from reopening regions because it was closed");
+    }
+  }
+
+  void processFavoredNodesForDaughters(HRegionInfo parent,
+    HRegionInfo regionA, HRegionInfo regionB) throws IOException {
+    if (shouldAssignFavoredNodes(parent)) {
+      List<ServerName> onlineServers = this.serverManager.getOnlineServersList();
+      ((FavoredNodesPromoter) this.balancer).
+          generateFavoredNodesForDaughter(onlineServers, parent, regionA, regionB);
+    }
+  }
+
+  void processFavoredNodesForMerge(HRegionInfo merged, HRegionInfo regionA, HRegionInfo regionB)
+    throws IOException {
+    if (shouldAssignFavoredNodes(merged)) {
+      ((FavoredNodesPromoter)this.balancer).
+        generateFavoredNodesForMergedRegion(merged, regionA, regionB);
+    }
+  }
+
+  /*
+   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
+   * belongs to a non-system table.
+   */
+  private boolean shouldAssignFavoredNodes(HRegionInfo region) {
+    return this.shouldAssignRegionsWithFavoredNodes
+        && FavoredNodesManager.isFavoredNodeApplicable(region);
+  }
+
+  /**
+   * Marks the region as online.  Removes it from regions in transition and
+   * updates the in-memory assignment information.
+   * <p>
+   * Used when a region has been successfully opened on a region server.
+   * @param regionInfo
+   * @param sn
+   */
+  void regionOnline(HRegionInfo regionInfo, ServerName sn) {
+    regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
+  }
+
+  void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
+    numRegionsOpened.incrementAndGet();
+    regionStates.regionOnline(regionInfo, sn, openSeqNum);
+
+    // Remove plan if one.
+    clearRegionPlan(regionInfo);
+    balancer.regionOnline(regionInfo, sn);
+
+    // Tell our listeners that a region was opened
+    sendRegionOpenedNotification(regionInfo, sn);
+  }
+
+  /**
+   * Marks the region as offline.  Removes it from regions in transition and
+   * removes in-memory assignment information.
+   * <p>
+   * Used when a region has been closed and should remain closed.
+   * @param regionInfo
+   */
+  public void regionOffline(final HRegionInfo regionInfo) {
+    regionOffline(regionInfo, null);
+  }
+
+  public void offlineDisabledRegion(HRegionInfo regionInfo) {
+    replicasToClose.remove(regionInfo);
+    regionOffline(regionInfo);
+  }
+
+  // Assignment methods
+
+  /**
+   * Assigns the specified region.
+   * <p>
+   * If a RegionPlan is available with a valid destination then it will be used
+   * to determine what server region is assigned to.  If no RegionPlan is
+   * available, region will be assigned to a random available server.
+   * <p>
+   * Updates the RegionState and sends the OPEN RPC.
+   * <p>
+   * This will only succeed if the region is in transition and in a CLOSED or
+   * OFFLINE state or not in transition, and of course, the
+   * chosen server is up and running (It may have just crashed!).
+   *
+   * @param region server to be assigned
+   */
+  public void assign(HRegionInfo region) {
+    assign(region, false);
+  }
+
+  /**
+   * Use care with forceNewPlan. It could cause double assignment.
+   */
+  public void assign(HRegionInfo region, boolean forceNewPlan) {
+    if (isDisabledorDisablingRegionInRIT(region)) {
+      return;
+    }
+    String encodedName = region.getEncodedName();
+    Lock lock = locker.acquireLock(encodedName);
+    try {
+      RegionState state = forceRegionStateToOffline(region, forceNewPlan);
+      if (state != null) {
+        if (regionStates.wasRegionOnDeadServer(encodedName)) {
+          LOG.info("Skip assigning " + region.getRegionNameAsString()
+            + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
+            + " is dead but not processed yet");
+          return;
+        }
+        assign(state, forceNewPlan);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Bulk assign regions to <code>destination</code>.
+   * @param destination
+   * @param regions Regions to assign.
+   * @return true if successful
+   */
+  boolean assign(final ServerName destination, final List<HRegionInfo> regions)
+    throws InterruptedException {
+    long startTime = EnvironmentEdgeManager.currentTime();
+    try {
+      int regionCount = regions.size();
+      if (regionCount == 0) {
+        return true;
+      }
+      LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
+      Set<String> encodedNames = new HashSet<>(regionCount);
+      for (HRegionInfo region : regions) {
+        encodedNames.add(region.getEncodedName());
+      }
+
+      List<HRegionInfo> failedToOpenRegions = new ArrayList<>();
+      Map<String, Lock> locks = locker.acquireLocks(encodedNames);
+      try {
+        Map<String, RegionPlan> plans = new HashMap<>(regionCount);
+        List<RegionState> states = new ArrayList<>(regionCount);
+        for (HRegionInfo region : regions) {
+          String encodedName = region.getEncodedName();
+          if (!isDisabledorDisablingRegionInRIT(region)) {
+            RegionState state = forceRegionStateToOffline(region, false);
+            boolean onDeadServer = false;
+            if (state != null) {
+              if (regionStates.wasRegionOnDeadServer(encodedName)) {
+                LOG.info("Skip assigning " + region.getRegionNameAsString()
+                  + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
+                  + " is dead but not processed yet");
+                onDeadServer = true;
+              } else {
+                RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
+                plans.put(encodedName, plan);
+                states.add(state);
+                continue;
+              }
+            }
+            // Reassign if the region wasn't on a dead server
+            if (!onDeadServer) {
+              LOG.info("failed to force region state to offline, "
+                + "will reassign later: " + region);
+              failedToOpenRegions.add(region); // assign individually later
+            }
+          }
+          // Release the lock, this region is excluded from bulk assign because
+          // we can't update its state, or set its znode to offline.
+          Lock lock = locks.remove(encodedName);
+          lock.unlock();
+        }
+
+        if (server.isStopped()) {
+          return false;
+        }
+
+        // Add region plans, so we can updateTimers when one region is opened so
+        // that unnecessary timeout on RIT is reduced.
+        this.addPlans(plans);
+
+        List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos = new ArrayList<>(states.size());
+        for (RegionState state: states) {
+          HRegionInfo region = state.getRegion();
+          regionStates.updateRegionState(
+            region, State.PENDING_OPEN, destination);
+          List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
+          if (shouldAssignFavoredNodes(region)) {
+            favoredNodes = server.getFavoredNodesManager().getFavoredNodesWithDNPort(region);
+          }
+          regionOpenInfos.add(new Pair<>(region, favoredNodes));
+        }
+
+        // Move on to open regions.
+        try {
+          // Send OPEN RPC. If it fails on a IOE or RemoteException,
+          // regions will be assigned individually.
+          Configuration conf = server.getConfiguration();
+          long maxWaitTime = System.currentTimeMillis() +
+            conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000);
+          for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
+            try {
+              List<RegionOpeningState> regionOpeningStateList = serverManager
+                .sendRegionOpen(destination, regionOpenInfos);
+              for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
+                RegionOpeningState openingState = regionOpeningStateList.get(k);
+                if (openingState != RegionOpeningState.OPENED) {
+                  HRegionInfo region = regionOpenInfos.get(k).getFirst();
+                  LOG.info("Got opening state " + openingState
+                    + ", will reassign later: " + region);
+                  // Failed opening this region, reassign it later
+                  forceRegionStateToOffline(region, true);
+                  failedToOpenRegions.add(region);
+                }
+              }
+              break;
+            } catch (IOException e) {
+              if (e instanceof RemoteException) {
+                e = ((RemoteException)e).unwrapRemoteException();
+              }
+              if (e instanceof RegionServerStoppedException) {
+                LOG.warn("The region server was shut down, ", e);
+                // No need to retry, the region server is a goner.
+                return false;
+              } else if (e instanceof ServerNotRunningYetException) {
+                long now = System.currentTimeMillis();
+                if (now < maxWaitTime) {
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("Server is not yet up; waiting up to " +
+                      (maxWaitTime - now) + "ms", e);
+                  }
+                  Thread.sleep(100);
+                  i--; // reset the try count
+                  continue;
+                }
+              } else if (e instanceof java.net.SocketTimeoutException
+                  && this.serverManager.isServerOnline(destination)) {
+                // In case socket is timed out and the region server is still online,
+                // the openRegion RPC could have been accepted by the server and
+                // just the response didn't go through.  So we will retry to
+                // open the region on the same server.
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Bulk assigner openRegion() to " + destination
+                    + " has timed out, but the regions might"
+                    + " already be opened on it.", e);
+                }
+                // wait and reset the re-try count, server might be just busy.
+                Thread.sleep(100);
+                i--;
+                continue;
+              } else if (e instanceof FailedServerException && i < maximumAttempts) {
+                // In case the server is in the failed server list, no point to
+                // retry too soon. Retry after the failed_server_expiry time
+                long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+                  RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug(destination + " is on failed server list; waiting "
+                    + sleepTime + "ms", e);
+                }
+                Thread.sleep(sleepTime);
+                continue;
+              }
+              throw e;
+            }
+          }
+        } catch (IOException e) {
+          // Can be a socket timeout, EOF, NoRouteToHost, etc
+          LOG.info("Unable to communicate with " + destination
+            + " in order to assign regions, ", e);
+          for (RegionState state: states) {
+            HRegionInfo region = state.getRegion();
+            forceRegionStateToOffline(region, true);
+          }
+          return false;
+        }
+      } finally {
+        for (Lock lock : locks.values()) {
+          lock.unlock();
+        }
+      }
+
+      if (!failedToOpenRegions.isEmpty()) {
+        for (HRegionInfo region : failedToOpenRegions) {
+          if (!regionStates.isRegionOnline(region)) {
+            invokeAssign(region);
+          }
+        }
+      }
+
+      // wait for assignment completion
+      ArrayList<HRegionInfo> userRegionSet = new ArrayList<>(regions.size());
+      for (HRegionInfo region: regions) {
+        if (!region.getTable().isSystemTable()) {
+          userRegionSet.add(region);
+        }
+      }
+      if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+            System.currentTimeMillis())) {
+        LOG.debug("some user regions are still in transition: " + userRegionSet);
+      }
+      LOG.debug("Bulk assigning done for " + destination);
+      return true;
+    } finally {
+      metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
+    }
+  }
+
+  /**
+   * Send CLOSE RPC if the server is online, otherwise, offline the region.
+   *
+   * The RPC will be sent only to the region sever found in the region state
+   * if it is passed in, otherwise, to the src server specified. If region
+   * state is not specified, we don't update region state at all, instead
+   * we just send the RPC call. This is useful for some cleanup without
+   * messing around the region states (see handleRegion, on region opened
+   * on an unexpected server scenario, for an example)
+   */
+  private void unassign(final HRegionInfo region,
+      final ServerName server, final ServerName dest) {
+    for (int i = 1; i <= this.maximumAttempts; i++) {
+      if (this.server.isStopped() || this.server.isAborted()) {
+        LOG.debug("Server stopped/aborted; skipping unassign of " + region);
+        return;
+      }
+      if (!serverManager.isServerOnline(server)) {
+        LOG.debug("Offline " + region.getRegionNameAsString()
+          + ", no need to unassign since it's on a dead server: " + server);
+        regionStates.updateRegionState(region, State.OFFLINE);
+        return;
+      }
+      try {
+        // Send CLOSE RPC
+        if (serverManager.sendRegionClose(server, region, dest)) {
+          LOG.debug("Sent CLOSE to " + server + " for region " +
+            region.getRegionNameAsString());
+          return;
+        }
+        // This never happens. Currently regionserver close always return true.
+        // Todo; this can now happen (0.96) if there is an exception in a coprocessor
+        LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
+          region.getRegionNameAsString());
+      } catch (Throwable t) {
+        long sleepTime = 0;
+        Configuration conf = this.server.getConfiguration();
+        if (t instanceof RemoteException) {
+          t = ((RemoteException)t).unwrapRemoteException();
+        }
+        if (t instanceof RegionServerAbortedException
+            || t instanceof RegionServerStoppedException
+            || t instanceof ServerNotRunningYetException) {
+          // RS is aborting, we cannot offline the region since the region may need to do WAL
+          // recovery. Until we see  the RS expiration, we should retry.
+          sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+            RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+
+        } else if (t instanceof NotServingRegionException) {
+          LOG.debug("Offline " + region.getRegionNameAsString()
+            + ", it's not any more on " + server, t);
+          regionStates.updateRegionState(region, State.OFFLINE);
+          return;
+        } else if (t instanceof FailedServerException && i < maximumAttempts) {
+          // In case the server is in the failed server list, no point to
+          // retry too soon. Retry after the failed_server_expiry time
+          sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+          RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(server + " is on failed server list; waiting " + sleepTime + "ms", t);
+          }
+       }
+       try {
+         if (sleepTime > 0) {
+           Thread.sleep(sleepTime);
+         }
+       } catch (InterruptedException ie) {
+         LOG.warn("Interrupted unassign " + region.getRegionNameAsString(), ie);
+         Thread.currentThread().interrupt();
+         regionStates.updateRegionState(region, State.FAILED_CLOSE);
+         return;
+       }
+       LOG.info("Server " + server + " returned " + t + " for "
+         + region.getRegionNameAsString() + ", try=" + i
+         + " of " + this.maximumAttempts, t);
+      }
+    }
+    // Run out of attempts
+    regionStates.updateRegionState(region, State.FAILED_CLOSE);
+  }
+
+  /**
+   * Set region to OFFLINE unless it is opening and forceNewPlan is false.
+   */
+  private RegionState forceRegionStateToOffline(
+      final HRegionInfo region, final boolean forceNewPlan) {
+    RegionState state = regionStates.getRegionState(region);
+    if (state == null) {
+      LOG.warn("Assigning but not in region states: " + region);
+      state = regionStates.createRegionState(region);
+    }
+
+    if (forceNewPlan && LOG.isDebugEnabled()) {
+      LOG.debug("Force region state offline " + state);
+    }
+
+    switch (state.getState()) {
+    case OPEN:
+    case OPENING:
+    case PENDING_OPEN:
+    case CLOSING:
+    case PENDING_CLOSE:
+      if (!forceNewPlan) {
+        LOG.debug("Skip assigning " +
+          region + ", it is already " + state);
+        return null;
+      }
+    case FAILED_CLOSE:
+    case FAILED_OPEN:
+      regionStates.updateRegionState(region, State.PENDING_CLOSE);
+      unassign(region, state.getServerName(), null);
+      state = regionStates.getRegionState(region);
+      if (!state.isOffline() && !state.isClosed()) {
+        // If the region isn't offline, we can't re-assign
+        // it now. It will be assigned automatically after
+        // the regionserver reports it's closed.
+        return null;
+      }
+    case OFFLINE:
+    case CLOSED:
+      break;
+    default:
+      LOG.error("Trying to assign region " + region
+        + ", which is " + state);
+      return null;
+    }
+    return state;
+  }
+
+  /**
+   * Caller must hold lock on the passed <code>state</code> object.
+   * @param state
+   * @param forceNewPlan
+   */
+  private void assign(RegionState state, boolean forceNewPlan) {
+    long startTime = EnvironmentEdgeManager.currentTime();
+    try {
+      Configuration conf = server.getConfiguration();
+      RegionPlan plan = null;
+      long maxWaitTime = -1;
+      HRegionInfo region = state.getRegion();
+      Throwable previousException = null;
+      for (int i = 1; i <= maximumAttempts; i++) {
+        if (server.isStopped() || server.isAborted()) {
+          LOG.info("Skip assigning " + region.getRegionNameAsString()
+            + ", the server is stopped/aborted");
+          return;
+        }
+
+        if (plan == null) { // Get a server for the region at first
+          try {
+            plan = getRegionPlan(region, forceNewPlan);
+          } catch (HBaseIOException e) {
+            LOG.warn("Failed to get region plan", e);
+          }
+        }
+
+        if (plan == null) {
+          LOG.warn("Unable to determine a plan to assign " + region);
+
+          // For meta region, we have to keep retrying until succeeding
+          if (region.isMetaRegion()) {
+            if (i == maximumAttempts) {
+              i = 0; // re-set attempt count to 0 for at least 1 retry
+
+              LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
+                " after maximumAttempts (" + this.maximumAttempts +
+                "). Reset attempts count and continue retrying.");
+            }
+            waitForRetryingMetaAssignment();
+            continue;
+          }
+
+          regionStates.updateRegionState(region, State.FAILED_OPEN);
+          return;
+        }
+        LOG.info("Assigning " + region.getRegionNameAsString() +
+            " to " + plan.getDestination());
+        // Transition RegionState to PENDING_OPEN
+       regionStates.updateRegionState(region,
+          State.PENDING_OPEN, plan.getDestination());
+
+        boolean needNewPlan = false;
+        final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
+            " to " + plan.getDestination();
+        try {
+          List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
+          if (shouldAssignFavoredNodes(region)) {
+            favoredNodes = server.getFavoredNodesManager().getFavoredNodesWithDNPort(region);
+          }
+          serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
+          return; // we're done
+        } catch (Throwable t) {
+          if (t instanceof RemoteException) {
+            t = ((RemoteException) t).unwrapRemoteException();
+          }
+          previousException = t;
+
+          // Should we wait a little before retrying? If the server is starting it's yes.
+          boolean hold = (t instanceof ServerNotRunningYetException);
+
+          // In case socket is timed out and the region server is still online,
+          // the openRegion RPC could have been accepted by the server and
+          // just the response didn't go through.  So we will retry to
+          // open the region on the same server.
+          boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
+              && this.serverManager.isServerOnline(plan.getDestination()));
+
+          if (hold) {
+            LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
+              "try=" + i + " of " + this.maximumAttempts, t);
+
+            if (maxWaitTime < 0) {
+              maxWaitTime = EnvironmentEdgeManager.currentTime()
+                + this.server.getConfiguration().getLong(
+                  "hbase.regionserver.rpc.startup.waittime", 60000);
+            }
+            try {
+              long now = EnvironmentEdgeManager.currentTime();
+              if (now < maxWaitTime) {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Server is not yet up; waiting up to "
+                    + (maxWaitTime - now) + "ms", t);
+                }
+                Thread.sleep(100);
+                i--; // reset the try count
+              } else {
+                LOG.debug("Server is not up for a while; try a new one", t);
+                needNewPlan = true;
+              }
+            } catch (InterruptedException ie) {
+              LOG.warn("Failed to assign "
+                  + region.getRegionNameAsString() + " since interrupted", ie);
+              regionStates.updateRegionState(region, State.FAILED_OPEN);
+              Thread.currentThread().interrupt();
+              return;
+            }
+          } else if (retry) {
+            i--; // we want to retry as many times as needed as long as the RS is not dead.
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(assignMsg + ", trying to assign to the same region server due ", t);
+            }
+          } else {
+            needNewPlan = true;
+            LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
+                " try=" + i + " of " + this.maximumAttempts, t);
+          }
+        }
+
+        if (i == this.maximumAttempts) {
+          // For meta region, we have to keep retrying until succeeding
+          if (region.isMetaRegion()) {
+            i = 0; // re-set attempt count to 0 for at least 1 retry
+            LOG.warn(assignMsg +
+                ", trying to assign a hbase:meta region reached to maximumAttempts (" +
+                this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
+            waitForRetryingMetaAssignment();
+          }
+          else {
+            // Don't reset the region state or get a new plan any more.
+            // This is the last try.
+            continue;
+          }
+        }
+
+        // If region opened on destination of present plan, reassigning to new
+        // RS may cause double assignments. In case of RegionAlreadyInTransitionException
+        // reassigning to same RS.
+        if (needNewPlan) {
+          // Force a new plan and reassign. Will return null if no servers.
+          // The new plan could be the same as the existing plan since we don't
+          // exclude the server of the original plan, which should not be
+          // excluded since it could be the only server up now.
+          RegionPlan newPlan = null;
+          try {
+            newPlan = getRegionPlan(region, true);
+          } catch (HBaseIOException e) {
+            LOG.warn("Failed to get region plan", e);
+          }
+          if (newPlan == null) {
+            regionStates.updateRegionState(region, State.FAILED_OPEN);
+            LOG.warn("Unable to find a viable location to assign region " +
+                region.getRegionNameAsString());
+            return;
+          }
+
+          if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
+            // Clean out plan we failed execute and one that doesn't look like it'll
+            // succeed anyways; we need a new plan!
+            // Transition back to OFFLINE
+            regionStates.updateRegionState(region, State.OFFLINE);
+            plan = newPlan;
+          } else if(plan.getDestination().equals(newPlan.getDestination()) &&
+              previousException instanceof FailedServerException) {
+            try {
+              LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
+                " to the same failed server.");
+              Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+                RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
+            } catch (InterruptedException ie) {
+              LOG.warn("Failed to assign "
+                  + region.getRegionNameAsString() + " since interrupted", ie);
+              regionStates.updateRegionState(region, State.FAILED_OPEN);
+              Thread.currentThread().interrupt();
+              return;
+            }
+          }
+        }
+      }
+      // Run out of attempts
+      regionStates.updateRegionState(region, State.FAILED_OPEN);
+    } finally {
+      metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
+    }
+  }
+
+  private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
+    if (this.tableStateManager.isTableState(region.getTable(),
+            TableState.State.DISABLED,
+            TableState.State.DISABLING) || replicasToClose.contains(region)) {
+      LOG.info("Table " + region.getTable() + " is disabled or disabling;"
+        + " skipping assign of " + region.getRegionNameAsString());
+      offlineDisabledRegion(region);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * @param region the region to assign
+   * @param forceNewPlan If true, then if an existing plan exists, a new plan
+   * will be generated.
+   * @return Plan for passed <code>region</code> (If none currently, it creates one or
+   * if no servers to assign, it returns null).
+   */
+  private RegionPlan getRegionPlan(final HRegionInfo region,
+      final boolean forceNewPlan) throws HBaseIOException {
+    // Pickup existing plan or make a new one
+    final String encodedName = region.getEncodedName();
+    final List<ServerName> destServers =
+      serverManager.createDestinationServersList();
+
+    if (destServers.isEmpty()){
+      LOG.warn("Can't move " + encodedName +
+        ", there is no destination server available.");
+      return null;
+    }
+
+    RegionPlan randomPlan = null;
+    boolean newPlan = false;
+    RegionPlan existingPlan;
+
+    synchronized (this.regionPlans) {
+      existingPlan = this.regionPlans.get(encodedName);
+
+      if (existingPlan != null && existingPlan.getDestination() != null) {
+        LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
+          + " destination server is " + existingPlan.getDestination() +
+            " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
+      }
+
+      if (forceNewPlan
+          || existingPlan == null
+          || existingPlan.getDestination() == null
+          || !destServers.contains(existingPlan.getDestination())) {
+        newPlan = true;
+        try {
+          randomPlan = new RegionPlan(region, null,
+              balancer.randomAssignment(region, destServers));
+        } catch (IOException ex) {
+          LOG.warn("Failed to create new plan.",ex);
+          return null;
+        }
+        this.regionPlans.put(encodedName, randomPlan);
+      }
+    }
+
+    if (newPlan) {
+      if (randomPlan.getDestination() == null) {
+        LOG.warn("Can't find a destination for " + encodedName);
+        return null;
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No previous transition plan found (or ignoring " +
+          "an existing plan) for " + region.getRegionNameAsString() +
+          "; generated random plan=" + randomPlan + "; " + destServers.size() +
+          " (online=" + serverManager.getOnlineServers().size() +
+          ") available servers, forceNewPlan=" + forceNewPlan);
+      }
+      return randomPlan;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Using pre-existing plan for " +
+        region.getRegionNameAsString() + "; plan=" + existingPlan);
+    }
+    return existingPlan;
+  }
+
+  /**
+   * Wait for some time before retrying meta table region assignment
+   */
+  private void waitForRetryingMetaAssignment() {
+    try {
+      Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
+    } catch (InterruptedException e) {
+      LOG.error("Got exception while waiting for hbase:meta assignment");
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * Unassigns the specified region.
+   * <p>
+   * Updates the RegionState and sends the CLOSE RPC unless region is being
+   * split by regionserver; then the unassign fails (silently) because we
+   * presume the region being unassigned no longer exists (its been split out
+   * of existence). TODO: What to do if split fails and is rolled back and
+   * parent is revivified?
+   * <p>
+   * If a RegionPlan is already set, it will remain.
+   *
+   * @param region server to be unassigned
+   */
+  public void unassign(HRegionInfo region) {
+    unassign(region, null);
+  }
+
+
+  /**
+   * Unassigns the specified region.
+   * <p>
+   * Updates the RegionState and sends the CLOSE RPC unless region is being
+   * split by regionserver; then the unassign fails (silently) because we
+   * presume the region being unassigned no longer exists (its been split out
+   * of existence). TODO: What to do if split fails and is rolled back and
+   * parent is revivified?
+   * <p>
+   * If a RegionPlan is already set, it will remain.
+   *
+   * @param region server to be unassigned
+   * @param dest the destination server of the region
+   */
+  public void unassign(HRegionInfo region, ServerName dest) {
+    // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
+    LOG.debug("Starting unassign of " + region.getRegionNameAsString()
+      + " (offlining), current state: " + regionStates.getRegionState(region));
+
+    String encodedName = region.getEncodedName();
+    // Grab the state of this region and synchronize on it
+    // We need a lock here as we're going to do a put later and we don't want multiple states
+    //  creation
+    ReentrantLock lock = locker.acquireLock(encodedName);
+    RegionState state = regionStates.getRegionTransitionState(encodedName);
+    try {
+      if (state == null || state.isFailedClose()) {
+        if (state == null) {
+          // Region is not in transition.
+          // We can unassign it only if it's not SPLIT/MERGED.
+          state = regionStates.getRegionState(encodedName);
+          if (state != null && state.isUnassignable()) {
+            LOG.info("Attempting to unassign " + state + ", ignored");
+            // Offline region will be reassigned below
+            return;
+          }
+          if (state == null || state.getServerName() == null) {
+            // We don't know where the region is, offline it.
+            // No need to send CLOSE RPC
+            LOG.warn("Attempting to unassign a region not in RegionStates "
+              + region.getRegionNameAsString() + ", offlined");
+            regionOffline(region);
+            return;
+          }
+        }
+        state = regionStates.updateRegionState(
+          region, State.PENDING_CLOSE);
+      } else if (state.isFailedOpen()) {
+        // The region is not open yet
+        regionOffline(region);
+        return;
+      } else {
+        LOG.debug("Attempting to unassign " +
+          region.getRegionNameAsString() + " but it is " +
+          "already in transition (" + state.getState());
+        return;
+      }
+
+      unassign(region, state.getServerName(), dest);
+    } finally {
+      lock.unlock();
+
+      // Region is expected to be reassigned afterwards
+      if (!replicasToClose.contains(region)
+          && regionStates.isRegionInState(region, State.OFFLINE)) {
+        assign(region);
+      }
+    }
+  }
+
+  /**
+   * Used by unit tests. Return the number of regions opened so far in the life
+   * of the master. Increases by one every time the master opens a region
+   * @return the counter value of the number of regions opened so far
+   */
+  public int getNumRegionsOpened() {
+    return numRegionsOpened.get();
+  }
+
+  /**
+   * Waits until the specified region has completed assignment.
+   * <p>
+   * If the region is already assigned, returns immediately.  Otherwise, method
+   * blocks until the region is assigned.
+   * @param regionInfo region to wait on assignment for
+   * @return true if the region is assigned false otherwise.
+   * @throws InterruptedException
+   */
+  public boolean waitForAssignment(HRegionInfo regionInfo)
+      throws InterruptedException {
+    ArrayList<HRegionInfo> regionSet = new ArrayList<>(1);
+    regionSet.add(regionInfo);
+    return waitForAssignment(regionSet, true, Long.MAX_VALUE);
+  }
+
+  /**
+   * Waits until the specified region has completed assignment, or the deadline is reached.
+   */
+  protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+      final boolean waitTillAllAssigned, final int reassigningRegions,
+      final long minEndTime) throws InterruptedException {
+    long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
+    if (deadline < 0) { // Overflow
+      deadline = Long.MAX_VALUE; // wait forever
+    }
+    return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
+  }
+
+  /**
+   * Waits until the specified region has completed assignment, or the deadline is reached.
+   * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
+   * @param waitTillAllAssigned true if we should wait all the regions to be assigned
+   * @param deadline the timestamp after which the wait is aborted
+   * @return true if all the regions are assigned false otherwise.
+   * @throws InterruptedException
+   */
+  protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+      final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
+    // We're not synchronizing on regionsInTransition now because we don't use any iterator.
+    while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
+      int failedOpenCount = 0;
+      Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
+      while (regionInfoIterator.hasNext()) {
+        HRegionInfo hri = regionInfoIterator.next();
+        if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
+            State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
+          regionInfoIterator.remove();
+        } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
+          failedOpenCount++;
+        }
+      }
+      if (!waitTillAllAssigned) {
+        // No need to wait, let assignment going on asynchronously
+        break;
+      }
+      if (!regionSet.isEmpty()) {
+        if (failedOpenCount == regionSet.size()) {
+          // all the regions we are waiting had an error on open.
+          break;
+        }
+        regionStates.waitForUpdate(100);
+      }
+    }
+    return regionSet.isEmpty();
+  }
+
+  /**
+   * Assigns the hbase:meta region or a replica.
+   * <p>
+   * Assumes that hbase:meta is currently closed and is not being actively served by
+   * any RegionServer.
+   * @param hri TODO
+   */
+  public void assignMeta(HRegionInfo hri) throws KeeperException {
+    regionStates.updateRegionState(hri, State.OFFLINE);
+    assign(hri);
+  }
+
+  /**
+   * Assigns specified regions retaining assignments, if any.
+   * <p>
+   * This is a synchronous call and will return once every region has been
+   * assigned.  If anything fails, an exception is thrown
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public void assign(Map<HRegionInfo, ServerName> regions)
+        throws IOException, InterruptedException {
+    if (regions == null || regions.isEmpty()) {
+      return;
+    }
+    List<ServerName> servers = serverManager.createDestinationServersList();
+    if (servers == null || servers.isEmpty()) {
+      throw new IOException("Found no destination server to assign region(s)");
+    }
+
+    // Reuse existing assignment info
+    Map<ServerName, List<HRegionInfo>> bulkPlan =
+      balancer.retainAssignment(regions, servers);
+    if (bulkPlan == null) {
+      throw new IOException("Unable to determine a plan to assign region(s)");
+    }
+
+    processBogusAssignments(bulkPlan);
+
+    assign(regions.size(), servers.size(),
+      "retainAssignment=true", bulkPlan);
+  }
+
+  /**
+   * Assigns specified regions round robin, if any.
+   * <p>
+   * This is a synchronous call and will return once every region has been
+   * assigned.  If anything fails, an exception is thrown
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public void assign(List<HRegionInfo> regions)
+        throws IOException, InterruptedException {
+    if (regions == null || regions.isEmpty()) {
+      return;
+    }
+
+    List<ServerName> servers = serverManager.createDestinationServersList();
+    if (servers == null || servers.isEmpty()) {
+      throw new IOException("Found no destination server to assign region(s)");
+    }
+
+    // Generate a round-robin bulk assignment plan
+    Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
+    if (bulkPlan == null) {
+      throw new IOException("Unable to determine a plan to assign region(s)");
+    }
+
+    processBogusAssignments(bulkPlan);
+
+    assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
+  }
+
+  private void assign(int regions, int totalServers,
+      String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
+          throws InterruptedException, IOException {
+
+    int servers = bulkPlan.size();
+    if (servers == 1 || (regions < bulkAssignThresholdRegions
+        && servers < bulkAssignThresholdServers)) {
+
+      // Not use bulk assignment.  This could be more efficient in small
+      // cluster, especially mini cluster for testing, so that tests won't time out
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Not using bulk assignment since we are assigning only " + regions +
+          " region(s) to " + servers + " server(s)");
+      }
+
+      // invoke assignment (async)
+      ArrayList<HRegionInfo> userRegionSet = new ArrayList<>(regions);
+      for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
+        if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
+          for (HRegionInfo region: plan.getValue()) {
+            if (!regionStates.isRegionOnline(region)) {
+              invokeAssign(region);
+              if (!region.getTable().isSystemTable()) {
+                userRegionSet.add(region);
+              }
+            }
+          }
+        }
+      }
+
+      // wait for assignment completion
+      if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+            System.currentTimeMillis())) {
+        LOG.debug("some user regions are still in transition: " + userRegionSet);
+      }
+    } else {
+      LOG.info("Bulk assigning " + regions + " region(s) across "
+        + totalServers + " server(s), " + message);
+
+      // Use fixed count thread pool assigning.
+      BulkAssigner ba = new GeneralBulkAssigner(
+        this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
+      ba.bulkAssign();
+      LOG.info("Bulk assigning done");
+    }
+  }
+
+  /**
+   * Assigns all user regions, if any exist.  Used during cluster startup.
+   * <p>
+   * This is a synchronous call and will return once every region has been
+   * assigned.  If anything fails, an exception is thrown and the cluster
+   * should be shutdown.
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
+      throws IOException, InterruptedException {
+    if (allRegions == null || allRegions.isEmpty()) return;
+
+    // Determine what type of assignment to do on startup
+    boolean retainAssignment = server.getConfiguration().
+      getBoolean("hbase.master.startup.retainassign", true);
+
+    Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
+    if (retainAssignment) {
+      assign(allRegions);
+    } else {
+      List<HRegionInfo> regions = new ArrayList<>(regionsFromMetaScan);
+      assign(regions);
+    }
+
+    for (HRegionInfo hri : regionsFromMetaScan) {
+      TableName tableName = hri.getTable();
+      if (!tableStateManager.isTableState(tableName,
+              TableState.State.ENABLED)) {
+        setEnabledTable(tableName);
+      }
+    }
+    // assign all the replicas that were not recorded in the meta
+    assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
+  }
+
+  /**
+   * Get number of replicas of a table
+   */
+  private static int getNumReplicas(MasterServices master, TableName table) {
+    int numReplica = 1;
+    try {
+      HTableDescriptor htd = master.getTableDescriptors().get(table);
+      if (htd == null) {
+        LOG.warn("master can not get TableDescriptor from table '" + table);
+      } else {
+        numReplica = htd.getRegionReplication();
+      }
+    } catch (IOException e){
+      LOG.warn("Couldn't get the replication attribute of the table " + table + " due to "
+          + e.getMessage());
+    }
+    return numReplica;
+  }
+
+  /**
+   * Get a list of replica regions that are:
+   * not recorded in meta yet. We might not have recorded the locations
+   * for the replicas since the replicas may not have been online yet, master restarted
+   * in the middle of assigning, ZK erased, etc.
+   * @param regionsRecordedInMeta the list of regions we know are recorded in meta
+   * either as a default, or, as the location of a replica
+   * @param master
+   * @return list of replica regions
+   * @throws IOException
+   */
+  public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
+      Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
+    List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<>();
+    for (HRegionInfo hri : regionsRecordedInMeta) {
+      TableName table = hri.getTable();
+      if(master.getTableDescriptors().get(table) == null)
+        continue;
+      int  desiredRegionReplication = getNumReplicas(master, table);
+      for (int i = 0; i < desiredRegionReplication; i++) {
+        HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
+        if (regionsRecordedInMeta.contains(replica)) continue;
+        regionsNotRecordedInMeta.add(replica);
+      }
+    }
+    return regionsNotRecordedInMeta;
+  }
+
+  /**
+   * Rebuild the list of user regions and assignment information.
+   * Updates regionstates with findings as we go through list of regions.
+   * @return set of servers not online that hosted some regions according to a scan of hbase:meta
+   * @throws IOException
+   */
+  Set<ServerName> rebuildUserRegions() throws
+          IOException, KeeperException {
+    Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
+            TableState.State.DISABLED, TableState.State.ENABLING);
+
+    Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
+            TableState.State.DISABLED,
+            TableState.State.DISABLING,
+            TableState.State.ENABLING);
+
+    // Region assignment from META
+    List<Result> results = MetaTableAccessor.fullScanRegions(server.getConnection());
+    // Get any new but slow to checkin region server that joined the cluster
+    Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
+    // Set of offline servers to be returned
+    Set<ServerName> offlineServers = new HashSet<>();
+    // Iterate regions in META
+    for (Result result : results) {
+      if (result == null && LOG.isDebugEnabled()){
+        LOG.debug("null result from meta - ignoring but this is strange.");
+        continue;
+      }
+      // keep a track of replicas to close. These were the replicas of the originally
+      // unmerged regions. The master might have closed them before but it mightn't
+      // maybe because it crashed.
+      PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
+      if (p.getFirst() != null && p.getSecond() != null) {
+        int numReplicas = getNumReplicas(server, p.getFirst().getTable());
+        for (HRegionInfo merge : p) {
+          for (int i = 1; i < numReplicas; i++) {
+            replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
+          }
+        }
+      }
+      RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
+      if (rl == null) {
+        continue;
+      }
+      HRegionLocation[] locations = rl.getRegionLocations();
+      if (locations == null) {
+        continue;
+      }
+      for (HRegionLocation hrl : locations) {
+        if (hrl == null) continue;
+        HRegionInfo regionInfo = hrl.getRegionInfo();
+        if (regionInfo == null) continue;
+        int replicaId = regionInfo.getReplicaId();
+        State state = RegionStateStore.getRegionState(result, replicaId);
+        // keep a track of replicas to close. These were the replicas of the split parents
+        // from the previous life of the master. The master should have closed them before
+        // but it couldn't maybe because it crashed
+        if (replicaId == 0 && state.equals(State.SPLIT)) {
+          for (HRegionLocation h : locations) {
+            replicasToClose.add(h.getRegionInfo());
+          }
+        }
+        ServerName lastHost = hrl.getServerName();
+        ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
+        regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
+        if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
+          // Region is not open (either offline or in transition), skip
+          continue;
+        }
+        TableName tableName = regionInfo.getTable();
+        if (!onlineServers.contains(regionLocation)) {
+          // Region is located on a server that isn't online
+          offlineServers.add(regionLocation);
+        } else if (!disabledOrEnablingTables.contains(tableName)) {
+          // Region is being served and on an active server
+          // add only if region not in disabled or enabling table
+          regionStates.regionOnline(regionInfo, regionLocation);
+          balancer.regionOnline(regionInfo, regionLocation);
+        }
+        // need to enable the table if not disabled or disabling or enabling
+        // this will be used in rolling restarts
+        if (!disabledOrDisablingOrEnabling.contains(tableName)
+          && !getTableStateManager().isTableState(tableName,
+                TableState.State.ENABLED)) {
+          setEnabledTable(tableName);
+        }
+      }
+    }
+    return offlineServers;
+  }
+
+  /**
+   * Processes list of regions in transition at startup
+   */
+  void processRegionsInTransition(Collection<RegionState> regionsInTransition) {
+    // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
+    // in case the RPC call is not sent out yet before the master was shut down
+    // since we update the state before we send the RPC call. We can't update
+    // the state after the RPC call. Otherwise, we don't know what's happened
+    // to the region if the master dies right after the RPC call is out.
+    for (RegionState regionState: regionsInTransition) {
+      LOG.info("Processing " + regionState);
+      ServerName serverName = regionState.getServerName();
+      // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
+      // case, try assigning it here.
+      if (serverName != null && !serverManager.getOnlineServers().containsKey(serverName)) {
+        LOG.info("Server " + serverName + " isn't online. SSH will handle this");
+        continue; // SSH will handle it
+      }
+      HRegionInfo regionInfo = regionState.getRegion();
+      RegionState.State state = regionState.getState();
+      switch (state) {
+      case CLOSED:
+        invokeAssign(regionState.getRegion());
+        break;
+      case PENDING_OPEN:
+        retrySendRegionOpen(regionState);
+        break;
+      case PENDING_CLOSE:
+        retrySendRegionClose(regionState);
+        break;
+      case FAILED_CLOSE:
+      case FAILED_OPEN:
+        invokeUnAssign(regionInfo);
+        break;
+      default:
+          // No process for other states
+          break;
+      }
+    }
+  }
+
+  /**
+   * At master failover, for pending_open region, make sure
+   * sendRegionOpen RPC call is sent to the target regionserver
+   */
+  private void retrySendRegionOpen(final RegionState regionState) {
+    this.executorService.submit(
+      new EventHandler(server, EventType.M_MASTER_RECOVERY) {
+        @Override
+        public void process() throws IOException {
+          HRegionInfo hri = regionState.getRegion();
+          ServerName serverName = regionState.getServerName();
+          ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
+          try {
+            for (int i = 1; i <= maximumAttempts; i++) {
+              if (!serverManager.isServerOnline(serverName)
+                  || server.isStopped() || server.isAborted()) {
+                return; // No need any more
+              }
+              try {
+                if (!regionState.equals(regionStates.getRegionState(hri))) {
+                  return; // Region is not in the expected state any more
+                }
+                List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
+                if (shouldAssignFavoredNodes(hri)) {
+                  FavoredNodesManager fnm = ((MasterServices)server).getFavoredNodesManager();
+                  favoredNodes = fnm.getFavoredNodesWithDNPort(hri);
+                }
+                serverManager.sendRegionOpen(serverName, hri, favoredNodes);
+                return; // we're done
+              } catch (Throwable t) {
+                if (t instanceof RemoteException) {
+                  t = ((RemoteException) t).unwrapRemoteException();
+                }
+                if (t instanceof FailedServerException && i < maximumAttempts) {
+                  // In case the server is in the failed server list, no point to
+                  // retry too soon. Retry after the failed_server_expiry time
+                  try {
+                    Configuration conf = this.server.getConfiguration();
+                    long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+                      RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug(serverName + " is on failed server list; waiting "
+                        + sleepTime + "ms", t);
+                    }
+                    Thread.sleep(sleepTime);
+                    continue;
+                  } catch (InterruptedException ie) {
+                    LOG.warn("Failed to assign "
+                      + hri.getRegionNameAsString() + " since interrupted", ie);
+                    regionStates.updateRegionState(hri, State.FAILED_OPEN);
+                    Thread.currentThread().interrupt();
+                    return;
+                  }
+                }
+                if (serverManager.isServerOnline(serverName)
+                    && t instanceof java.net.SocketTimeoutException) {
+                  i--; // reset the try count
+                } else {
+                  LOG.info("Got exception in retrying sendRegionOpen for "
+                    + regionState + "; try=" + i + " of " + maximumAttempts, t);
+                }
+                Threads.sleep(100);
+              }
+            }
+            // Run out of attempts
+            regionStates.updateRegionState(hri, State.FAILED_OPEN);
+          } finally {
+            lock.unlock();
+          }
+        }
+      });
+  }
+
+  /**
+   * At master failover, for pending_close region, make sure
+   * sendRegionClose RPC call is sent to the target regionserver
+   */
+  private void retrySendRegionClose(final RegionState regionState) {
+    this.executorService.submit(
+      new EventHandler(server, EventType.M_MASTER_RECOVERY) {
+        @Override
+        public void process() throws IOException {
+          HRegionInfo hri = regionState.getRegion();
+          ServerName serverName = regionState.getServerName();
+          ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
+          try {
+            for (int i = 1; i <= maximumAttempts; i++) {
+              if (!serverManager.isServerOnline(serverName)
+                  || server.isStopped() || server.isAborted()) {
+                return; // No need any more
+              }
+              try {
+                if (!regionState.equals(regionStates.getRegionState(hri))) {
+                  return; // Region is not in the expected state any more
+                }
+                serverManager.sendRegionClose(serverName, hri, null);
+                return; // Done.
+              } catch (Throwable t) {
+                if (t instanceof RemoteException) {
+                  t = ((RemoteException) t).unwrapRemoteException();
+                }
+                if (t instanceof FailedServerException && i < maximumAttempts) {
+                  // In case the server is in the failed server list, no point to
+                  // retry too soon. Retry after the failed_server_expiry time
+                  try {
+                    Configuration conf = this.server.getConfiguration();
+                    long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+                      RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug(serverName + " is on failed server list; waiting "
+                        + sleepTime + "ms", t);
+                    }
+                    Thread.sleep(sleepTime);
+                    continue;
+                  } catch (InterruptedException ie) {
+                    LOG.warn("Failed to unassign "
+                      + hri.getRegionNameAsString() + " since interrupted", ie);
+                    regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE);
+                    Thread.currentThread().interrupt();
+                    return;
+                  }
+                }
+                if (serverManager.isServerOnline(serverName)
+                    && t instanceof java.net.SocketTimeoutException) {
+                  i--; // reset the try count
+                } else {
+                  LOG.info("Got exception in retrying sendRegionClose for "
+                    + regionState + "; try=" + i + " of " + maximumAttempts, t);
+                }
+                Threads.sleep(100);
+              }
+            }
+            // Run out of attempts
+            regionStates.updateRegionState(hri, State.FAILED_CLOSE);
+          } finally {
+            lock.unlock();
+          }
+        }
+      });
+  }
+
+  /**
+   * Set Regions in transitions metrics.
+   * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
+   * This iterator is not fail fast, which may lead to stale read; but that's better than
+   * creating a copy of the map for metrics computation, as this method will be invoked
+   * on a frequent interval.
+   */
+  public void updateRegionsInTransitionMetrics() {
+    long currentTime = System.currentTimeMillis();
+    int totalRITs = 0;
+    int totalRITsOverThreshold = 0;
+    long oldestRITTime = 0;
+    int ritThreshold = this.server.getConfiguration().
+      getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
+    for (RegionState state: regionStates.getRegionsInTransition()) {
+      totalRITs++;
+      long ritTime = currentTime - state.getStamp();
+      if (ritTime > ritThreshold) { // more than the threshold
+        totalRITsOverThreshold++;
+      }
+      if (oldestRITTime < ritTime) {
+        oldestRITTime = ritTime;
+      }
+    }
+    if (this.metricsAssignmentManager != null) {
+      this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
+      this.metricsAssignmentManager.updateRITCount(totalRITs);
+      this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
+    }
+  }
+
+  /**
+   * @param region Region whose plan we are to clear.
+   */
+  private void clearRegionPlan(final HRegionInfo region) {
+    synchronized (this.regionPlans) {
+      this.regionPlans.remove(region.getEncodedName());
+    }
+  }
+
+  /**
+   * Wait on region to clear regions-in-transition.
+   * @param hri Region to wait on.
+   * @throws IOException
+   */
+  public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
+      throws IOException, InterruptedException {
+    waitOnRegionToClearRegionsInTransition(hri, -1L);
+  }
+
+  /**
+   * Wait on region to clear regions-in-transition or time out
+   * @param hri
+   * @param timeOut Milliseconds to wait for current region to be out of transition state.
+   * @return True when a region clears regions-in-transition before timeout otherwise false
+   * @throws InterruptedException
+   */
+  public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
+      throws InterruptedException {
+    if (!regionStates.isRegionInTransition(hri)) {
+      return true;
+    }
+    long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
+        + timeOut;
+    // There is already a timeout monitor on regions in transition so I
+    // should not have to have one here too?
+    LOG.info("Waiting for " + hri.getEncodedName() +
+        " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
+    while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
+      regionStates.waitForUpdate(100);
+      if (EnvironmentEdgeManager.currentTime() > end) {
+        LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
+        return false;
+      }
+    }
+    if (this.server.isStopped()) {
+      LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
+      return false;
+    }
+    return true;
+  }
+
+  void invokeAssign(HRegionInfo regionInfo) {
+    threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
+  }
+
+  void invokeAssignLater(HRegionInfo regionInfo, long sleepMillis) {
+    scheduledThreadPoolExecutor.schedule(new DelayedAssignCallable(
+        new AssignCallable(this, regionInfo)), sleepMillis, TimeUnit.MILLISECONDS);
+  }
+
+  void invokeUnAssign(HRegionInfo regionInfo) {
+    threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
+  }
+
+  public boolean isCarryingMeta(ServerName serverName) {
+    return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
+  }
+
+  public boolean isCarryingMetaReplica(ServerName serverName, int replicaId) {
+    return isCarryingRegion(serverName,
+        RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
+  }
+
+  public boolean isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
+    return isCarryingRegion(serverName, metaHri);
+  }
+
+  /**
+   * Check if the shutdown server carries the specific region.
+   * @return whether the serverName currently hosts the region
+   */
+  private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
+    RegionState regionState = regionStates.getRegionTransitionState(hri);
+    ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
+    if (transitionAddr != null) {
+      boolean matchTransitionAddr = transitionAddr.equals(serverName);
+      LOG.debug("Checking region=" + hri.getRegionNameAsString()
+        + ", transitioning on server=" + matchTransitionAddr
+        + " server being checked: " + serverName
+        + ", matches=" + matchTransitionAddr);
+      return matchTransitionAddr;
+    }
+
+    ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
+    boolean matchAssignedAddr = serverName.equals(assignedAddr);
+    LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
+      + " is on server=" + assignedAddr + ", server being checked: "
+      + serverName);
+    return matchAssignedAddr;
+  }
+
+  /**
+   * Clean out crashed server removing any assignments.
+   * @param sn Server that went down.
+   * @return list of regions in transition on this server
+   */
+  public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
+    // Clean out any existing assignment plans for this server
+    synchronized (this.regionPlans) {
+      for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
+          i.hasNext();) {
+        Map.Entry<String, RegionPlan> e = i.next();
+        ServerName otherSn = e.getValue().getDestination();
+        // The name will be null if the region is planned for a random assign.
+        if (otherSn != null && otherSn.equals(sn)) {
+          // Use iterator's remove else we'll get CME
+          i.remove();
+        }
+      }
+    }
+    List<HRegionInfo> rits = regionStates.serverOffline(sn);
+    for (Iterator<HRegionInfo> it = rits.iterator(); it.hasNext(); ) {
+      HRegionInfo hri = it.next();
+      String encodedName = hri.getEncodedName();
+
+      // We need a lock on the region as we could update it
+      Lock lock = locker.acquireLock(encodedName);
+      try {
+        RegionState regionState = regionStates.getRegionTransitionState(encodedName);
+        if (regionState == null
+            || (regionState.getServerName() != null && !regionState.isOnServer(sn))
+            || !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN,
+                State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
+          LOG.info("Skip " + regionState + " since it is not opening/failed_close"
+            + " on the dead server any more: " + sn);
+          it.remove();
+        } else {
+          if (tab

<TRUNCATED>