You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/26 23:47:26 UTC
[18/59] [abbrv] hbase git commit: Revert "HBASE-14614 Procedure v2 -
Core Assignment Manager (Matteo Bertozzi)" Revert a mistaken commit!!!
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
new file mode 100644
index 0000000..69ebd97
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -0,0 +1,3053 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.RegionStateListener;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.favored.FavoredNodesManager;
+import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
+import org.apache.hadoop.hbase.ipc.FailedServerException;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.quotas.QuotaExceededException;
+import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
+import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.KeyLocker;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Manages and performs region assignment.
+ * Related communications with regionserver are all done over RPC.
+ */
+@InterfaceAudience.Private
+public class AssignmentManager {
+ private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
+
+ protected final MasterServices server;
+
+ private ServerManager serverManager;
+
+ private boolean shouldAssignRegionsWithFavoredNodes;
+
+ private LoadBalancer balancer;
+
+ private final MetricsAssignmentManager metricsAssignmentManager;
+
+ private AtomicInteger numRegionsOpened = new AtomicInteger(0);
+
+ final private KeyLocker<String> locker = new KeyLocker<>();
+
+ Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
+
+ /**
+ * Map of regions to reopen after the schema of a table is changed. Key -
+ * encoded region name, value - HRegionInfo
+ */
+ private final Map <String, HRegionInfo> regionsToReopen;
+
+ /*
+ * Maximum times we recurse an assignment/unassignment.
+ * See below in {@link #assign()} and {@link #unassign()}.
+ */
+ private final int maximumAttempts;
+
+ /**
+ * The sleep time for which the assignment will wait before retrying in case of
+ * hbase:meta assignment failure due to lack of availability of region plan or bad region plan
+ */
+ private final long sleepTimeBeforeRetryingMetaAssignment;
+
+ /** Plans for region movement. Key is the encoded version of a region name*/
+ // TODO: When do plans get cleaned out? Ever? In server open and in server
+ // shutdown processing -- St.Ack
+ // All access to this Map must be synchronized.
+ final NavigableMap<String, RegionPlan> regionPlans = new TreeMap<>();
+
+ private final TableStateManager tableStateManager;
+
+ private final ExecutorService executorService;
+
+ private java.util.concurrent.ExecutorService threadPoolExecutorService;
+ private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
+
+ private final RegionStates regionStates;
+
+ // The threshold to use bulk assigning. Using bulk assignment
+ // only if assigning at least this many regions to at least this
+ // many servers. If assigning fewer regions to fewer servers,
+ // bulk assigning may be not as efficient.
+ private final int bulkAssignThresholdRegions;
+ private final int bulkAssignThresholdServers;
+ private final int bulkPerRegionOpenTimeGuesstimate;
+
+ // Should bulk assignment wait till all regions are assigned,
+ // or it is timed out? This is useful to measure bulk assignment
+ // performance, but not needed in most use cases.
+ private final boolean bulkAssignWaitTillAllAssigned;
+
+ /**
+ * Indicator that AssignmentManager has recovered the region states so
+ * that ServerShutdownHandler can be fully enabled and re-assign regions
+ * of dead servers. So that when re-assignment happens, AssignmentManager
+ * has proper region states.
+ *
+ * Protected to ease testing.
+ */
+ protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
+
+ /**
+ * A map to track the count a region fails to open in a row.
+ * So that we don't try to open a region forever if the failure is
+ * unrecoverable. We don't put this information in region states
+ * because we don't expect this to happen frequently; we don't
+ * want to copy this information over during each state transition either.
+ */
+ private final ConcurrentHashMap<String, AtomicInteger> failedOpenTracker = new ConcurrentHashMap<>();
+
+ // In case not using ZK for region assignment, region states
+ // are persisted in meta with a state store
+ private final RegionStateStore regionStateStore;
+
+ /**
+ * For testing only! Set to true to skip handling of split.
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
+ public static boolean TEST_SKIP_SPLIT_HANDLING = false;
+
+ /** Listeners that are called on assignment events. */
+ private List<AssignmentListener> listeners = new CopyOnWriteArrayList<>();
+
+ private RegionStateListener regionStateListener;
+
+ private RetryCounter.BackoffPolicy backoffPolicy;
+ private RetryCounter.RetryConfig retryConfig;
+ /**
+ * Constructs a new assignment manager.
+ *
+ * @param server instance of HMaster this AM running inside
+ * @param serverManager serverManager for associated HMaster
+ * @param balancer implementation of {@link LoadBalancer}
+ * @param service Executor service
+ * @param metricsMaster metrics manager
+ * @throws IOException
+ */
+ public AssignmentManager(MasterServices server, ServerManager serverManager,
+ final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster,
+ final TableStateManager tableStateManager)
+ throws IOException {
+ this.server = server;
+ this.serverManager = serverManager;
+ this.executorService = service;
+ this.regionStateStore = new RegionStateStore(server);
+ this.regionsToReopen = Collections.synchronizedMap
+ (new HashMap<String, HRegionInfo> ());
+ Configuration conf = server.getConfiguration();
+
+ this.tableStateManager = tableStateManager;
+
+ // This is the max attempts, not retries, so it should be at least 1.
+ this.maximumAttempts = Math.max(1,
+ this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
+ this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
+ "hbase.meta.assignment.retry.sleeptime", 1000l);
+ this.balancer = balancer;
+ // Only read favored nodes if using the favored nodes load balancer.
+ this.shouldAssignRegionsWithFavoredNodes = this.balancer instanceof FavoredNodesPromoter;
+ int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
+
+ this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
+ maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
+
+ this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
+ Threads.newDaemonThreadFactory("AM.Scheduler"));
+
+ this.regionStates = new RegionStates(
+ server, tableStateManager, serverManager, regionStateStore);
+
+ this.bulkAssignWaitTillAllAssigned =
+ conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
+ this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
+ this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
+ this.bulkPerRegionOpenTimeGuesstimate =
+ conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
+
+ this.metricsAssignmentManager = new MetricsAssignmentManager();
+
+ // Configurations for retrying opening a region on receiving a FAILED_OPEN
+ this.retryConfig = new RetryCounter.RetryConfig();
+ this.retryConfig.setSleepInterval(conf.getLong("hbase.assignment.retry.sleep.initial", 0l));
+ // Set the max time limit to the initial sleep interval so we use a constant time sleep strategy
+ // if the user does not set a max sleep limit
+ this.retryConfig.setMaxSleepTime(conf.getLong("hbase.assignment.retry.sleep.max",
+ retryConfig.getSleepInterval()));
+ this.backoffPolicy = getBackoffPolicy();
+ }
+
+ /**
+ * Returns the backoff policy used for Failed Region Open retries
+ * @return the backoff policy used for Failed Region Open retries
+ */
+ RetryCounter.BackoffPolicy getBackoffPolicy() {
+ return new RetryCounter.ExponentialBackoffPolicyWithLimit();
+ }
+
+ MetricsAssignmentManager getAssignmentManagerMetrics() {
+ return this.metricsAssignmentManager;
+ }
+
+ /**
+ * Add the listener to the notification list.
+ * @param listener The AssignmentListener to register
+ */
+ public void registerListener(final AssignmentListener listener) {
+ this.listeners.add(listener);
+ }
+
+ /**
+ * Remove the listener from the notification list.
+ * @param listener The AssignmentListener to unregister
+ */
+ public boolean unregisterListener(final AssignmentListener listener) {
+ return this.listeners.remove(listener);
+ }
+
+ /**
+ * @return Instance of ZKTableStateManager.
+ */
+ public TableStateManager getTableStateManager() {
+ // These are 'expensive' to make involving trip to zk ensemble so allow
+ // sharing.
+ return this.tableStateManager;
+ }
+
+ /**
+ * This SHOULD not be public. It is public now
+ * because of some unit tests.
+ *
+ * TODO: make it package private and keep RegionStates in the master package
+ */
+ public RegionStates getRegionStates() {
+ return regionStates;
+ }
+
+ /**
+ * Used in some tests to mock up region state in meta
+ */
+ @VisibleForTesting
+ RegionStateStore getRegionStateStore() {
+ return regionStateStore;
+ }
+
+ public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
+ return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
+ }
+
+ /**
+ * Add a regionPlan for the specified region.
+ * @param encodedName
+ * @param plan
+ */
+ public void addPlan(String encodedName, RegionPlan plan) {
+ synchronized (regionPlans) {
+ regionPlans.put(encodedName, plan);
+ }
+ }
+
+ /**
+ * Add a map of region plans.
+ */
+ public void addPlans(Map<String, RegionPlan> plans) {
+ synchronized (regionPlans) {
+ regionPlans.putAll(plans);
+ }
+ }
+
+ /**
+ * Set the list of regions that will be reopened
+ * because of an update in table schema
+ *
+ * @param regions
+ * list of regions that should be tracked for reopen
+ */
+ public void setRegionsToReopen(List <HRegionInfo> regions) {
+ for(HRegionInfo hri : regions) {
+ regionsToReopen.put(hri.getEncodedName(), hri);
+ }
+ }
+
+ /**
+ * Used by the client to identify if all regions have the schema updates
+ *
+ * @param tableName
+ * @return Pair indicating the status of the alter command
+ * @throws IOException
+ */
+ public Pair<Integer, Integer> getReopenStatus(TableName tableName)
+ throws IOException {
+ List<HRegionInfo> hris;
+ if (TableName.META_TABLE_NAME.equals(tableName)) {
+ hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
+ } else {
+ hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true);
+ }
+
+ Integer pending = 0;
+ for (HRegionInfo hri : hris) {
+ String name = hri.getEncodedName();
+ // no lock concurrent access ok: sequential consistency respected.
+ if (regionsToReopen.containsKey(name)
+ || regionStates.isRegionInTransition(name)) {
+ pending++;
+ }
+ }
+ return new Pair<>(pending, hris.size());
+ }
+
+ /**
+ * Used by ServerShutdownHandler to make sure AssignmentManager has completed
+ * the failover cleanup before re-assigning regions of dead servers. So that
+ * when re-assignment happens, AssignmentManager has proper region states.
+ */
+ public boolean isFailoverCleanupDone() {
+ return failoverCleanupDone.get();
+ }
+
+ /**
+ * To avoid racing with AM, external entities may need to lock a region,
+ * for example, when SSH checks what regions to skip re-assigning.
+ */
+ public Lock acquireRegionLock(final String encodedName) {
+ return locker.acquireLock(encodedName);
+ }
+
+ /**
+ * Now, failover cleanup is completed. Notify server manager to
+ * process queued up dead servers processing, if any.
+ */
+ void failoverCleanupDone() {
+ failoverCleanupDone.set(true);
+ serverManager.processQueuedDeadServers();
+ }
+
+ /**
+ * Called on startup.
+ * Figures whether a fresh cluster start of we are joining extant running cluster.
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ * @throws CoordinatedStateException
+ */
+ void joinCluster()
+ throws IOException, KeeperException, InterruptedException, CoordinatedStateException {
+ long startTime = System.currentTimeMillis();
+ // Concurrency note: In the below the accesses on regionsInTransition are
+ // outside of a synchronization block where usually all accesses to RIT are
+ // synchronized. The presumption is that in this case it is safe since this
+ // method is being played by a single thread on startup.
+
+ // TODO: Regions that have a null location and are not in regionsInTransitions
+ // need to be handled.
+
+ // Scan hbase:meta to build list of existing regions, servers, and assignment
+ // Returns servers who have not checked in (assumed dead) that some regions
+ // were assigned to (according to the meta)
+ Set<ServerName> deadServers = rebuildUserRegions();
+
+ // This method will assign all user regions if a clean server startup or
+ // it will reconstruct master state and cleanup any leftovers from previous master process.
+ boolean failover = processDeadServersAndRegionsInTransition(deadServers);
+
+ LOG.info("Joined the cluster in " + (System.currentTimeMillis()
+ - startTime) + "ms, failover=" + failover);
+ }
+
+ /**
+ * Process all regions that are in transition in zookeeper and also
+ * processes the list of dead servers.
+ * Used by master joining an cluster. If we figure this is a clean cluster
+ * startup, will assign all user regions.
+ * @param deadServers Set of servers that are offline probably legitimately that were carrying
+ * regions according to a scan of hbase:meta. Can be null.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
+ throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
+ // TODO Needed? List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
+ boolean failover = !serverManager.getDeadServers().isEmpty();
+ if (failover) {
+ // This may not be a failover actually, especially if meta is on this master.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
+ }
+ // Check if there are any regions on these servers
+ failover = false;
+ for (ServerName serverName : serverManager.getDeadServers().copyServerNames()) {
+ if (regionStates.getRegionAssignments().values().contains(serverName)) {
+ LOG.debug("Found regions on dead server: " + serverName);
+ failover = true;
+ break;
+ }
+ }
+ }
+ Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
+ if (!failover) {
+ // If any one region except meta is assigned, it's a failover.
+ for (Map.Entry<HRegionInfo, ServerName> en:
+ regionStates.getRegionAssignments().entrySet()) {
+ HRegionInfo hri = en.getKey();
+ if (!hri.isMetaTable()
+ && onlineServers.contains(en.getValue())) {
+ LOG.debug("Found region " + hri + " out on cluster");
+ failover = true;
+ break;
+ }
+ }
+ }
+ if (!failover) {
+ // If any region except meta is in transition on a live server, it's a failover.
+ Set<RegionState> regionsInTransition = regionStates.getRegionsInTransition();
+ if (!regionsInTransition.isEmpty()) {
+ for (RegionState regionState: regionsInTransition) {
+ ServerName serverName = regionState.getServerName();
+ if (!regionState.getRegion().isMetaRegion()
+ && serverName != null && onlineServers.contains(serverName)) {
+ LOG.debug("Found " + regionState + " for region " +
+ regionState.getRegion().getRegionNameAsString() + " for server " +
+ serverName + "in RITs");
+ failover = true;
+ break;
+ }
+ }
+ }
+ }
+ if (!failover) {
+ // If we get here, we have a full cluster restart. It is a failover only
+ // if there are some WALs are not split yet. For meta WALs, they should have
+ // been split already, if any. We can walk through those queued dead servers,
+ // if they don't have any WALs, this restart should be considered as a clean one
+ Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
+ if (!queuedDeadServers.isEmpty()) {
+ Configuration conf = server.getConfiguration();
+ Path walRootDir = FSUtils.getWALRootDir(conf);
+ FileSystem walFs = FSUtils.getWALFileSystem(conf);
+ for (ServerName serverName: queuedDeadServers) {
+ // In the case of a clean exit, the shutdown handler would have presplit any WALs and
+ // removed empty directories.
+ Path walDir = new Path(walRootDir,
+ AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
+ Path splitDir = walDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
+ if (checkWals(walFs, walDir) || checkWals(walFs, splitDir)) {
+ LOG.debug("Found queued dead server " + serverName);
+ failover = true;
+ break;
+ }
+ }
+ if (!failover) {
+ // We figured that it's not a failover, so no need to
+ // work on these re-queued dead servers any more.
+ LOG.info("AM figured that it's not a failover and cleaned up "
+ + queuedDeadServers.size() + " queued dead servers");
+ serverManager.removeRequeuedDeadServers();
+ }
+ }
+ }
+
+ Set<TableName> disabledOrDisablingOrEnabling = null;
+ Map<HRegionInfo, ServerName> allRegions = null;
+
+ if (!failover) {
+ disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
+ TableState.State.DISABLED, TableState.State.DISABLING,
+ TableState.State.ENABLING);
+
+ // Clean re/start, mark all user regions closed before reassignment
+ allRegions = regionStates.closeAllUserRegions(
+ disabledOrDisablingOrEnabling);
+ }
+
+ // Now region states are restored
+ regionStateStore.start();
+
+ if (failover) {
+ if (deadServers != null && !deadServers.isEmpty()) {
+ for (ServerName serverName: deadServers) {
+ if (!serverManager.isServerDead(serverName)) {
+ serverManager.expireServer(serverName); // Let SSH do region re-assign
+ }
+ }
+ }
+ processRegionsInTransition(regionStates.getRegionsInTransition());
+ }
+
+ // Now we can safely claim failover cleanup completed and enable
+ // ServerShutdownHandler for further processing. The nodes (below)
+ // in transition, if any, are for regions not related to those
+ // dead servers at all, and can be done in parallel to SSH.
+ failoverCleanupDone();
+ if (!failover) {
+ // Fresh cluster startup.
+ LOG.info("Clean cluster startup. Don't reassign user regions");
+ assignAllUserRegions(allRegions);
+ } else {
+ LOG.info("Failover! Reassign user regions");
+ }
+ // unassign replicas of the split parents and the merged regions
+ // the daughter replicas are opened in assignAllUserRegions if it was
+ // not already opened.
+ for (HRegionInfo h : replicasToClose) {
+ unassign(h);
+ }
+ replicasToClose.clear();
+ return failover;
+ }
+
+ private boolean checkWals(FileSystem fs, Path dir) throws IOException {
+ if (!fs.exists(dir)) {
+ LOG.debug(dir + " doesn't exist");
+ return false;
+ }
+ if (!fs.getFileStatus(dir).isDirectory()) {
+ LOG.warn(dir + " is not a directory");
+ return false;
+ }
+ FileStatus[] files = FSUtils.listStatus(fs, dir);
+ if (files == null || files.length == 0) {
+ LOG.debug(dir + " has no files");
+ return false;
+ }
+ for (int i = 0; i < files.length; i++) {
+ if (files[i].isFile() && files[i].getLen() > 0) {
+ LOG.debug(dir + " has a non-empty file: " + files[i].getPath());
+ return true;
+ } else if (files[i].isDirectory() && checkWals(fs, files[i].getPath())) {
+ LOG.debug(dir + " is a directory and has a non-empty file: " + files[i].getPath());
+ return true;
+ }
+ }
+ LOG.debug("Found 0 non-empty wal files for :" + dir);
+ return false;
+ }
+
+ /**
+ * When a region is closed, it should be removed from the regionsToReopen
+ * @param hri HRegionInfo of the region which was closed
+ */
+ public void removeClosedRegion(HRegionInfo hri) {
+ if (regionsToReopen.remove(hri.getEncodedName()) != null) {
+ LOG.debug("Removed region from reopening regions because it was closed");
+ }
+ }
+
+ void processFavoredNodesForDaughters(HRegionInfo parent,
+ HRegionInfo regionA, HRegionInfo regionB) throws IOException {
+ if (shouldAssignFavoredNodes(parent)) {
+ List<ServerName> onlineServers = this.serverManager.getOnlineServersList();
+ ((FavoredNodesPromoter) this.balancer).
+ generateFavoredNodesForDaughter(onlineServers, parent, regionA, regionB);
+ }
+ }
+
+ void processFavoredNodesForMerge(HRegionInfo merged, HRegionInfo regionA, HRegionInfo regionB)
+ throws IOException {
+ if (shouldAssignFavoredNodes(merged)) {
+ ((FavoredNodesPromoter)this.balancer).
+ generateFavoredNodesForMergedRegion(merged, regionA, regionB);
+ }
+ }
+
+ /*
+ * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
+ * belongs to a non-system table.
+ */
+ private boolean shouldAssignFavoredNodes(HRegionInfo region) {
+ return this.shouldAssignRegionsWithFavoredNodes
+ && FavoredNodesManager.isFavoredNodeApplicable(region);
+ }
+
+ /**
+ * Marks the region as online. Removes it from regions in transition and
+ * updates the in-memory assignment information.
+ * <p>
+ * Used when a region has been successfully opened on a region server.
+ * @param regionInfo
+ * @param sn
+ */
+ void regionOnline(HRegionInfo regionInfo, ServerName sn) {
+ regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
+ }
+
+ void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
+ numRegionsOpened.incrementAndGet();
+ regionStates.regionOnline(regionInfo, sn, openSeqNum);
+
+ // Remove plan if one.
+ clearRegionPlan(regionInfo);
+ balancer.regionOnline(regionInfo, sn);
+
+ // Tell our listeners that a region was opened
+ sendRegionOpenedNotification(regionInfo, sn);
+ }
+
+ /**
+ * Marks the region as offline. Removes it from regions in transition and
+ * removes in-memory assignment information.
+ * <p>
+ * Used when a region has been closed and should remain closed.
+ * @param regionInfo
+ */
+ public void regionOffline(final HRegionInfo regionInfo) {
+ regionOffline(regionInfo, null);
+ }
+
+ public void offlineDisabledRegion(HRegionInfo regionInfo) {
+ replicasToClose.remove(regionInfo);
+ regionOffline(regionInfo);
+ }
+
+ // Assignment methods
+
+ /**
+ * Assigns the specified region.
+ * <p>
+ * If a RegionPlan is available with a valid destination then it will be used
+ * to determine what server region is assigned to. If no RegionPlan is
+ * available, region will be assigned to a random available server.
+ * <p>
+ * Updates the RegionState and sends the OPEN RPC.
+ * <p>
+ * This will only succeed if the region is in transition and in a CLOSED or
+ * OFFLINE state or not in transition, and of course, the
+ * chosen server is up and running (It may have just crashed!).
+ *
+ * @param region server to be assigned
+ */
+ public void assign(HRegionInfo region) {
+ assign(region, false);
+ }
+
+ /**
+ * Use care with forceNewPlan. It could cause double assignment.
+ */
+ public void assign(HRegionInfo region, boolean forceNewPlan) {
+ if (isDisabledorDisablingRegionInRIT(region)) {
+ return;
+ }
+ String encodedName = region.getEncodedName();
+ Lock lock = locker.acquireLock(encodedName);
+ try {
+ RegionState state = forceRegionStateToOffline(region, forceNewPlan);
+ if (state != null) {
+ if (regionStates.wasRegionOnDeadServer(encodedName)) {
+ LOG.info("Skip assigning " + region.getRegionNameAsString()
+ + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
+ + " is dead but not processed yet");
+ return;
+ }
+ assign(state, forceNewPlan);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Bulk assign regions to <code>destination</code>.
+ * @param destination
+ * @param regions Regions to assign.
+ * @return true if successful
+ */
+ boolean assign(final ServerName destination, final List<HRegionInfo> regions)
+ throws InterruptedException {
+ long startTime = EnvironmentEdgeManager.currentTime();
+ try {
+ int regionCount = regions.size();
+ if (regionCount == 0) {
+ return true;
+ }
+ LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
+ Set<String> encodedNames = new HashSet<>(regionCount);
+ for (HRegionInfo region : regions) {
+ encodedNames.add(region.getEncodedName());
+ }
+
+ List<HRegionInfo> failedToOpenRegions = new ArrayList<>();
+ Map<String, Lock> locks = locker.acquireLocks(encodedNames);
+ try {
+ Map<String, RegionPlan> plans = new HashMap<>(regionCount);
+ List<RegionState> states = new ArrayList<>(regionCount);
+ for (HRegionInfo region : regions) {
+ String encodedName = region.getEncodedName();
+ if (!isDisabledorDisablingRegionInRIT(region)) {
+ RegionState state = forceRegionStateToOffline(region, false);
+ boolean onDeadServer = false;
+ if (state != null) {
+ if (regionStates.wasRegionOnDeadServer(encodedName)) {
+ LOG.info("Skip assigning " + region.getRegionNameAsString()
+ + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
+ + " is dead but not processed yet");
+ onDeadServer = true;
+ } else {
+ RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
+ plans.put(encodedName, plan);
+ states.add(state);
+ continue;
+ }
+ }
+ // Reassign if the region wasn't on a dead server
+ if (!onDeadServer) {
+ LOG.info("failed to force region state to offline, "
+ + "will reassign later: " + region);
+ failedToOpenRegions.add(region); // assign individually later
+ }
+ }
+ // Release the lock, this region is excluded from bulk assign because
+ // we can't update its state, or set its znode to offline.
+ Lock lock = locks.remove(encodedName);
+ lock.unlock();
+ }
+
+ if (server.isStopped()) {
+ return false;
+ }
+
+ // Add region plans, so we can updateTimers when one region is opened so
+ // that unnecessary timeout on RIT is reduced.
+ this.addPlans(plans);
+
+ List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos = new ArrayList<>(states.size());
+ for (RegionState state: states) {
+ HRegionInfo region = state.getRegion();
+ regionStates.updateRegionState(
+ region, State.PENDING_OPEN, destination);
+ List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
+ if (shouldAssignFavoredNodes(region)) {
+ favoredNodes = server.getFavoredNodesManager().getFavoredNodesWithDNPort(region);
+ }
+ regionOpenInfos.add(new Pair<>(region, favoredNodes));
+ }
+
+ // Move on to open regions.
+ try {
+ // Send OPEN RPC. If it fails on a IOE or RemoteException,
+ // regions will be assigned individually.
+ Configuration conf = server.getConfiguration();
+ long maxWaitTime = System.currentTimeMillis() +
+ conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000);
+ for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
+ try {
+ List<RegionOpeningState> regionOpeningStateList = serverManager
+ .sendRegionOpen(destination, regionOpenInfos);
+ for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
+ RegionOpeningState openingState = regionOpeningStateList.get(k);
+ if (openingState != RegionOpeningState.OPENED) {
+ HRegionInfo region = regionOpenInfos.get(k).getFirst();
+ LOG.info("Got opening state " + openingState
+ + ", will reassign later: " + region);
+ // Failed opening this region, reassign it later
+ forceRegionStateToOffline(region, true);
+ failedToOpenRegions.add(region);
+ }
+ }
+ break;
+ } catch (IOException e) {
+ if (e instanceof RemoteException) {
+ e = ((RemoteException)e).unwrapRemoteException();
+ }
+ if (e instanceof RegionServerStoppedException) {
+ LOG.warn("The region server was shut down, ", e);
+ // No need to retry, the region server is a goner.
+ return false;
+ } else if (e instanceof ServerNotRunningYetException) {
+ long now = System.currentTimeMillis();
+ if (now < maxWaitTime) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Server is not yet up; waiting up to " +
+ (maxWaitTime - now) + "ms", e);
+ }
+ Thread.sleep(100);
+ i--; // reset the try count
+ continue;
+ }
+ } else if (e instanceof java.net.SocketTimeoutException
+ && this.serverManager.isServerOnline(destination)) {
+ // In case socket is timed out and the region server is still online,
+ // the openRegion RPC could have been accepted by the server and
+ // just the response didn't go through. So we will retry to
+ // open the region on the same server.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Bulk assigner openRegion() to " + destination
+ + " has timed out, but the regions might"
+ + " already be opened on it.", e);
+ }
+ // wait and reset the re-try count, server might be just busy.
+ Thread.sleep(100);
+ i--;
+ continue;
+ } else if (e instanceof FailedServerException && i < maximumAttempts) {
+ // In case the server is in the failed server list, no point to
+ // retry too soon. Retry after the failed_server_expiry time
+ long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+ RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(destination + " is on failed server list; waiting "
+ + sleepTime + "ms", e);
+ }
+ Thread.sleep(sleepTime);
+ continue;
+ }
+ throw e;
+ }
+ }
+ } catch (IOException e) {
+ // Can be a socket timeout, EOF, NoRouteToHost, etc
+ LOG.info("Unable to communicate with " + destination
+ + " in order to assign regions, ", e);
+ for (RegionState state: states) {
+ HRegionInfo region = state.getRegion();
+ forceRegionStateToOffline(region, true);
+ }
+ return false;
+ }
+ } finally {
+ for (Lock lock : locks.values()) {
+ lock.unlock();
+ }
+ }
+
+ if (!failedToOpenRegions.isEmpty()) {
+ for (HRegionInfo region : failedToOpenRegions) {
+ if (!regionStates.isRegionOnline(region)) {
+ invokeAssign(region);
+ }
+ }
+ }
+
+ // wait for assignment completion
+ ArrayList<HRegionInfo> userRegionSet = new ArrayList<>(regions.size());
+ for (HRegionInfo region: regions) {
+ if (!region.getTable().isSystemTable()) {
+ userRegionSet.add(region);
+ }
+ }
+ if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+ System.currentTimeMillis())) {
+ LOG.debug("some user regions are still in transition: " + userRegionSet);
+ }
+ LOG.debug("Bulk assigning done for " + destination);
+ return true;
+ } finally {
+ metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
+ }
+ }
+
+ /**
+ * Send CLOSE RPC if the server is online, otherwise, offline the region.
+ *
+ * The RPC will be sent only to the region sever found in the region state
+ * if it is passed in, otherwise, to the src server specified. If region
+ * state is not specified, we don't update region state at all, instead
+ * we just send the RPC call. This is useful for some cleanup without
+ * messing around the region states (see handleRegion, on region opened
+ * on an unexpected server scenario, for an example)
+ */
+ private void unassign(final HRegionInfo region,
+ final ServerName server, final ServerName dest) {
+ for (int i = 1; i <= this.maximumAttempts; i++) {
+ if (this.server.isStopped() || this.server.isAborted()) {
+ LOG.debug("Server stopped/aborted; skipping unassign of " + region);
+ return;
+ }
+ if (!serverManager.isServerOnline(server)) {
+ LOG.debug("Offline " + region.getRegionNameAsString()
+ + ", no need to unassign since it's on a dead server: " + server);
+ regionStates.updateRegionState(region, State.OFFLINE);
+ return;
+ }
+ try {
+ // Send CLOSE RPC
+ if (serverManager.sendRegionClose(server, region, dest)) {
+ LOG.debug("Sent CLOSE to " + server + " for region " +
+ region.getRegionNameAsString());
+ return;
+ }
+ // This never happens. Currently regionserver close always return true.
+ // Todo; this can now happen (0.96) if there is an exception in a coprocessor
+ LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
+ region.getRegionNameAsString());
+ } catch (Throwable t) {
+ long sleepTime = 0;
+ Configuration conf = this.server.getConfiguration();
+ if (t instanceof RemoteException) {
+ t = ((RemoteException)t).unwrapRemoteException();
+ }
+ if (t instanceof RegionServerAbortedException
+ || t instanceof RegionServerStoppedException
+ || t instanceof ServerNotRunningYetException) {
+ // RS is aborting, we cannot offline the region since the region may need to do WAL
+ // recovery. Until we see the RS expiration, we should retry.
+ sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+ RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+
+ } else if (t instanceof NotServingRegionException) {
+ LOG.debug("Offline " + region.getRegionNameAsString()
+ + ", it's not any more on " + server, t);
+ regionStates.updateRegionState(region, State.OFFLINE);
+ return;
+ } else if (t instanceof FailedServerException && i < maximumAttempts) {
+ // In case the server is in the failed server list, no point to
+ // retry too soon. Retry after the failed_server_expiry time
+ sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+ RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(server + " is on failed server list; waiting " + sleepTime + "ms", t);
+ }
+ }
+ try {
+ if (sleepTime > 0) {
+ Thread.sleep(sleepTime);
+ }
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted unassign " + region.getRegionNameAsString(), ie);
+ Thread.currentThread().interrupt();
+ regionStates.updateRegionState(region, State.FAILED_CLOSE);
+ return;
+ }
+ LOG.info("Server " + server + " returned " + t + " for "
+ + region.getRegionNameAsString() + ", try=" + i
+ + " of " + this.maximumAttempts, t);
+ }
+ }
+ // Run out of attempts
+ regionStates.updateRegionState(region, State.FAILED_CLOSE);
+ }
+
+ /**
+ * Set region to OFFLINE unless it is opening and forceNewPlan is false.
+ */
+ private RegionState forceRegionStateToOffline(
+ final HRegionInfo region, final boolean forceNewPlan) {
+ RegionState state = regionStates.getRegionState(region);
+ if (state == null) {
+ LOG.warn("Assigning but not in region states: " + region);
+ state = regionStates.createRegionState(region);
+ }
+
+ if (forceNewPlan && LOG.isDebugEnabled()) {
+ LOG.debug("Force region state offline " + state);
+ }
+
+ switch (state.getState()) {
+ case OPEN:
+ case OPENING:
+ case PENDING_OPEN:
+ case CLOSING:
+ case PENDING_CLOSE:
+ if (!forceNewPlan) {
+ LOG.debug("Skip assigning " +
+ region + ", it is already " + state);
+ return null;
+ }
+ case FAILED_CLOSE:
+ case FAILED_OPEN:
+ regionStates.updateRegionState(region, State.PENDING_CLOSE);
+ unassign(region, state.getServerName(), null);
+ state = regionStates.getRegionState(region);
+ if (!state.isOffline() && !state.isClosed()) {
+ // If the region isn't offline, we can't re-assign
+ // it now. It will be assigned automatically after
+ // the regionserver reports it's closed.
+ return null;
+ }
+ case OFFLINE:
+ case CLOSED:
+ break;
+ default:
+ LOG.error("Trying to assign region " + region
+ + ", which is " + state);
+ return null;
+ }
+ return state;
+ }
+
+ /**
+ * Caller must hold lock on the passed <code>state</code> object.
+ * @param state
+ * @param forceNewPlan
+ */
+ private void assign(RegionState state, boolean forceNewPlan) {
+ long startTime = EnvironmentEdgeManager.currentTime();
+ try {
+ Configuration conf = server.getConfiguration();
+ RegionPlan plan = null;
+ long maxWaitTime = -1;
+ HRegionInfo region = state.getRegion();
+ Throwable previousException = null;
+ for (int i = 1; i <= maximumAttempts; i++) {
+ if (server.isStopped() || server.isAborted()) {
+ LOG.info("Skip assigning " + region.getRegionNameAsString()
+ + ", the server is stopped/aborted");
+ return;
+ }
+
+ if (plan == null) { // Get a server for the region at first
+ try {
+ plan = getRegionPlan(region, forceNewPlan);
+ } catch (HBaseIOException e) {
+ LOG.warn("Failed to get region plan", e);
+ }
+ }
+
+ if (plan == null) {
+ LOG.warn("Unable to determine a plan to assign " + region);
+
+ // For meta region, we have to keep retrying until succeeding
+ if (region.isMetaRegion()) {
+ if (i == maximumAttempts) {
+ i = 0; // re-set attempt count to 0 for at least 1 retry
+
+ LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
+ " after maximumAttempts (" + this.maximumAttempts +
+ "). Reset attempts count and continue retrying.");
+ }
+ waitForRetryingMetaAssignment();
+ continue;
+ }
+
+ regionStates.updateRegionState(region, State.FAILED_OPEN);
+ return;
+ }
+ LOG.info("Assigning " + region.getRegionNameAsString() +
+ " to " + plan.getDestination());
+ // Transition RegionState to PENDING_OPEN
+ regionStates.updateRegionState(region,
+ State.PENDING_OPEN, plan.getDestination());
+
+ boolean needNewPlan = false;
+ final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
+ " to " + plan.getDestination();
+ try {
+ List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
+ if (shouldAssignFavoredNodes(region)) {
+ favoredNodes = server.getFavoredNodesManager().getFavoredNodesWithDNPort(region);
+ }
+ serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
+ return; // we're done
+ } catch (Throwable t) {
+ if (t instanceof RemoteException) {
+ t = ((RemoteException) t).unwrapRemoteException();
+ }
+ previousException = t;
+
+ // Should we wait a little before retrying? If the server is starting it's yes.
+ boolean hold = (t instanceof ServerNotRunningYetException);
+
+ // In case socket is timed out and the region server is still online,
+ // the openRegion RPC could have been accepted by the server and
+ // just the response didn't go through. So we will retry to
+ // open the region on the same server.
+ boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
+ && this.serverManager.isServerOnline(plan.getDestination()));
+
+ if (hold) {
+ LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
+ "try=" + i + " of " + this.maximumAttempts, t);
+
+ if (maxWaitTime < 0) {
+ maxWaitTime = EnvironmentEdgeManager.currentTime()
+ + this.server.getConfiguration().getLong(
+ "hbase.regionserver.rpc.startup.waittime", 60000);
+ }
+ try {
+ long now = EnvironmentEdgeManager.currentTime();
+ if (now < maxWaitTime) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Server is not yet up; waiting up to "
+ + (maxWaitTime - now) + "ms", t);
+ }
+ Thread.sleep(100);
+ i--; // reset the try count
+ } else {
+ LOG.debug("Server is not up for a while; try a new one", t);
+ needNewPlan = true;
+ }
+ } catch (InterruptedException ie) {
+ LOG.warn("Failed to assign "
+ + region.getRegionNameAsString() + " since interrupted", ie);
+ regionStates.updateRegionState(region, State.FAILED_OPEN);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ } else if (retry) {
+ i--; // we want to retry as many times as needed as long as the RS is not dead.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(assignMsg + ", trying to assign to the same region server due ", t);
+ }
+ } else {
+ needNewPlan = true;
+ LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
+ " try=" + i + " of " + this.maximumAttempts, t);
+ }
+ }
+
+ if (i == this.maximumAttempts) {
+ // For meta region, we have to keep retrying until succeeding
+ if (region.isMetaRegion()) {
+ i = 0; // re-set attempt count to 0 for at least 1 retry
+ LOG.warn(assignMsg +
+ ", trying to assign a hbase:meta region reached to maximumAttempts (" +
+ this.maximumAttempts + "). Reset attempt counts and continue retrying.");
+ waitForRetryingMetaAssignment();
+ }
+ else {
+ // Don't reset the region state or get a new plan any more.
+ // This is the last try.
+ continue;
+ }
+ }
+
+ // If region opened on destination of present plan, reassigning to new
+ // RS may cause double assignments. In case of RegionAlreadyInTransitionException
+ // reassigning to same RS.
+ if (needNewPlan) {
+ // Force a new plan and reassign. Will return null if no servers.
+ // The new plan could be the same as the existing plan since we don't
+ // exclude the server of the original plan, which should not be
+ // excluded since it could be the only server up now.
+ RegionPlan newPlan = null;
+ try {
+ newPlan = getRegionPlan(region, true);
+ } catch (HBaseIOException e) {
+ LOG.warn("Failed to get region plan", e);
+ }
+ if (newPlan == null) {
+ regionStates.updateRegionState(region, State.FAILED_OPEN);
+ LOG.warn("Unable to find a viable location to assign region " +
+ region.getRegionNameAsString());
+ return;
+ }
+
+ if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
+ // Clean out plan we failed execute and one that doesn't look like it'll
+ // succeed anyways; we need a new plan!
+ // Transition back to OFFLINE
+ regionStates.updateRegionState(region, State.OFFLINE);
+ plan = newPlan;
+ } else if(plan.getDestination().equals(newPlan.getDestination()) &&
+ previousException instanceof FailedServerException) {
+ try {
+ LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
+ " to the same failed server.");
+ Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+ RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
+ } catch (InterruptedException ie) {
+ LOG.warn("Failed to assign "
+ + region.getRegionNameAsString() + " since interrupted", ie);
+ regionStates.updateRegionState(region, State.FAILED_OPEN);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+ }
+ // Run out of attempts
+ regionStates.updateRegionState(region, State.FAILED_OPEN);
+ } finally {
+ metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
+ }
+ }
+
+ private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
+ if (this.tableStateManager.isTableState(region.getTable(),
+ TableState.State.DISABLED,
+ TableState.State.DISABLING) || replicasToClose.contains(region)) {
+ LOG.info("Table " + region.getTable() + " is disabled or disabling;"
+ + " skipping assign of " + region.getRegionNameAsString());
+ offlineDisabledRegion(region);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * @param region the region to assign
+ * @param forceNewPlan If true, then if an existing plan exists, a new plan
+ * will be generated.
+ * @return Plan for passed <code>region</code> (If none currently, it creates one or
+ * if no servers to assign, it returns null).
+ */
+ private RegionPlan getRegionPlan(final HRegionInfo region,
+ final boolean forceNewPlan) throws HBaseIOException {
+ // Pickup existing plan or make a new one
+ final String encodedName = region.getEncodedName();
+ final List<ServerName> destServers =
+ serverManager.createDestinationServersList();
+
+ if (destServers.isEmpty()){
+ LOG.warn("Can't move " + encodedName +
+ ", there is no destination server available.");
+ return null;
+ }
+
+ RegionPlan randomPlan = null;
+ boolean newPlan = false;
+ RegionPlan existingPlan;
+
+ synchronized (this.regionPlans) {
+ existingPlan = this.regionPlans.get(encodedName);
+
+ if (existingPlan != null && existingPlan.getDestination() != null) {
+ LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
+ + " destination server is " + existingPlan.getDestination() +
+ " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
+ }
+
+ if (forceNewPlan
+ || existingPlan == null
+ || existingPlan.getDestination() == null
+ || !destServers.contains(existingPlan.getDestination())) {
+ newPlan = true;
+ try {
+ randomPlan = new RegionPlan(region, null,
+ balancer.randomAssignment(region, destServers));
+ } catch (IOException ex) {
+ LOG.warn("Failed to create new plan.",ex);
+ return null;
+ }
+ this.regionPlans.put(encodedName, randomPlan);
+ }
+ }
+
+ if (newPlan) {
+ if (randomPlan.getDestination() == null) {
+ LOG.warn("Can't find a destination for " + encodedName);
+ return null;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No previous transition plan found (or ignoring " +
+ "an existing plan) for " + region.getRegionNameAsString() +
+ "; generated random plan=" + randomPlan + "; " + destServers.size() +
+ " (online=" + serverManager.getOnlineServers().size() +
+ ") available servers, forceNewPlan=" + forceNewPlan);
+ }
+ return randomPlan;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using pre-existing plan for " +
+ region.getRegionNameAsString() + "; plan=" + existingPlan);
+ }
+ return existingPlan;
+ }
+
+ /**
+ * Wait for some time before retrying meta table region assignment
+ */
+ private void waitForRetryingMetaAssignment() {
+ try {
+ Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
+ } catch (InterruptedException e) {
+ LOG.error("Got exception while waiting for hbase:meta assignment");
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Unassigns the specified region.
+ * <p>
+ * Updates the RegionState and sends the CLOSE RPC unless region is being
+ * split by regionserver; then the unassign fails (silently) because we
+ * presume the region being unassigned no longer exists (its been split out
+ * of existence). TODO: What to do if split fails and is rolled back and
+ * parent is revivified?
+ * <p>
+ * If a RegionPlan is already set, it will remain.
+ *
+ * @param region server to be unassigned
+ */
+ public void unassign(HRegionInfo region) {
+ unassign(region, null);
+ }
+
+
+ /**
+ * Unassigns the specified region.
+ * <p>
+ * Updates the RegionState and sends the CLOSE RPC unless region is being
+ * split by regionserver; then the unassign fails (silently) because we
+ * presume the region being unassigned no longer exists (its been split out
+ * of existence). TODO: What to do if split fails and is rolled back and
+ * parent is revivified?
+ * <p>
+ * If a RegionPlan is already set, it will remain.
+ *
+ * @param region server to be unassigned
+ * @param dest the destination server of the region
+ */
+ public void unassign(HRegionInfo region, ServerName dest) {
+ // TODO: Method needs refactoring. Ugly buried returns throughout. Beware!
+ LOG.debug("Starting unassign of " + region.getRegionNameAsString()
+ + " (offlining), current state: " + regionStates.getRegionState(region));
+
+ String encodedName = region.getEncodedName();
+ // Grab the state of this region and synchronize on it
+ // We need a lock here as we're going to do a put later and we don't want multiple states
+ // creation
+ ReentrantLock lock = locker.acquireLock(encodedName);
+ RegionState state = regionStates.getRegionTransitionState(encodedName);
+ try {
+ if (state == null || state.isFailedClose()) {
+ if (state == null) {
+ // Region is not in transition.
+ // We can unassign it only if it's not SPLIT/MERGED.
+ state = regionStates.getRegionState(encodedName);
+ if (state != null && state.isUnassignable()) {
+ LOG.info("Attempting to unassign " + state + ", ignored");
+ // Offline region will be reassigned below
+ return;
+ }
+ if (state == null || state.getServerName() == null) {
+ // We don't know where the region is, offline it.
+ // No need to send CLOSE RPC
+ LOG.warn("Attempting to unassign a region not in RegionStates "
+ + region.getRegionNameAsString() + ", offlined");
+ regionOffline(region);
+ return;
+ }
+ }
+ state = regionStates.updateRegionState(
+ region, State.PENDING_CLOSE);
+ } else if (state.isFailedOpen()) {
+ // The region is not open yet
+ regionOffline(region);
+ return;
+ } else {
+ LOG.debug("Attempting to unassign " +
+ region.getRegionNameAsString() + " but it is " +
+ "already in transition (" + state.getState());
+ return;
+ }
+
+ unassign(region, state.getServerName(), dest);
+ } finally {
+ lock.unlock();
+
+ // Region is expected to be reassigned afterwards
+ if (!replicasToClose.contains(region)
+ && regionStates.isRegionInState(region, State.OFFLINE)) {
+ assign(region);
+ }
+ }
+ }
+
+ /**
+ * Used by unit tests. Return the number of regions opened so far in the life
+ * of the master. Increases by one every time the master opens a region
+ * @return the counter value of the number of regions opened so far
+ */
+ public int getNumRegionsOpened() {
+ return numRegionsOpened.get();
+ }
+
+ /**
+ * Waits until the specified region has completed assignment.
+ * <p>
+ * If the region is already assigned, returns immediately. Otherwise, method
+ * blocks until the region is assigned.
+ * @param regionInfo region to wait on assignment for
+ * @return true if the region is assigned false otherwise.
+ * @throws InterruptedException
+ */
+ public boolean waitForAssignment(HRegionInfo regionInfo)
+ throws InterruptedException {
+ ArrayList<HRegionInfo> regionSet = new ArrayList<>(1);
+ regionSet.add(regionInfo);
+ return waitForAssignment(regionSet, true, Long.MAX_VALUE);
+ }
+
+ /**
+ * Waits until the specified region has completed assignment, or the deadline is reached.
+ */
+ protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+ final boolean waitTillAllAssigned, final int reassigningRegions,
+ final long minEndTime) throws InterruptedException {
+ long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
+ if (deadline < 0) { // Overflow
+ deadline = Long.MAX_VALUE; // wait forever
+ }
+ return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
+ }
+
+ /**
+ * Waits until the specified region has completed assignment, or the deadline is reached.
+ * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
+ * @param waitTillAllAssigned true if we should wait all the regions to be assigned
+ * @param deadline the timestamp after which the wait is aborted
+ * @return true if all the regions are assigned false otherwise.
+ * @throws InterruptedException
+ */
+ protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+ final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
+ // We're not synchronizing on regionsInTransition now because we don't use any iterator.
+ while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
+ int failedOpenCount = 0;
+ Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
+ while (regionInfoIterator.hasNext()) {
+ HRegionInfo hri = regionInfoIterator.next();
+ if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
+ State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
+ regionInfoIterator.remove();
+ } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
+ failedOpenCount++;
+ }
+ }
+ if (!waitTillAllAssigned) {
+ // No need to wait, let assignment going on asynchronously
+ break;
+ }
+ if (!regionSet.isEmpty()) {
+ if (failedOpenCount == regionSet.size()) {
+ // all the regions we are waiting had an error on open.
+ break;
+ }
+ regionStates.waitForUpdate(100);
+ }
+ }
+ return regionSet.isEmpty();
+ }
+
+ /**
+ * Assigns the hbase:meta region or a replica.
+ * <p>
+ * Assumes that hbase:meta is currently closed and is not being actively served by
+ * any RegionServer.
+ * @param hri TODO
+ */
+ public void assignMeta(HRegionInfo hri) throws KeeperException {
+ regionStates.updateRegionState(hri, State.OFFLINE);
+ assign(hri);
+ }
+
+ /**
+ * Assigns specified regions retaining assignments, if any.
+ * <p>
+ * This is a synchronous call and will return once every region has been
+ * assigned. If anything fails, an exception is thrown
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public void assign(Map<HRegionInfo, ServerName> regions)
+ throws IOException, InterruptedException {
+ if (regions == null || regions.isEmpty()) {
+ return;
+ }
+ List<ServerName> servers = serverManager.createDestinationServersList();
+ if (servers == null || servers.isEmpty()) {
+ throw new IOException("Found no destination server to assign region(s)");
+ }
+
+ // Reuse existing assignment info
+ Map<ServerName, List<HRegionInfo>> bulkPlan =
+ balancer.retainAssignment(regions, servers);
+ if (bulkPlan == null) {
+ throw new IOException("Unable to determine a plan to assign region(s)");
+ }
+
+ processBogusAssignments(bulkPlan);
+
+ assign(regions.size(), servers.size(),
+ "retainAssignment=true", bulkPlan);
+ }
+
+ /**
+ * Assigns specified regions round robin, if any.
+ * <p>
+ * This is a synchronous call and will return once every region has been
+ * assigned. If anything fails, an exception is thrown
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public void assign(List<HRegionInfo> regions)
+ throws IOException, InterruptedException {
+ if (regions == null || regions.isEmpty()) {
+ return;
+ }
+
+ List<ServerName> servers = serverManager.createDestinationServersList();
+ if (servers == null || servers.isEmpty()) {
+ throw new IOException("Found no destination server to assign region(s)");
+ }
+
+ // Generate a round-robin bulk assignment plan
+ Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
+ if (bulkPlan == null) {
+ throw new IOException("Unable to determine a plan to assign region(s)");
+ }
+
+ processBogusAssignments(bulkPlan);
+
+ assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
+ }
+
+ private void assign(int regions, int totalServers,
+ String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
+ throws InterruptedException, IOException {
+
+ int servers = bulkPlan.size();
+ if (servers == 1 || (regions < bulkAssignThresholdRegions
+ && servers < bulkAssignThresholdServers)) {
+
+ // Not use bulk assignment. This could be more efficient in small
+ // cluster, especially mini cluster for testing, so that tests won't time out
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Not using bulk assignment since we are assigning only " + regions +
+ " region(s) to " + servers + " server(s)");
+ }
+
+ // invoke assignment (async)
+ ArrayList<HRegionInfo> userRegionSet = new ArrayList<>(regions);
+ for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
+ if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
+ for (HRegionInfo region: plan.getValue()) {
+ if (!regionStates.isRegionOnline(region)) {
+ invokeAssign(region);
+ if (!region.getTable().isSystemTable()) {
+ userRegionSet.add(region);
+ }
+ }
+ }
+ }
+ }
+
+ // wait for assignment completion
+ if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+ System.currentTimeMillis())) {
+ LOG.debug("some user regions are still in transition: " + userRegionSet);
+ }
+ } else {
+ LOG.info("Bulk assigning " + regions + " region(s) across "
+ + totalServers + " server(s), " + message);
+
+ // Use fixed count thread pool assigning.
+ BulkAssigner ba = new GeneralBulkAssigner(
+ this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
+ ba.bulkAssign();
+ LOG.info("Bulk assigning done");
+ }
+ }
+
+ /**
+ * Assigns all user regions, if any exist. Used during cluster startup.
+ * <p>
+ * This is a synchronous call and will return once every region has been
+ * assigned. If anything fails, an exception is thrown and the cluster
+ * should be shutdown.
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
+ throws IOException, InterruptedException {
+ if (allRegions == null || allRegions.isEmpty()) return;
+
+ // Determine what type of assignment to do on startup
+ boolean retainAssignment = server.getConfiguration().
+ getBoolean("hbase.master.startup.retainassign", true);
+
+ Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
+ if (retainAssignment) {
+ assign(allRegions);
+ } else {
+ List<HRegionInfo> regions = new ArrayList<>(regionsFromMetaScan);
+ assign(regions);
+ }
+
+ for (HRegionInfo hri : regionsFromMetaScan) {
+ TableName tableName = hri.getTable();
+ if (!tableStateManager.isTableState(tableName,
+ TableState.State.ENABLED)) {
+ setEnabledTable(tableName);
+ }
+ }
+ // assign all the replicas that were not recorded in the meta
+ assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
+ }
+
+ /**
+ * Get number of replicas of a table
+ */
+ private static int getNumReplicas(MasterServices master, TableName table) {
+ int numReplica = 1;
+ try {
+ HTableDescriptor htd = master.getTableDescriptors().get(table);
+ if (htd == null) {
+ LOG.warn("master can not get TableDescriptor from table '" + table);
+ } else {
+ numReplica = htd.getRegionReplication();
+ }
+ } catch (IOException e){
+ LOG.warn("Couldn't get the replication attribute of the table " + table + " due to "
+ + e.getMessage());
+ }
+ return numReplica;
+ }
+
+ /**
+ * Get a list of replica regions that are:
+ * not recorded in meta yet. We might not have recorded the locations
+ * for the replicas since the replicas may not have been online yet, master restarted
+ * in the middle of assigning, ZK erased, etc.
+ * @param regionsRecordedInMeta the list of regions we know are recorded in meta
+ * either as a default, or, as the location of a replica
+ * @param master
+ * @return list of replica regions
+ * @throws IOException
+ */
+ public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
+ Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
+ List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<>();
+ for (HRegionInfo hri : regionsRecordedInMeta) {
+ TableName table = hri.getTable();
+ if(master.getTableDescriptors().get(table) == null)
+ continue;
+ int desiredRegionReplication = getNumReplicas(master, table);
+ for (int i = 0; i < desiredRegionReplication; i++) {
+ HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
+ if (regionsRecordedInMeta.contains(replica)) continue;
+ regionsNotRecordedInMeta.add(replica);
+ }
+ }
+ return regionsNotRecordedInMeta;
+ }
+
+ /**
+ * Rebuild the list of user regions and assignment information.
+ * Updates regionstates with findings as we go through list of regions.
+ * @return set of servers not online that hosted some regions according to a scan of hbase:meta
+ * @throws IOException
+ */
+ Set<ServerName> rebuildUserRegions() throws
+ IOException, KeeperException {
+ Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
+ TableState.State.DISABLED, TableState.State.ENABLING);
+
+ Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
+ TableState.State.DISABLED,
+ TableState.State.DISABLING,
+ TableState.State.ENABLING);
+
+ // Region assignment from META
+ List<Result> results = MetaTableAccessor.fullScanRegions(server.getConnection());
+ // Get any new but slow to checkin region server that joined the cluster
+ Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
+ // Set of offline servers to be returned
+ Set<ServerName> offlineServers = new HashSet<>();
+ // Iterate regions in META
+ for (Result result : results) {
+ if (result == null && LOG.isDebugEnabled()){
+ LOG.debug("null result from meta - ignoring but this is strange.");
+ continue;
+ }
+ // keep a track of replicas to close. These were the replicas of the originally
+ // unmerged regions. The master might have closed them before but it mightn't
+ // maybe because it crashed.
+ PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
+ if (p.getFirst() != null && p.getSecond() != null) {
+ int numReplicas = getNumReplicas(server, p.getFirst().getTable());
+ for (HRegionInfo merge : p) {
+ for (int i = 1; i < numReplicas; i++) {
+ replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
+ }
+ }
+ }
+ RegionLocations rl = MetaTableAccessor.getRegionLocations(result);
+ if (rl == null) {
+ continue;
+ }
+ HRegionLocation[] locations = rl.getRegionLocations();
+ if (locations == null) {
+ continue;
+ }
+ for (HRegionLocation hrl : locations) {
+ if (hrl == null) continue;
+ HRegionInfo regionInfo = hrl.getRegionInfo();
+ if (regionInfo == null) continue;
+ int replicaId = regionInfo.getReplicaId();
+ State state = RegionStateStore.getRegionState(result, replicaId);
+ // keep a track of replicas to close. These were the replicas of the split parents
+ // from the previous life of the master. The master should have closed them before
+ // but it couldn't maybe because it crashed
+ if (replicaId == 0 && state.equals(State.SPLIT)) {
+ for (HRegionLocation h : locations) {
+ replicasToClose.add(h.getRegionInfo());
+ }
+ }
+ ServerName lastHost = hrl.getServerName();
+ ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
+ regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
+ if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
+ // Region is not open (either offline or in transition), skip
+ continue;
+ }
+ TableName tableName = regionInfo.getTable();
+ if (!onlineServers.contains(regionLocation)) {
+ // Region is located on a server that isn't online
+ offlineServers.add(regionLocation);
+ } else if (!disabledOrEnablingTables.contains(tableName)) {
+ // Region is being served and on an active server
+ // add only if region not in disabled or enabling table
+ regionStates.regionOnline(regionInfo, regionLocation);
+ balancer.regionOnline(regionInfo, regionLocation);
+ }
+ // need to enable the table if not disabled or disabling or enabling
+ // this will be used in rolling restarts
+ if (!disabledOrDisablingOrEnabling.contains(tableName)
+ && !getTableStateManager().isTableState(tableName,
+ TableState.State.ENABLED)) {
+ setEnabledTable(tableName);
+ }
+ }
+ }
+ return offlineServers;
+ }
+
+ /**
+ * Processes list of regions in transition at startup
+ */
+ void processRegionsInTransition(Collection<RegionState> regionsInTransition) {
+ // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
+ // in case the RPC call is not sent out yet before the master was shut down
+ // since we update the state before we send the RPC call. We can't update
+ // the state after the RPC call. Otherwise, we don't know what's happened
+ // to the region if the master dies right after the RPC call is out.
+ for (RegionState regionState: regionsInTransition) {
+ LOG.info("Processing " + regionState);
+ ServerName serverName = regionState.getServerName();
+ // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
+ // case, try assigning it here.
+ if (serverName != null && !serverManager.getOnlineServers().containsKey(serverName)) {
+ LOG.info("Server " + serverName + " isn't online. SSH will handle this");
+ continue; // SSH will handle it
+ }
+ HRegionInfo regionInfo = regionState.getRegion();
+ RegionState.State state = regionState.getState();
+ switch (state) {
+ case CLOSED:
+ invokeAssign(regionState.getRegion());
+ break;
+ case PENDING_OPEN:
+ retrySendRegionOpen(regionState);
+ break;
+ case PENDING_CLOSE:
+ retrySendRegionClose(regionState);
+ break;
+ case FAILED_CLOSE:
+ case FAILED_OPEN:
+ invokeUnAssign(regionInfo);
+ break;
+ default:
+ // No process for other states
+ break;
+ }
+ }
+ }
+
+ /**
+ * At master failover, for pending_open region, make sure
+ * sendRegionOpen RPC call is sent to the target regionserver
+ */
+ private void retrySendRegionOpen(final RegionState regionState) {
+ this.executorService.submit(
+ new EventHandler(server, EventType.M_MASTER_RECOVERY) {
+ @Override
+ public void process() throws IOException {
+ HRegionInfo hri = regionState.getRegion();
+ ServerName serverName = regionState.getServerName();
+ ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
+ try {
+ for (int i = 1; i <= maximumAttempts; i++) {
+ if (!serverManager.isServerOnline(serverName)
+ || server.isStopped() || server.isAborted()) {
+ return; // No need any more
+ }
+ try {
+ if (!regionState.equals(regionStates.getRegionState(hri))) {
+ return; // Region is not in the expected state any more
+ }
+ List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
+ if (shouldAssignFavoredNodes(hri)) {
+ FavoredNodesManager fnm = ((MasterServices)server).getFavoredNodesManager();
+ favoredNodes = fnm.getFavoredNodesWithDNPort(hri);
+ }
+ serverManager.sendRegionOpen(serverName, hri, favoredNodes);
+ return; // we're done
+ } catch (Throwable t) {
+ if (t instanceof RemoteException) {
+ t = ((RemoteException) t).unwrapRemoteException();
+ }
+ if (t instanceof FailedServerException && i < maximumAttempts) {
+ // In case the server is in the failed server list, no point to
+ // retry too soon. Retry after the failed_server_expiry time
+ try {
+ Configuration conf = this.server.getConfiguration();
+ long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+ RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(serverName + " is on failed server list; waiting "
+ + sleepTime + "ms", t);
+ }
+ Thread.sleep(sleepTime);
+ continue;
+ } catch (InterruptedException ie) {
+ LOG.warn("Failed to assign "
+ + hri.getRegionNameAsString() + " since interrupted", ie);
+ regionStates.updateRegionState(hri, State.FAILED_OPEN);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ if (serverManager.isServerOnline(serverName)
+ && t instanceof java.net.SocketTimeoutException) {
+ i--; // reset the try count
+ } else {
+ LOG.info("Got exception in retrying sendRegionOpen for "
+ + regionState + "; try=" + i + " of " + maximumAttempts, t);
+ }
+ Threads.sleep(100);
+ }
+ }
+ // Run out of attempts
+ regionStates.updateRegionState(hri, State.FAILED_OPEN);
+ } finally {
+ lock.unlock();
+ }
+ }
+ });
+ }
+
+ /**
+ * At master failover, for pending_close region, make sure
+ * sendRegionClose RPC call is sent to the target regionserver
+ */
+ private void retrySendRegionClose(final RegionState regionState) {
+ this.executorService.submit(
+ new EventHandler(server, EventType.M_MASTER_RECOVERY) {
+ @Override
+ public void process() throws IOException {
+ HRegionInfo hri = regionState.getRegion();
+ ServerName serverName = regionState.getServerName();
+ ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
+ try {
+ for (int i = 1; i <= maximumAttempts; i++) {
+ if (!serverManager.isServerOnline(serverName)
+ || server.isStopped() || server.isAborted()) {
+ return; // No need any more
+ }
+ try {
+ if (!regionState.equals(regionStates.getRegionState(hri))) {
+ return; // Region is not in the expected state any more
+ }
+ serverManager.sendRegionClose(serverName, hri, null);
+ return; // Done.
+ } catch (Throwable t) {
+ if (t instanceof RemoteException) {
+ t = ((RemoteException) t).unwrapRemoteException();
+ }
+ if (t instanceof FailedServerException && i < maximumAttempts) {
+ // In case the server is in the failed server list, no point to
+ // retry too soon. Retry after the failed_server_expiry time
+ try {
+ Configuration conf = this.server.getConfiguration();
+ long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+ RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(serverName + " is on failed server list; waiting "
+ + sleepTime + "ms", t);
+ }
+ Thread.sleep(sleepTime);
+ continue;
+ } catch (InterruptedException ie) {
+ LOG.warn("Failed to unassign "
+ + hri.getRegionNameAsString() + " since interrupted", ie);
+ regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ if (serverManager.isServerOnline(serverName)
+ && t instanceof java.net.SocketTimeoutException) {
+ i--; // reset the try count
+ } else {
+ LOG.info("Got exception in retrying sendRegionClose for "
+ + regionState + "; try=" + i + " of " + maximumAttempts, t);
+ }
+ Threads.sleep(100);
+ }
+ }
+ // Run out of attempts
+ regionStates.updateRegionState(hri, State.FAILED_CLOSE);
+ } finally {
+ lock.unlock();
+ }
+ }
+ });
+ }
+
+ /**
+ * Set Regions in transitions metrics.
+ * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
+ * This iterator is not fail fast, which may lead to stale read; but that's better than
+ * creating a copy of the map for metrics computation, as this method will be invoked
+ * on a frequent interval.
+ */
+ public void updateRegionsInTransitionMetrics() {
+ long currentTime = System.currentTimeMillis();
+ int totalRITs = 0;
+ int totalRITsOverThreshold = 0;
+ long oldestRITTime = 0;
+ int ritThreshold = this.server.getConfiguration().
+ getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
+ for (RegionState state: regionStates.getRegionsInTransition()) {
+ totalRITs++;
+ long ritTime = currentTime - state.getStamp();
+ if (ritTime > ritThreshold) { // more than the threshold
+ totalRITsOverThreshold++;
+ }
+ if (oldestRITTime < ritTime) {
+ oldestRITTime = ritTime;
+ }
+ }
+ if (this.metricsAssignmentManager != null) {
+ this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
+ this.metricsAssignmentManager.updateRITCount(totalRITs);
+ this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
+ }
+ }
+
+ /**
+ * @param region Region whose plan we are to clear.
+ */
+ private void clearRegionPlan(final HRegionInfo region) {
+ synchronized (this.regionPlans) {
+ this.regionPlans.remove(region.getEncodedName());
+ }
+ }
+
+ /**
+ * Wait on region to clear regions-in-transition.
+ * @param hri Region to wait on.
+ * @throws IOException
+ */
+ public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
+ throws IOException, InterruptedException {
+ waitOnRegionToClearRegionsInTransition(hri, -1L);
+ }
+
+ /**
+ * Wait on region to clear regions-in-transition or time out
+ * @param hri
+ * @param timeOut Milliseconds to wait for current region to be out of transition state.
+ * @return True when a region clears regions-in-transition before timeout otherwise false
+ * @throws InterruptedException
+ */
+ public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
+ throws InterruptedException {
+ if (!regionStates.isRegionInTransition(hri)) {
+ return true;
+ }
+ long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
+ + timeOut;
+ // There is already a timeout monitor on regions in transition so I
+ // should not have to have one here too?
+ LOG.info("Waiting for " + hri.getEncodedName() +
+ " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
+ while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
+ regionStates.waitForUpdate(100);
+ if (EnvironmentEdgeManager.currentTime() > end) {
+ LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
+ return false;
+ }
+ }
+ if (this.server.isStopped()) {
+ LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
+ return false;
+ }
+ return true;
+ }
+
+ void invokeAssign(HRegionInfo regionInfo) {
+ threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
+ }
+
+ void invokeAssignLater(HRegionInfo regionInfo, long sleepMillis) {
+ scheduledThreadPoolExecutor.schedule(new DelayedAssignCallable(
+ new AssignCallable(this, regionInfo)), sleepMillis, TimeUnit.MILLISECONDS);
+ }
+
+ void invokeUnAssign(HRegionInfo regionInfo) {
+ threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
+ }
+
+ public boolean isCarryingMeta(ServerName serverName) {
+ return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
+ }
+
+ public boolean isCarryingMetaReplica(ServerName serverName, int replicaId) {
+ return isCarryingRegion(serverName,
+ RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
+ }
+
+ public boolean isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
+ return isCarryingRegion(serverName, metaHri);
+ }
+
+ /**
+ * Check if the shutdown server carries the specific region.
+ * @return whether the serverName currently hosts the region
+ */
+ private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
+ RegionState regionState = regionStates.getRegionTransitionState(hri);
+ ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
+ if (transitionAddr != null) {
+ boolean matchTransitionAddr = transitionAddr.equals(serverName);
+ LOG.debug("Checking region=" + hri.getRegionNameAsString()
+ + ", transitioning on server=" + matchTransitionAddr
+ + " server being checked: " + serverName
+ + ", matches=" + matchTransitionAddr);
+ return matchTransitionAddr;
+ }
+
+ ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
+ boolean matchAssignedAddr = serverName.equals(assignedAddr);
+ LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
+ + " is on server=" + assignedAddr + ", server being checked: "
+ + serverName);
+ return matchAssignedAddr;
+ }
+
+ /**
+ * Clean out crashed server removing any assignments.
+ * @param sn Server that went down.
+ * @return list of regions in transition on this server
+ */
+ public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
+ // Clean out any existing assignment plans for this server
+ synchronized (this.regionPlans) {
+ for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
+ i.hasNext();) {
+ Map.Entry<String, RegionPlan> e = i.next();
+ ServerName otherSn = e.getValue().getDestination();
+ // The name will be null if the region is planned for a random assign.
+ if (otherSn != null && otherSn.equals(sn)) {
+ // Use iterator's remove else we'll get CME
+ i.remove();
+ }
+ }
+ }
+ List<HRegionInfo> rits = regionStates.serverOffline(sn);
+ for (Iterator<HRegionInfo> it = rits.iterator(); it.hasNext(); ) {
+ HRegionInfo hri = it.next();
+ String encodedName = hri.getEncodedName();
+
+ // We need a lock on the region as we could update it
+ Lock lock = locker.acquireLock(encodedName);
+ try {
+ RegionState regionState = regionStates.getRegionTransitionState(encodedName);
+ if (regionState == null
+ || (regionState.getServerName() != null && !regionState.isOnServer(sn))
+ || !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN,
+ State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
+ LOG.info("Skip " + regionState + " since it is not opening/failed_close"
+ + " on the dead server any more: " + sn);
+ it.remove();
+ } else {
+ if (tab
<TRUNCATED>