You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/23 07:36:28 UTC
[16/50] [abbrv] hbase git commit: HBASE-14614 Procedure v2 - Core
Assignment Manager (Matteo Bertozzi) Move to a new AssignmentManager,
one that describes Assignment using a State Machine built on top of
ProcedureV2 facility.
http://git-wip-us.apache.org/repos/asf/hbase/blob/f56592fd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
deleted file mode 100644
index 69ebd97..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ /dev/null
@@ -1,3053 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CoordinatedStateException;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.RegionStateListener;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MasterSwitchType;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.executor.EventHandler;
-import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.executor.ExecutorService;
-import org.apache.hadoop.hbase.favored.FavoredNodesManager;
-import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
-import org.apache.hadoop.hbase.ipc.FailedServerException;
-import org.apache.hadoop.hbase.ipc.RpcClient;
-import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
-import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.quotas.QuotaExceededException;
-import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
-import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
-import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.KeyLocker;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.PairOfSameType;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Manages and performs region assignment.
- * Related communications with regionserver are all done over RPC.
- */
-@InterfaceAudience.Private
-public class AssignmentManager {
- private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
-
- protected final MasterServices server;
-
- private ServerManager serverManager;
-
- private boolean shouldAssignRegionsWithFavoredNodes;
-
- private LoadBalancer balancer;
-
- private final MetricsAssignmentManager metricsAssignmentManager;
-
- private AtomicInteger numRegionsOpened = new AtomicInteger(0);
-
- final private KeyLocker<String> locker = new KeyLocker<>();
-
- Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
-
- /**
- * Map of regions to reopen after the schema of a table is changed. Key -
- * encoded region name, value - HRegionInfo
- */
- private final Map <String, HRegionInfo> regionsToReopen;
-
- /*
- * Maximum times we recurse an assignment/unassignment.
- * See below in {@link #assign()} and {@link #unassign()}.
- */
- private final int maximumAttempts;
-
- /**
- * The sleep time for which the assignment will wait before retrying in case of
- * hbase:meta assignment failure due to lack of availability of region plan or bad region plan
- */
- private final long sleepTimeBeforeRetryingMetaAssignment;
-
- /** Plans for region movement. Key is the encoded version of a region name*/
- // TODO: When do plans get cleaned out? Ever? In server open and in server
- // shutdown processing -- St.Ack
- // All access to this Map must be synchronized.
- final NavigableMap<String, RegionPlan> regionPlans = new TreeMap<>();
-
- private final TableStateManager tableStateManager;
-
- private final ExecutorService executorService;
-
- private java.util.concurrent.ExecutorService threadPoolExecutorService;
- private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
-
- private final RegionStates regionStates;
-
- // The threshold to use bulk assigning. Using bulk assignment
- // only if assigning at least this many regions to at least this
- // many servers. If assigning fewer regions to fewer servers,
- // bulk assigning may be not as efficient.
- private final int bulkAssignThresholdRegions;
- private final int bulkAssignThresholdServers;
- private final int bulkPerRegionOpenTimeGuesstimate;
-
- // Should bulk assignment wait till all regions are assigned,
- // or it is timed out? This is useful to measure bulk assignment
- // performance, but not needed in most use cases.
- private final boolean bulkAssignWaitTillAllAssigned;
-
- /**
- * Indicator that AssignmentManager has recovered the region states so
- * that ServerShutdownHandler can be fully enabled and re-assign regions
- * of dead servers. So that when re-assignment happens, AssignmentManager
- * has proper region states.
- *
- * Protected to ease testing.
- */
- protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
-
- /**
- * A map to track the count a region fails to open in a row.
- * So that we don't try to open a region forever if the failure is
- * unrecoverable. We don't put this information in region states
- * because we don't expect this to happen frequently; we don't
- * want to copy this information over during each state transition either.
- */
- private final ConcurrentHashMap<String, AtomicInteger> failedOpenTracker = new ConcurrentHashMap<>();
-
- // In case not using ZK for region assignment, region states
- // are persisted in meta with a state store
- private final RegionStateStore regionStateStore;
-
- /**
- * For testing only! Set to true to skip handling of split.
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
- public static boolean TEST_SKIP_SPLIT_HANDLING = false;
-
- /** Listeners that are called on assignment events. */
- private List<AssignmentListener> listeners = new CopyOnWriteArrayList<>();
-
- private RegionStateListener regionStateListener;
-
- private RetryCounter.BackoffPolicy backoffPolicy;
- private RetryCounter.RetryConfig retryConfig;
- /**
- * Constructs a new assignment manager.
- *
- * @param server instance of HMaster this AM running inside
- * @param serverManager serverManager for associated HMaster
- * @param balancer implementation of {@link LoadBalancer}
- * @param service Executor service
- * @param metricsMaster metrics manager
- * @throws IOException
- */
- public AssignmentManager(MasterServices server, ServerManager serverManager,
- final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster,
- final TableStateManager tableStateManager)
- throws IOException {
- this.server = server;
- this.serverManager = serverManager;
- this.executorService = service;
- this.regionStateStore = new RegionStateStore(server);
- this.regionsToReopen = Collections.synchronizedMap
- (new HashMap<String, HRegionInfo> ());
- Configuration conf = server.getConfiguration();
-
- this.tableStateManager = tableStateManager;
-
- // This is the max attempts, not retries, so it should be at least 1.
- this.maximumAttempts = Math.max(1,
- this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
- this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
- "hbase.meta.assignment.retry.sleeptime", 1000l);
- this.balancer = balancer;
- // Only read favored nodes if using the favored nodes load balancer.
- this.shouldAssignRegionsWithFavoredNodes = this.balancer instanceof FavoredNodesPromoter;
- int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
-
- this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
- maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
-
- this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
- Threads.newDaemonThreadFactory("AM.Scheduler"));
-
- this.regionStates = new RegionStates(
- server, tableStateManager, serverManager, regionStateStore);
-
- this.bulkAssignWaitTillAllAssigned =
- conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
- this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
- this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
- this.bulkPerRegionOpenTimeGuesstimate =
- conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
-
- this.metricsAssignmentManager = new MetricsAssignmentManager();
-
- // Configurations for retrying opening a region on receiving a FAILED_OPEN
- this.retryConfig = new RetryCounter.RetryConfig();
- this.retryConfig.setSleepInterval(conf.getLong("hbase.assignment.retry.sleep.initial", 0l));
- // Set the max time limit to the initial sleep interval so we use a constant time sleep strategy
- // if the user does not set a max sleep limit
- this.retryConfig.setMaxSleepTime(conf.getLong("hbase.assignment.retry.sleep.max",
- retryConfig.getSleepInterval()));
- this.backoffPolicy = getBackoffPolicy();
- }
-
- /**
- * Returns the backoff policy used for Failed Region Open retries
- * @return the backoff policy used for Failed Region Open retries
- */
- RetryCounter.BackoffPolicy getBackoffPolicy() {
- return new RetryCounter.ExponentialBackoffPolicyWithLimit();
- }
-
- MetricsAssignmentManager getAssignmentManagerMetrics() {
- return this.metricsAssignmentManager;
- }
-
- /**
- * Add the listener to the notification list.
- * @param listener The AssignmentListener to register
- */
- public void registerListener(final AssignmentListener listener) {
- this.listeners.add(listener);
- }
-
- /**
- * Remove the listener from the notification list.
- * @param listener The AssignmentListener to unregister
- */
- public boolean unregisterListener(final AssignmentListener listener) {
- return this.listeners.remove(listener);
- }
-
- /**
- * @return Instance of ZKTableStateManager.
- */
- public TableStateManager getTableStateManager() {
- // These are 'expensive' to make involving trip to zk ensemble so allow
- // sharing.
- return this.tableStateManager;
- }
-
- /**
- * This SHOULD not be public. It is public now
- * because of some unit tests.
- *
- * TODO: make it package private and keep RegionStates in the master package
- */
- public RegionStates getRegionStates() {
- return regionStates;
- }
-
- /**
- * Used in some tests to mock up region state in meta
- */
- @VisibleForTesting
- RegionStateStore getRegionStateStore() {
- return regionStateStore;
- }
-
- public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
- return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
- }
-
- /**
- * Add a regionPlan for the specified region.
- * @param encodedName
- * @param plan
- */
- public void addPlan(String encodedName, RegionPlan plan) {
- synchronized (regionPlans) {
- regionPlans.put(encodedName, plan);
- }
- }
-
- /**
- * Add a map of region plans.
- */
- public void addPlans(Map<String, RegionPlan> plans) {
- synchronized (regionPlans) {
- regionPlans.putAll(plans);
- }
- }
-
- /**
- * Set the list of regions that will be reopened
- * because of an update in table schema
- *
- * @param regions
- * list of regions that should be tracked for reopen
- */
- public void setRegionsToReopen(List <HRegionInfo> regions) {
- for(HRegionInfo hri : regions) {
- regionsToReopen.put(hri.getEncodedName(), hri);
- }
- }
-
- /**
- * Used by the client to identify if all regions have the schema updates
- *
- * @param tableName
- * @return Pair indicating the status of the alter command
- * @throws IOException
- */
- public Pair<Integer, Integer> getReopenStatus(TableName tableName)
- throws IOException {
- List<HRegionInfo> hris;
- if (TableName.META_TABLE_NAME.equals(tableName)) {
- hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
- } else {
- hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true);
- }
-
- Integer pending = 0;
- for (HRegionInfo hri : hris) {
- String name = hri.getEncodedName();
- // no lock concurrent access ok: sequential consistency respected.
- if (regionsToReopen.containsKey(name)
- || regionStates.isRegionInTransition(name)) {
- pending++;
- }
- }
- return new Pair<>(pending, hris.size());
- }
-
- /**
- * Used by ServerShutdownHandler to make sure AssignmentManager has completed
- * the failover cleanup before re-assigning regions of dead servers. So that
- * when re-assignment happens, AssignmentManager has proper region states.
- */
- public boolean isFailoverCleanupDone() {
- return failoverCleanupDone.get();
- }
-
- /**
- * To avoid racing with AM, external entities may need to lock a region,
- * for example, when SSH checks what regions to skip re-assigning.
- */
- public Lock acquireRegionLock(final String encodedName) {
- return locker.acquireLock(encodedName);
- }
-
- /**
- * Now, failover cleanup is completed. Notify server manager to
- * process queued up dead servers processing, if any.
- */
- void failoverCleanupDone() {
- failoverCleanupDone.set(true);
- serverManager.processQueuedDeadServers();
- }
-
- /**
- * Called on startup.
- * Figures whether a fresh cluster start of we are joining extant running cluster.
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
- * @throws CoordinatedStateException
- */
- void joinCluster()
- throws IOException, KeeperException, InterruptedException, CoordinatedStateException {
- long startTime = System.currentTimeMillis();
- // Concurrency note: In the below the accesses on regionsInTransition are
- // outside of a synchronization block where usually all accesses to RIT are
- // synchronized. The presumption is that in this case it is safe since this
- // method is being played by a single thread on startup.
-
- // TODO: Regions that have a null location and are not in regionsInTransitions
- // need to be handled.
-
- // Scan hbase:meta to build list of existing regions, servers, and assignment
- // Returns servers who have not checked in (assumed dead) that some regions
- // were assigned to (according to the meta)
- Set<ServerName> deadServers = rebuildUserRegions();
-
- // This method will assign all user regions if a clean server startup or
- // it will reconstruct master state and cleanup any leftovers from previous master process.
- boolean failover = processDeadServersAndRegionsInTransition(deadServers);
-
- LOG.info("Joined the cluster in " + (System.currentTimeMillis()
- - startTime) + "ms, failover=" + failover);
- }
-
- /**
- * Process all regions that are in transition in zookeeper and also
- * processes the list of dead servers.
- * Used by master joining an cluster. If we figure this is a clean cluster
- * startup, will assign all user regions.
- * @param deadServers Set of servers that are offline probably legitimately that were carrying
- * regions according to a scan of hbase:meta. Can be null.
- * @throws IOException
- * @throws InterruptedException
- */
- boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
- throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
- // TODO Needed? List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
- boolean failover = !serverManager.getDeadServers().isEmpty();
- if (failover) {
- // This may not be a failover actually, especially if meta is on this master.
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
- }
- // Check if there are any regions on these servers
- failover = false;
- for (ServerName serverName : serverManager.getDeadServers().copyServerNames()) {
- if (regionStates.getRegionAssignments().values().contains(serverName)) {
- LOG.debug("Found regions on dead server: " + serverName);
- failover = true;
- break;
- }
- }
- }
- Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
- if (!failover) {
- // If any one region except meta is assigned, it's a failover.
- for (Map.Entry<HRegionInfo, ServerName> en:
- regionStates.getRegionAssignments().entrySet()) {
- HRegionInfo hri = en.getKey();
- if (!hri.isMetaTable()
- && onlineServers.contains(en.getValue())) {
- LOG.debug("Found region " + hri + " out on cluster");
- failover = true;
- break;
- }
- }
- }
- if (!failover) {
- // If any region except meta is in transition on a live server, it's a failover.
- Set<RegionState> regionsInTransition = regionStates.getRegionsInTransition();
- if (!regionsInTransition.isEmpty()) {
- for (RegionState regionState: regionsInTransition) {
- ServerName serverName = regionState.getServerName();
- if (!regionState.getRegion().isMetaRegion()
- && serverName != null && onlineServers.contains(serverName)) {
- LOG.debug("Found " + regionState + " for region " +
- regionState.getRegion().getRegionNameAsString() + " for server " +
- serverName + "in RITs");
- failover = true;
- break;
- }
- }
- }
- }
- if (!failover) {
- // If we get here, we have a full cluster restart. It is a failover only
- // if there are some WALs are not split yet. For meta WALs, they should have
- // been split already, if any. We can walk through those queued dead servers,
- // if they don't have any WALs, this restart should be considered as a clean one
- Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
- if (!queuedDeadServers.isEmpty()) {
- Configuration conf = server.getConfiguration();
- Path walRootDir = FSUtils.getWALRootDir(conf);
- FileSystem walFs = FSUtils.getWALFileSystem(conf);
- for (ServerName serverName: queuedDeadServers) {
- // In the case of a clean exit, the shutdown handler would have presplit any WALs and
- // removed empty directories.
- Path walDir = new Path(walRootDir,
- AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
- Path splitDir = walDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
- if (checkWals(walFs, walDir) || checkWals(walFs, splitDir)) {
- LOG.debug("Found queued dead server " + serverName);
- failover = true;
- break;
- }
- }
- if (!failover) {
- // We figured that it's not a failover, so no need to
- // work on these re-queued dead servers any more.
- LOG.info("AM figured that it's not a failover and cleaned up "
- + queuedDeadServers.size() + " queued dead servers");
- serverManager.removeRequeuedDeadServers();
- }
- }
- }
-
- Set<TableName> disabledOrDisablingOrEnabling = null;
- Map<HRegionInfo, ServerName> allRegions = null;
-
- if (!failover) {
- disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
- TableState.State.DISABLED, TableState.State.DISABLING,
- TableState.State.ENABLING);
-
- // Clean re/start, mark all user regions closed before reassignment
- allRegions = regionStates.closeAllUserRegions(
- disabledOrDisablingOrEnabling);
- }
-
- // Now region states are restored
- regionStateStore.start();
-
- if (failover) {
- if (deadServers != null && !deadServers.isEmpty()) {
- for (ServerName serverName: deadServers) {
- if (!serverManager.isServerDead(serverName)) {
- serverManager.expireServer(serverName); // Let SSH do region re-assign
- }
- }
- }
- processRegionsInTransition(regionStates.getRegionsInTransition());
- }
-
- // Now we can safely claim failover cleanup completed and enable
- // ServerShutdownHandler for further processing. The nodes (below)
- // in transition, if any, are for regions not related to those
- // dead servers at all, and can be done in parallel to SSH.
- failoverCleanupDone();
- if (!failover) {
- // Fresh cluster startup.
- LOG.info("Clean cluster startup. Don't reassign user regions");
- assignAllUserRegions(allRegions);
- } else {
- LOG.info("Failover! Reassign user regions");
- }
- // unassign replicas of the split parents and the merged regions
- // the daughter replicas are opened in assignAllUserRegions if it was
- // not already opened.
- for (HRegionInfo h : replicasToClose) {
- unassign(h);
- }
- replicasToClose.clear();
- return failover;
- }
-
- private boolean checkWals(FileSystem fs, Path dir) throws IOException {
- if (!fs.exists(dir)) {
- LOG.debug(dir + " doesn't exist");
- return false;
- }
- if (!fs.getFileStatus(dir).isDirectory()) {
- LOG.warn(dir + " is not a directory");
- return false;
- }
- FileStatus[] files = FSUtils.listStatus(fs, dir);
- if (files == null || files.length == 0) {
- LOG.debug(dir + " has no files");
- return false;
- }
- for (int i = 0; i < files.length; i++) {
- if (files[i].isFile() && files[i].getLen() > 0) {
- LOG.debug(dir + " has a non-empty file: " + files[i].getPath());
- return true;
- } else if (files[i].isDirectory() && checkWals(fs, files[i].getPath())) {
- LOG.debug(dir + " is a directory and has a non-empty file: " + files[i].getPath());
- return true;
- }
- }
- LOG.debug("Found 0 non-empty wal files for :" + dir);
- return false;
- }
-
- /**
- * When a region is closed, it should be removed from the regionsToReopen
- * @param hri HRegionInfo of the region which was closed
- */
- public void removeClosedRegion(HRegionInfo hri) {
- if (regionsToReopen.remove(hri.getEncodedName()) != null) {
- LOG.debug("Removed region from reopening regions because it was closed");
- }
- }
-
- void processFavoredNodesForDaughters(HRegionInfo parent,
- HRegionInfo regionA, HRegionInfo regionB) throws IOException {
- if (shouldAssignFavoredNodes(parent)) {
- List<ServerName> onlineServers = this.serverManager.getOnlineServersList();
- ((FavoredNodesPromoter) this.balancer).
- generateFavoredNodesForDaughter(onlineServers, parent, regionA, regionB);
- }
- }
-
- void processFavoredNodesForMerge(HRegionInfo merged, HRegionInfo regionA, HRegionInfo regionB)
- throws IOException {
- if (shouldAssignFavoredNodes(merged)) {
- ((FavoredNodesPromoter)this.balancer).
- generateFavoredNodesForMergedRegion(merged, regionA, regionB);
- }
- }
-
- /*
- * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
- * belongs to a non-system table.
- */
- private boolean shouldAssignFavoredNodes(HRegionInfo region) {
- return this.shouldAssignRegionsWithFavoredNodes
- && FavoredNodesManager.isFavoredNodeApplicable(region);
- }
-
- /**
- * Marks the region as online. Removes it from regions in transition and
- * updates the in-memory assignment information.
- * <p>
- * Used when a region has been successfully opened on a region server.
- * @param regionInfo
- * @param sn
- */
- void regionOnline(HRegionInfo regionInfo, ServerName sn) {
- regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
- }
-
- void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
- numRegionsOpened.incrementAndGet();
- regionStates.regionOnline(regionInfo, sn, openSeqNum);
-
- // Remove plan if one.
- clearRegionPlan(regionInfo);
- balancer.regionOnline(regionInfo, sn);
-
- // Tell our listeners that a region was opened
- sendRegionOpenedNotification(regionInfo, sn);
- }
-
- /**
- * Marks the region as offline. Removes it from regions in transition and
- * removes in-memory assignment information.
- * <p>
- * Used when a region has been closed and should remain closed.
- * @param regionInfo
- */
- public void regionOffline(final HRegionInfo regionInfo) {
- regionOffline(regionInfo, null);
- }
-
- public void offlineDisabledRegion(HRegionInfo regionInfo) {
- replicasToClose.remove(regionInfo);
- regionOffline(regionInfo);
- }
-
- // Assignment methods
-
- /**
- * Assigns the specified region.
- * <p>
- * If a RegionPlan is available with a valid destination then it will be used
- * to determine what server region is assigned to. If no RegionPlan is
- * available, region will be assigned to a random available server.
- * <p>
- * Updates the RegionState and sends the OPEN RPC.
- * <p>
- * This will only succeed if the region is in transition and in a CLOSED or
- * OFFLINE state or not in transition, and of course, the
- * chosen server is up and running (It may have just crashed!).
- *
- * @param region server to be assigned
- */
- public void assign(HRegionInfo region) {
- assign(region, false);
- }
-
- /**
- * Use care with forceNewPlan. It could cause double assignment.
- */
- public void assign(HRegionInfo region, boolean forceNewPlan) {
- if (isDisabledorDisablingRegionInRIT(region)) {
- return;
- }
- String encodedName = region.getEncodedName();
- Lock lock = locker.acquireLock(encodedName);
- try {
- RegionState state = forceRegionStateToOffline(region, forceNewPlan);
- if (state != null) {
- if (regionStates.wasRegionOnDeadServer(encodedName)) {
- LOG.info("Skip assigning " + region.getRegionNameAsString()
- + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
- + " is dead but not processed yet");
- return;
- }
- assign(state, forceNewPlan);
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Bulk assign regions to <code>destination</code>.
- * @param destination
- * @param regions Regions to assign.
- * @return true if successful
- */
- boolean assign(final ServerName destination, final List<HRegionInfo> regions)
- throws InterruptedException {
- long startTime = EnvironmentEdgeManager.currentTime();
- try {
- int regionCount = regions.size();
- if (regionCount == 0) {
- return true;
- }
- LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
- Set<String> encodedNames = new HashSet<>(regionCount);
- for (HRegionInfo region : regions) {
- encodedNames.add(region.getEncodedName());
- }
-
- List<HRegionInfo> failedToOpenRegions = new ArrayList<>();
- Map<String, Lock> locks = locker.acquireLocks(encodedNames);
- try {
- Map<String, RegionPlan> plans = new HashMap<>(regionCount);
- List<RegionState> states = new ArrayList<>(regionCount);
- for (HRegionInfo region : regions) {
- String encodedName = region.getEncodedName();
- if (!isDisabledorDisablingRegionInRIT(region)) {
- RegionState state = forceRegionStateToOffline(region, false);
- boolean onDeadServer = false;
- if (state != null) {
- if (regionStates.wasRegionOnDeadServer(encodedName)) {
- LOG.info("Skip assigning " + region.getRegionNameAsString()
- + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
- + " is dead but not processed yet");
- onDeadServer = true;
- } else {
- RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
- plans.put(encodedName, plan);
- states.add(state);
- continue;
- }
- }
- // Reassign if the region wasn't on a dead server
- if (!onDeadServer) {
- LOG.info("failed to force region state to offline, "
- + "will reassign later: " + region);
- failedToOpenRegions.add(region); // assign individually later
- }
- }
- // Release the lock, this region is excluded from bulk assign because
- // we can't update its state, or set its znode to offline.
- Lock lock = locks.remove(encodedName);
- lock.unlock();
- }
-
- if (server.isStopped()) {
- return false;
- }
-
- // Add region plans, so we can updateTimers when one region is opened so
- // that unnecessary timeout on RIT is reduced.
- this.addPlans(plans);
-
- List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos = new ArrayList<>(states.size());
- for (RegionState state: states) {
- HRegionInfo region = state.getRegion();
- regionStates.updateRegionState(
- region, State.PENDING_OPEN, destination);
- List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
- if (shouldAssignFavoredNodes(region)) {
- favoredNodes = server.getFavoredNodesManager().getFavoredNodesWithDNPort(region);
- }
- regionOpenInfos.add(new Pair<>(region, favoredNodes));
- }
-
- // Move on to open regions.
- try {
- // Send OPEN RPC. If it fails on a IOE or RemoteException,
- // regions will be assigned individually.
- Configuration conf = server.getConfiguration();
- long maxWaitTime = System.currentTimeMillis() +
- conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000);
- for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
- try {
- List<RegionOpeningState> regionOpeningStateList = serverManager
- .sendRegionOpen(destination, regionOpenInfos);
- for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
- RegionOpeningState openingState = regionOpeningStateList.get(k);
- if (openingState != RegionOpeningState.OPENED) {
- HRegionInfo region = regionOpenInfos.get(k).getFirst();
- LOG.info("Got opening state " + openingState
- + ", will reassign later: " + region);
- // Failed opening this region, reassign it later
- forceRegionStateToOffline(region, true);
- failedToOpenRegions.add(region);
- }
- }
- break;
- } catch (IOException e) {
- if (e instanceof RemoteException) {
- e = ((RemoteException)e).unwrapRemoteException();
- }
- if (e instanceof RegionServerStoppedException) {
- LOG.warn("The region server was shut down, ", e);
- // No need to retry, the region server is a goner.
- return false;
- } else if (e instanceof ServerNotRunningYetException) {
- long now = System.currentTimeMillis();
- if (now < maxWaitTime) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Server is not yet up; waiting up to " +
- (maxWaitTime - now) + "ms", e);
- }
- Thread.sleep(100);
- i--; // reset the try count
- continue;
- }
- } else if (e instanceof java.net.SocketTimeoutException
- && this.serverManager.isServerOnline(destination)) {
- // In case socket is timed out and the region server is still online,
- // the openRegion RPC could have been accepted by the server and
- // just the response didn't go through. So we will retry to
- // open the region on the same server.
- if (LOG.isDebugEnabled()) {
- LOG.debug("Bulk assigner openRegion() to " + destination
- + " has timed out, but the regions might"
- + " already be opened on it.", e);
- }
- // wait and reset the re-try count, server might be just busy.
- Thread.sleep(100);
- i--;
- continue;
- } else if (e instanceof FailedServerException && i < maximumAttempts) {
- // In case the server is in the failed server list, no point to
- // retry too soon. Retry after the failed_server_expiry time
- long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
- RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
- if (LOG.isDebugEnabled()) {
- LOG.debug(destination + " is on failed server list; waiting "
- + sleepTime + "ms", e);
- }
- Thread.sleep(sleepTime);
- continue;
- }
- throw e;
- }
- }
- } catch (IOException e) {
- // Can be a socket timeout, EOF, NoRouteToHost, etc
- LOG.info("Unable to communicate with " + destination
- + " in order to assign regions, ", e);
- for (RegionState state: states) {
- HRegionInfo region = state.getRegion();
- forceRegionStateToOffline(region, true);
- }
- return false;
- }
- } finally {
- for (Lock lock : locks.values()) {
- lock.unlock();
- }
- }
-
- if (!failedToOpenRegions.isEmpty()) {
- for (HRegionInfo region : failedToOpenRegions) {
- if (!regionStates.isRegionOnline(region)) {
- invokeAssign(region);
- }
- }
- }
-
- // wait for assignment completion
- ArrayList<HRegionInfo> userRegionSet = new ArrayList<>(regions.size());
- for (HRegionInfo region: regions) {
- if (!region.getTable().isSystemTable()) {
- userRegionSet.add(region);
- }
- }
- if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
- System.currentTimeMillis())) {
- LOG.debug("some user regions are still in transition: " + userRegionSet);
- }
- LOG.debug("Bulk assigning done for " + destination);
- return true;
- } finally {
- metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
- }
- }
-
- /**
- * Send CLOSE RPC if the server is online, otherwise, offline the region.
- *
- * The RPC will be sent only to the region sever found in the region state
- * if it is passed in, otherwise, to the src server specified. If region
- * state is not specified, we don't update region state at all, instead
- * we just send the RPC call. This is useful for some cleanup without
- * messing around the region states (see handleRegion, on region opened
- * on an unexpected server scenario, for an example)
- */
- private void unassign(final HRegionInfo region,
- final ServerName server, final ServerName dest) {
- for (int i = 1; i <= this.maximumAttempts; i++) {
- if (this.server.isStopped() || this.server.isAborted()) {
- LOG.debug("Server stopped/aborted; skipping unassign of " + region);
- return;
- }
- if (!serverManager.isServerOnline(server)) {
- LOG.debug("Offline " + region.getRegionNameAsString()
- + ", no need to unassign since it's on a dead server: " + server);
- regionStates.updateRegionState(region, State.OFFLINE);
- return;
- }
- try {
- // Send CLOSE RPC
- if (serverManager.sendRegionClose(server, region, dest)) {
- LOG.debug("Sent CLOSE to " + server + " for region " +
- region.getRegionNameAsString());
- return;
- }
- // This never happens. Currently regionserver close always return true.
- // Todo; this can now happen (0.96) if there is an exception in a coprocessor
- LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
- region.getRegionNameAsString());
- } catch (Throwable t) {
- long sleepTime = 0;
- Configuration conf = this.server.getConfiguration();
- if (t instanceof RemoteException) {
- t = ((RemoteException)t).unwrapRemoteException();
- }
- if (t instanceof RegionServerAbortedException
- || t instanceof RegionServerStoppedException
- || t instanceof ServerNotRunningYetException) {
- // RS is aborting, we cannot offline the region since the region may need to do WAL
- // recovery. Until we see the RS expiration, we should retry.
- sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
- RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
-
- } else if (t instanceof NotServingRegionException) {
- LOG.debug("Offline " + region.getRegionNameAsString()
- + ", it's not any more on " + server, t);
- regionStates.updateRegionState(region, State.OFFLINE);
- return;
- } else if (t instanceof FailedServerException && i < maximumAttempts) {
- // In case the server is in the failed server list, no point to
- // retry too soon. Retry after the failed_server_expiry time
- sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
- RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
- if (LOG.isDebugEnabled()) {
- LOG.debug(server + " is on failed server list; waiting " + sleepTime + "ms", t);
- }
- }
- try {
- if (sleepTime > 0) {
- Thread.sleep(sleepTime);
- }
- } catch (InterruptedException ie) {
- LOG.warn("Interrupted unassign " + region.getRegionNameAsString(), ie);
- Thread.currentThread().interrupt();
- regionStates.updateRegionState(region, State.FAILED_CLOSE);
- return;
- }
- LOG.info("Server " + server + " returned " + t + " for "
- + region.getRegionNameAsString() + ", try=" + i
- + " of " + this.maximumAttempts, t);
- }
- }
- // Run out of attempts
- regionStates.updateRegionState(region, State.FAILED_CLOSE);
- }
-
- /**
- * Set region to OFFLINE unless it is opening and forceNewPlan is false.
- */
- private RegionState forceRegionStateToOffline(
- final HRegionInfo region, final boolean forceNewPlan) {
- RegionState state = regionStates.getRegionState(region);
- if (state == null) {
- LOG.warn("Assigning but not in region states: " + region);
- state = regionStates.createRegionState(region);
- }
-
- if (forceNewPlan && LOG.isDebugEnabled()) {
- LOG.debug("Force region state offline " + state);
- }
-
- switch (state.getState()) {
- case OPEN:
- case OPENING:
- case PENDING_OPEN:
- case CLOSING:
- case PENDING_CLOSE:
- if (!forceNewPlan) {
- LOG.debug("Skip assigning " +
- region + ", it is already " + state);
- return null;
- }
- case FAILED_CLOSE:
- case FAILED_OPEN:
- regionStates.updateRegionState(region, State.PENDING_CLOSE);
- unassign(region, state.getServerName(), null);
- state = regionStates.getRegionState(region);
- if (!state.isOffline() && !state.isClosed()) {
- // If the region isn't offline, we can't re-assign
- // it now. It will be assigned automatically after
- // the regionserver reports it's closed.
- return null;
- }
- case OFFLINE:
- case CLOSED:
- break;
- default:
- LOG.error("Trying to assign region " + region
- + ", which is " + state);
- return null;
- }
- return state;
- }
-
- /**
- * Caller must hold lock on the passed <code>state</code> object.
- * @param state
- * @param forceNewPlan
- */
- private void assign(RegionState state, boolean forceNewPlan) {
- long startTime = EnvironmentEdgeManager.currentTime();
- try {
- Configuration conf = server.getConfiguration();
- RegionPlan plan = null;
- long maxWaitTime = -1;
- HRegionInfo region = state.getRegion();
- Throwable previousException = null;
- for (int i = 1; i <= maximumAttempts; i++) {
- if (server.isStopped() || server.isAborted()) {
- LOG.info("Skip assigning " + region.getRegionNameAsString()
- + ", the server is stopped/aborted");
- return;
- }
-
- if (plan == null) { // Get a server for the region at first
- try {
- plan = getRegionPlan(region, forceNewPlan);
- } catch (HBaseIOException e) {
- LOG.warn("Failed to get region plan", e);
- }
- }
-
- if (plan == null) {
- LOG.warn("Unable to determine a plan to assign " + region);
-
- // For meta region, we have to keep retrying until succeeding
- if (region.isMetaRegion()) {
- if (i == maximumAttempts) {
- i = 0; // re-set attempt count to 0 for at least 1 retry
-
- LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
- " after maximumAttempts (" + this.maximumAttempts +
- "). Reset attempts count and continue retrying.");
- }
- waitForRetryingMetaAssignment();
- continue;
- }
-
- regionStates.updateRegionState(region, State.FAILED_OPEN);
- return;
- }
- LOG.info("Assigning " + region.getRegionNameAsString() +
- " to " + plan.getDestination());
- // Transition RegionState to PENDING_OPEN
- regionStates.updateRegionState(region,
- State.PENDING_OPEN, plan.getDestination());
-
- boolean needNewPlan = false;
- final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
- " to " + plan.getDestination();
- try {
- List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
- if (shouldAssignFavoredNodes(region)) {
- favoredNodes = server.getFavoredNodesManager().getFavoredNodesWithDNPort(region);
- }
- serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
- return; // we're done
- } catch (Throwable t) {
- if (t instanceof RemoteException) {
- t = ((RemoteException) t).unwrapRemoteException();
- }
- previousException = t;
-
- // Should we wait a little before retrying? If the server is starting it's yes.
- boolean hold = (t instanceof ServerNotRunningYetException);
-
- // In case socket is timed out and the region server is still online,
- // the openRegion RPC could have been accepted by the server and
- // just the response didn't go through. So we will retry to
- // open the region on the same server.
- boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
- && this.serverManager.isServerOnline(plan.getDestination()));
-
- if (hold) {
- LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
- "try=" + i + " of " + this.maximumAttempts, t);
-
- if (maxWaitTime < 0) {
- maxWaitTime = EnvironmentEdgeManager.currentTime()
- + this.server.getConfiguration().getLong(
- "hbase.regionserver.rpc.startup.waittime", 60000);
- }
- try {
- long now = EnvironmentEdgeManager.currentTime();
- if (now < maxWaitTime) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Server is not yet up; waiting up to "
- + (maxWaitTime - now) + "ms", t);
- }
- Thread.sleep(100);
- i--; // reset the try count
- } else {
- LOG.debug("Server is not up for a while; try a new one", t);
- needNewPlan = true;
- }
- } catch (InterruptedException ie) {
- LOG.warn("Failed to assign "
- + region.getRegionNameAsString() + " since interrupted", ie);
- regionStates.updateRegionState(region, State.FAILED_OPEN);
- Thread.currentThread().interrupt();
- return;
- }
- } else if (retry) {
- i--; // we want to retry as many times as needed as long as the RS is not dead.
- if (LOG.isDebugEnabled()) {
- LOG.debug(assignMsg + ", trying to assign to the same region server due ", t);
- }
- } else {
- needNewPlan = true;
- LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
- " try=" + i + " of " + this.maximumAttempts, t);
- }
- }
-
- if (i == this.maximumAttempts) {
- // For meta region, we have to keep retrying until succeeding
- if (region.isMetaRegion()) {
- i = 0; // re-set attempt count to 0 for at least 1 retry
- LOG.warn(assignMsg +
- ", trying to assign a hbase:meta region reached to maximumAttempts (" +
- this.maximumAttempts + "). Reset attempt counts and continue retrying.");
- waitForRetryingMetaAssignment();
- }
- else {
- // Don't reset the region state or get a new plan any more.
- // This is the last try.
- continue;
- }
- }
-
- // If region opened on destination of present plan, reassigning to new
- // RS may cause double assignments. In case of RegionAlreadyInTransitionException
- // reassigning to same RS.
- if (needNewPlan) {
- // Force a new plan and reassign. Will return null if no servers.
- // The new plan could be the same as the existing plan since we don't
- // exclude the server of the original plan, which should not be
- // excluded since it could be the only server up now.
- RegionPlan newPlan = null;
- try {
- newPlan = getRegionPlan(region, true);
- } catch (HBaseIOException e) {
- LOG.warn("Failed to get region plan", e);
- }
- if (newPlan == null) {
- regionStates.updateRegionState(region, State.FAILED_OPEN);
- LOG.warn("Unable to find a viable location to assign region " +
- region.getRegionNameAsString());
- return;
- }
-
- if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
- // Clean out plan we failed execute and one that doesn't look like it'll
- // succeed anyways; we need a new plan!
- // Transition back to OFFLINE
- regionStates.updateRegionState(region, State.OFFLINE);
- plan = newPlan;
- } else if(plan.getDestination().equals(newPlan.getDestination()) &&
- previousException instanceof FailedServerException) {
- try {
- LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
- " to the same failed server.");
- Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
- RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
- } catch (InterruptedException ie) {
- LOG.warn("Failed to assign "
- + region.getRegionNameAsString() + " since interrupted", ie);
- regionStates.updateRegionState(region, State.FAILED_OPEN);
- Thread.currentThread().interrupt();
- return;
- }
- }
- }
- }
- // Run out of attempts
- regionStates.updateRegionState(region, State.FAILED_OPEN);
- } finally {
- metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
- }
- }
-
- private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
- if (this.tableStateManager.isTableState(region.getTable(),
- TableState.State.DISABLED,
- TableState.State.DISABLING) || replicasToClose.contains(region)) {
- LOG.info("Table " + region.getTable() + " is disabled or disabling;"
- + " skipping assign of " + region.getRegionNameAsString());
- offlineDisabledRegion(region);
- return true;
- }
- return false;
- }
-
- /**
- * @param region the region to assign
- * @param forceNewPlan If true, then if an existing plan exists, a new plan
- * will be generated.
- * @return Plan for passed <code>region</code> (If none currently, it creates one or
- * if no servers to assign, it returns null).
- */
- private RegionPlan getRegionPlan(final HRegionInfo region,
- final boolean forceNewPlan) throws HBaseIOException {
- // Pickup existing plan or make a new one
- final String encodedName = region.getEncodedName();
- final List<ServerName> destServers =
- serverManager.createDestinationServersList();
-
- if (destServers.isEmpty()){
- LOG.warn("Can't move " + encodedName +
- ", there is no destination server available.");
- return null;
- }
-
- RegionPlan randomPlan = null;
- boolean newPlan = false;
- RegionPlan existingPlan;
-
- synchronized (this.regionPlans) {
- existingPlan = this.regionPlans.get(encodedName);
-
- if (existingPlan != null && existingPlan.getDestination() != null) {
- LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
- + " destination server is " + existingPlan.getDestination() +
- " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
- }
-
- if (forceNewPlan
- || existingPlan == null
- || existingPlan.getDestination() == null
- || !destServers.contains(existingPlan.getDestination())) {
- newPlan = true;
- try {
- randomPlan = new RegionPlan(region, null,
- balancer.randomAssignment(region, destServers));
- } catch (IOException ex) {
- LOG.warn("Failed to create new plan.",ex);
- return null;
- }
- this.regionPlans.put(encodedName, randomPlan);
- }
- }
-
- if (newPlan) {
- if (randomPlan.getDestination() == null) {
- LOG.warn("Can't find a destination for " + encodedName);
- return null;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("No previous transition plan found (or ignoring " +
- "an existing plan) for " + region.getRegionNameAsString() +
- "; generated random plan=" + randomPlan + "; " + destServers.size() +
- " (online=" + serverManager.getOnlineServers().size() +
- ") available servers, forceNewPlan=" + forceNewPlan);
- }
- return randomPlan;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Using pre-existing plan for " +
- region.getRegionNameAsString() + "; plan=" + existingPlan);
- }
- return existingPlan;
- }
-
- /**
- * Wait for some time before retrying meta table region assignment
- */
- private void waitForRetryingMetaAssignment() {
- try {
- Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
- } catch (InterruptedException e) {
- LOG.error("Got exception while waiting for hbase:meta assignment");
- Thread.currentThread().interrupt();
- }
- }
-
- /**
- * Unassigns the specified region.
- * <p>
- * Updates the RegionState and sends the CLOSE RPC unless region is being
- * split by regionserver; then the unassign fails (silently) because we
- * presume the region being unassigned no longer exists (its been split out
- * of existence). TODO: What to do if split fails and is rolled back and
- * parent is revivified?
- * <p>
- * If a RegionPlan is already set, it will remain.
- *
- * @param region server to be unassigned
- */
- public void unassign(HRegionInfo region) {
- unassign(region, null);
- }
-
-
- /**
- * Unassigns the specified region.
- * <p>
- * Updates the RegionState and sends the CLOSE RPC unless region is being
- * split by regionserver; then the unassign fails (silently) because we
- * presume the region being unassigned no longer exists (its been split out
- * of existence). TODO: What to do if split fails and is rolled back and
- * parent is revivified?
- * <p>
- * If a RegionPlan is already set, it will remain.
- *
- * @param region server to be unassigned
- * @param dest the destination server of the region
- */
- public void unassign(HRegionInfo region, ServerName dest) {
- // TODO: Method needs refactoring. Ugly buried returns throughout. Beware!
- LOG.debug("Starting unassign of " + region.getRegionNameAsString()
- + " (offlining), current state: " + regionStates.getRegionState(region));
-
- String encodedName = region.getEncodedName();
- // Grab the state of this region and synchronize on it
- // We need a lock here as we're going to do a put later and we don't want multiple states
- // creation
- ReentrantLock lock = locker.acquireLock(encodedName);
- RegionState state = regionStates.getRegionTransitionState(encodedName);
- try {
- if (state == null || state.isFailedClose()) {
- if (state == null) {
- // Region is not in transition.
- // We can unassign it only if it's not SPLIT/MERGED.
- state = regionStates.getRegionState(encodedName);
- if (state != null && state.isUnassignable()) {
- LOG.info("Attempting to unassign " + state + ", ignored");
- // Offline region will be reassigned below
- return;
- }
- if (state == null || state.getServerName() == null) {
- // We don't know where the region is, offline it.
- // No need to send CLOSE RPC
- LOG.warn("Attempting to unassign a region not in RegionStates "
- + region.getRegionNameAsString() + ", offlined");
- regionOffline(region);
- return;
- }
- }
- state = regionStates.updateRegionState(
- region, State.PENDING_CLOSE);
- } else if (state.isFailedOpen()) {
- // The region is not open yet
- regionOffline(region);
- return;
- } else {
- LOG.debug("Attempting to unassign " +
- region.getRegionNameAsString() + " but it is " +
- "already in transition (" + state.getState());
- return;
- }
-
- unassign(region, state.getServerName(), dest);
- } finally {
- lock.unlock();
-
- // Region is expected to be reassigned afterwards
- if (!replicasToClose.contains(region)
- && regionStates.isRegionInState(region, State.OFFLINE)) {
- assign(region);
- }
- }
- }
-
- /**
- * Used by unit tests. Return the number of regions opened so far in the life
- * of the master. Increases by one every time the master opens a region
- * @return the counter value of the number of regions opened so far
- */
- public int getNumRegionsOpened() {
- return numRegionsOpened.get();
- }
-
- /**
- * Waits until the specified region has completed assignment.
- * <p>
- * If the region is already assigned, returns immediately. Otherwise, method
- * blocks until the region is assigned.
- * @param regionInfo region to wait on assignment for
- * @return true if the region is assigned false otherwise.
- * @throws InterruptedException
- */
- public boolean waitForAssignment(HRegionInfo regionInfo)
- throws InterruptedException {
- ArrayList<HRegionInfo> regionSet = new ArrayList<>(1);
- regionSet.add(regionInfo);
- return waitForAssignment(regionSet, true, Long.MAX_VALUE);
- }
-
- /**
- * Waits until the specified region has completed assignment, or the deadline is reached.
- */
- protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
- final boolean waitTillAllAssigned, final int reassigningRegions,
- final long minEndTime) throws InterruptedException {
- long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
- if (deadline < 0) { // Overflow
- deadline = Long.MAX_VALUE; // wait forever
- }
- return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
- }
-
- /**
- * Waits until the specified region has completed assignment, or the deadline is reached.
- * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
- * @param waitTillAllAssigned true if we should wait all the regions to be assigned
- * @param deadline the timestamp after which the wait is aborted
- * @return true if all the regions are assigned false otherwise.
- * @throws InterruptedException
- */
- protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
- final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
- // We're not synchronizing on regionsInTransition now because we don't use any iterator.
- while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
- int failedOpenCount = 0;
- Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
- while (regionInfoIterator.hasNext()) {
- HRegionInfo hri = regionInfoIterator.next();
- if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
- State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
- regionInfoIterator.remove();
- } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
- failedOpenCount++;
- }
- }
- if (!waitTillAllAssigned) {
- // No need to wait, let assignment going on asynchronously
- break;
- }
- if (!regionSet.isEmpty()) {
- if (failedOpenCount == regionSet.size()) {
- // all the regions we are waiting had an error on open.
- break;
- }
- regionStates.waitForUpdate(100);
- }
- }
- return regionSet.isEmpty();
- }
-
- /**
- * Assigns the hbase:meta region or a replica.
- * <p>
- * Assumes that hbase:meta is currently closed and is not being actively served by
- * any RegionServer.
- * @param hri TODO
- */
- public void assignMeta(HRegionInfo hri) throws KeeperException {
- regionStates.updateRegionState(hri, State.OFFLINE);
- assign(hri);
- }
-
- /**
- * Assigns specified regions retaining assignments, if any.
- * <p>
- * This is a synchronous call and will return once every region has been
- * assigned. If anything fails, an exception is thrown
- * @throws InterruptedException
- * @throws IOException
- */
- public void assign(Map<HRegionInfo, ServerName> regions)
- throws IOException, InterruptedException {
- if (regions == null || regions.isEmpty()) {
- return;
- }
- List<ServerName> servers = serverManager.createDestinationServersList();
- if (servers == null || servers.isEmpty()) {
- throw new IOException("Found no destination server to assign region(s)");
- }
-
- // Reuse existing assignment info
- Map<ServerName, List<HRegionInfo>> bulkPlan =
- balancer.retainAssignment(regions, servers);
- if (bulkPlan == null) {
- throw new IOException("Unable to determine a plan to assign region(s)");
- }
-
- processBogusAssignments(bulkPlan);
-
- assign(regions.size(), servers.size(),
- "retainAssignment=true", bulkPlan);
- }
-
- /**
- * Assigns specified regions round robin, if any.
- * <p>
- * This is a synchronous call and will return once every region has been
- * assigned. If anything fails, an exception is thrown
- * @throws InterruptedException
- * @throws IOException
- */
- public void assign(List<HRegionInfo> regions)
- throws IOException, InterruptedException {
- if (regions == null || regions.isEmpty()) {
- return;
- }
-
- List<ServerName> servers = serverManager.createDestinationServersList();
- if (servers == null || servers.isEmpty()) {
- throw new IOException("Found no destination server to assign region(s)");
- }
-
- // Generate a round-robin bulk assignment plan
- Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
- if (bulkPlan == null) {
- throw new IOException("Unable to determine a plan to assign region(s)");
- }
-
- processBogusAssignments(bulkPlan);
-
- assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
- }
-
- private void assign(int regions, int totalServers,
- String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
- throws InterruptedException, IOException {
-
- int servers = bulkPlan.size();
- if (servers == 1 || (regions < bulkAssignThresholdRegions
- && servers < bulkAssignThresholdServers)) {
-
- // Not use bulk assignment. This could be more efficient in small
- // cluster, especially mini cluster for testing, so that tests won't time out
- if (LOG.isTraceEnabled()) {
- LOG.trace("Not using bulk assignment since we are assigning only " + regions +
- " region(s) to " + servers + " server(s)");
- }
-
- // invoke assignment (async)
- ArrayList<HRegionInfo> userRegionSet = new ArrayList<>(regions);
- for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
- if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
- for (HRegionInfo region: plan.getValue()) {
- if (!regionStates.isRegionOnline(region)) {
- invokeAssign(region);
- if (!region.getTable().isSystemTable()) {
- userRegionSet.add(region);
- }
- }
- }
- }
- }
-
- // wait for assignment completion
- if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
- System.currentTimeMillis())) {
- LOG.debug("some user regions are still in transition: " + userRegionSet);
- }
- } else {
- LOG.info("Bulk assigning " + regions + " region(s) across "
- + totalServers + " server(s), " + message);
-
- // Use fixed count thread pool assigning.
- BulkAssigner ba = new GeneralBulkAssigner(
- this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
- ba.bulkAssign();
- LOG.info("Bulk assigning done");
- }
- }
-
- /**
- * Assigns all user regions, if any exist. Used during cluster startup.
- * <p>
- * This is a synchronous call and will return once every region has been
- * assigned. If anything fails, an exception is thrown and the cluster
- * should be shutdown.
- * @throws InterruptedException
- * @throws IOException
- */
- private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
- throws IOException, InterruptedException {
- if (allRegions == null || allRegions.isEmpty()) return;
-
- // Determine what type of assignment to do on startup
- boolean retainAssignment = server.getConfiguration().
- getBoolean("hbase.master.startup.retainassign", true);
-
- Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
- if (retainAssignment) {
- assign(allRegions);
- } else {
- List<HRegionInfo> regions = new ArrayList<>(regionsFromMetaScan);
- assign(regions);
- }
-
- for (HRegionInfo hri : regionsFromMetaScan) {
- TableName tableName = hri.getTable();
- if (!tableStateManager.isTableState(tableName,
- TableState.State.ENABLED)) {
- setEnabledTable(tableName);
- }
- }
- // assign all the replicas that were not recorded in the meta
- assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
- }
-
- /**
- * Get number of replicas of a table
- */
- private static int getNumReplicas(MasterServices master, TableName table) {
- int numReplica = 1;
- try {
- HTableDescriptor htd = master.getTableDescriptors().get(table);
- if (htd == null) {
- LOG.warn("master can not get TableDescriptor from table '" + table);
- } else {
- numReplica = htd.getRegionReplication();
- }
- } catch (IOException e){
- LOG.warn("Couldn't get the replication attribute of the table " + table + " due to "
- + e.getMessage());
- }
- return numReplica;
- }
-
- /**
- * Get a list of replica regions that are:
- * not recorded in meta yet. We might not have recorded the locations
- * for the replicas since the replicas may not have been online yet, master restarted
- * in the middle of assigning, ZK erased, etc.
- * @param regionsRecordedInMeta the list of regions we know are recorded in meta
- * either as a default, or, as the location of a replica
- * @param master
- * @return list of replica regions
- * @throws IOException
- */
- public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
- Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
- List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<>();
- for (HRegionInfo hri : regionsRecordedInMeta) {
- TableName table = hri.getTable();
- if(master.getTableDescriptors().get(table) == null)
- continue;
- int desiredRegionReplication = getNumReplicas(master, table);
- for (int i = 0; i < desiredRegionReplication; i++) {
- HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
- if (regionsRecordedInMeta.contains(replica)) continue;
- regionsNotRecordedInMeta.add(replica);
- }
- }
- return regionsNotRecordedInMeta;
- }
-
- /**
- * Rebuild the list of user regions and assignment information.
- * Updates regionstates with findings as we go through list of regions.
- * @return set of servers not online that hosted some regions according to a scan of hbase:meta
- * @throws IOException
- */
- Set<ServerName> rebuildUserRegions() throws
- IOException, KeeperException {
- Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
- TableState.State.DISABLED, TableState.State.ENABLING);
-
- Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
- TableState.State.DISABLED,
- TableState.State.DISABLING,
- TableState.State.ENABLING);
-
- // Region assignment from META
- List<Result> results = MetaTableAccessor.fullScanRegions(server.getConnection());
- // Get any new but slow to checkin region server that joined the cluster
- Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
- // Set of offline servers to be returned
- Set<ServerName> offlineServers = new HashSet<>();
- // Iterate regions in META
- for (Result result : results) {
- if (result == null && LOG.isDebugEnabled()){
- LOG.debug("null result from meta - ignoring but this is strange.");
- continue;
- }
- // keep a track of replicas to close. These were the replicas of the originally
- // unmerged regions. The master might have closed them before but it mightn't
- // maybe because it crashed.
- PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
- if (p.getFirst() != null && p.getSecond() != null) {
- int numReplicas = getNumReplicas(server, p.getFirst().getTable());
- for (HRegionInfo merge : p) {
- for (int i = 1; i < numReplicas; i++) {
- replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
- }
- }
- }
- RegionLocations rl = MetaTableAccessor.getRegionLocations(result);
- if (rl == null) {
- continue;
- }
- HRegionLocation[] locations = rl.getRegionLocations();
- if (locations == null) {
- continue;
- }
- for (HRegionLocation hrl : locations) {
- if (hrl == null) continue;
- HRegionInfo regionInfo = hrl.getRegionInfo();
- if (regionInfo == null) continue;
- int replicaId = regionInfo.getReplicaId();
- State state = RegionStateStore.getRegionState(result, replicaId);
- // keep a track of replicas to close. These were the replicas of the split parents
- // from the previous life of the master. The master should have closed them before
- // but it couldn't maybe because it crashed
- if (replicaId == 0 && state.equals(State.SPLIT)) {
- for (HRegionLocation h : locations) {
- replicasToClose.add(h.getRegionInfo());
- }
- }
- ServerName lastHost = hrl.getServerName();
- ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
- regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
- if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
- // Region is not open (either offline or in transition), skip
- continue;
- }
- TableName tableName = regionInfo.getTable();
- if (!onlineServers.contains(regionLocation)) {
- // Region is located on a server that isn't online
- offlineServers.add(regionLocation);
- } else if (!disabledOrEnablingTables.contains(tableName)) {
- // Region is being served and on an active server
- // add only if region not in disabled or enabling table
- regionStates.regionOnline(regionInfo, regionLocation);
- balancer.regionOnline(regionInfo, regionLocation);
- }
- // need to enable the table if not disabled or disabling or enabling
- // this will be used in rolling restarts
- if (!disabledOrDisablingOrEnabling.contains(tableName)
- && !getTableStateManager().isTableState(tableName,
- TableState.State.ENABLED)) {
- setEnabledTable(tableName);
- }
- }
- }
- return offlineServers;
- }
-
- /**
- * Processes list of regions in transition at startup
- */
- void processRegionsInTransition(Collection<RegionState> regionsInTransition) {
- // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
- // in case the RPC call is not sent out yet before the master was shut down
- // since we update the state before we send the RPC call. We can't update
- // the state after the RPC call. Otherwise, we don't know what's happened
- // to the region if the master dies right after the RPC call is out.
- for (RegionState regionState: regionsInTransition) {
- LOG.info("Processing " + regionState);
- ServerName serverName = regionState.getServerName();
- // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
- // case, try assigning it here.
- if (serverName != null && !serverManager.getOnlineServers().containsKey(serverName)) {
- LOG.info("Server " + serverName + " isn't online. SSH will handle this");
- continue; // SSH will handle it
- }
- HRegionInfo regionInfo = regionState.getRegion();
- RegionState.State state = regionState.getState();
- switch (state) {
- case CLOSED:
- invokeAssign(regionState.getRegion());
- break;
- case PENDING_OPEN:
- retrySendRegionOpen(regionState);
- break;
- case PENDING_CLOSE:
- retrySendRegionClose(regionState);
- break;
- case FAILED_CLOSE:
- case FAILED_OPEN:
- invokeUnAssign(regionInfo);
- break;
- default:
- // No process for other states
- break;
- }
- }
- }
-
- /**
- * At master failover, for pending_open region, make sure
- * sendRegionOpen RPC call is sent to the target regionserver
- */
- private void retrySendRegionOpen(final RegionState regionState) {
- this.executorService.submit(
- new EventHandler(server, EventType.M_MASTER_RECOVERY) {
- @Override
- public void process() throws IOException {
- HRegionInfo hri = regionState.getRegion();
- ServerName serverName = regionState.getServerName();
- ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
- try {
- for (int i = 1; i <= maximumAttempts; i++) {
- if (!serverManager.isServerOnline(serverName)
- || server.isStopped() || server.isAborted()) {
- return; // No need any more
- }
- try {
- if (!regionState.equals(regionStates.getRegionState(hri))) {
- return; // Region is not in the expected state any more
- }
- List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
- if (shouldAssignFavoredNodes(hri)) {
- FavoredNodesManager fnm = ((MasterServices)server).getFavoredNodesManager();
- favoredNodes = fnm.getFavoredNodesWithDNPort(hri);
- }
- serverManager.sendRegionOpen(serverName, hri, favoredNodes);
- return; // we're done
- } catch (Throwable t) {
- if (t instanceof RemoteException) {
- t = ((RemoteException) t).unwrapRemoteException();
- }
- if (t instanceof FailedServerException && i < maximumAttempts) {
- // In case the server is in the failed server list, no point to
- // retry too soon. Retry after the failed_server_expiry time
- try {
- Configuration conf = this.server.getConfiguration();
- long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
- RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
- if (LOG.isDebugEnabled()) {
- LOG.debug(serverName + " is on failed server list; waiting "
- + sleepTime + "ms", t);
- }
- Thread.sleep(sleepTime);
- continue;
- } catch (InterruptedException ie) {
- LOG.warn("Failed to assign "
- + hri.getRegionNameAsString() + " since interrupted", ie);
- regionStates.updateRegionState(hri, State.FAILED_OPEN);
- Thread.currentThread().interrupt();
- return;
- }
- }
- if (serverManager.isServerOnline(serverName)
- && t instanceof java.net.SocketTimeoutException) {
- i--; // reset the try count
- } else {
- LOG.info("Got exception in retrying sendRegionOpen for "
- + regionState + "; try=" + i + " of " + maximumAttempts, t);
- }
- Threads.sleep(100);
- }
- }
- // Run out of attempts
- regionStates.updateRegionState(hri, State.FAILED_OPEN);
- } finally {
- lock.unlock();
- }
- }
- });
- }
-
- /**
- * At master failover, for pending_close region, make sure
- * sendRegionClose RPC call is sent to the target regionserver
- */
- private void retrySendRegionClose(final RegionState regionState) {
- this.executorService.submit(
- new EventHandler(server, EventType.M_MASTER_RECOVERY) {
- @Override
- public void process() throws IOException {
- HRegionInfo hri = regionState.getRegion();
- ServerName serverName = regionState.getServerName();
- ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
- try {
- for (int i = 1; i <= maximumAttempts; i++) {
- if (!serverManager.isServerOnline(serverName)
- || server.isStopped() || server.isAborted()) {
- return; // No need any more
- }
- try {
- if (!regionState.equals(regionStates.getRegionState(hri))) {
- return; // Region is not in the expected state any more
- }
- serverManager.sendRegionClose(serverName, hri, null);
- return; // Done.
- } catch (Throwable t) {
- if (t instanceof RemoteException) {
- t = ((RemoteException) t).unwrapRemoteException();
- }
- if (t instanceof FailedServerException && i < maximumAttempts) {
- // In case the server is in the failed server list, no point to
- // retry too soon. Retry after the failed_server_expiry time
- try {
- Configuration conf = this.server.getConfiguration();
- long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
- RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
- if (LOG.isDebugEnabled()) {
- LOG.debug(serverName + " is on failed server list; waiting "
- + sleepTime + "ms", t);
- }
- Thread.sleep(sleepTime);
- continue;
- } catch (InterruptedException ie) {
- LOG.warn("Failed to unassign "
- + hri.getRegionNameAsString() + " since interrupted", ie);
- regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE);
- Thread.currentThread().interrupt();
- return;
- }
- }
- if (serverManager.isServerOnline(serverName)
- && t instanceof java.net.SocketTimeoutException) {
- i--; // reset the try count
- } else {
- LOG.info("Got exception in retrying sendRegionClose for "
- + regionState + "; try=" + i + " of " + maximumAttempts, t);
- }
- Threads.sleep(100);
- }
- }
- // Run out of attempts
- regionStates.updateRegionState(hri, State.FAILED_CLOSE);
- } finally {
- lock.unlock();
- }
- }
- });
- }
-
- /**
- * Set Regions in transitions metrics.
- * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
- * This iterator is not fail fast, which may lead to stale read; but that's better than
- * creating a copy of the map for metrics computation, as this method will be invoked
- * on a frequent interval.
- */
- public void updateRegionsInTransitionMetrics() {
- long currentTime = System.currentTimeMillis();
- int totalRITs = 0;
- int totalRITsOverThreshold = 0;
- long oldestRITTime = 0;
- int ritThreshold = this.server.getConfiguration().
- getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
- for (RegionState state: regionStates.getRegionsInTransition()) {
- totalRITs++;
- long ritTime = currentTime - state.getStamp();
- if (ritTime > ritThreshold) { // more than the threshold
- totalRITsOverThreshold++;
- }
- if (oldestRITTime < ritTime) {
- oldestRITTime = ritTime;
- }
- }
- if (this.metricsAssignmentManager != null) {
- this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
- this.metricsAssignmentManager.updateRITCount(totalRITs);
- this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
- }
- }
-
- /**
- * @param region Region whose plan we are to clear.
- */
- private void clearRegionPlan(final HRegionInfo region) {
- synchronized (this.regionPlans) {
- this.regionPlans.remove(region.getEncodedName());
- }
- }
-
- /**
- * Wait on region to clear regions-in-transition.
- * @param hri Region to wait on.
- * @throws IOException
- */
- public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
- throws IOException, InterruptedException {
- waitOnRegionToClearRegionsInTransition(hri, -1L);
- }
-
- /**
- * Wait on region to clear regions-in-transition or time out
- * @param hri
- * @param timeOut Milliseconds to wait for current region to be out of transition state.
- * @return True when a region clears regions-in-transition before timeout otherwise false
- * @throws InterruptedException
- */
- public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
- throws InterruptedException {
- if (!regionStates.isRegionInTransition(hri)) {
- return true;
- }
- long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
- + timeOut;
- // There is already a timeout monitor on regions in transition so I
- // should not have to have one here too?
- LOG.info("Waiting for " + hri.getEncodedName() +
- " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
- while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
- regionStates.waitForUpdate(100);
- if (EnvironmentEdgeManager.currentTime() > end) {
- LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
- return false;
- }
- }
- if (this.server.isStopped()) {
- LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
- return false;
- }
- return true;
- }
-
- void invokeAssign(HRegionInfo regionInfo) {
- threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
- }
-
- void invokeAssignLater(HRegionInfo regionInfo, long sleepMillis) {
- scheduledThreadPoolExecutor.schedule(new DelayedAssignCallable(
- new AssignCallable(this, regionInfo)), sleepMillis, TimeUnit.MILLISECONDS);
- }
-
- void invokeUnAssign(HRegionInfo regionInfo) {
- threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
- }
-
- public boolean isCarryingMeta(ServerName serverName) {
- return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
- }
-
- public boolean isCarryingMetaReplica(ServerName serverName, int replicaId) {
- return isCarryingRegion(serverName,
- RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
- }
-
- public boolean isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
- return isCarryingRegion(serverName, metaHri);
- }
-
- /**
- * Check if the shutdown server carries the specific region.
- * @return whether the serverName currently hosts the region
- */
- private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
- RegionState regionState = regionStates.getRegionTransitionState(hri);
- ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
- if (transitionAddr != null) {
- boolean matchTransitionAddr = transitionAddr.equals(serverName);
- LOG.debug("Checking region=" + hri.getRegionNameAsString()
- + ", transitioning on server=" + matchTransitionAddr
- + " server being checked: " + serverName
- + ", matches=" + matchTransitionAddr);
- return matchTransitionAddr;
- }
-
- ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
- boolean matchAssignedAddr = serverName.equals(assignedAddr);
- LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
- + " is on server=" + assignedAddr + ", server being checked: "
- + serverName);
- return matchAssignedAddr;
- }
-
- /**
- * Clean out crashed server removing any assignments.
- * @param sn Server that went down.
- * @return list of regions in transition on this server
- */
- public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
- // Clean out any existing assignment plans for this server
- synchronized (this.regionPlans) {
- for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
- i.hasNext();) {
- Map.Entry<String, RegionPlan> e = i.next();
- ServerName otherSn = e.getValue().getDestination();
- // The name will be null if the region is planned for a random assign.
- if (otherSn != null && otherSn.equals(sn)) {
- // Use iterator's remove else we'll get CME
- i.remove();
- }
- }
- }
- List<HRegionInfo> rits = regionStates.serverOffline(sn);
- for (Iterator<HRegionInfo> it = rits.iterator(); it.hasNext(); ) {
- HRegionInfo hri = it.next();
- String encodedName = hri.getEncodedName();
-
- // We need a lock on the region as we could update it
- Lock lock = locker.acquireLock(encodedName);
- try {
- RegionState regionState = regionStates.getRegionTransitionState(encodedName);
- if (regionState == null
- || (regionState.getServerName() != null && !regionState.isOnServer(sn))
- || !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN,
- State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
- LOG.info("Skip " + regionState + " since it is not opening/failed_close"
- + " on the dead server any more: " + sn);
- it.remove();
- } else {
- if
<TRUNCATED>