You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/26 23:47:23 UTC
[15/59] [abbrv] hbase git commit: Revert "HBASE-14614 Procedure v2 -
Core Assignment Manager (Matteo Bertozzi)" Revert a mistaken commit!!!
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
deleted file mode 100644
index 864b7f1..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++ /dev/null
@@ -1,1709 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.master.assignment;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.PleaseHoldException;
-import org.apache.hadoop.hbase.RegionException;
-import org.apache.hadoop.hbase.RegionStateListener;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
-import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
-import org.apache.hadoop.hbase.favored.FavoredNodesManager;
-import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
-import org.apache.hadoop.hbase.master.AssignmentListener;
-import org.apache.hadoop.hbase.master.LoadBalancer;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
-import org.apache.hadoop.hbase.master.NoSuchProcedureException;
-import org.apache.hadoop.hbase.master.RegionPlan;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.master.ServerListener;
-import org.apache.hadoop.hbase.master.TableStateManager;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerState;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode;
-// TODO: why are they here?
-import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
-import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
-import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
-import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
-import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
-import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import org.apache.hadoop.hbase.quotas.QuotaExceededException;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Threads;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * The AssignmentManager is the coordinator for region assign/unassign operations.
- * <ul>
- * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
- * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
- * </ul>
- * Regions are created by CreateTable, Split, Merge.
- * Regions are deleted by DeleteTable, Split, Merge.
- * Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash.
- * Unassigns are triggered by DisableTable, Split, Merge
- */
-@InterfaceAudience.Private
-public class AssignmentManager implements ServerListener {
- private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
-
- // TODO: AMv2
- // - handle region migration from hbase1 to hbase2.
- // - handle sys table assignment first (e.g. acl, namespace)
- // - handle table priorities
- // - If ServerBusyException trying to update hbase:meta, we abort the Master
- // See updateRegionLocation in RegionStateStore.
- //
- // See also
- // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
- // for other TODOs.
-
- public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
- "hbase.assignment.bootstrap.thread.pool.size";
-
- public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
- "hbase.assignment.dispatch.wait.msec";
- private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
-
- public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
- "hbase.assignment.dispatch.wait.queue.max.size";
- private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
-
- public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
- "hbase.assignment.rit.chore.interval.msec";
- private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 5 * 1000;
-
- public static final String ASSIGN_MAX_ATTEMPTS =
- "hbase.assignment.maximum.attempts";
- private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = 10;
-
- /** Region in Transition metrics threshold time */
- public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
- "hbase.metrics.rit.stuck.warning.threshold";
- private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
-
- private final ProcedureEvent<?> metaInitializedEvent = new ProcedureEvent<>("meta initialized");
- private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
-
- /**
- * Indicator that AssignmentManager has recovered the region states so
- * that ServerCrashProcedure can be fully enabled and re-assign regions
- * of dead servers. So that when re-assignment happens, AssignmentManager
- * has proper region states.
- */
- private final ProcedureEvent<?> failoverCleanupDone = new ProcedureEvent<>("failover cleanup");
-
- /** Listeners that are called on assignment events. */
- private final CopyOnWriteArrayList<AssignmentListener> listeners =
- new CopyOnWriteArrayList<AssignmentListener>();
-
- // TODO: why is this different from the listeners (carried over from the old AM)
- private RegionStateListener regionStateListener;
-
- private final MetricsAssignmentManager metrics;
- private final RegionInTransitionChore ritChore;
- private final MasterServices master;
-
- private final AtomicBoolean running = new AtomicBoolean(false);
- private final RegionStates regionStates = new RegionStates();
- private final RegionStateStore regionStateStore;
-
- private final boolean shouldAssignRegionsWithFavoredNodes;
- private final int assignDispatchWaitQueueMaxSize;
- private final int assignDispatchWaitMillis;
- private final int assignMaxAttempts;
-
- private Thread assignThread;
-
- public AssignmentManager(final MasterServices master) {
- this(master, new RegionStateStore(master));
- }
-
- public AssignmentManager(final MasterServices master, final RegionStateStore stateStore) {
- this.master = master;
- this.regionStateStore = stateStore;
- this.metrics = new MetricsAssignmentManager();
-
- final Configuration conf = master.getConfiguration();
-
- // Only read favored nodes if using the favored nodes load balancer.
- this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class.isAssignableFrom(
- conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
-
- this.assignDispatchWaitMillis = conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY,
- DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
- this.assignDispatchWaitQueueMaxSize = conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY,
- DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
-
- this.assignMaxAttempts = Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS,
- DEFAULT_ASSIGN_MAX_ATTEMPTS));
-
- int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY,
- DEFAULT_RIT_CHORE_INTERVAL_MSEC);
- this.ritChore = new RegionInTransitionChore(ritChoreInterval);
- }
-
- public void start() throws IOException {
- if (!running.compareAndSet(false, true)) {
- return;
- }
-
- LOG.info("Starting assignment manager");
-
- // Register Server Listener
- master.getServerManager().registerListener(this);
-
- // Start the RegionStateStore
- regionStateStore.start();
-
- // Start the Assignment Thread
- startAssignmentThread();
- }
-
- public void stop() {
- if (!running.compareAndSet(true, false)) {
- return;
- }
-
- LOG.info("Stopping assignment manager");
-
- // The AM is started before the procedure executor,
- // but the actual work will be loaded/submitted only once we have the executor
- final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
-
- // Remove the RIT chore
- if (hasProcExecutor) {
- master.getMasterProcedureExecutor().removeChore(this.ritChore);
- }
-
- // Stop the Assignment Thread
- stopAssignmentThread();
-
- // Stop the RegionStateStore
- regionStates.clear();
- regionStateStore.stop();
-
- // Unregister Server Listener
- master.getServerManager().unregisterListener(this);
-
- // Update meta events (for testing)
- if (hasProcExecutor) {
- getProcedureScheduler().suspendEvent(metaLoadEvent);
- setFailoverCleanupDone(false);
- for (HRegionInfo hri: getMetaRegionSet()) {
- setMetaInitialized(hri, false);
- }
- }
- }
-
- public boolean isRunning() {
- return running.get();
- }
-
- public Configuration getConfiguration() {
- return master.getConfiguration();
- }
-
- public MetricsAssignmentManager getAssignmentManagerMetrics() {
- return metrics;
- }
-
- private LoadBalancer getBalancer() {
- return master.getLoadBalancer();
- }
-
- private MasterProcedureEnv getProcedureEnvironment() {
- return master.getMasterProcedureExecutor().getEnvironment();
- }
-
- private MasterProcedureScheduler getProcedureScheduler() {
- return getProcedureEnvironment().getProcedureScheduler();
- }
-
- protected int getAssignMaxAttempts() {
- return assignMaxAttempts;
- }
-
- /**
- * Add the listener to the notification list.
- * @param listener The AssignmentListener to register
- */
- public void registerListener(final AssignmentListener listener) {
- this.listeners.add(listener);
- }
-
- /**
- * Remove the listener from the notification list.
- * @param listener The AssignmentListener to unregister
- */
- public boolean unregisterListener(final AssignmentListener listener) {
- return this.listeners.remove(listener);
- }
-
- public void setRegionStateListener(final RegionStateListener listener) {
- this.regionStateListener = listener;
- }
-
- public RegionStates getRegionStates() {
- return regionStates;
- }
-
- public RegionStateStore getRegionStateStore() {
- return regionStateStore;
- }
-
- public List<ServerName> getFavoredNodes(final HRegionInfo regionInfo) {
- return this.shouldAssignRegionsWithFavoredNodes?
- ((FavoredStochasticBalancer)getBalancer()).getFavoredNodes(regionInfo):
- ServerName.EMPTY_SERVER_LIST;
- }
-
- // ============================================================================================
- // Table State Manager helpers
- // ============================================================================================
- TableStateManager getTableStateManager() {
- return master.getTableStateManager();
- }
-
- public boolean isTableEnabled(final TableName tableName) {
- return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
- }
-
- public boolean isTableDisabled(final TableName tableName) {
- return getTableStateManager().isTableState(tableName,
- TableState.State.DISABLED, TableState.State.DISABLING);
- }
-
- // ============================================================================================
- // META Helpers
- // ============================================================================================
- private boolean isMetaRegion(final HRegionInfo regionInfo) {
- return regionInfo.isMetaRegion();
- }
-
- public boolean isMetaRegion(final byte[] regionName) {
- return getMetaRegionFromName(regionName) != null;
- }
-
- public HRegionInfo getMetaRegionFromName(final byte[] regionName) {
- for (HRegionInfo hri: getMetaRegionSet()) {
- if (Bytes.equals(hri.getRegionName(), regionName)) {
- return hri;
- }
- }
- return null;
- }
-
- public boolean isCarryingMeta(final ServerName serverName) {
- for (HRegionInfo hri: getMetaRegionSet()) {
- if (isCarryingRegion(serverName, hri)) {
- return true;
- }
- }
- return false;
- }
-
- private boolean isCarryingRegion(final ServerName serverName, final HRegionInfo regionInfo) {
- // TODO: check for state?
- final RegionStateNode node = regionStates.getRegionNode(regionInfo);
- return(node != null && serverName.equals(node.getRegionLocation()));
- }
-
- private HRegionInfo getMetaForRegion(final HRegionInfo regionInfo) {
- //if (regionInfo.isMetaRegion()) return regionInfo;
- // TODO: handle multiple meta. if the region provided is not meta lookup
- // which meta the region belongs to.
- return HRegionInfo.FIRST_META_REGIONINFO;
- }
-
- // TODO: handle multiple meta.
- private static final Set<HRegionInfo> META_REGION_SET =
- Collections.singleton(HRegionInfo.FIRST_META_REGIONINFO);
- public Set<HRegionInfo> getMetaRegionSet() {
- return META_REGION_SET;
- }
-
- // ============================================================================================
- // META Event(s) helpers
- // ============================================================================================
- public boolean isMetaInitialized() {
- return metaInitializedEvent.isReady();
- }
-
- public boolean isMetaRegionInTransition() {
- return !isMetaInitialized();
- }
-
- public boolean waitMetaInitialized(final Procedure proc) {
- // TODO: handle multiple meta. should this wait on all meta?
- // this is used by the ServerCrashProcedure...
- return waitMetaInitialized(proc, HRegionInfo.FIRST_META_REGIONINFO);
- }
-
- public boolean waitMetaInitialized(final Procedure proc, final HRegionInfo regionInfo) {
- return getProcedureScheduler().waitEvent(
- getMetaInitializedEvent(getMetaForRegion(regionInfo)), proc);
- }
-
- private void setMetaInitialized(final HRegionInfo metaRegionInfo, final boolean isInitialized) {
- assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
- final ProcedureEvent metaInitEvent = getMetaInitializedEvent(metaRegionInfo);
- if (isInitialized) {
- getProcedureScheduler().wakeEvent(metaInitEvent);
- } else {
- getProcedureScheduler().suspendEvent(metaInitEvent);
- }
- }
-
- private ProcedureEvent getMetaInitializedEvent(final HRegionInfo metaRegionInfo) {
- assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
- // TODO: handle multiple meta.
- return metaInitializedEvent;
- }
-
- public boolean waitMetaLoaded(final Procedure proc) {
- return getProcedureScheduler().waitEvent(metaLoadEvent, proc);
- }
-
- protected void wakeMetaLoadedEvent() {
- getProcedureScheduler().wakeEvent(metaLoadEvent);
- assert isMetaLoaded() : "expected meta to be loaded";
- }
-
- public boolean isMetaLoaded() {
- return metaLoadEvent.isReady();
- }
-
- // ============================================================================================
- // TODO: Sync helpers
- // ============================================================================================
- public void assignMeta(final HRegionInfo metaRegionInfo) throws IOException {
- assignMeta(metaRegionInfo, null);
- }
-
- public void assignMeta(final HRegionInfo metaRegionInfo, final ServerName serverName)
- throws IOException {
- assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
- AssignProcedure proc;
- if (serverName != null) {
- LOG.debug("Try assigning Meta " + metaRegionInfo + " to " + serverName);
- proc = createAssignProcedure(metaRegionInfo, serverName);
- } else {
- LOG.debug("Assigning " + metaRegionInfo.getRegionNameAsString());
- proc = createAssignProcedure(metaRegionInfo, false);
- }
- ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
- }
-
- public void assign(final HRegionInfo regionInfo) throws IOException {
- assign(regionInfo, true);
- }
-
- public void assign(final HRegionInfo regionInfo, final boolean forceNewPlan) throws IOException {
- AssignProcedure proc = createAssignProcedure(regionInfo, forceNewPlan);
- ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
- }
-
- public void unassign(final HRegionInfo regionInfo) throws IOException {
- unassign(regionInfo, false);
- }
-
- public void unassign(final HRegionInfo regionInfo, final boolean forceNewPlan)
- throws IOException {
- // TODO: rename this reassign
- RegionStateNode node = this.regionStates.getRegionNode(regionInfo);
- ServerName destinationServer = node.getRegionLocation();
- if (destinationServer == null) {
- throw new UnexpectedStateException("DestinationServer is null; Assigned? " + node.toString());
- }
- assert destinationServer != null; node.toString();
- UnassignProcedure proc = createUnassignProcedure(regionInfo, destinationServer, forceNewPlan);
- ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
- }
-
- public Future<byte[]> moveAsync(final RegionPlan regionPlan) {
- MoveRegionProcedure proc = createMoveRegionProcedure(regionPlan);
- return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
- }
-
- @VisibleForTesting
- public boolean waitForAssignment(final HRegionInfo regionInfo) throws IOException {
- return waitForAssignment(regionInfo, Long.MAX_VALUE);
- }
-
- @VisibleForTesting
- // TODO: Remove this?
- public boolean waitForAssignment(final HRegionInfo regionInfo, final long timeout)
- throws IOException {
- RegionStateNode node = null;
- // This method can be called before the regionInfo has made it into the regionStateMap
- // so wait around here a while.
- long startTime = System.currentTimeMillis();
- // Something badly wrong if takes ten seconds to register a region.
- long endTime = startTime + 10000;
- while ((node = regionStates.getRegionNode(regionInfo)) == null && isRunning() &&
- System.currentTimeMillis() < endTime) {
- // Presume it not yet added but will be added soon. Let it spew a lot so we can tell if
- // we are waiting here alot.
- LOG.debug("Waiting on " + regionInfo + " to be added to regionStateMap");
- Threads.sleep(10);
- }
- if (node == null) {
- if (!isRunning()) return false;
- throw new RegionException(regionInfo.getRegionNameAsString() + " never registered with Assigment.");
- }
-
- RegionTransitionProcedure proc = node.getProcedure();
- if (proc == null) {
- throw new NoSuchProcedureException(node.toString());
- }
-
- ProcedureSyncWait.waitForProcedureToCompleteIOE(
- master.getMasterProcedureExecutor(), proc.getProcId(), timeout);
- return true;
- }
-
- // ============================================================================================
- // RegionTransition procedures helpers
- // ============================================================================================
-
- public AssignProcedure[] createAssignProcedures(final Collection<HRegionInfo> regionInfo) {
- return createAssignProcedures(regionInfo, false);
- }
-
- public AssignProcedure[] createAssignProcedures(final Collection<HRegionInfo> regionInfo,
- final boolean forceNewPlan) {
- if (regionInfo.isEmpty()) return null;
- final AssignProcedure[] procs = new AssignProcedure[regionInfo.size()];
- int index = 0;
- for (HRegionInfo hri: regionInfo) {
- procs[index++] = createAssignProcedure(hri, forceNewPlan);
- }
- return procs;
- }
-
- // Needed for the following method so it can type the created Array we return
- private static final UnassignProcedure [] UNASSIGNED_PROCEDURE_FOR_TYPE_INFO =
- new UnassignProcedure[0];
-
- UnassignProcedure[] createUnassignProcedures(final Collection<RegionStateNode> nodes) {
- if (nodes.isEmpty()) return null;
- final List<UnassignProcedure> procs = new ArrayList<UnassignProcedure>(nodes.size());
- for (RegionStateNode node: nodes) {
- if (!this.regionStates.include(node, false)) continue;
- // Look for regions that are offline/closed; i.e. already unassigned.
- if (this.regionStates.isRegionOffline(node.getRegionInfo())) continue;
- assert node.getRegionLocation() != null: node.toString();
- procs.add(createUnassignProcedure(node.getRegionInfo(), node.getRegionLocation(), false));
- }
- return procs.toArray(UNASSIGNED_PROCEDURE_FOR_TYPE_INFO);
- }
-
- public MoveRegionProcedure[] createReopenProcedures(final Collection<HRegionInfo> regionInfo) {
- final MoveRegionProcedure[] procs = new MoveRegionProcedure[regionInfo.size()];
- int index = 0;
- for (HRegionInfo hri: regionInfo) {
- final ServerName serverName = regionStates.getRegionServerOfRegion(hri);
- final RegionPlan plan = new RegionPlan(hri, serverName, serverName);
- procs[index++] = createMoveRegionProcedure(plan);
- }
- return procs;
- }
-
- /**
- * Called by things like EnableTableProcedure to get a list of AssignProcedure
- * to assign the regions of the table.
- */
- public AssignProcedure[] createAssignProcedures(final TableName tableName) {
- return createAssignProcedures(regionStates.getRegionsOfTable(tableName));
- }
-
- /**
- * Called by things like DisableTableProcedure to get a list of UnassignProcedure
- * to unassign the regions of the table.
- */
- public UnassignProcedure[] createUnassignProcedures(final TableName tableName) {
- return createUnassignProcedures(regionStates.getTableRegionStateNodes(tableName));
- }
-
- /**
- * Called by things like ModifyColumnFamilyProcedure to get a list of MoveRegionProcedure
- * to reopen the regions of the table.
- */
- public MoveRegionProcedure[] createReopenProcedures(final TableName tableName) {
- return createReopenProcedures(regionStates.getRegionsOfTable(tableName));
- }
-
- public AssignProcedure createAssignProcedure(final HRegionInfo regionInfo,
- final boolean forceNewPlan) {
- AssignProcedure proc = new AssignProcedure(regionInfo, forceNewPlan);
- proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
- return proc;
- }
-
- public AssignProcedure createAssignProcedure(final HRegionInfo regionInfo,
- final ServerName targetServer) {
- AssignProcedure proc = new AssignProcedure(regionInfo, targetServer);
- proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
- return proc;
- }
-
- public UnassignProcedure createUnassignProcedure(final HRegionInfo regionInfo,
- final ServerName destinationServer, final boolean force) {
- // If destinationServer is null, figure it.
- ServerName sn = destinationServer != null? destinationServer:
- getRegionStates().getRegionState(regionInfo).getServerName();
- assert sn != null;
- UnassignProcedure proc = new UnassignProcedure(regionInfo, sn, force);
- proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
- return proc;
- }
-
- public MoveRegionProcedure createMoveRegionProcedure(final RegionPlan plan) {
- return new MoveRegionProcedure(getProcedureEnvironment(), plan);
- }
-
-
- public SplitTableRegionProcedure createSplitProcedure(final HRegionInfo regionToSplit,
- final byte[] splitKey) throws IOException {
- return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
- }
-
- public MergeTableRegionsProcedure createMergeProcedure(final HRegionInfo regionToMergeA,
- final HRegionInfo regionToMergeB) throws IOException {
- return new MergeTableRegionsProcedure(getProcedureEnvironment(), regionToMergeA,regionToMergeB);
- }
-
- /**
- * Delete the region states. This is called by "DeleteTable"
- */
- public void deleteTable(final TableName tableName) throws IOException {
- final ArrayList<HRegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
- regionStateStore.deleteRegions(regions);
- for (int i = 0; i < regions.size(); ++i) {
- final HRegionInfo regionInfo = regions.get(i);
- // we expect the region to be offline
- regionStates.removeFromOfflineRegions(regionInfo);
- regionStates.deleteRegion(regionInfo);
- }
- }
-
- // ============================================================================================
- // RS Region Transition Report helpers
- // ============================================================================================
- // TODO: Move this code in MasterRpcServices and call on specific event?
- public ReportRegionStateTransitionResponse reportRegionStateTransition(
- final ReportRegionStateTransitionRequest req)
- throws PleaseHoldException {
- final ReportRegionStateTransitionResponse.Builder builder =
- ReportRegionStateTransitionResponse.newBuilder();
- final ServerName serverName = ProtobufUtil.toServerName(req.getServer());
- try {
- for (RegionStateTransition transition: req.getTransitionList()) {
- switch (transition.getTransitionCode()) {
- case OPENED:
- case FAILED_OPEN:
- case CLOSED:
- assert transition.getRegionInfoCount() == 1 : transition;
- final HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
- updateRegionTransition(serverName, transition.getTransitionCode(), hri,
- transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM);
- break;
- case READY_TO_SPLIT:
- case SPLIT_PONR:
- case SPLIT:
- case SPLIT_REVERTED:
- assert transition.getRegionInfoCount() == 3 : transition;
- final HRegionInfo parent = HRegionInfo.convert(transition.getRegionInfo(0));
- final HRegionInfo splitA = HRegionInfo.convert(transition.getRegionInfo(1));
- final HRegionInfo splitB = HRegionInfo.convert(transition.getRegionInfo(2));
- updateRegionSplitTransition(serverName, transition.getTransitionCode(),
- parent, splitA, splitB);
- break;
- case READY_TO_MERGE:
- case MERGE_PONR:
- case MERGED:
- case MERGE_REVERTED:
- assert transition.getRegionInfoCount() == 3 : transition;
- final HRegionInfo merged = HRegionInfo.convert(transition.getRegionInfo(0));
- final HRegionInfo mergeA = HRegionInfo.convert(transition.getRegionInfo(1));
- final HRegionInfo mergeB = HRegionInfo.convert(transition.getRegionInfo(2));
- updateRegionMergeTransition(serverName, transition.getTransitionCode(),
- merged, mergeA, mergeB);
- break;
- }
- }
- } catch (PleaseHoldException e) {
- if (LOG.isTraceEnabled()) LOG.trace("Failed transition " + e.getMessage());
- throw e;
- } catch (UnsupportedOperationException|IOException e) {
- // TODO: at the moment we have a single error message and the RS will abort
- // if the master says that one of the region transitions failed.
- LOG.warn("Failed transition", e);
- builder.setErrorMessage("Failed transition " + e.getMessage());
- }
- return builder.build();
- }
-
- private void updateRegionTransition(final ServerName serverName, final TransitionCode state,
- final HRegionInfo regionInfo, final long seqId)
- throws PleaseHoldException, UnexpectedStateException {
- checkFailoverCleanupCompleted(regionInfo);
-
- final RegionStateNode regionNode = regionStates.getRegionNode(regionInfo);
- if (regionNode == null) {
- // the table/region is gone. maybe a delete, split, merge
- throw new UnexpectedStateException(String.format(
- "Server %s was trying to transition region %s to %s. but the region was removed.",
- serverName, regionInfo, state));
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace(String.format("Update region transition serverName=%s region=%s state=%s",
- serverName, regionNode, state));
- }
-
- final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
- if (!reportTransition(regionNode, serverNode, state, seqId)) {
- LOG.warn(String.format(
- "No procedure for %s. server=%s to transition to %s", regionNode, serverName, state));
- }
- }
-
- // FYI: regionNode is sometimes synchronized by the caller but not always.
- private boolean reportTransition(final RegionStateNode regionNode,
- final ServerStateNode serverNode, final TransitionCode state, final long seqId)
- throws UnexpectedStateException {
- final ServerName serverName = serverNode.getServerName();
- synchronized (regionNode) {
- final RegionTransitionProcedure proc = regionNode.getProcedure();
- if (proc == null) return false;
-
- // serverNode.getReportEvent().removeProcedure(proc);
- proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(),
- serverName, state, seqId);
- }
- return true;
- }
-
- private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state,
- final HRegionInfo parent, final HRegionInfo hriA, final HRegionInfo hriB)
- throws IOException {
- checkFailoverCleanupCompleted(parent);
-
- if (state != TransitionCode.READY_TO_SPLIT) {
- throw new UnexpectedStateException("unsupported split state=" + state +
- " for parent region " + parent +
- " maybe an old RS (< 2.0) had the operation in progress");
- }
-
- // sanity check on the request
- if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
- throw new UnsupportedOperationException(
- "unsupported split request with bad keys: parent=" + parent +
- " hriA=" + hriA + " hriB=" + hriB);
- }
-
- try {
- if (regionStateListener != null) {
- regionStateListener.onRegionSplit(parent);
- }
- } catch (QuotaExceededException e) {
- // TODO: does this really belong here?
- master.getRegionNormalizer().planSkipped(parent, PlanType.SPLIT);
- throw e;
- }
-
- // Submit the Split procedure
- final byte[] splitKey = hriB.getStartKey();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Split request from " + serverName +
- ", parent=" + parent + " splitKey=" + Bytes.toStringBinary(splitKey));
- }
- master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
-
- // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
- if (regionStates.getOrCreateServer(serverName).getVersionNumber() < 0x0200000) {
- throw new UnsupportedOperationException(String.format(
- "Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB));
- }
- }
-
- private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state,
- final HRegionInfo merged, final HRegionInfo hriA, final HRegionInfo hriB)
- throws PleaseHoldException, UnexpectedStateException, IOException {
- checkFailoverCleanupCompleted(merged);
-
- if (state != TransitionCode.READY_TO_MERGE) {
- throw new UnexpectedStateException("Unsupported merge state=" + state +
- " for regionA=" + hriA + " regionB=" + hriB + " merged=" + merged +
- " maybe an old RS (< 2.0) had the operation in progress");
- }
-
- // Submit the Merge procedure
- if (LOG.isDebugEnabled()) {
- LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
- }
- master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
-
- // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
- if (regionStates.getOrCreateServer(serverName).getVersionNumber() < 0x0200000) {
- throw new UnsupportedOperationException(String.format(
- "Merge not handled yet: state=%s merged=%s hriA=%s hriB=%s", state, merged, hriA, hriB));
- }
- }
-
- // ============================================================================================
- // RS Status update (report online regions) helpers
- // ============================================================================================
- /**
- * the master will call this method when the RS send the regionServerReport().
- * the report will contains the "hbase version" and the "online regions".
- * this method will check the the online regions against the in-memory state of the AM,
- * if there is a mismatch we will try to fence out the RS with the assumption
- * that something went wrong on the RS side.
- */
- public void reportOnlineRegions(final ServerName serverName,
- final int versionNumber, final Set<byte[]> regionNames) {
- if (!isRunning()) return;
- if (LOG.isTraceEnabled()) {
- LOG.trace("ReportOnlineRegions " + serverName + " regionCount=" + regionNames.size() +
- ", metaLoaded=" + isMetaLoaded() + " " +
- regionNames.stream().map(element -> Bytes.toStringBinary(element)).
- collect(Collectors.toList()));
- }
-
- final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
-
- // update the server version number. This will be used for live upgrades.
- synchronized (serverNode) {
- serverNode.setVersionNumber(versionNumber);
- if (serverNode.isInState(ServerState.SPLITTING, ServerState.OFFLINE)) {
- LOG.warn("Got a report from a server result in state " + serverNode.getState());
- return;
- }
- }
-
- if (regionNames.isEmpty()) {
- // nothing to do if we don't have regions
- LOG.trace("no online region found on " + serverName);
- } else if (!isMetaLoaded()) {
- // if we are still on startup, discard the report unless is from someone holding meta
- checkOnlineRegionsReportForMeta(serverNode, regionNames);
- } else {
- // The Heartbeat updates us of what regions are only. check and verify the state.
- checkOnlineRegionsReport(serverNode, regionNames);
- }
-
- // wake report event
- wakeServerReportEvent(serverNode);
- }
-
- public void checkOnlineRegionsReportForMeta(final ServerStateNode serverNode,
- final Set<byte[]> regionNames) {
- try {
- for (byte[] regionName: regionNames) {
- final HRegionInfo hri = getMetaRegionFromName(regionName);
- if (hri == null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Skip online report for region=" + Bytes.toStringBinary(regionName) +
- " while meta is loading");
- }
- continue;
- }
-
- final RegionStateNode regionNode = regionStates.getOrCreateRegionNode(hri);
- LOG.info("META REPORTED: " + regionNode);
- if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) {
- LOG.warn("META REPORTED but no procedure found");
- regionNode.setRegionLocation(serverNode.getServerName());
- } else if (LOG.isTraceEnabled()) {
- LOG.trace("META REPORTED: " + regionNode);
- }
- }
- } catch (UnexpectedStateException e) {
- final ServerName serverName = serverNode.getServerName();
- LOG.warn("KILLING " + serverName + ": " + e.getMessage());
- killRegionServer(serverNode);
- }
- }
-
- void checkOnlineRegionsReport(final ServerStateNode serverNode, final Set<byte[]> regionNames) {
- final ServerName serverName = serverNode.getServerName();
- try {
- for (byte[] regionName: regionNames) {
- if (!isRunning()) return;
- final RegionStateNode regionNode = regionStates.getRegionNodeFromName(regionName);
- if (regionNode == null) {
- throw new UnexpectedStateException("Not online: " + Bytes.toStringBinary(regionName));
- }
- synchronized (regionNode) {
- if (regionNode.isInState(State.OPENING, State.OPEN)) {
- if (!regionNode.getRegionLocation().equals(serverName)) {
- throw new UnexpectedStateException(regionNode.toString() +
- "reported OPEN on server=" + serverName +
- " but state has otherwise.");
- } else if (regionNode.isInState(State.OPENING)) {
- try {
- if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) {
- LOG.warn(regionNode.toString() + " reported OPEN on server=" + serverName +
- " but state has otherwise AND NO procedure is running");
- }
- } catch (UnexpectedStateException e) {
- LOG.warn(regionNode.toString() + " reported unexpteced OPEN: " + e.getMessage(), e);
- }
- }
- } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
- long diff = regionNode.getLastUpdate() - EnvironmentEdgeManager.currentTime();
- if (diff > 1000/*One Second... make configurable if an issue*/) {
- // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
- // came in at about same time as a region transition. Make sure there is some
- // elapsed time between killing remote server.
- throw new UnexpectedStateException(regionNode.toString() +
- " reported an unexpected OPEN; time since last update=" + diff);
- }
- }
- }
- }
- } catch (UnexpectedStateException e) {
- LOG.warn("Killing " + serverName + ": " + e.getMessage());
- killRegionServer(serverNode);
- }
- }
-
- protected boolean waitServerReportEvent(final ServerName serverName, final Procedure proc) {
- final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
- return getProcedureScheduler().waitEvent(serverNode.getReportEvent(), proc);
- }
-
- protected void wakeServerReportEvent(final ServerStateNode serverNode) {
- getProcedureScheduler().wakeEvent(serverNode.getReportEvent());
- }
-
- // ============================================================================================
- // RIT chore
- // ============================================================================================
- private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
- public RegionInTransitionChore(final int timeoutMsec) {
- super(timeoutMsec);
- }
-
- @Override
- protected void periodicExecute(final MasterProcedureEnv env) {
- final AssignmentManager am = env.getAssignmentManager();
-
- final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
- if (ritStat.hasRegionsOverThreshold()) {
- for (RegionState hri: ritStat.getRegionOverThreshold()) {
- am.handleRegionOverStuckWarningThreshold(hri.getRegion());
- }
- }
-
- // update metrics
- am.updateRegionsInTransitionMetrics(ritStat);
- }
- }
-
- public RegionInTransitionStat computeRegionInTransitionStat() {
- final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
- rit.update(this);
- return rit;
- }
-
- public static class RegionInTransitionStat {
- private final int ritThreshold;
-
- private HashMap<String, RegionState> ritsOverThreshold = null;
- private long statTimestamp;
- private long oldestRITTime = 0;
- private int totalRITsTwiceThreshold = 0;
- private int totalRITs = 0;
-
- @VisibleForTesting
- public RegionInTransitionStat(final Configuration conf) {
- this.ritThreshold =
- conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
- }
-
- public int getRITThreshold() {
- return ritThreshold;
- }
-
- public long getTimestamp() {
- return statTimestamp;
- }
-
- public int getTotalRITs() {
- return totalRITs;
- }
-
- public long getOldestRITTime() {
- return oldestRITTime;
- }
-
- public int getTotalRITsOverThreshold() {
- Map<String, RegionState> m = this.ritsOverThreshold;
- return m != null ? m.size() : 0;
- }
-
- public boolean hasRegionsTwiceOverThreshold() {
- return totalRITsTwiceThreshold > 0;
- }
-
- public boolean hasRegionsOverThreshold() {
- Map<String, RegionState> m = this.ritsOverThreshold;
- return m != null && !m.isEmpty();
- }
-
- public Collection<RegionState> getRegionOverThreshold() {
- Map<String, RegionState> m = this.ritsOverThreshold;
- return m != null? m.values(): Collections.EMPTY_SET;
- }
-
- public boolean isRegionOverThreshold(final HRegionInfo regionInfo) {
- Map<String, RegionState> m = this.ritsOverThreshold;
- return m != null? m.containsKey(regionInfo.getEncodedName()): false;
- }
-
- public boolean isRegionTwiceOverThreshold(final HRegionInfo regionInfo) {
- Map<String, RegionState> m = this.ritsOverThreshold;
- if (m == null) return false;
- final RegionState state = m.get(regionInfo.getEncodedName());
- if (state == null) return false;
- return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
- }
-
- protected void update(final AssignmentManager am) {
- final RegionStates regionStates = am.getRegionStates();
- this.statTimestamp = EnvironmentEdgeManager.currentTime();
- update(regionStates.getRegionsStateInTransition(), statTimestamp);
- update(regionStates.getRegionFailedOpen(), statTimestamp);
- }
-
- private void update(final Collection<RegionState> regions, final long currentTime) {
- for (RegionState state: regions) {
- totalRITs++;
- final long ritTime = currentTime - state.getStamp();
- if (ritTime > ritThreshold) {
- if (ritsOverThreshold == null) {
- ritsOverThreshold = new HashMap<String, RegionState>();
- }
- ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
- totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
- }
- if (oldestRITTime < ritTime) {
- oldestRITTime = ritTime;
- }
- }
- }
- }
-
- private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
- metrics.updateRITOldestAge(ritStat.getOldestRITTime());
- metrics.updateRITCount(ritStat.getTotalRITs());
- metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
- }
-
- private void handleRegionOverStuckWarningThreshold(final HRegionInfo regionInfo) {
- final RegionStateNode regionNode = regionStates.getRegionNode(regionInfo);
- //if (regionNode.isStuck()) {
- LOG.warn("TODO Handle stuck in transition: " + regionNode);
- }
-
- // ============================================================================================
- // TODO: Master load/bootstrap
- // ============================================================================================
- public void joinCluster() throws IOException {
- final long startTime = System.currentTimeMillis();
-
- LOG.info("Joining the cluster...");
-
- // Scan hbase:meta to build list of existing regions, servers, and assignment
- loadMeta();
-
- for (int i = 0; master.getServerManager().countOfRegionServers() < 1; ++i) {
- LOG.info("waiting for RS to join");
- Threads.sleep(250);
- }
- LOG.info("RS joined " + master.getServerManager().countOfRegionServers());
-
- // This method will assign all user regions if a clean server startup or
- // it will reconstruct master state and cleanup any leftovers from previous master process.
- boolean failover = processofflineServersWithOnlineRegions();
-
- // Start the RIT chore
- master.getMasterProcedureExecutor().addChore(this.ritChore);
-
- LOG.info(String.format("Joined the cluster in %s, failover=%s",
- StringUtils.humanTimeDiff(System.currentTimeMillis() - startTime), failover));
- }
-
- private void loadMeta() throws IOException {
- // TODO: use a thread pool
- regionStateStore.visitMeta(new RegionStateStore.RegionStateVisitor() {
- @Override
- public void visitRegionState(final HRegionInfo regionInfo, final State state,
- final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
- final RegionStateNode regionNode = regionStates.getOrCreateRegionNode(regionInfo);
- synchronized (regionNode) {
- if (!regionNode.isInTransition()) {
- regionNode.setState(state);
- regionNode.setLastHost(lastHost);
- regionNode.setRegionLocation(regionLocation);
- regionNode.setOpenSeqNum(openSeqNum);
-
- if (state == State.OPEN) {
- assert regionLocation != null : "found null region location for " + regionNode;
- regionStates.addRegionToServer(regionLocation, regionNode);
- } else if (state == State.OFFLINE || regionInfo.isOffline()) {
- regionStates.addToOfflineRegions(regionNode);
- } else {
- // These regions should have a procedure in replay
- regionStates.addRegionInTransition(regionNode, null);
- }
- }
- }
- }
- });
-
- // every assignment is blocked until meta is loaded.
- wakeMetaLoadedEvent();
- }
-
- // TODO: the assumption here is that if RSs are crashing while we are executing this
- // they will be handled by the SSH that are put in the ServerManager "queue".
- // we can integrate this a bit better.
- private boolean processofflineServersWithOnlineRegions() {
- boolean failover = !master.getServerManager().getDeadServers().isEmpty();
-
- final Set<ServerName> offlineServersWithOnlineRegions = new HashSet<ServerName>();
- final ArrayList<HRegionInfo> regionsToAssign = new ArrayList<HRegionInfo>();
- long st, et;
-
- st = System.currentTimeMillis();
- for (RegionStateNode regionNode: regionStates.getRegionNodes()) {
- if (regionNode.getState() == State.OPEN) {
- final ServerName serverName = regionNode.getRegionLocation();
- if (!master.getServerManager().isServerOnline(serverName)) {
- offlineServersWithOnlineRegions.add(serverName);
- }
- } else if (regionNode.getState() == State.OFFLINE) {
- if (isTableEnabled(regionNode.getTable())) {
- regionsToAssign.add(regionNode.getRegionInfo());
- }
- }
- }
- et = System.currentTimeMillis();
- LOG.info("[STEP-1] " + StringUtils.humanTimeDiff(et - st));
-
- // kill servers with online regions
- st = System.currentTimeMillis();
- for (ServerName serverName: offlineServersWithOnlineRegions) {
- if (!master.getServerManager().isServerOnline(serverName)) {
- LOG.info("KILL RS hosting regions but not online " + serverName +
- " (master=" + master.getServerName() + ")");
- killRegionServer(serverName);
- }
- }
- et = System.currentTimeMillis();
- LOG.info("[STEP-2] " + StringUtils.humanTimeDiff(et - st));
-
- setFailoverCleanupDone(true);
-
- // assign offline regions
- st = System.currentTimeMillis();
- for (HRegionInfo regionInfo: regionsToAssign) {
- master.getMasterProcedureExecutor().submitProcedure(
- createAssignProcedure(regionInfo, false));
- }
- et = System.currentTimeMillis();
- LOG.info("[STEP-3] " + StringUtils.humanTimeDiff(et - st));
-
- return failover;
- }
-
- /**
- * Used by ServerCrashProcedure to make sure AssignmentManager has completed
- * the failover cleanup before re-assigning regions of dead servers. So that
- * when re-assignment happens, AssignmentManager has proper region states.
- */
- public boolean isFailoverCleanupDone() {
- return failoverCleanupDone.isReady();
- }
-
- /**
- * Used by ServerCrashProcedure tests verify the ability to suspend the
- * execution of the ServerCrashProcedure.
- */
- @VisibleForTesting
- public void setFailoverCleanupDone(final boolean b) {
- master.getMasterProcedureExecutor().getEnvironment()
- .setEventReady(failoverCleanupDone, b);
- }
-
- public ProcedureEvent getFailoverCleanupEvent() {
- return failoverCleanupDone;
- }
-
- /**
- * Used to check if the failover cleanup is done.
- * if not we throw PleaseHoldException since we are rebuilding the RegionStates
- * @param hri region to check if it is already rebuild
- * @throws PleaseHoldException if the failover cleanup is not completed
- */
- private void checkFailoverCleanupCompleted(final HRegionInfo hri) throws PleaseHoldException {
- if (!isRunning()) {
- throw new PleaseHoldException("AssignmentManager not running");
- }
-
- // TODO: can we avoid throwing an exception if hri is already loaded?
- // at the moment we bypass only meta
- boolean meta = isMetaRegion(hri);
- boolean cleanup = isFailoverCleanupDone();
- if (!isMetaRegion(hri) && !isFailoverCleanupDone()) {
- String msg = "Master not fully online; hbase:meta=" + meta + ", failoverCleanup=" + cleanup;
- throw new PleaseHoldException(msg);
- }
- }
-
- // ============================================================================================
- // TODO: Metrics
- // ============================================================================================
- public int getNumRegionsOpened() {
- // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
- return 0;
- }
-
- public void submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) {
- boolean carryingMeta = master.getAssignmentManager().isCarryingMeta(serverName);
- ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
- procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName,
- shouldSplitWal, carryingMeta));
- LOG.debug("Added=" + serverName +
- " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
- }
-
- public void offlineRegion(final HRegionInfo regionInfo) throws IOException {
- // TODO used by MasterRpcServices ServerCrashProcedure
- final RegionStateNode node = regionStates.getRegionNode(regionInfo);
- if (node != null) node.offline();
- }
-
- public void onlineRegion(final HRegionInfo regionInfo, final ServerName serverName) {
- // TODO used by TestSplitTransactionOnCluster.java
- }
-
- public Map<ServerName, List<HRegionInfo>> getSnapShotOfAssignment(
- final Collection<HRegionInfo> regions) {
- return regionStates.getSnapShotOfAssignment(regions);
- }
-
- // ============================================================================================
- // TODO: UTILS/HELPERS?
- // ============================================================================================
- /**
- * Used by the client (via master) to identify if all regions have the schema updates
- *
- * @param tableName
- * @return Pair indicating the status of the alter command (pending/total)
- * @throws IOException
- */
- public Pair<Integer, Integer> getReopenStatus(TableName tableName)
- throws IOException {
- if (isTableDisabled(tableName)) return new Pair<Integer, Integer>(0, 0);
-
- final List<RegionState> states = regionStates.getTableRegionStates(tableName);
- int ritCount = 0;
- for (RegionState regionState: states) {
- if (!regionState.isOpened()) ritCount++;
- }
- return new Pair<Integer, Integer>(ritCount, states.size());
- }
-
- // ============================================================================================
- // TODO: Region State In Transition
- // ============================================================================================
- protected boolean addRegionInTransition(final RegionStateNode regionNode,
- final RegionTransitionProcedure procedure) {
- return regionStates.addRegionInTransition(regionNode, procedure);
- }
-
- protected void removeRegionInTransition(final RegionStateNode regionNode,
- final RegionTransitionProcedure procedure) {
- regionStates.removeRegionInTransition(regionNode, procedure);
- }
-
- public boolean hasRegionsInTransition() {
- return regionStates.hasRegionsInTransition();
- }
-
- public List<RegionStateNode> getRegionsInTransition() {
- return regionStates.getRegionsInTransition();
- }
-
- public List<HRegionInfo> getAssignedRegions() {
- return regionStates.getAssignedRegions();
- }
-
- public HRegionInfo getRegionInfo(final byte[] regionName) {
- final RegionStateNode regionState = regionStates.getRegionNodeFromName(regionName);
- return regionState != null ? regionState.getRegionInfo() : null;
- }
-
- // ============================================================================================
- // TODO: Region Status update
- // ============================================================================================
- private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
- final ServerName serverName) {
- getBalancer().regionOnline(regionInfo, serverName);
- if (!this.listeners.isEmpty()) {
- for (AssignmentListener listener : this.listeners) {
- listener.regionOpened(regionInfo, serverName);
- }
- }
- }
-
- private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
- getBalancer().regionOffline(regionInfo);
- if (!this.listeners.isEmpty()) {
- for (AssignmentListener listener : this.listeners) {
- listener.regionClosed(regionInfo);
- }
- }
- }
-
- public void markRegionAsOpening(final RegionStateNode regionNode) throws IOException {
- synchronized (regionNode) {
- State state = regionNode.transitionState(State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN);
- regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode);
- regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state,
- regionNode.getRegionLocation(), regionNode.getLastHost(), HConstants.NO_SEQNUM,
- regionNode.getProcedure().getProcId());
- }
-
- // update the operation count metrics
- metrics.incrementOperationCounter();
- }
-
- public void undoRegionAsOpening(final RegionStateNode regionNode) {
- boolean opening = false;
- synchronized (regionNode) {
- if (regionNode.isInState(State.OPENING)) {
- opening = true;
- regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
- }
- // Should we update hbase:meta?
- }
- if (opening) {
- // TODO: Metrics. Do opposite of metrics.incrementOperationCounter();
- }
- }
-
- public void markRegionAsOpened(final RegionStateNode regionNode) throws IOException {
- final HRegionInfo hri = regionNode.getRegionInfo();
- synchronized (regionNode) {
- State state = regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN);
- if (isMetaRegion(hri)) {
- setMetaInitialized(hri, true);
- }
- regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode);
- // TODO: OPENING Updates hbase:meta too... we need to do both here and there?
- // That is a lot of hbase:meta writing.
- regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state,
- regionNode.getRegionLocation(), regionNode.getLastHost(), regionNode.getOpenSeqNum(),
- regionNode.getProcedure().getProcId());
- sendRegionOpenedNotification(hri, regionNode.getRegionLocation());
- // update assignment metrics
- if (regionNode.getProcedure() != null) {
- metrics.updateAssignTime(regionNode.getProcedure().elapsedTime());
- }
- }
- }
-
- public void markRegionAsClosing(final RegionStateNode regionNode) throws IOException {
- final HRegionInfo hri = regionNode.getRegionInfo();
- synchronized (regionNode) {
- State state = regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE);
- // Set meta has not initialized early. so people trying to create/edit tables will wait
- if (isMetaRegion(hri)) {
- setMetaInitialized(hri, false);
- }
- regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode);
- regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state,
- regionNode.getRegionLocation(), regionNode.getLastHost(), HConstants.NO_SEQNUM,
- regionNode.getProcedure().getProcId());
- }
-
- // update the operation count metrics
- metrics.incrementOperationCounter();
- }
-
- public void undoRegionAsClosing(final RegionStateNode regionNode) throws IOException {
- // TODO: Metrics. Do opposite of metrics.incrementOperationCounter();
- // There is nothing to undo?
- }
-
- public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException {
- final HRegionInfo hri = regionNode.getRegionInfo();
- synchronized (regionNode) {
- State state = regionNode.transitionState(State.CLOSED, RegionStates.STATES_EXPECTED_ON_CLOSE);
- regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
- regionNode.setLastHost(regionNode.getRegionLocation());
- regionNode.setRegionLocation(null);
- regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state,
- regionNode.getRegionLocation()/*null*/, regionNode.getLastHost(),
- HConstants.NO_SEQNUM, regionNode.getProcedure().getProcId());
- sendRegionClosedNotification(hri);
- // Update assignment metrics
- if (regionNode.getProcedure() != null) {
- metrics.updateUnassignTime(regionNode.getProcedure().elapsedTime());
- }
- }
- }
-
- public void markRegionAsSplit(final HRegionInfo parent, final ServerName serverName,
- final HRegionInfo daughterA, final HRegionInfo daughterB)
- throws IOException {
- // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
- // The parent stays in regionStates until cleared when removed by CatalogJanitor.
- // Update its state in regionStates to it shows as offline and split when read
- // later figuring what regions are in a table and what are not: see
- // regionStates#getRegionsOfTable
- final RegionStateNode node = regionStates.getOrCreateRegionNode(parent);
- node.setState(State.SPLIT);
- regionStateStore.splitRegion(parent, daughterA, daughterB, serverName);
- if (shouldAssignFavoredNodes(parent)) {
- List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
- ((FavoredNodesPromoter)getBalancer()).
- generateFavoredNodesForDaughter(onlineServers, parent, daughterA, daughterB);
- }
- }
-
- /**
- * When called here, the merge has happened. The two merged regions have been
- * unassigned and the above markRegionClosed has been called on each so they have been
- * disassociated from a hosting Server. The merged region will be open after this call. The
- * merged regions are removed from hbase:meta below> Later they are deleted from the filesystem
- * by the catalog janitor running against hbase:meta. It notices when the merged region no
- * longer holds references to the old regions.
- */
- public void markRegionAsMerged(final HRegionInfo child, final ServerName serverName,
- final HRegionInfo mother, final HRegionInfo father) throws IOException {
- final RegionStateNode node = regionStates.getOrCreateRegionNode(child);
- node.setState(State.MERGED);
- regionStates.deleteRegion(mother);
- regionStates.deleteRegion(father);
- regionStateStore.mergeRegions(child, mother, father, serverName);
- if (shouldAssignFavoredNodes(child)) {
- ((FavoredNodesPromoter)getBalancer()).
- generateFavoredNodesForMergedRegion(child, mother, father);
- }
- }
-
- /*
- * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
- * belongs to a non-system table.
- */
- private boolean shouldAssignFavoredNodes(HRegionInfo region) {
- return this.shouldAssignRegionsWithFavoredNodes &&
- FavoredNodesManager.isFavoredNodeApplicable(region);
- }
-
- // ============================================================================================
- // Assign Queue (Assign/Balance)
- // ============================================================================================
- private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
- private final ReentrantLock assignQueueLock = new ReentrantLock();
- private final Condition assignQueueFullCond = assignQueueLock.newCondition();
-
- /**
- * Add the assign operation to the assignment queue.
- * The pending assignment operation will be processed,
- * and each region will be assigned by a server using the balancer.
- */
- protected void queueAssign(final RegionStateNode regionNode) {
- getProcedureScheduler().suspendEvent(regionNode.getProcedureEvent());
-
- // TODO: quick-start for meta and the other sys-tables?
- assignQueueLock.lock();
- try {
- pendingAssignQueue.add(regionNode);
- if (regionNode.isSystemTable() ||
- pendingAssignQueue.size() == 1 ||
- pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize) {
- assignQueueFullCond.signal();
- }
- } finally {
- assignQueueLock.unlock();
- }
- }
-
- private void startAssignmentThread() {
- assignThread = new Thread("AssignmentThread") {
- @Override
- public void run() {
- while (isRunning()) {
- processAssignQueue();
- }
- pendingAssignQueue.clear();
- }
- };
- assignThread.start();
- }
-
- private void stopAssignmentThread() {
- assignQueueSignal();
- try {
- while (assignThread.isAlive()) {
- assignQueueSignal();
- assignThread.join(250);
- }
- } catch (InterruptedException e) {
- LOG.warn("join interrupted", e);
- Thread.currentThread().interrupt();
- }
- }
-
- private void assignQueueSignal() {
- assignQueueLock.lock();
- try {
- assignQueueFullCond.signal();
- } finally {
- assignQueueLock.unlock();
- }
- }
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
- private HashMap<HRegionInfo, RegionStateNode> waitOnAssignQueue() {
- HashMap<HRegionInfo, RegionStateNode> regions = null;
-
- assignQueueLock.lock();
- try {
- if (pendingAssignQueue.isEmpty() && isRunning()) {
- assignQueueFullCond.await();
- }
-
- if (!isRunning()) return null;
- assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
- regions = new HashMap<HRegionInfo, RegionStateNode>(pendingAssignQueue.size());
- for (RegionStateNode regionNode: pendingAssignQueue) {
- regions.put(regionNode.getRegionInfo(), regionNode);
- }
- pendingAssignQueue.clear();
- } catch (InterruptedException e) {
- LOG.warn("got interrupted ", e);
- Thread.currentThread().interrupt();
- } finally {
- assignQueueLock.unlock();
- }
- return regions;
- }
-
- private void processAssignQueue() {
- final HashMap<HRegionInfo, RegionStateNode> regions = waitOnAssignQueue();
- if (regions == null || regions.size() == 0 || !isRunning()) {
- return;
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
- }
-
- // TODO: Optimize balancer. pass a RegionPlan?
- final HashMap<HRegionInfo, ServerName> retainMap = new HashMap<HRegionInfo, ServerName>();
- final List<HRegionInfo> rrList = new ArrayList<HRegionInfo>();
- for (RegionStateNode regionNode: regions.values()) {
- if (regionNode.getRegionLocation() != null) {
- retainMap.put(regionNode.getRegionInfo(), regionNode.getRegionLocation());
- } else {
- rrList.add(regionNode.getRegionInfo());
- }
- }
-
- // TODO: connect with the listener to invalidate the cache
- final LoadBalancer balancer = getBalancer();
-
- // TODO use events
- List<ServerName> servers = master.getServerManager().createDestinationServersList();
- for (int i = 0; servers.size() < 1; ++i) {
- if (i % 4 == 0) {
- LOG.warn("no server available, unable to find a location for " + regions.size() +
- " unassigned regions. waiting");
- }
-
- // the was AM killed
- if (!isRunning()) {
- LOG.debug("aborting assignment-queue with " + regions.size() + " not assigned");
- return;
- }
-
- Threads.sleep(250);
- servers = master.getServerManager().createDestinationServersList();
- }
-
- final boolean isTraceEnabled = LOG.isTraceEnabled();
- if (isTraceEnabled) {
- LOG.trace("available servers count=" + servers.size() + ": " + servers);
- }
-
- // ask the balancer where to place regions
- if (!retainMap.isEmpty()) {
- if (isTraceEnabled) {
- LOG.trace("retain assign regions=" + retainMap);
- }
- try {
- acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
- } catch (HBaseIOException e) {
- LOG.warn("unable to retain assignment", e);
- addToPendingAssignment(regions, retainMap.keySet());
- }
- }
-
- // TODO: Do we need to split retain and round-robin?
- // the retain seems to fallback to round-robin/random if the region is not in the map.
- if (!rrList.isEmpty()) {
- Collections.sort(rrList);
- if (isTraceEnabled) {
- LOG.trace("round robin regions=" + rrList);
- }
- try {
- acceptPlan(regions, balancer.roundRobinAssignment(rrList, servers));
- } catch (HBaseIOException e) {
- LOG.warn("unable to round-robin assignment", e);
- addToPendingAssignment(regions, rrList);
- }
- }
- }
-
- private void acceptPlan(final HashMap<HRegionInfo, RegionStateNode> regions,
- final Map<ServerName, List<HRegionInfo>> plan) throws HBaseIOException {
- final ProcedureEvent[] events = new ProcedureEvent[regions.size()];
- final long st = System.currentTimeMillis();
-
- if (plan == null) {
- throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
- }
-
- if (plan.isEmpty()) return;
-
- int evcount = 0;
- for (Map.Entry<ServerName, List<HRegionInfo>> entry: plan.entrySet()) {
- final ServerName server = entry.getKey();
- for (HRegionInfo hri: entry.getValue()) {
- final RegionStateNode regionNode = regions.get(hri);
- regionNode.setRegionLocation(server);
- events[evcount++] = regionNode.getProcedureEvent();
- }
- }
- getProcedureScheduler().wakeEvents(evcount, events);
-
- final long et = System.currentTimeMillis();
- if (LOG.isTraceEnabled()) {
- LOG.trace("ASSIGN ACCEPT " + events.length + " -> " +
- StringUtils.humanTimeDiff(et - st));
- }
- }
-
- private void addToPendingAssignment(final HashMap<HRegionInfo, RegionStateNode> regions,
- final Collection<HRegionInfo> pendingRegions) {
- assignQueueLock.lock();
- try {
- for (HRegionInfo hri: pendingRegions) {
- pendingAssignQueue.add(regions.get(hri));
- }
- } finally {
- assignQueueLock.unlock();
- }
- }
-
- // ============================================================================================
- // Server Helpers
- // ============================================================================================
- @Override
- public void serverAdded(final ServerName serverName) {
- }
-
- @Override
- public void serverRemoved(final ServerName serverName) {
- final ServerStateNode serverNode = regionStates.getServerNode(serverName);
- if (serverNode == null) return;
-
- // just in case, wake procedures waiting for this server report
- wakeServerReportEvent(serverNode);
- }
-
- public int getServerVersion(final ServerName serverName) {
- final ServerStateNode node = regionStates.getServerNode(serverName);
- return node != null ? node.getVersionNumber() : 0;
- }
-
- public void killRegionServer(final ServerName serverName) {
- final ServerStateNode serverNode = regionStates.getServerNode(serverName);
- killRegionServer(serverNode);
- }
-
- public void killRegionServer(final ServerStateNode serverNode) {
- /** Don't do this. Messes up accounting. Let ServerCrashProcedure do this.
- for (RegionStateNode regionNode: serverNode.getRegions()) {
- regionNode.offline();
- }*/
- master.getServerManager().expireServer(serverNode.getServerName());
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/FailedRemoteDispatchException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/FailedRemoteDispatchException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/FailedRemoteDispatchException.java
deleted file mode 100644
index 111b525..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/FailedRemoteDispatchException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.assignment;
-
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Used internally signaling failed queue of a remote procedure
- * operation.
- */
-@SuppressWarnings("serial")
-@InterfaceAudience.Private
-public class FailedRemoteDispatchException extends HBaseIOException {
- public FailedRemoteDispatchException(String msg) {
- super(msg);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCMergedRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCMergedRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCMergedRegionsProcedure.java
deleted file mode 100644
index c7d97ee..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCMergedRegionsProcedure.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.assignment;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineTableProcedure;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.GCMergedRegionsState;
-
-/**
- * GC regions that have been Merged.
- * Caller determines if it is GC time. This Procedure does not check.
- * <p>This is a Table Procedure. We take a read lock on the Table.
- * We do NOT keep a lock for the life of this procedure. The subprocedures
- * take locks on the Regions they are purging.
- */
-@InterfaceAudience.Private
-public class GCMergedRegionsProcedure
-extends AbstractStateMachineTableProcedure<GCMergedRegionsState> {
- private static final Log LOG = LogFactory.getLog(GCMergedRegionsProcedure.class);
- private HRegionInfo father;
- private HRegionInfo mother;
- private HRegionInfo mergedChild;
-
- public GCMergedRegionsProcedure(final MasterProcedureEnv env,
- final HRegionInfo mergedChild,
- final HRegionInfo father,
- final HRegionInfo mother) {
- super(env);
- this.father = father;
- this.mother = mother;
- this.mergedChild = mergedChild;
- }
-
- public GCMergedRegionsProcedure() {
- // Required by the Procedure framework to create the procedure on replay
- super();
- }
-
- @Override
- public TableOperationType getTableOperationType() {
- return TableOperationType.MERGED_REGIONS_GC;
- }
-
- @Override
- protected Flow executeFromState(MasterProcedureEnv env, GCMergedRegionsState state)
- throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + " execute state=" + state);
- }
- try {
- switch (state) {
- case GC_MERGED_REGIONS_PREPARE:
- // Nothing to do to prepare.
- setNextState(GCMergedRegionsState.GC_MERGED_REGIONS_PURGE);
- break;
- case GC_MERGED_REGIONS_PURGE:
- addChildProcedure(createGCRegionProcedures(env));
- setNextState(GCMergedRegionsState.GC_REGION_EDIT_METADATA);
- break;
- case GC_REGION_EDIT_METADATA:
- MetaTableAccessor.deleteMergeQualifiers(env.getMasterServices().getConnection(), mergedChild);
- return Flow.NO_MORE_STATE;
- default:
- throw new UnsupportedOperationException(this + " unhandled state=" + state);
- }
- } catch (IOException ioe) {
- // TODO: This is going to spew log?
- LOG.warn("Error trying to GC merged regions " + this.father.getShortNameToLog() +
- " & " + this.mother.getShortNameToLog() + "; retrying...", ioe);
- }
- return Flow.HAS_MORE_STATE;
- }
-
- private GCRegionProcedure[] createGCRegionProcedures(final MasterProcedureEnv env) {
- GCRegionProcedure [] procs = new GCRegionProcedure[2];
- int index = 0;
- for (HRegionInfo hri: new HRegionInfo [] {this.father, this.mother}) {
- GCRegionProcedure proc = new GCRegionProcedure(env, hri);
- proc.setOwner(env.getRequestUser().getShortName());
- procs[index++] = proc;
- }
- return procs;
- }
-
- @Override
- protected void rollbackState(MasterProcedureEnv env, GCMergedRegionsState state)
- throws IOException, InterruptedException {
- // no-op
- }
-
- @Override
- protected GCMergedRegionsState getState(int stateId) {
- return GCMergedRegionsState.forNumber(stateId);
- }
-
- @Override
- protected int getStateId(GCMergedRegionsState state) {
- return state.getNumber();
- }
-
- @Override
- protected GCMergedRegionsState getInitialState() {
- return GCMergedRegionsState.GC_MERGED_REGIONS_PREPARE;
- }
-
- @Override
- protected void serializeStateData(OutputStream stream) throws IOException {
- super.serializeStateData(stream);
- final MasterProcedureProtos.GCMergedRegionsStateData.Builder msg =
- MasterProcedureProtos.GCMergedRegionsStateData.newBuilder().
- setParentA(HRegionInfo.convert(this.father)).
- setParentB(HRegionInfo.convert(this.mother)).
- setMergedChild(HRegionInfo.convert(this.mergedChild));
- msg.build().writeDelimitedTo(stream);
- }
-
- @Override
- protected void deserializeStateData(InputStream stream) throws IOException {
- super.deserializeStateData(stream);
- final MasterProcedureProtos.GCMergedRegionsStateData msg =
- MasterProcedureProtos.GCMergedRegionsStateData.parseDelimitedFrom(stream);
- this.father = HRegionInfo.convert(msg.getParentA());
- this.mother = HRegionInfo.convert(msg.getParentB());
- this.mergedChild = HRegionInfo.convert(msg.getMergedChild());
- }
-
- @Override
- public void toStringClassDetails(StringBuilder sb) {
- sb.append(getClass().getSimpleName());
- sb.append(" child=");
- sb.append(this.mergedChild.getShortNameToLog());
- sb.append(", father=");
- sb.append(this.father.getShortNameToLog());
- sb.append(", mother=");
- sb.append(this.mother.getShortNameToLog());
- }
-
- @Override
- public TableName getTableName() {
- return this.mergedChild.getTable();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java
deleted file mode 100644
index 3874232..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.assignment;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.backup.HFileArchiver;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.favored.FavoredNodesManager;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.GCRegionState;
-
-import com.google.common.collect.Lists;
-
-/**
- * GC a Region that is no longer in use. It has been split or merged away.
- * Caller determines if it is GC time. This Procedure does not check.
- * <p>This is a Region StateMachine Procedure. We take a read lock on the Table and then
- * exclusive on the Region.
- */
-@InterfaceAudience.Private
-public class GCRegionProcedure extends AbstractStateMachineRegionProcedure<GCRegionState> {
- private static final Log LOG = LogFactory.getLog(GCRegionProcedure.class);
-
- public GCRegionProcedure(final MasterProcedureEnv env, final HRegionInfo hri) {
- super(env, hri);
- }
-
- public GCRegionProcedure() {
- // Required by the Procedure framework to create the procedure on replay
- super();
- }
-
- @Override
- public TableOperationType getTableOperationType() {
- return TableOperationType.REGION_GC;
- }
-
- @Override
- protected Flow executeFromState(MasterProcedureEnv env, GCRegionState state)
- throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + " execute state=" + state);
- }
- MasterServices masterServices = env.getMasterServices();
- try {
- switch (state) {
- case GC_REGION_PREPARE:
- // Nothing to do to prepare.
- setNextState(GCRegionState.GC_REGION_ARCHIVE);
- break;
- case GC_REGION_ARCHIVE:
- FileSystem fs = masterServices.getMasterFileSystem().getFileSystem();
- if (HFileArchiver.exists(masterServices.getConfiguration(), fs, getRegion())) {
- if (LOG.isDebugEnabled()) LOG.debug("Archiving region=" + getRegion().getShortNameToLog());
- HFileArchiver.archiveRegion(masterServices.getConfiguration(), fs, getRegion());
- }
- setNextState(GCRegionState.GC_REGION_PURGE_METADATA);
- break;
- case GC_REGION_PURGE_METADATA:
- // TODO: Purge metadata before removing from HDFS? This ordering is copied
- // from CatalogJanitor.
- AssignmentManager am = masterServices.getAssignmentManager();
- if (am != null) {
- if (am.getRegionStates() != null) {
- am.getRegionStates().deleteRegion(getRegion());
- }
- }
- MetaTableAccessor.deleteRegion(masterServices.getConnection(), getRegion());
- masterServices.getServerManager().removeRegion(getRegion());
- FavoredNodesManager fnm = masterServices.getFavoredNodesManager();
- if (fnm != null) {
- fnm.deleteFavoredNodesForRegions(Lists.newArrayList(getRegion()));
- }
- return Flow.NO_MORE_STATE;
- default:
- throw new UnsupportedOperationException(this + " unhandled state=" + state);
- }
- } catch (IOException ioe) {
- // TODO: This is going to spew log?
- LOG.warn("Error trying to GC " + getRegion().getShortNameToLog() + "; retrying...", ioe);
- }
- return Flow.HAS_MORE_STATE;
- }
-
- @Override
- protected void rollbackState(MasterProcedureEnv env, GCRegionState state) throws IOException, InterruptedException {
- // no-op
- }
-
- @Override
- protected GCRegionState getState(int stateId) {
- return GCRegionState.forNumber(stateId);
- }
-
- @Override
- protected int getStateId(GCRegionState state) {
- return state.getNumber();
- }
-
- @Override
- protected GCRegionState getInitialState() {
- return GCRegionState.GC_REGION_PREPARE;
- }
-
- @Override
- protected void serializeStateData(OutputStream stream) throws IOException {
- super.serializeStateData(stream);
- // Double serialization of regionname. Superclass is also serializing. Fix.
- final MasterProcedureProtos.GCRegionStateData.Builder msg =
- MasterProcedureProtos.GCRegionStateData.newBuilder()
- .setRegionInfo(HRegionInfo.convert(getRegion()));
- msg.build().writeDelimitedTo(stream);
- }
-
- @Override
- protected void deserializeStateData(InputStream stream) throws IOException {
- super.deserializeStateData(stream);
- final MasterProcedureProtos.GCRegionStateData msg =
- MasterProcedureProtos.GCRegionStateData.parseDelimitedFrom(stream);
- setRegion(HRegionInfo.convert(msg.getRegionInfo()));
- }
-
- @Override
- protected org.apache.hadoop.hbase.procedure2.Procedure.LockState acquireLock(MasterProcedureEnv env) {
- return super.acquireLock(env);
- }
-}
\ No newline at end of file