You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/24 07:20:41 UTC

[18/27] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi) Move to a new AssignmentManager, one that describes Assignment using a State Machine built on top of ProcedureV2 facility.

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
deleted file mode 100644
index 69ebd97..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ /dev/null
@@ -1,3053 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CoordinatedStateException;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.RegionStateListener;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MasterSwitchType;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.executor.EventHandler;
-import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.executor.ExecutorService;
-import org.apache.hadoop.hbase.favored.FavoredNodesManager;
-import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
-import org.apache.hadoop.hbase.ipc.FailedServerException;
-import org.apache.hadoop.hbase.ipc.RpcClient;
-import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
-import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.quotas.QuotaExceededException;
-import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
-import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
-import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.KeyLocker;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.PairOfSameType;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Manages and performs region assignment.
- * Related communications with regionserver are all done over RPC.
- */
-@InterfaceAudience.Private
-public class AssignmentManager {
-  private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
-
-  protected final MasterServices server;
-
-  private ServerManager serverManager;
-
-  private boolean shouldAssignRegionsWithFavoredNodes;
-
-  private LoadBalancer balancer;
-
-  private final MetricsAssignmentManager metricsAssignmentManager;
-
-  private AtomicInteger numRegionsOpened = new AtomicInteger(0);
-
-  final private KeyLocker<String> locker = new KeyLocker<>();
-
-  Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
-
-  /**
-   * Map of regions to reopen after the schema of a table is changed. Key -
-   * encoded region name, value - HRegionInfo
-   */
-  private final Map <String, HRegionInfo> regionsToReopen;
-
-  /*
-   * Maximum times we recurse an assignment/unassignment.
-   * See below in {@link #assign()} and {@link #unassign()}.
-   */
-  private final int maximumAttempts;
-
-  /**
-   * The sleep time for which the assignment will wait before retrying in case of
-   * hbase:meta assignment failure due to lack of availability of region plan or bad region plan
-   */
-  private final long sleepTimeBeforeRetryingMetaAssignment;
-
-  /** Plans for region movement. Key is the encoded version of a region name*/
-  // TODO: When do plans get cleaned out?  Ever? In server open and in server
-  // shutdown processing -- St.Ack
-  // All access to this Map must be synchronized.
-  final NavigableMap<String, RegionPlan> regionPlans = new TreeMap<>();
-
-  private final TableStateManager tableStateManager;
-
-  private final ExecutorService executorService;
-
-  private java.util.concurrent.ExecutorService threadPoolExecutorService;
-  private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
-
-  private final RegionStates regionStates;
-
-  // The threshold to use bulk assigning. Using bulk assignment
-  // only if assigning at least this many regions to at least this
-  // many servers. If assigning fewer regions to fewer servers,
-  // bulk assigning may be not as efficient.
-  private final int bulkAssignThresholdRegions;
-  private final int bulkAssignThresholdServers;
-  private final int bulkPerRegionOpenTimeGuesstimate;
-
-  // Should bulk assignment wait till all regions are assigned,
-  // or it is timed out?  This is useful to measure bulk assignment
-  // performance, but not needed in most use cases.
-  private final boolean bulkAssignWaitTillAllAssigned;
-
-  /**
-   * Indicator that AssignmentManager has recovered the region states so
-   * that ServerShutdownHandler can be fully enabled and re-assign regions
-   * of dead servers. So that when re-assignment happens, AssignmentManager
-   * has proper region states.
-   *
-   * Protected to ease testing.
-   */
-  protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
-
-  /**
-   * A map to track the count a region fails to open in a row.
-   * So that we don't try to open a region forever if the failure is
-   * unrecoverable.  We don't put this information in region states
-   * because we don't expect this to happen frequently; we don't
-   * want to copy this information over during each state transition either.
-   */
-  private final ConcurrentHashMap<String, AtomicInteger> failedOpenTracker = new ConcurrentHashMap<>();
-
-  // In case not using ZK for region assignment, region states
-  // are persisted in meta with a state store
-  private final RegionStateStore regionStateStore;
-
-  /**
-   * For testing only!  Set to true to skip handling of split.
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
-  public static boolean TEST_SKIP_SPLIT_HANDLING = false;
-
-  /** Listeners that are called on assignment events. */
-  private List<AssignmentListener> listeners = new CopyOnWriteArrayList<>();
-
-  private RegionStateListener regionStateListener;
-
-  private RetryCounter.BackoffPolicy backoffPolicy;
-  private RetryCounter.RetryConfig retryConfig;
-  /**
-   * Constructs a new assignment manager.
-   *
-   * @param server instance of HMaster this AM running inside
-   * @param serverManager serverManager for associated HMaster
-   * @param balancer implementation of {@link LoadBalancer}
-   * @param service Executor service
-   * @param metricsMaster metrics manager
-   * @throws IOException
-   */
-  public AssignmentManager(MasterServices server, ServerManager serverManager,
-      final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster,
-      final TableStateManager tableStateManager)
-          throws IOException {
-    this.server = server;
-    this.serverManager = serverManager;
-    this.executorService = service;
-    this.regionStateStore = new RegionStateStore(server);
-    this.regionsToReopen = Collections.synchronizedMap
-                           (new HashMap<String, HRegionInfo> ());
-    Configuration conf = server.getConfiguration();
-
-    this.tableStateManager = tableStateManager;
-
-    // This is the max attempts, not retries, so it should be at least 1.
-    this.maximumAttempts = Math.max(1,
-      this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
-    this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
-        "hbase.meta.assignment.retry.sleeptime", 1000l);
-    this.balancer = balancer;
-    // Only read favored nodes if using the favored nodes load balancer.
-    this.shouldAssignRegionsWithFavoredNodes = this.balancer instanceof FavoredNodesPromoter;
-    int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
-
-    this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
-        maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
-
-    this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
-        Threads.newDaemonThreadFactory("AM.Scheduler"));
-
-    this.regionStates = new RegionStates(
-      server, tableStateManager, serverManager, regionStateStore);
-
-    this.bulkAssignWaitTillAllAssigned =
-      conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
-    this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
-    this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
-    this.bulkPerRegionOpenTimeGuesstimate =
-      conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
-
-    this.metricsAssignmentManager = new MetricsAssignmentManager();
-
-    // Configurations for retrying opening a region on receiving a FAILED_OPEN
-    this.retryConfig = new RetryCounter.RetryConfig();
-    this.retryConfig.setSleepInterval(conf.getLong("hbase.assignment.retry.sleep.initial", 0l));
-    // Set the max time limit to the initial sleep interval so we use a constant time sleep strategy
-    // if the user does not set a max sleep limit
-    this.retryConfig.setMaxSleepTime(conf.getLong("hbase.assignment.retry.sleep.max",
-        retryConfig.getSleepInterval()));
-    this.backoffPolicy = getBackoffPolicy();
-  }
-
-  /**
-   * Returns the backoff policy used for Failed Region Open retries
-   * @return the backoff policy used for Failed Region Open retries
-   */
-  RetryCounter.BackoffPolicy getBackoffPolicy() {
-    return new RetryCounter.ExponentialBackoffPolicyWithLimit();
-  }
-
-  MetricsAssignmentManager getAssignmentManagerMetrics() {
-    return this.metricsAssignmentManager;
-  }
-
-  /**
-   * Add the listener to the notification list.
-   * @param listener The AssignmentListener to register
-   */
-  public void registerListener(final AssignmentListener listener) {
-    this.listeners.add(listener);
-  }
-
-  /**
-   * Remove the listener from the notification list.
-   * @param listener The AssignmentListener to unregister
-   */
-  public boolean unregisterListener(final AssignmentListener listener) {
-    return this.listeners.remove(listener);
-  }
-
-  /**
-   * @return Instance of ZKTableStateManager.
-   */
-  public TableStateManager getTableStateManager() {
-    // These are 'expensive' to make involving trip to zk ensemble so allow
-    // sharing.
-    return this.tableStateManager;
-  }
-
-  /**
-   * This SHOULD not be public. It is public now
-   * because of some unit tests.
-   *
-   * TODO: make it package private and keep RegionStates in the master package
-   */
-  public RegionStates getRegionStates() {
-    return regionStates;
-  }
-
-  /**
-   * Used in some tests to mock up region state in meta
-   */
-  @VisibleForTesting
-  RegionStateStore getRegionStateStore() {
-    return regionStateStore;
-  }
-
-  public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
-    return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
-  }
-
-  /**
-   * Add a regionPlan for the specified region.
-   * @param encodedName
-   * @param plan
-   */
-  public void addPlan(String encodedName, RegionPlan plan) {
-    synchronized (regionPlans) {
-      regionPlans.put(encodedName, plan);
-    }
-  }
-
-  /**
-   * Add a map of region plans.
-   */
-  public void addPlans(Map<String, RegionPlan> plans) {
-    synchronized (regionPlans) {
-      regionPlans.putAll(plans);
-    }
-  }
-
-  /**
-   * Set the list of regions that will be reopened
-   * because of an update in table schema
-   *
-   * @param regions
-   *          list of regions that should be tracked for reopen
-   */
-  public void setRegionsToReopen(List <HRegionInfo> regions) {
-    for(HRegionInfo hri : regions) {
-      regionsToReopen.put(hri.getEncodedName(), hri);
-    }
-  }
-
-  /**
-   * Used by the client to identify if all regions have the schema updates
-   *
-   * @param tableName
-   * @return Pair indicating the status of the alter command
-   * @throws IOException
-   */
-  public Pair<Integer, Integer> getReopenStatus(TableName tableName)
-      throws IOException {
-    List<HRegionInfo> hris;
-    if (TableName.META_TABLE_NAME.equals(tableName)) {
-      hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
-    } else {
-      hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true);
-    }
-
-    Integer pending = 0;
-    for (HRegionInfo hri : hris) {
-      String name = hri.getEncodedName();
-      // no lock concurrent access ok: sequential consistency respected.
-      if (regionsToReopen.containsKey(name)
-          || regionStates.isRegionInTransition(name)) {
-        pending++;
-      }
-    }
-    return new Pair<>(pending, hris.size());
-  }
-
-  /**
-   * Used by ServerShutdownHandler to make sure AssignmentManager has completed
-   * the failover cleanup before re-assigning regions of dead servers. So that
-   * when re-assignment happens, AssignmentManager has proper region states.
-   */
-  public boolean isFailoverCleanupDone() {
-    return failoverCleanupDone.get();
-  }
-
-  /**
-   * To avoid racing with AM, external entities may need to lock a region,
-   * for example, when SSH checks what regions to skip re-assigning.
-   */
-  public Lock acquireRegionLock(final String encodedName) {
-    return locker.acquireLock(encodedName);
-  }
-
-  /**
-   * Now, failover cleanup is completed. Notify server manager to
-   * process queued up dead servers processing, if any.
-   */
-  void failoverCleanupDone() {
-    failoverCleanupDone.set(true);
-    serverManager.processQueuedDeadServers();
-  }
-
-  /**
-   * Called on startup.
-   * Figures whether a fresh cluster start of we are joining extant running cluster.
-   * @throws IOException
-   * @throws KeeperException
-   * @throws InterruptedException
-   * @throws CoordinatedStateException
-   */
-  void joinCluster()
-  throws IOException, KeeperException, InterruptedException, CoordinatedStateException {
-    long startTime = System.currentTimeMillis();
-    // Concurrency note: In the below the accesses on regionsInTransition are
-    // outside of a synchronization block where usually all accesses to RIT are
-    // synchronized.  The presumption is that in this case it is safe since this
-    // method is being played by a single thread on startup.
-
-    // TODO: Regions that have a null location and are not in regionsInTransitions
-    // need to be handled.
-
-    // Scan hbase:meta to build list of existing regions, servers, and assignment
-    // Returns servers who have not checked in (assumed dead) that some regions
-    // were assigned to (according to the meta)
-    Set<ServerName> deadServers = rebuildUserRegions();
-
-    // This method will assign all user regions if a clean server startup or
-    // it will reconstruct master state and cleanup any leftovers from previous master process.
-    boolean failover = processDeadServersAndRegionsInTransition(deadServers);
-
-    LOG.info("Joined the cluster in " + (System.currentTimeMillis()
-      - startTime) + "ms, failover=" + failover);
-  }
-
-  /**
-   * Process all regions that are in transition in zookeeper and also
-   * processes the list of dead servers.
-   * Used by master joining an cluster.  If we figure this is a clean cluster
-   * startup, will assign all user regions.
-   * @param deadServers Set of servers that are offline probably legitimately that were carrying
-   * regions according to a scan of hbase:meta. Can be null.
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
-  throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
-    // TODO Needed? List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
-    boolean failover = !serverManager.getDeadServers().isEmpty();
-    if (failover) {
-      // This may not be a failover actually, especially if meta is on this master.
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
-      }
-      // Check if there are any regions on these servers
-      failover = false;
-      for (ServerName serverName : serverManager.getDeadServers().copyServerNames()) {
-        if (regionStates.getRegionAssignments().values().contains(serverName)) {
-          LOG.debug("Found regions on dead server: " + serverName);
-          failover = true;
-          break;
-        }
-      }
-    }
-    Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
-    if (!failover) {
-      // If any one region except meta is assigned, it's a failover.
-      for (Map.Entry<HRegionInfo, ServerName> en:
-          regionStates.getRegionAssignments().entrySet()) {
-        HRegionInfo hri = en.getKey();
-        if (!hri.isMetaTable()
-            && onlineServers.contains(en.getValue())) {
-          LOG.debug("Found region " + hri + " out on cluster");
-          failover = true;
-          break;
-        }
-      }
-    }
-    if (!failover) {
-      // If any region except meta is in transition on a live server, it's a failover.
-      Set<RegionState> regionsInTransition = regionStates.getRegionsInTransition();
-      if (!regionsInTransition.isEmpty()) {
-        for (RegionState regionState: regionsInTransition) {
-          ServerName serverName = regionState.getServerName();
-          if (!regionState.getRegion().isMetaRegion()
-              && serverName != null && onlineServers.contains(serverName)) {
-            LOG.debug("Found " + regionState + " for region " +
-              regionState.getRegion().getRegionNameAsString() + " for server " +
-                serverName + "in RITs");
-            failover = true;
-            break;
-          }
-        }
-      }
-    }
-    if (!failover) {
-      // If we get here, we have a full cluster restart. It is a failover only
-      // if there are some WALs are not split yet. For meta WALs, they should have
-      // been split already, if any. We can walk through those queued dead servers,
-      // if they don't have any WALs, this restart should be considered as a clean one
-      Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
-      if (!queuedDeadServers.isEmpty()) {
-        Configuration conf = server.getConfiguration();
-        Path walRootDir = FSUtils.getWALRootDir(conf);
-        FileSystem walFs = FSUtils.getWALFileSystem(conf);
-        for (ServerName serverName: queuedDeadServers) {
-          // In the case of a clean exit, the shutdown handler would have presplit any WALs and
-          // removed empty directories.
-          Path walDir = new Path(walRootDir,
-            AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
-          Path splitDir = walDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
-          if (checkWals(walFs, walDir) || checkWals(walFs, splitDir)) {
-            LOG.debug("Found queued dead server " + serverName);
-            failover = true;
-            break;
-          }
-        }
-        if (!failover) {
-          // We figured that it's not a failover, so no need to
-          // work on these re-queued dead servers any more.
-          LOG.info("AM figured that it's not a failover and cleaned up "
-            + queuedDeadServers.size() + " queued dead servers");
-          serverManager.removeRequeuedDeadServers();
-        }
-      }
-    }
-
-    Set<TableName> disabledOrDisablingOrEnabling = null;
-    Map<HRegionInfo, ServerName> allRegions = null;
-
-    if (!failover) {
-      disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
-        TableState.State.DISABLED, TableState.State.DISABLING,
-        TableState.State.ENABLING);
-
-      // Clean re/start, mark all user regions closed before reassignment
-      allRegions = regionStates.closeAllUserRegions(
-        disabledOrDisablingOrEnabling);
-    }
-
-    // Now region states are restored
-    regionStateStore.start();
-
-    if (failover) {
-      if (deadServers != null && !deadServers.isEmpty()) {
-        for (ServerName serverName: deadServers) {
-          if (!serverManager.isServerDead(serverName)) {
-            serverManager.expireServer(serverName); // Let SSH do region re-assign
-          }
-        }
-      }
-      processRegionsInTransition(regionStates.getRegionsInTransition());
-    }
-
-    // Now we can safely claim failover cleanup completed and enable
-    // ServerShutdownHandler for further processing. The nodes (below)
-    // in transition, if any, are for regions not related to those
-    // dead servers at all, and can be done in parallel to SSH.
-    failoverCleanupDone();
-    if (!failover) {
-      // Fresh cluster startup.
-      LOG.info("Clean cluster startup. Don't reassign user regions");
-      assignAllUserRegions(allRegions);
-    } else {
-      LOG.info("Failover! Reassign user regions");
-    }
-    // unassign replicas of the split parents and the merged regions
-    // the daughter replicas are opened in assignAllUserRegions if it was
-    // not already opened.
-    for (HRegionInfo h : replicasToClose) {
-      unassign(h);
-    }
-    replicasToClose.clear();
-    return failover;
-  }
-
-  private boolean checkWals(FileSystem fs, Path dir) throws IOException {
-    if (!fs.exists(dir)) {
-      LOG.debug(dir + " doesn't exist");
-      return false;
-    }
-    if (!fs.getFileStatus(dir).isDirectory()) {
-      LOG.warn(dir + " is not a directory");
-      return false;
-    }
-    FileStatus[] files = FSUtils.listStatus(fs, dir);
-    if (files == null || files.length == 0) {
-      LOG.debug(dir + " has no files");
-      return false;
-    }
-    for (int i = 0; i < files.length; i++) {
-      if (files[i].isFile() && files[i].getLen() > 0) {
-        LOG.debug(dir + " has a non-empty file: " + files[i].getPath());
-        return true;
-      } else if (files[i].isDirectory() && checkWals(fs, files[i].getPath())) {
-        LOG.debug(dir + " is a directory and has a non-empty file: " + files[i].getPath());
-        return true;
-      }
-    }
-    LOG.debug("Found 0 non-empty wal files for :" + dir);
-    return false;
-  }
-
-  /**
-   * When a region is closed, it should be removed from the regionsToReopen
-   * @param hri HRegionInfo of the region which was closed
-   */
-  public void removeClosedRegion(HRegionInfo hri) {
-    if (regionsToReopen.remove(hri.getEncodedName()) != null) {
-      LOG.debug("Removed region from reopening regions because it was closed");
-    }
-  }
-
-  void processFavoredNodesForDaughters(HRegionInfo parent,
-    HRegionInfo regionA, HRegionInfo regionB) throws IOException {
-    if (shouldAssignFavoredNodes(parent)) {
-      List<ServerName> onlineServers = this.serverManager.getOnlineServersList();
-      ((FavoredNodesPromoter) this.balancer).
-          generateFavoredNodesForDaughter(onlineServers, parent, regionA, regionB);
-    }
-  }
-
-  void processFavoredNodesForMerge(HRegionInfo merged, HRegionInfo regionA, HRegionInfo regionB)
-    throws IOException {
-    if (shouldAssignFavoredNodes(merged)) {
-      ((FavoredNodesPromoter)this.balancer).
-        generateFavoredNodesForMergedRegion(merged, regionA, regionB);
-    }
-  }
-
-  /*
-   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
-   * belongs to a non-system table.
-   */
-  private boolean shouldAssignFavoredNodes(HRegionInfo region) {
-    return this.shouldAssignRegionsWithFavoredNodes
-        && FavoredNodesManager.isFavoredNodeApplicable(region);
-  }
-
-  /**
-   * Marks the region as online.  Removes it from regions in transition and
-   * updates the in-memory assignment information.
-   * <p>
-   * Used when a region has been successfully opened on a region server.
-   * @param regionInfo
-   * @param sn
-   */
-  void regionOnline(HRegionInfo regionInfo, ServerName sn) {
-    regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
-  }
-
-  void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
-    numRegionsOpened.incrementAndGet();
-    regionStates.regionOnline(regionInfo, sn, openSeqNum);
-
-    // Remove plan if one.
-    clearRegionPlan(regionInfo);
-    balancer.regionOnline(regionInfo, sn);
-
-    // Tell our listeners that a region was opened
-    sendRegionOpenedNotification(regionInfo, sn);
-  }
-
-  /**
-   * Marks the region as offline.  Removes it from regions in transition and
-   * removes in-memory assignment information.
-   * <p>
-   * Used when a region has been closed and should remain closed.
-   * @param regionInfo
-   */
-  public void regionOffline(final HRegionInfo regionInfo) {
-    regionOffline(regionInfo, null);
-  }
-
-  public void offlineDisabledRegion(HRegionInfo regionInfo) {
-    replicasToClose.remove(regionInfo);
-    regionOffline(regionInfo);
-  }
-
-  // Assignment methods
-
-  /**
-   * Assigns the specified region.
-   * <p>
-   * If a RegionPlan is available with a valid destination then it will be used
-   * to determine what server region is assigned to.  If no RegionPlan is
-   * available, region will be assigned to a random available server.
-   * <p>
-   * Updates the RegionState and sends the OPEN RPC.
-   * <p>
-   * This will only succeed if the region is in transition and in a CLOSED or
-   * OFFLINE state or not in transition, and of course, the
-   * chosen server is up and running (It may have just crashed!).
-   *
-   * @param region server to be assigned
-   */
-  public void assign(HRegionInfo region) {
-    assign(region, false);
-  }
-
-  /**
-   * Use care with forceNewPlan. It could cause double assignment.
-   */
-  public void assign(HRegionInfo region, boolean forceNewPlan) {
-    if (isDisabledorDisablingRegionInRIT(region)) {
-      return;
-    }
-    String encodedName = region.getEncodedName();
-    Lock lock = locker.acquireLock(encodedName);
-    try {
-      RegionState state = forceRegionStateToOffline(region, forceNewPlan);
-      if (state != null) {
-        if (regionStates.wasRegionOnDeadServer(encodedName)) {
-          LOG.info("Skip assigning " + region.getRegionNameAsString()
-            + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
-            + " is dead but not processed yet");
-          return;
-        }
-        assign(state, forceNewPlan);
-      }
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Bulk assign regions to <code>destination</code>.
-   * @param destination
-   * @param regions Regions to assign.
-   * @return true if successful
-   */
-  boolean assign(final ServerName destination, final List<HRegionInfo> regions)
-    throws InterruptedException {
-    long startTime = EnvironmentEdgeManager.currentTime();
-    try {
-      int regionCount = regions.size();
-      if (regionCount == 0) {
-        return true;
-      }
-      LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
-      Set<String> encodedNames = new HashSet<>(regionCount);
-      for (HRegionInfo region : regions) {
-        encodedNames.add(region.getEncodedName());
-      }
-
-      List<HRegionInfo> failedToOpenRegions = new ArrayList<>();
-      Map<String, Lock> locks = locker.acquireLocks(encodedNames);
-      try {
-        Map<String, RegionPlan> plans = new HashMap<>(regionCount);
-        List<RegionState> states = new ArrayList<>(regionCount);
-        for (HRegionInfo region : regions) {
-          String encodedName = region.getEncodedName();
-          if (!isDisabledorDisablingRegionInRIT(region)) {
-            RegionState state = forceRegionStateToOffline(region, false);
-            boolean onDeadServer = false;
-            if (state != null) {
-              if (regionStates.wasRegionOnDeadServer(encodedName)) {
-                LOG.info("Skip assigning " + region.getRegionNameAsString()
-                  + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
-                  + " is dead but not processed yet");
-                onDeadServer = true;
-              } else {
-                RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
-                plans.put(encodedName, plan);
-                states.add(state);
-                continue;
-              }
-            }
-            // Reassign if the region wasn't on a dead server
-            if (!onDeadServer) {
-              LOG.info("failed to force region state to offline, "
-                + "will reassign later: " + region);
-              failedToOpenRegions.add(region); // assign individually later
-            }
-          }
-          // Release the lock, this region is excluded from bulk assign because
-          // we can't update its state, or set its znode to offline.
-          Lock lock = locks.remove(encodedName);
-          lock.unlock();
-        }
-
-        if (server.isStopped()) {
-          return false;
-        }
-
-        // Add region plans, so we can updateTimers when one region is opened so
-        // that unnecessary timeout on RIT is reduced.
-        this.addPlans(plans);
-
-        List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos = new ArrayList<>(states.size());
-        for (RegionState state: states) {
-          HRegionInfo region = state.getRegion();
-          regionStates.updateRegionState(
-            region, State.PENDING_OPEN, destination);
-          List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
-          if (shouldAssignFavoredNodes(region)) {
-            favoredNodes = server.getFavoredNodesManager().getFavoredNodesWithDNPort(region);
-          }
-          regionOpenInfos.add(new Pair<>(region, favoredNodes));
-        }
-
-        // Move on to open regions.
-        try {
-          // Send OPEN RPC. If it fails on a IOE or RemoteException,
-          // regions will be assigned individually.
-          Configuration conf = server.getConfiguration();
-          long maxWaitTime = System.currentTimeMillis() +
-            conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000);
-          for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
-            try {
-              List<RegionOpeningState> regionOpeningStateList = serverManager
-                .sendRegionOpen(destination, regionOpenInfos);
-              for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
-                RegionOpeningState openingState = regionOpeningStateList.get(k);
-                if (openingState != RegionOpeningState.OPENED) {
-                  HRegionInfo region = regionOpenInfos.get(k).getFirst();
-                  LOG.info("Got opening state " + openingState
-                    + ", will reassign later: " + region);
-                  // Failed opening this region, reassign it later
-                  forceRegionStateToOffline(region, true);
-                  failedToOpenRegions.add(region);
-                }
-              }
-              break;
-            } catch (IOException e) {
-              if (e instanceof RemoteException) {
-                e = ((RemoteException)e).unwrapRemoteException();
-              }
-              if (e instanceof RegionServerStoppedException) {
-                LOG.warn("The region server was shut down, ", e);
-                // No need to retry, the region server is a goner.
-                return false;
-              } else if (e instanceof ServerNotRunningYetException) {
-                long now = System.currentTimeMillis();
-                if (now < maxWaitTime) {
-                  if (LOG.isDebugEnabled()) {
-                    LOG.debug("Server is not yet up; waiting up to " +
-                      (maxWaitTime - now) + "ms", e);
-                  }
-                  Thread.sleep(100);
-                  i--; // reset the try count
-                  continue;
-                }
-              } else if (e instanceof java.net.SocketTimeoutException
-                  && this.serverManager.isServerOnline(destination)) {
-                // In case socket is timed out and the region server is still online,
-                // the openRegion RPC could have been accepted by the server and
-                // just the response didn't go through.  So we will retry to
-                // open the region on the same server.
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Bulk assigner openRegion() to " + destination
-                    + " has timed out, but the regions might"
-                    + " already be opened on it.", e);
-                }
-                // wait and reset the re-try count, server might be just busy.
-                Thread.sleep(100);
-                i--;
-                continue;
-              } else if (e instanceof FailedServerException && i < maximumAttempts) {
-                // In case the server is in the failed server list, no point to
-                // retry too soon. Retry after the failed_server_expiry time
-                long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
-                  RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug(destination + " is on failed server list; waiting "
-                    + sleepTime + "ms", e);
-                }
-                Thread.sleep(sleepTime);
-                continue;
-              }
-              throw e;
-            }
-          }
-        } catch (IOException e) {
-          // Can be a socket timeout, EOF, NoRouteToHost, etc
-          LOG.info("Unable to communicate with " + destination
-            + " in order to assign regions, ", e);
-          for (RegionState state: states) {
-            HRegionInfo region = state.getRegion();
-            forceRegionStateToOffline(region, true);
-          }
-          return false;
-        }
-      } finally {
-        for (Lock lock : locks.values()) {
-          lock.unlock();
-        }
-      }
-
-      if (!failedToOpenRegions.isEmpty()) {
-        for (HRegionInfo region : failedToOpenRegions) {
-          if (!regionStates.isRegionOnline(region)) {
-            invokeAssign(region);
-          }
-        }
-      }
-
-      // wait for assignment completion
-      ArrayList<HRegionInfo> userRegionSet = new ArrayList<>(regions.size());
-      for (HRegionInfo region: regions) {
-        if (!region.getTable().isSystemTable()) {
-          userRegionSet.add(region);
-        }
-      }
-      if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
-            System.currentTimeMillis())) {
-        LOG.debug("some user regions are still in transition: " + userRegionSet);
-      }
-      LOG.debug("Bulk assigning done for " + destination);
-      return true;
-    } finally {
-      metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
-    }
-  }
-
-  /**
-   * Send CLOSE RPC if the server is online, otherwise, offline the region.
-   *
-   * The RPC will be sent only to the region sever found in the region state
-   * if it is passed in, otherwise, to the src server specified. If region
-   * state is not specified, we don't update region state at all, instead
-   * we just send the RPC call. This is useful for some cleanup without
-   * messing around the region states (see handleRegion, on region opened
-   * on an unexpected server scenario, for an example)
-   */
-  private void unassign(final HRegionInfo region,
-      final ServerName server, final ServerName dest) {
-    for (int i = 1; i <= this.maximumAttempts; i++) {
-      if (this.server.isStopped() || this.server.isAborted()) {
-        LOG.debug("Server stopped/aborted; skipping unassign of " + region);
-        return;
-      }
-      if (!serverManager.isServerOnline(server)) {
-        LOG.debug("Offline " + region.getRegionNameAsString()
-          + ", no need to unassign since it's on a dead server: " + server);
-        regionStates.updateRegionState(region, State.OFFLINE);
-        return;
-      }
-      try {
-        // Send CLOSE RPC
-        if (serverManager.sendRegionClose(server, region, dest)) {
-          LOG.debug("Sent CLOSE to " + server + " for region " +
-            region.getRegionNameAsString());
-          return;
-        }
-        // This never happens. Currently regionserver close always return true.
-        // Todo; this can now happen (0.96) if there is an exception in a coprocessor
-        LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
-          region.getRegionNameAsString());
-      } catch (Throwable t) {
-        long sleepTime = 0;
-        Configuration conf = this.server.getConfiguration();
-        if (t instanceof RemoteException) {
-          t = ((RemoteException)t).unwrapRemoteException();
-        }
-        if (t instanceof RegionServerAbortedException
-            || t instanceof RegionServerStoppedException
-            || t instanceof ServerNotRunningYetException) {
-          // RS is aborting, we cannot offline the region since the region may need to do WAL
-          // recovery. Until we see  the RS expiration, we should retry.
-          sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
-            RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
-
-        } else if (t instanceof NotServingRegionException) {
-          LOG.debug("Offline " + region.getRegionNameAsString()
-            + ", it's not any more on " + server, t);
-          regionStates.updateRegionState(region, State.OFFLINE);
-          return;
-        } else if (t instanceof FailedServerException && i < maximumAttempts) {
-          // In case the server is in the failed server list, no point to
-          // retry too soon. Retry after the failed_server_expiry time
-          sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
-          RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(server + " is on failed server list; waiting " + sleepTime + "ms", t);
-          }
-       }
-       try {
-         if (sleepTime > 0) {
-           Thread.sleep(sleepTime);
-         }
-       } catch (InterruptedException ie) {
-         LOG.warn("Interrupted unassign " + region.getRegionNameAsString(), ie);
-         Thread.currentThread().interrupt();
-         regionStates.updateRegionState(region, State.FAILED_CLOSE);
-         return;
-       }
-       LOG.info("Server " + server + " returned " + t + " for "
-         + region.getRegionNameAsString() + ", try=" + i
-         + " of " + this.maximumAttempts, t);
-      }
-    }
-    // Run out of attempts
-    regionStates.updateRegionState(region, State.FAILED_CLOSE);
-  }
-
-  /**
-   * Set region to OFFLINE unless it is opening and forceNewPlan is false.
-   */
-  private RegionState forceRegionStateToOffline(
-      final HRegionInfo region, final boolean forceNewPlan) {
-    RegionState state = regionStates.getRegionState(region);
-    if (state == null) {
-      LOG.warn("Assigning but not in region states: " + region);
-      state = regionStates.createRegionState(region);
-    }
-
-    if (forceNewPlan && LOG.isDebugEnabled()) {
-      LOG.debug("Force region state offline " + state);
-    }
-
-    switch (state.getState()) {
-    case OPEN:
-    case OPENING:
-    case PENDING_OPEN:
-    case CLOSING:
-    case PENDING_CLOSE:
-      if (!forceNewPlan) {
-        LOG.debug("Skip assigning " +
-          region + ", it is already " + state);
-        return null;
-      }
-    case FAILED_CLOSE:
-    case FAILED_OPEN:
-      regionStates.updateRegionState(region, State.PENDING_CLOSE);
-      unassign(region, state.getServerName(), null);
-      state = regionStates.getRegionState(region);
-      if (!state.isOffline() && !state.isClosed()) {
-        // If the region isn't offline, we can't re-assign
-        // it now. It will be assigned automatically after
-        // the regionserver reports it's closed.
-        return null;
-      }
-    case OFFLINE:
-    case CLOSED:
-      break;
-    default:
-      LOG.error("Trying to assign region " + region
-        + ", which is " + state);
-      return null;
-    }
-    return state;
-  }
-
-  /**
-   * Caller must hold lock on the passed <code>state</code> object.
-   * @param state
-   * @param forceNewPlan
-   */
-  private void assign(RegionState state, boolean forceNewPlan) {
-    long startTime = EnvironmentEdgeManager.currentTime();
-    try {
-      Configuration conf = server.getConfiguration();
-      RegionPlan plan = null;
-      long maxWaitTime = -1;
-      HRegionInfo region = state.getRegion();
-      Throwable previousException = null;
-      for (int i = 1; i <= maximumAttempts; i++) {
-        if (server.isStopped() || server.isAborted()) {
-          LOG.info("Skip assigning " + region.getRegionNameAsString()
-            + ", the server is stopped/aborted");
-          return;
-        }
-
-        if (plan == null) { // Get a server for the region at first
-          try {
-            plan = getRegionPlan(region, forceNewPlan);
-          } catch (HBaseIOException e) {
-            LOG.warn("Failed to get region plan", e);
-          }
-        }
-
-        if (plan == null) {
-          LOG.warn("Unable to determine a plan to assign " + region);
-
-          // For meta region, we have to keep retrying until succeeding
-          if (region.isMetaRegion()) {
-            if (i == maximumAttempts) {
-              i = 0; // re-set attempt count to 0 for at least 1 retry
-
-              LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
-                " after maximumAttempts (" + this.maximumAttempts +
-                "). Reset attempts count and continue retrying.");
-            }
-            waitForRetryingMetaAssignment();
-            continue;
-          }
-
-          regionStates.updateRegionState(region, State.FAILED_OPEN);
-          return;
-        }
-        LOG.info("Assigning " + region.getRegionNameAsString() +
-            " to " + plan.getDestination());
-        // Transition RegionState to PENDING_OPEN
-       regionStates.updateRegionState(region,
-          State.PENDING_OPEN, plan.getDestination());
-
-        boolean needNewPlan = false;
-        final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
-            " to " + plan.getDestination();
-        try {
-          List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
-          if (shouldAssignFavoredNodes(region)) {
-            favoredNodes = server.getFavoredNodesManager().getFavoredNodesWithDNPort(region);
-          }
-          serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
-          return; // we're done
-        } catch (Throwable t) {
-          if (t instanceof RemoteException) {
-            t = ((RemoteException) t).unwrapRemoteException();
-          }
-          previousException = t;
-
-          // Should we wait a little before retrying? If the server is starting it's yes.
-          boolean hold = (t instanceof ServerNotRunningYetException);
-
-          // In case socket is timed out and the region server is still online,
-          // the openRegion RPC could have been accepted by the server and
-          // just the response didn't go through.  So we will retry to
-          // open the region on the same server.
-          boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
-              && this.serverManager.isServerOnline(plan.getDestination()));
-
-          if (hold) {
-            LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
-              "try=" + i + " of " + this.maximumAttempts, t);
-
-            if (maxWaitTime < 0) {
-              maxWaitTime = EnvironmentEdgeManager.currentTime()
-                + this.server.getConfiguration().getLong(
-                  "hbase.regionserver.rpc.startup.waittime", 60000);
-            }
-            try {
-              long now = EnvironmentEdgeManager.currentTime();
-              if (now < maxWaitTime) {
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Server is not yet up; waiting up to "
-                    + (maxWaitTime - now) + "ms", t);
-                }
-                Thread.sleep(100);
-                i--; // reset the try count
-              } else {
-                LOG.debug("Server is not up for a while; try a new one", t);
-                needNewPlan = true;
-              }
-            } catch (InterruptedException ie) {
-              LOG.warn("Failed to assign "
-                  + region.getRegionNameAsString() + " since interrupted", ie);
-              regionStates.updateRegionState(region, State.FAILED_OPEN);
-              Thread.currentThread().interrupt();
-              return;
-            }
-          } else if (retry) {
-            i--; // we want to retry as many times as needed as long as the RS is not dead.
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(assignMsg + ", trying to assign to the same region server due ", t);
-            }
-          } else {
-            needNewPlan = true;
-            LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
-                " try=" + i + " of " + this.maximumAttempts, t);
-          }
-        }
-
-        if (i == this.maximumAttempts) {
-          // For meta region, we have to keep retrying until succeeding
-          if (region.isMetaRegion()) {
-            i = 0; // re-set attempt count to 0 for at least 1 retry
-            LOG.warn(assignMsg +
-                ", trying to assign a hbase:meta region reached to maximumAttempts (" +
-                this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
-            waitForRetryingMetaAssignment();
-          }
-          else {
-            // Don't reset the region state or get a new plan any more.
-            // This is the last try.
-            continue;
-          }
-        }
-
-        // If region opened on destination of present plan, reassigning to new
-        // RS may cause double assignments. In case of RegionAlreadyInTransitionException
-        // reassigning to same RS.
-        if (needNewPlan) {
-          // Force a new plan and reassign. Will return null if no servers.
-          // The new plan could be the same as the existing plan since we don't
-          // exclude the server of the original plan, which should not be
-          // excluded since it could be the only server up now.
-          RegionPlan newPlan = null;
-          try {
-            newPlan = getRegionPlan(region, true);
-          } catch (HBaseIOException e) {
-            LOG.warn("Failed to get region plan", e);
-          }
-          if (newPlan == null) {
-            regionStates.updateRegionState(region, State.FAILED_OPEN);
-            LOG.warn("Unable to find a viable location to assign region " +
-                region.getRegionNameAsString());
-            return;
-          }
-
-          if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
-            // Clean out plan we failed execute and one that doesn't look like it'll
-            // succeed anyways; we need a new plan!
-            // Transition back to OFFLINE
-            regionStates.updateRegionState(region, State.OFFLINE);
-            plan = newPlan;
-          } else if(plan.getDestination().equals(newPlan.getDestination()) &&
-              previousException instanceof FailedServerException) {
-            try {
-              LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
-                " to the same failed server.");
-              Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
-                RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
-            } catch (InterruptedException ie) {
-              LOG.warn("Failed to assign "
-                  + region.getRegionNameAsString() + " since interrupted", ie);
-              regionStates.updateRegionState(region, State.FAILED_OPEN);
-              Thread.currentThread().interrupt();
-              return;
-            }
-          }
-        }
-      }
-      // Run out of attempts
-      regionStates.updateRegionState(region, State.FAILED_OPEN);
-    } finally {
-      metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
-    }
-  }
-
-  private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
-    if (this.tableStateManager.isTableState(region.getTable(),
-            TableState.State.DISABLED,
-            TableState.State.DISABLING) || replicasToClose.contains(region)) {
-      LOG.info("Table " + region.getTable() + " is disabled or disabling;"
-        + " skipping assign of " + region.getRegionNameAsString());
-      offlineDisabledRegion(region);
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * @param region the region to assign
-   * @param forceNewPlan If true, then if an existing plan exists, a new plan
-   * will be generated.
-   * @return Plan for passed <code>region</code> (If none currently, it creates one or
-   * if no servers to assign, it returns null).
-   */
-  private RegionPlan getRegionPlan(final HRegionInfo region,
-      final boolean forceNewPlan) throws HBaseIOException {
-    // Pickup existing plan or make a new one
-    final String encodedName = region.getEncodedName();
-    final List<ServerName> destServers =
-      serverManager.createDestinationServersList();
-
-    if (destServers.isEmpty()){
-      LOG.warn("Can't move " + encodedName +
-        ", there is no destination server available.");
-      return null;
-    }
-
-    RegionPlan randomPlan = null;
-    boolean newPlan = false;
-    RegionPlan existingPlan;
-
-    synchronized (this.regionPlans) {
-      existingPlan = this.regionPlans.get(encodedName);
-
-      if (existingPlan != null && existingPlan.getDestination() != null) {
-        LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
-          + " destination server is " + existingPlan.getDestination() +
-            " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
-      }
-
-      if (forceNewPlan
-          || existingPlan == null
-          || existingPlan.getDestination() == null
-          || !destServers.contains(existingPlan.getDestination())) {
-        newPlan = true;
-        try {
-          randomPlan = new RegionPlan(region, null,
-              balancer.randomAssignment(region, destServers));
-        } catch (IOException ex) {
-          LOG.warn("Failed to create new plan.",ex);
-          return null;
-        }
-        this.regionPlans.put(encodedName, randomPlan);
-      }
-    }
-
-    if (newPlan) {
-      if (randomPlan.getDestination() == null) {
-        LOG.warn("Can't find a destination for " + encodedName);
-        return null;
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("No previous transition plan found (or ignoring " +
-          "an existing plan) for " + region.getRegionNameAsString() +
-          "; generated random plan=" + randomPlan + "; " + destServers.size() +
-          " (online=" + serverManager.getOnlineServers().size() +
-          ") available servers, forceNewPlan=" + forceNewPlan);
-      }
-      return randomPlan;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Using pre-existing plan for " +
-        region.getRegionNameAsString() + "; plan=" + existingPlan);
-    }
-    return existingPlan;
-  }
-
-  /**
-   * Wait for some time before retrying meta table region assignment
-   */
-  private void waitForRetryingMetaAssignment() {
-    try {
-      Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
-    } catch (InterruptedException e) {
-      LOG.error("Got exception while waiting for hbase:meta assignment");
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  /**
-   * Unassigns the specified region.
-   * <p>
-   * Updates the RegionState and sends the CLOSE RPC unless region is being
-   * split by regionserver; then the unassign fails (silently) because we
-   * presume the region being unassigned no longer exists (its been split out
-   * of existence). TODO: What to do if split fails and is rolled back and
-   * parent is revivified?
-   * <p>
-   * If a RegionPlan is already set, it will remain.
-   *
-   * @param region server to be unassigned
-   */
-  public void unassign(HRegionInfo region) {
-    unassign(region, null);
-  }
-
-
-  /**
-   * Unassigns the specified region.
-   * <p>
-   * Updates the RegionState and sends the CLOSE RPC unless region is being
-   * split by regionserver; then the unassign fails (silently) because we
-   * presume the region being unassigned no longer exists (its been split out
-   * of existence). TODO: What to do if split fails and is rolled back and
-   * parent is revivified?
-   * <p>
-   * If a RegionPlan is already set, it will remain.
-   *
-   * @param region server to be unassigned
-   * @param dest the destination server of the region
-   */
-  public void unassign(HRegionInfo region, ServerName dest) {
-    // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
-    LOG.debug("Starting unassign of " + region.getRegionNameAsString()
-      + " (offlining), current state: " + regionStates.getRegionState(region));
-
-    String encodedName = region.getEncodedName();
-    // Grab the state of this region and synchronize on it
-    // We need a lock here as we're going to do a put later and we don't want multiple states
-    //  creation
-    ReentrantLock lock = locker.acquireLock(encodedName);
-    RegionState state = regionStates.getRegionTransitionState(encodedName);
-    try {
-      if (state == null || state.isFailedClose()) {
-        if (state == null) {
-          // Region is not in transition.
-          // We can unassign it only if it's not SPLIT/MERGED.
-          state = regionStates.getRegionState(encodedName);
-          if (state != null && state.isUnassignable()) {
-            LOG.info("Attempting to unassign " + state + ", ignored");
-            // Offline region will be reassigned below
-            return;
-          }
-          if (state == null || state.getServerName() == null) {
-            // We don't know where the region is, offline it.
-            // No need to send CLOSE RPC
-            LOG.warn("Attempting to unassign a region not in RegionStates "
-              + region.getRegionNameAsString() + ", offlined");
-            regionOffline(region);
-            return;
-          }
-        }
-        state = regionStates.updateRegionState(
-          region, State.PENDING_CLOSE);
-      } else if (state.isFailedOpen()) {
-        // The region is not open yet
-        regionOffline(region);
-        return;
-      } else {
-        LOG.debug("Attempting to unassign " +
-          region.getRegionNameAsString() + " but it is " +
-          "already in transition (" + state.getState());
-        return;
-      }
-
-      unassign(region, state.getServerName(), dest);
-    } finally {
-      lock.unlock();
-
-      // Region is expected to be reassigned afterwards
-      if (!replicasToClose.contains(region)
-          && regionStates.isRegionInState(region, State.OFFLINE)) {
-        assign(region);
-      }
-    }
-  }
-
-  /**
-   * Used by unit tests. Return the number of regions opened so far in the life
-   * of the master. Increases by one every time the master opens a region
-   * @return the counter value of the number of regions opened so far
-   */
-  public int getNumRegionsOpened() {
-    return numRegionsOpened.get();
-  }
-
-  /**
-   * Waits until the specified region has completed assignment.
-   * <p>
-   * If the region is already assigned, returns immediately.  Otherwise, method
-   * blocks until the region is assigned.
-   * @param regionInfo region to wait on assignment for
-   * @return true if the region is assigned false otherwise.
-   * @throws InterruptedException
-   */
-  public boolean waitForAssignment(HRegionInfo regionInfo)
-      throws InterruptedException {
-    ArrayList<HRegionInfo> regionSet = new ArrayList<>(1);
-    regionSet.add(regionInfo);
-    return waitForAssignment(regionSet, true, Long.MAX_VALUE);
-  }
-
-  /**
-   * Waits until the specified region has completed assignment, or the deadline is reached.
-   */
-  protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
-      final boolean waitTillAllAssigned, final int reassigningRegions,
-      final long minEndTime) throws InterruptedException {
-    long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
-    if (deadline < 0) { // Overflow
-      deadline = Long.MAX_VALUE; // wait forever
-    }
-    return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
-  }
-
-  /**
-   * Waits until the specified region has completed assignment, or the deadline is reached.
-   * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
-   * @param waitTillAllAssigned true if we should wait all the regions to be assigned
-   * @param deadline the timestamp after which the wait is aborted
-   * @return true if all the regions are assigned false otherwise.
-   * @throws InterruptedException
-   */
-  protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
-      final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
-    // We're not synchronizing on regionsInTransition now because we don't use any iterator.
-    while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
-      int failedOpenCount = 0;
-      Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
-      while (regionInfoIterator.hasNext()) {
-        HRegionInfo hri = regionInfoIterator.next();
-        if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
-            State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
-          regionInfoIterator.remove();
-        } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
-          failedOpenCount++;
-        }
-      }
-      if (!waitTillAllAssigned) {
-        // No need to wait, let assignment going on asynchronously
-        break;
-      }
-      if (!regionSet.isEmpty()) {
-        if (failedOpenCount == regionSet.size()) {
-          // all the regions we are waiting had an error on open.
-          break;
-        }
-        regionStates.waitForUpdate(100);
-      }
-    }
-    return regionSet.isEmpty();
-  }
-
-  /**
-   * Assigns the hbase:meta region or a replica.
-   * <p>
-   * Assumes that hbase:meta is currently closed and is not being actively served by
-   * any RegionServer.
-   * @param hri TODO
-   */
-  public void assignMeta(HRegionInfo hri) throws KeeperException {
-    regionStates.updateRegionState(hri, State.OFFLINE);
-    assign(hri);
-  }
-
-  /**
-   * Assigns specified regions retaining assignments, if any.
-   * <p>
-   * This is a synchronous call and will return once every region has been
-   * assigned.  If anything fails, an exception is thrown
-   * @throws InterruptedException
-   * @throws IOException
-   */
-  public void assign(Map<HRegionInfo, ServerName> regions)
-        throws IOException, InterruptedException {
-    if (regions == null || regions.isEmpty()) {
-      return;
-    }
-    List<ServerName> servers = serverManager.createDestinationServersList();
-    if (servers == null || servers.isEmpty()) {
-      throw new IOException("Found no destination server to assign region(s)");
-    }
-
-    // Reuse existing assignment info
-    Map<ServerName, List<HRegionInfo>> bulkPlan =
-      balancer.retainAssignment(regions, servers);
-    if (bulkPlan == null) {
-      throw new IOException("Unable to determine a plan to assign region(s)");
-    }
-
-    processBogusAssignments(bulkPlan);
-
-    assign(regions.size(), servers.size(),
-      "retainAssignment=true", bulkPlan);
-  }
-
-  /**
-   * Assigns specified regions round robin, if any.
-   * <p>
-   * This is a synchronous call and will return once every region has been
-   * assigned.  If anything fails, an exception is thrown
-   * @throws InterruptedException
-   * @throws IOException
-   */
-  public void assign(List<HRegionInfo> regions)
-        throws IOException, InterruptedException {
-    if (regions == null || regions.isEmpty()) {
-      return;
-    }
-
-    List<ServerName> servers = serverManager.createDestinationServersList();
-    if (servers == null || servers.isEmpty()) {
-      throw new IOException("Found no destination server to assign region(s)");
-    }
-
-    // Generate a round-robin bulk assignment plan
-    Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
-    if (bulkPlan == null) {
-      throw new IOException("Unable to determine a plan to assign region(s)");
-    }
-
-    processBogusAssignments(bulkPlan);
-
-    assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
-  }
-
-  private void assign(int regions, int totalServers,
-      String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
-          throws InterruptedException, IOException {
-
-    int servers = bulkPlan.size();
-    if (servers == 1 || (regions < bulkAssignThresholdRegions
-        && servers < bulkAssignThresholdServers)) {
-
-      // Not use bulk assignment.  This could be more efficient in small
-      // cluster, especially mini cluster for testing, so that tests won't time out
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Not using bulk assignment since we are assigning only " + regions +
-          " region(s) to " + servers + " server(s)");
-      }
-
-      // invoke assignment (async)
-      ArrayList<HRegionInfo> userRegionSet = new ArrayList<>(regions);
-      for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
-        if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
-          for (HRegionInfo region: plan.getValue()) {
-            if (!regionStates.isRegionOnline(region)) {
-              invokeAssign(region);
-              if (!region.getTable().isSystemTable()) {
-                userRegionSet.add(region);
-              }
-            }
-          }
-        }
-      }
-
-      // wait for assignment completion
-      if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
-            System.currentTimeMillis())) {
-        LOG.debug("some user regions are still in transition: " + userRegionSet);
-      }
-    } else {
-      LOG.info("Bulk assigning " + regions + " region(s) across "
-        + totalServers + " server(s), " + message);
-
-      // Use fixed count thread pool assigning.
-      BulkAssigner ba = new GeneralBulkAssigner(
-        this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
-      ba.bulkAssign();
-      LOG.info("Bulk assigning done");
-    }
-  }
-
-  /**
-   * Assigns all user regions, if any exist.  Used during cluster startup.
-   * <p>
-   * This is a synchronous call and will return once every region has been
-   * assigned.  If anything fails, an exception is thrown and the cluster
-   * should be shutdown.
-   * @throws InterruptedException
-   * @throws IOException
-   */
-  private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
-      throws IOException, InterruptedException {
-    if (allRegions == null || allRegions.isEmpty()) return;
-
-    // Determine what type of assignment to do on startup
-    boolean retainAssignment = server.getConfiguration().
-      getBoolean("hbase.master.startup.retainassign", true);
-
-    Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
-    if (retainAssignment) {
-      assign(allRegions);
-    } else {
-      List<HRegionInfo> regions = new ArrayList<>(regionsFromMetaScan);
-      assign(regions);
-    }
-
-    for (HRegionInfo hri : regionsFromMetaScan) {
-      TableName tableName = hri.getTable();
-      if (!tableStateManager.isTableState(tableName,
-              TableState.State.ENABLED)) {
-        setEnabledTable(tableName);
-      }
-    }
-    // assign all the replicas that were not recorded in the meta
-    assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
-  }
-
-  /**
-   * Get number of replicas of a table
-   */
-  private static int getNumReplicas(MasterServices master, TableName table) {
-    int numReplica = 1;
-    try {
-      HTableDescriptor htd = master.getTableDescriptors().get(table);
-      if (htd == null) {
-        LOG.warn("master can not get TableDescriptor from table '" + table);
-      } else {
-        numReplica = htd.getRegionReplication();
-      }
-    } catch (IOException e){
-      LOG.warn("Couldn't get the replication attribute of the table " + table + " due to "
-          + e.getMessage());
-    }
-    return numReplica;
-  }
-
-  /**
-   * Get a list of replica regions that are:
-   * not recorded in meta yet. We might not have recorded the locations
-   * for the replicas since the replicas may not have been online yet, master restarted
-   * in the middle of assigning, ZK erased, etc.
-   * @param regionsRecordedInMeta the list of regions we know are recorded in meta
-   * either as a default, or, as the location of a replica
-   * @param master
-   * @return list of replica regions
-   * @throws IOException
-   */
-  public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
-      Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
-    List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<>();
-    for (HRegionInfo hri : regionsRecordedInMeta) {
-      TableName table = hri.getTable();
-      if(master.getTableDescriptors().get(table) == null)
-        continue;
-      int  desiredRegionReplication = getNumReplicas(master, table);
-      for (int i = 0; i < desiredRegionReplication; i++) {
-        HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
-        if (regionsRecordedInMeta.contains(replica)) continue;
-        regionsNotRecordedInMeta.add(replica);
-      }
-    }
-    return regionsNotRecordedInMeta;
-  }
-
-  /**
-   * Rebuild the list of user regions and assignment information.
-   * Updates regionstates with findings as we go through list of regions.
-   * @return set of servers not online that hosted some regions according to a scan of hbase:meta
-   * @throws IOException
-   */
-  Set<ServerName> rebuildUserRegions() throws
-          IOException, KeeperException {
-    Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
-            TableState.State.DISABLED, TableState.State.ENABLING);
-
-    Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
-            TableState.State.DISABLED,
-            TableState.State.DISABLING,
-            TableState.State.ENABLING);
-
-    // Region assignment from META
-    List<Result> results = MetaTableAccessor.fullScanRegions(server.getConnection());
-    // Get any new but slow to checkin region server that joined the cluster
-    Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
-    // Set of offline servers to be returned
-    Set<ServerName> offlineServers = new HashSet<>();
-    // Iterate regions in META
-    for (Result result : results) {
-      if (result == null && LOG.isDebugEnabled()){
-        LOG.debug("null result from meta - ignoring but this is strange.");
-        continue;
-      }
-      // keep a track of replicas to close. These were the replicas of the originally
-      // unmerged regions. The master might have closed them before but it mightn't
-      // maybe because it crashed.
-      PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
-      if (p.getFirst() != null && p.getSecond() != null) {
-        int numReplicas = getNumReplicas(server, p.getFirst().getTable());
-        for (HRegionInfo merge : p) {
-          for (int i = 1; i < numReplicas; i++) {
-            replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
-          }
-        }
-      }
-      RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
-      if (rl == null) {
-        continue;
-      }
-      HRegionLocation[] locations = rl.getRegionLocations();
-      if (locations == null) {
-        continue;
-      }
-      for (HRegionLocation hrl : locations) {
-        if (hrl == null) continue;
-        HRegionInfo regionInfo = hrl.getRegionInfo();
-        if (regionInfo == null) continue;
-        int replicaId = regionInfo.getReplicaId();
-        State state = RegionStateStore.getRegionState(result, replicaId);
-        // keep a track of replicas to close. These were the replicas of the split parents
-        // from the previous life of the master. The master should have closed them before
-        // but it couldn't maybe because it crashed
-        if (replicaId == 0 && state.equals(State.SPLIT)) {
-          for (HRegionLocation h : locations) {
-            replicasToClose.add(h.getRegionInfo());
-          }
-        }
-        ServerName lastHost = hrl.getServerName();
-        ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
-        regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
-        if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
-          // Region is not open (either offline or in transition), skip
-          continue;
-        }
-        TableName tableName = regionInfo.getTable();
-        if (!onlineServers.contains(regionLocation)) {
-          // Region is located on a server that isn't online
-          offlineServers.add(regionLocation);
-        } else if (!disabledOrEnablingTables.contains(tableName)) {
-          // Region is being served and on an active server
-          // add only if region not in disabled or enabling table
-          regionStates.regionOnline(regionInfo, regionLocation);
-          balancer.regionOnline(regionInfo, regionLocation);
-        }
-        // need to enable the table if not disabled or disabling or enabling
-        // this will be used in rolling restarts
-        if (!disabledOrDisablingOrEnabling.contains(tableName)
-          && !getTableStateManager().isTableState(tableName,
-                TableState.State.ENABLED)) {
-          setEnabledTable(tableName);
-        }
-      }
-    }
-    return offlineServers;
-  }
-
-  /**
-   * Processes list of regions in transition at startup
-   */
-  void processRegionsInTransition(Collection<RegionState> regionsInTransition) {
-    // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
-    // in case the RPC call is not sent out yet before the master was shut down
-    // since we update the state before we send the RPC call. We can't update
-    // the state after the RPC call. Otherwise, we don't know what's happened
-    // to the region if the master dies right after the RPC call is out.
-    for (RegionState regionState: regionsInTransition) {
-      LOG.info("Processing " + regionState);
-      ServerName serverName = regionState.getServerName();
-      // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
-      // case, try assigning it here.
-      if (serverName != null && !serverManager.getOnlineServers().containsKey(serverName)) {
-        LOG.info("Server " + serverName + " isn't online. SSH will handle this");
-        continue; // SSH will handle it
-      }
-      HRegionInfo regionInfo = regionState.getRegion();
-      RegionState.State state = regionState.getState();
-      switch (state) {
-      case CLOSED:
-        invokeAssign(regionState.getRegion());
-        break;
-      case PENDING_OPEN:
-        retrySendRegionOpen(regionState);
-        break;
-      case PENDING_CLOSE:
-        retrySendRegionClose(regionState);
-        break;
-      case FAILED_CLOSE:
-      case FAILED_OPEN:
-        invokeUnAssign(regionInfo);
-        break;
-      default:
-          // No process for other states
-          break;
-      }
-    }
-  }
-
-  /**
-   * At master failover, for pending_open region, make sure
-   * sendRegionOpen RPC call is sent to the target regionserver
-   */
-  private void retrySendRegionOpen(final RegionState regionState) {
-    this.executorService.submit(
-      new EventHandler(server, EventType.M_MASTER_RECOVERY) {
-        @Override
-        public void process() throws IOException {
-          HRegionInfo hri = regionState.getRegion();
-          ServerName serverName = regionState.getServerName();
-          ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
-          try {
-            for (int i = 1; i <= maximumAttempts; i++) {
-              if (!serverManager.isServerOnline(serverName)
-                  || server.isStopped() || server.isAborted()) {
-                return; // No need any more
-              }
-              try {
-                if (!regionState.equals(regionStates.getRegionState(hri))) {
-                  return; // Region is not in the expected state any more
-                }
-                List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
-                if (shouldAssignFavoredNodes(hri)) {
-                  FavoredNodesManager fnm = ((MasterServices)server).getFavoredNodesManager();
-                  favoredNodes = fnm.getFavoredNodesWithDNPort(hri);
-                }
-                serverManager.sendRegionOpen(serverName, hri, favoredNodes);
-                return; // we're done
-              } catch (Throwable t) {
-                if (t instanceof RemoteException) {
-                  t = ((RemoteException) t).unwrapRemoteException();
-                }
-                if (t instanceof FailedServerException && i < maximumAttempts) {
-                  // In case the server is in the failed server list, no point to
-                  // retry too soon. Retry after the failed_server_expiry time
-                  try {
-                    Configuration conf = this.server.getConfiguration();
-                    long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
-                      RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug(serverName + " is on failed server list; waiting "
-                        + sleepTime + "ms", t);
-                    }
-                    Thread.sleep(sleepTime);
-                    continue;
-                  } catch (InterruptedException ie) {
-                    LOG.warn("Failed to assign "
-                      + hri.getRegionNameAsString() + " since interrupted", ie);
-                    regionStates.updateRegionState(hri, State.FAILED_OPEN);
-                    Thread.currentThread().interrupt();
-                    return;
-                  }
-                }
-                if (serverManager.isServerOnline(serverName)
-                    && t instanceof java.net.SocketTimeoutException) {
-                  i--; // reset the try count
-                } else {
-                  LOG.info("Got exception in retrying sendRegionOpen for "
-                    + regionState + "; try=" + i + " of " + maximumAttempts, t);
-                }
-                Threads.sleep(100);
-              }
-            }
-            // Run out of attempts
-            regionStates.updateRegionState(hri, State.FAILED_OPEN);
-          } finally {
-            lock.unlock();
-          }
-        }
-      });
-  }
-
-  /**
-   * At master failover, for pending_close region, make sure
-   * sendRegionClose RPC call is sent to the target regionserver
-   */
-  private void retrySendRegionClose(final RegionState regionState) {
-    this.executorService.submit(
-      new EventHandler(server, EventType.M_MASTER_RECOVERY) {
-        @Override
-        public void process() throws IOException {
-          HRegionInfo hri = regionState.getRegion();
-          ServerName serverName = regionState.getServerName();
-          ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
-          try {
-            for (int i = 1; i <= maximumAttempts; i++) {
-              if (!serverManager.isServerOnline(serverName)
-                  || server.isStopped() || server.isAborted()) {
-                return; // No need any more
-              }
-              try {
-                if (!regionState.equals(regionStates.getRegionState(hri))) {
-                  return; // Region is not in the expected state any more
-                }
-                serverManager.sendRegionClose(serverName, hri, null);
-                return; // Done.
-              } catch (Throwable t) {
-                if (t instanceof RemoteException) {
-                  t = ((RemoteException) t).unwrapRemoteException();
-                }
-                if (t instanceof FailedServerException && i < maximumAttempts) {
-                  // In case the server is in the failed server list, no point to
-                  // retry too soon. Retry after the failed_server_expiry time
-                  try {
-                    Configuration conf = this.server.getConfiguration();
-                    long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
-                      RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug(serverName + " is on failed server list; waiting "
-                        + sleepTime + "ms", t);
-                    }
-                    Thread.sleep(sleepTime);
-                    continue;
-                  } catch (InterruptedException ie) {
-                    LOG.warn("Failed to unassign "
-                      + hri.getRegionNameAsString() + " since interrupted", ie);
-                    regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE);
-                    Thread.currentThread().interrupt();
-                    return;
-                  }
-                }
-                if (serverManager.isServerOnline(serverName)
-                    && t instanceof java.net.SocketTimeoutException) {
-                  i--; // reset the try count
-                } else {
-                  LOG.info("Got exception in retrying sendRegionClose for "
-                    + regionState + "; try=" + i + " of " + maximumAttempts, t);
-                }
-                Threads.sleep(100);
-              }
-            }
-            // Run out of attempts
-            regionStates.updateRegionState(hri, State.FAILED_CLOSE);
-          } finally {
-            lock.unlock();
-          }
-        }
-      });
-  }
-
-  /**
-   * Set Regions in transitions metrics.
-   * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
-   * This iterator is not fail fast, which may lead to stale read; but that's better than
-   * creating a copy of the map for metrics computation, as this method will be invoked
-   * on a frequent interval.
-   */
-  public void updateRegionsInTransitionMetrics() {
-    long currentTime = System.currentTimeMillis();
-    int totalRITs = 0;
-    int totalRITsOverThreshold = 0;
-    long oldestRITTime = 0;
-    int ritThreshold = this.server.getConfiguration().
-      getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
-    for (RegionState state: regionStates.getRegionsInTransition()) {
-      totalRITs++;
-      long ritTime = currentTime - state.getStamp();
-      if (ritTime > ritThreshold) { // more than the threshold
-        totalRITsOverThreshold++;
-      }
-      if (oldestRITTime < ritTime) {
-        oldestRITTime = ritTime;
-      }
-    }
-    if (this.metricsAssignmentManager != null) {
-      this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
-      this.metricsAssignmentManager.updateRITCount(totalRITs);
-      this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
-    }
-  }
-
-  /**
-   * @param region Region whose plan we are to clear.
-   */
-  private void clearRegionPlan(final HRegionInfo region) {
-    synchronized (this.regionPlans) {
-      this.regionPlans.remove(region.getEncodedName());
-    }
-  }
-
-  /**
-   * Wait on region to clear regions-in-transition.
-   * @param hri Region to wait on.
-   * @throws IOException
-   */
-  public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
-      throws IOException, InterruptedException {
-    waitOnRegionToClearRegionsInTransition(hri, -1L);
-  }
-
-  /**
-   * Wait on region to clear regions-in-transition or time out
-   * @param hri
-   * @param timeOut Milliseconds to wait for current region to be out of transition state.
-   * @return True when a region clears regions-in-transition before timeout otherwise false
-   * @throws InterruptedException
-   */
-  public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
-      throws InterruptedException {
-    if (!regionStates.isRegionInTransition(hri)) {
-      return true;
-    }
-    long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
-        + timeOut;
-    // There is already a timeout monitor on regions in transition so I
-    // should not have to have one here too?
-    LOG.info("Waiting for " + hri.getEncodedName() +
-        " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
-    while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
-      regionStates.waitForUpdate(100);
-      if (EnvironmentEdgeManager.currentTime() > end) {
-        LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
-        return false;
-      }
-    }
-    if (this.server.isStopped()) {
-      LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
-      return false;
-    }
-    return true;
-  }
-
-  void invokeAssign(HRegionInfo regionInfo) {
-    threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
-  }
-
-  void invokeAssignLater(HRegionInfo regionInfo, long sleepMillis) {
-    scheduledThreadPoolExecutor.schedule(new DelayedAssignCallable(
-        new AssignCallable(this, regionInfo)), sleepMillis, TimeUnit.MILLISECONDS);
-  }
-
-  void invokeUnAssign(HRegionInfo regionInfo) {
-    threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
-  }
-
-  public boolean isCarryingMeta(ServerName serverName) {
-    return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
-  }
-
-  public boolean isCarryingMetaReplica(ServerName serverName, int replicaId) {
-    return isCarryingRegion(serverName,
-        RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
-  }
-
-  public boolean isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
-    return isCarryingRegion(serverName, metaHri);
-  }
-
-  /**
-   * Check if the shutdown server carries the specific region.
-   * @return whether the serverName currently hosts the region
-   */
-  private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
-    RegionState regionState = regionStates.getRegionTransitionState(hri);
-    ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
-    if (transitionAddr != null) {
-      boolean matchTransitionAddr = transitionAddr.equals(serverName);
-      LOG.debug("Checking region=" + hri.getRegionNameAsString()
-        + ", transitioning on server=" + matchTransitionAddr
-        + " server being checked: " + serverName
-        + ", matches=" + matchTransitionAddr);
-      return matchTransitionAddr;
-    }
-
-    ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
-    boolean matchAssignedAddr = serverName.equals(assignedAddr);
-    LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
-      + " is on server=" + assignedAddr + ", server being checked: "
-      + serverName);
-    return matchAssignedAddr;
-  }
-
-  /**
-   * Clean out crashed server removing any assignments.
-   * @param sn Server that went down.
-   * @return list of regions in transition on this server
-   */
-  public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
-    // Clean out any existing assignment plans for this server
-    synchronized (this.regionPlans) {
-      for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
-          i.hasNext();) {
-        Map.Entry<String, RegionPlan> e = i.next();
-        ServerName otherSn = e.getValue().getDestination();
-        // The name will be null if the region is planned for a random assign.
-        if (otherSn != null && otherSn.equals(sn)) {
-          // Use iterator's remove else we'll get CME
-          i.remove();
-        }
-      }
-    }
-    List<HRegionInfo> rits = regionStates.serverOffline(sn);
-    for (Iterator<HRegionInfo> it = rits.iterator(); it.hasNext(); ) {
-      HRegionInfo hri = it.next();
-      String encodedName = hri.getEncodedName();
-
-      // We need a lock on the region as we could update it
-      Lock lock = locker.acquireLock(encodedName);
-      try {
-        RegionState regionState = regionStates.getRegionTransitionState(encodedName);
-        if (regionState == null
-            || (regionState.getServerName() != null && !regionState.isOnServer(sn))
-            || !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN,
-                State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
-          LOG.info("Skip " + regionState + " since it is not opening/failed_close"
-            + " on the dead server any more: " + sn);
-          it.remove();
-        } else {
-          if 

<TRUNCATED>