You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/08/20 22:16:27 UTC
[6/7] hbase git commit: HBASE-20881 Introduce a region transition
procedure to handle all the state transition for a region
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index 70a9680..9b020c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -19,14 +19,12 @@ package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,22 +32,23 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.RegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
-import org.apache.hadoop.hbase.master.AssignmentListener;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
@@ -59,14 +58,10 @@ import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.TableStateManager;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerState;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode;
import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
-import org.apache.hadoop.hbase.master.procedure.ServerCrashException;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
@@ -90,7 +85,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
@@ -149,10 +143,6 @@ public class AssignmentManager implements ServerListener {
private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
- /** Listeners that are called on assignment events. */
- private final CopyOnWriteArrayList<AssignmentListener> listeners =
- new CopyOnWriteArrayList<AssignmentListener>();
-
private final MetricsAssignmentManager metrics;
private final RegionInTransitionChore ritChore;
private final MasterServices master;
@@ -216,16 +206,55 @@ public class AssignmentManager implements ServerListener {
// it could be null in some tests
if (zkw != null) {
RegionState regionState = MetaTableLocator.getMetaRegionState(zkw);
- RegionStateNode regionStateNode =
+ RegionStateNode regionNode =
regionStates.getOrCreateRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO);
- synchronized (regionStateNode) {
- regionStateNode.setRegionLocation(regionState.getServerName());
- regionStateNode.setState(regionState.getState());
+ regionNode.lock();
+ try {
+ regionNode.setRegionLocation(regionState.getServerName());
+ regionNode.setState(regionState.getState());
setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN);
+ } finally {
+ regionNode.unlock();
}
}
}
+ /**
+ * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode.
+ * <p>
+ * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta
+ * method below. And it is also very important as now before submitting a TRSP, we need to attach
+ * it to the RegionStateNode, which acts like a guard, so we need to restore this information at
+ * the very beginning, before we start processing any procedures.
+ */
+ public void setupRIT(List<TransitRegionStateProcedure> procs) {
+ procs.forEach(proc -> {
+ RegionInfo regionInfo = proc.getRegion();
+ RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
+ TransitRegionStateProcedure existingProc = regionNode.getProcedure();
+ if (existingProc != null) {
+ // This is possible, as we will detach the procedure from the RSN before we
+ // actually finish the procedure. This is because that, we will update the region state
+ // directly in the reportTransition method for TRSP, and theoretically the region transition
+ // has been done, so we need to detach the procedure from the RSN. But actually the
+ // procedure has not been marked as done in the pv2 framework yet, so it is possible that we
+ // schedule a new TRSP immediately and when arriving here, we will find out that there are
+ // multiple TRSPs for the region. But we can make sure that, only the last one can take the
+ // charge, the previous ones should have all been finished already.
+ // So here we will compare the proc id, the greater one will win.
+ if (existingProc.getProcId() < proc.getProcId()) {
+ // the new one wins, unset and set it to the new one below
+ regionNode.unsetProcedure(existingProc);
+ } else {
+ // the old one wins, skip
+ return;
+ }
+ }
+ LOG.info("Attach {} to {} to restore RIT", proc, regionNode);
+ regionNode.setProcedure(proc);
+ });
+ }
+
public void stop() {
if (!running.compareAndSet(true, false)) {
return;
@@ -288,22 +317,6 @@ public class AssignmentManager implements ServerListener {
return assignMaxAttempts;
}
- /**
- * Add the listener to the notification list.
- * @param listener The AssignmentListener to register
- */
- public void registerListener(final AssignmentListener listener) {
- this.listeners.add(listener);
- }
-
- /**
- * Remove the listener from the notification list.
- * @param listener The AssignmentListener to unregister
- */
- public boolean unregisterListener(final AssignmentListener listener) {
- return this.listeners.remove(listener);
- }
-
public RegionStates getRegionStates() {
return regionStates;
}
@@ -513,51 +526,89 @@ public class AssignmentManager implements ServerListener {
private List<RegionInfo> getSystemTables(ServerName serverName) {
Set<RegionStateNode> regions = this.getRegionStates().getServerNode(serverName).getRegions();
if (regions == null) {
- return new ArrayList<>();
+ return Collections.emptyList();
}
- return regions.stream()
- .map(RegionStateNode::getRegionInfo)
- .filter(r -> r.getTable().isSystemTable())
- .collect(Collectors.toList());
+ return regions.stream().map(RegionStateNode::getRegionInfo)
+ .filter(r -> r.getTable().isSystemTable()).collect(Collectors.toList());
}
- public void assign(final RegionInfo regionInfo, ServerName sn) throws IOException {
- AssignProcedure proc = createAssignProcedure(regionInfo, sn);
- ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
+ private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
+ throws HBaseIOException {
+ if (regionNode.getProcedure() != null) {
+ throw new HBaseIOException(regionNode + " is currently in transition");
+ }
+ if (!regionNode.isInState(expectedStates)) {
+ throw new DoNotRetryRegionException("Unexpected state for " + regionNode);
+ }
+ if (getTableStateManager().isTableState(regionNode.getTable(), TableState.State.DISABLING,
+ TableState.State.DISABLED)) {
+ throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
+ }
}
- public void assign(final RegionInfo regionInfo) throws IOException {
- AssignProcedure proc = createAssignProcedure(regionInfo);
+ public void assign(RegionInfo regionInfo, ServerName sn) throws IOException {
+ // TODO: should we use getRegionStateNode?
+ RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
+ TransitRegionStateProcedure proc;
+ regionNode.lock();
+ try {
+ preTransitCheck(regionNode, RegionStates.STATES_EXPECTED_ON_OPEN);
+ proc = TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn);
+ regionNode.setProcedure(proc);
+ } finally {
+ regionNode.unlock();
+ }
ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
}
- public void unassign(final RegionInfo regionInfo) throws IOException {
- unassign(regionInfo, false);
+ public void assign(RegionInfo regionInfo) throws IOException {
+ assign(regionInfo, null);
}
- public void unassign(final RegionInfo regionInfo, final boolean forceNewPlan)
- throws IOException {
- // TODO: rename this reassign
- RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo);
- ServerName destinationServer = node.getRegionLocation();
- if (destinationServer == null) {
- throw new UnexpectedStateException("DestinationServer is null; Assigned? " + node.toString());
+ public void unassign(RegionInfo regionInfo) throws IOException {
+ RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
+ if (regionNode == null) {
+ throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
+ }
+ TransitRegionStateProcedure proc;
+ regionNode.lock();
+ try {
+ preTransitCheck(regionNode, RegionStates.STATES_EXPECTED_ON_CLOSE);
+ proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
+ regionNode.setProcedure(proc);
+ } finally {
+ regionNode.unlock();
}
- assert destinationServer != null; node.toString();
- UnassignProcedure proc = createUnassignProcedure(regionInfo, destinationServer, forceNewPlan);
ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
}
- public void move(final RegionInfo regionInfo) throws IOException {
- RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo);
- ServerName sourceServer = node.getRegionLocation();
- RegionPlan plan = new RegionPlan(regionInfo, sourceServer, null);
- MoveRegionProcedure proc = createMoveRegionProcedure(plan);
+ private TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
+ ServerName targetServer) throws HBaseIOException {
+ RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
+ if (regionNode == null) {
+ throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
+ }
+ TransitRegionStateProcedure proc;
+ regionNode.lock();
+ try {
+ preTransitCheck(regionNode, RegionStates.STATES_EXPECTED_ON_CLOSE);
+ regionNode.checkOnline();
+ proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
+ regionNode.setProcedure(proc);
+ } finally {
+ regionNode.unlock();
+ }
+ return proc;
+ }
+
+ public void move(RegionInfo regionInfo) throws IOException {
+ TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null);
ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
}
- public Future<byte[]> moveAsync(final RegionPlan regionPlan) throws HBaseIOException {
- MoveRegionProcedure proc = createMoveRegionProcedure(regionPlan);
+ public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException {
+ TransitRegionStateProcedure proc =
+ createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination());
return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
}
@@ -569,7 +620,7 @@ public class AssignmentManager implements ServerListener {
@VisibleForTesting
// TODO: Remove this?
public boolean waitForAssignment(final RegionInfo regionInfo, final long timeout)
- throws IOException {
+ throws IOException {
RegionStateNode node = null;
// This method can be called before the regionInfo has made it into the regionStateMap
// so wait around here a while.
@@ -577,24 +628,27 @@ public class AssignmentManager implements ServerListener {
// Something badly wrong if takes ten seconds to register a region.
long endTime = startTime + 10000;
while ((node = regionStates.getRegionStateNode(regionInfo)) == null && isRunning() &&
- System.currentTimeMillis() < endTime) {
+ System.currentTimeMillis() < endTime) {
// Presume it not yet added but will be added soon. Let it spew a lot so we can tell if
// we are waiting here alot.
LOG.debug("Waiting on " + regionInfo + " to be added to regionStateMap");
Threads.sleep(10);
}
if (node == null) {
- if (!isRunning()) return false;
- throw new RegionException(regionInfo.getRegionNameAsString() + " never registered with Assigment.");
+ if (!isRunning()) {
+ return false;
+ }
+ throw new RegionException(
+ regionInfo.getRegionNameAsString() + " never registered with Assigment.");
}
- RegionTransitionProcedure proc = node.getProcedure();
+ TransitRegionStateProcedure proc = node.getProcedure();
if (proc == null) {
throw new NoSuchProcedureException(node.toString());
}
- ProcedureSyncWait.waitForProcedureToCompleteIOE(
- master.getMasterProcedureExecutor(), proc, timeout);
+ ProcedureSyncWait.waitForProcedureToCompleteIOE(master.getMasterProcedureExecutor(), proc,
+ timeout);
return true;
}
@@ -604,22 +658,23 @@ public class AssignmentManager implements ServerListener {
/**
* Create round-robin assigns. Use on table creation to distribute out regions across cluster.
- * @return AssignProcedures made out of the passed in <code>hris</code> and a call
- * to the balancer to populate the assigns with targets chosen using round-robin (default
- * balancer scheme). If at assign-time, the target chosen is no longer up, thats fine,
- * the AssignProcedure will ask the balancer for a new target, and so on.
+ * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
+ * to populate the assigns with targets chosen using round-robin (default balancer
+ * scheme). If at assign-time, the target chosen is no longer up, thats fine, the
+ * AssignProcedure will ask the balancer for a new target, and so on.
*/
- public AssignProcedure[] createRoundRobinAssignProcedures(final List<RegionInfo> hris) {
+ public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(
+ List<RegionInfo> hris) {
if (hris.isEmpty()) {
- return null;
+ return new TransitRegionStateProcedure[0];
}
try {
// Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
// a better job if it has all the assignments in the one lump.
Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
- this.master.getServerManager().createDestinationServersList(null));
+ this.master.getServerManager().createDestinationServersList(null));
// Return mid-method!
- return createAssignProcedures(assignments, hris.size());
+ return createAssignProcedures(assignments);
} catch (HBaseIOException hioe) {
LOG.warn("Failed roundRobinAssignment", hioe);
}
@@ -627,130 +682,97 @@ public class AssignmentManager implements ServerListener {
return createAssignProcedures(hris);
}
- /**
- * Create an array of AssignProcedures w/o specifying a target server.
- * If no target server, at assign time, we will try to use the former location of the region
- * if one exists. This is how we 'retain' the old location across a server restart.
- * Used by {@link ServerCrashProcedure} assigning regions on a server that has crashed (SCP is
- * also used across a cluster-restart just-in-case to ensure we do cleanup of any old WALs or
- * server processes).
- */
- public AssignProcedure[] createAssignProcedures(final List<RegionInfo> hris) {
- if (hris.isEmpty()) {
- return null;
- }
- int index = 0;
- AssignProcedure [] procedures = new AssignProcedure[hris.size()];
- for (RegionInfo hri : hris) {
- // Sort the procedures so meta and system regions are first in the returned array.
- procedures[index++] = createAssignProcedure(hri);
- }
- if (procedures.length > 1) {
- // Sort the procedures so meta and system regions are first in the returned array.
- Arrays.sort(procedures, AssignProcedure.COMPARATOR);
- }
- return procedures;
- }
-
- // Make this static for the method below where we use it typing the AssignProcedure array we
- // return as result.
- private static final AssignProcedure [] ASSIGN_PROCEDURE_ARRAY_TYPE = new AssignProcedure[] {};
-
- /**
- * @param assignments Map of assignments from which we produce an array of AssignProcedures.
- * @param size Count of assignments to make (the caller may know the total count)
- * @return Assignments made from the passed in <code>assignments</code>
- */
- private AssignProcedure[] createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments,
- int size) {
- List<AssignProcedure> procedures = new ArrayList<>(size > 0? size: 8/*Arbitrary*/);
- for (Map.Entry<ServerName, List<RegionInfo>> e: assignments.entrySet()) {
- for (RegionInfo ri: e.getValue()) {
- AssignProcedure ap = createAssignProcedure(ri, e.getKey());
- ap.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
- procedures.add(ap);
+ @VisibleForTesting
+ static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
+ if (left.getRegion().isMetaRegion()) {
+ if (right.getRegion().isMetaRegion()) {
+ return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
}
+ return -1;
+ } else if (right.getRegion().isMetaRegion()) {
+ return +1;
}
- if (procedures.size() > 1) {
- // Sort the procedures so meta and system regions are first in the returned array.
- procedures.sort(AssignProcedure.COMPARATOR);
+ if (left.getRegion().getTable().isSystemTable()) {
+ if (right.getRegion().getTable().isSystemTable()) {
+ return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
+ }
+ return -1;
+ } else if (right.getRegion().getTable().isSystemTable()) {
+ return +1;
}
- return procedures.toArray(ASSIGN_PROCEDURE_ARRAY_TYPE);
+ return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
}
- // Needed for the following method so it can type the created Array we retur n
- private static final UnassignProcedure [] UNASSIGN_PROCEDURE_ARRAY_TYPE =
- new UnassignProcedure[0];
-
- UnassignProcedure[] createUnassignProcedures(final Collection<RegionStateNode> nodes) {
- if (nodes.isEmpty()) return null;
- final List<UnassignProcedure> procs = new ArrayList<UnassignProcedure>(nodes.size());
- for (RegionStateNode node: nodes) {
- if (!this.regionStates.include(node, false)) continue;
- // Look for regions that are offline/closed; i.e. already unassigned.
- if (this.regionStates.isRegionOffline(node.getRegionInfo())) continue;
- assert node.getRegionLocation() != null: node.toString();
- procs.add(createUnassignProcedure(node.getRegionInfo(), node.getRegionLocation(), false));
+ private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode,
+ ServerName targetServer) {
+ TransitRegionStateProcedure proc;
+ regionNode.lock();
+ try {
+ assert regionNode.getProcedure() == null;
+ proc = TransitRegionStateProcedure.assign(getProcedureEnvironment(),
+ regionNode.getRegionInfo(), targetServer);
+ regionNode.setProcedure(proc);
+ } finally {
+ regionNode.unlock();
}
- return procs.toArray(UNASSIGN_PROCEDURE_ARRAY_TYPE);
+ return proc;
}
/**
- * Called by things like DisableTableProcedure to get a list of UnassignProcedure
- * to unassign the regions of the table.
+ * Create an array of TransitRegionStateProcedure w/o specifying a target server.
+ * <p/>
+ * If no target server, at assign time, we will try to use the former location of the region if
+ * one exists. This is how we 'retain' the old location across a server restart.
+ * <p/>
+ * Should only be called when you can make sure that no one can touch these regions other than
+ * you. For example, when you are creating table.
*/
- public UnassignProcedure[] createUnassignProcedures(final TableName tableName) {
- return createUnassignProcedures(regionStates.getTableRegionStateNodes(tableName));
+ public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) {
+ return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
+ .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare)
+ .toArray(TransitRegionStateProcedure[]::new);
}
- public AssignProcedure createAssignProcedure(final RegionInfo regionInfo) {
- AssignProcedure proc = new AssignProcedure(regionInfo);
- proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
- return proc;
- }
-
- public AssignProcedure createAssignProcedure(final RegionInfo regionInfo,
- final ServerName targetServer) {
- AssignProcedure proc = new AssignProcedure(regionInfo, targetServer);
- proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
- return proc;
- }
-
- UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo,
- final ServerName destinationServer, final boolean force) {
- return createUnassignProcedure(regionInfo, destinationServer, force, false);
- }
-
- UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo,
- final ServerName destinationServer, final boolean force,
- final boolean removeAfterUnassigning) {
- // If destinationServer is null, figure it.
- ServerName sn = destinationServer != null? destinationServer:
- getRegionStates().getRegionState(regionInfo).getServerName();
- assert sn != null;
- UnassignProcedure proc = new UnassignProcedure(regionInfo, sn, force, removeAfterUnassigning);
- proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
- return proc;
+ /**
+ * @param assignments Map of assignments from which we produce an array of AssignProcedures.
+ * @return Assignments made from the passed in <code>assignments</code>
+ */
+ private TransitRegionStateProcedure[] createAssignProcedures(
+ Map<ServerName, List<RegionInfo>> assignments) {
+ return assignments.entrySet().stream()
+ .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
+ .map(regionNode -> createAssignProcedure(regionNode, e.getKey())))
+ .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
}
- private MoveRegionProcedure createMoveRegionProcedure(RegionPlan plan) throws HBaseIOException {
- if (plan.getRegionInfo().getTable().isSystemTable()) {
- List<ServerName> exclude = getExcludedServersForSystemTable();
- if (plan.getDestination() != null && exclude.contains(plan.getDestination())) {
- try {
- LOG.info("Can not move " + plan.getRegionInfo() + " to " + plan.getDestination() +
- " because the server is not with highest version");
- plan.setDestination(getBalancer().randomAssignment(plan.getRegionInfo(),
- this.master.getServerManager().createDestinationServersList(exclude)));
- } catch (HBaseIOException e) {
- LOG.warn(e.toString(), e);
+ /**
+ * Called by DisableTableProcedure to unassign all the regions for a table.
+ */
+ public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) {
+ return regionStates.getTableRegionStateNodes(tableName).stream().map(regionNode -> {
+ regionNode.lock();
+ try {
+ if (!regionStates.include(regionNode, false) ||
+ regionStates.isRegionOffline(regionNode.getRegionInfo())) {
+ return null;
+ }
+ // As in DisableTableProcedure, we will hold the xlock for table, so we can make sure that
+ // this procedure has not been executed yet, as TRSP will hold the shared lock for table all
+ // the time. So here we will unset it and when it is actually executed, it will find that
+ // the attach procedure is not itself and quit immediately.
+ if (regionNode.getProcedure() != null) {
+ regionNode.unsetProcedure(regionNode.getProcedure());
}
+ TransitRegionStateProcedure proc = TransitRegionStateProcedure
+ .unassign(getProcedureEnvironment(), regionNode.getRegionInfo());
+ regionNode.setProcedure(proc);
+ return proc;
+ } finally {
+ regionNode.unlock();
}
- }
- return new MoveRegionProcedure(getProcedureEnvironment(), plan, true);
+ }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
}
-
public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
final byte[] splitKey) throws IOException {
return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
@@ -780,8 +802,7 @@ public class AssignmentManager implements ServerListener {
// ============================================================================================
// TODO: Move this code in MasterRpcServices and call on specific event?
public ReportRegionStateTransitionResponse reportRegionStateTransition(
- final ReportRegionStateTransitionRequest req)
- throws PleaseHoldException {
+ final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
final ReportRegionStateTransitionResponse.Builder builder =
ReportRegionStateTransitionResponse.newBuilder();
final ServerName serverName = ProtobufUtil.toServerName(req.getServer());
@@ -819,7 +840,7 @@ public class AssignmentManager implements ServerListener {
}
}
} catch (PleaseHoldException e) {
- if (LOG.isTraceEnabled()) LOG.trace("Failed transition " + e.getMessage());
+ LOG.trace("Failed transition ", e);
throw e;
} catch (UnsupportedOperationException|IOException e) {
// TODO: at the moment we have a single error message and the RS will abort
@@ -830,56 +851,53 @@ public class AssignmentManager implements ServerListener {
return builder.build();
}
- private void updateRegionTransition(final ServerName serverName, final TransitionCode state,
- final RegionInfo regionInfo, final long seqId)
- throws PleaseHoldException, UnexpectedStateException {
+ private void updateRegionTransition(ServerName serverName, TransitionCode state,
+ RegionInfo regionInfo, long seqId) throws IOException {
checkMetaLoaded(regionInfo);
- final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
+ RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
if (regionNode == null) {
// the table/region is gone. maybe a delete, split, merge
throw new UnexpectedStateException(String.format(
"Server %s was trying to transition region %s to %s. but the region was removed.",
serverName, regionInfo, state));
}
+ LOG.trace("Update region transition serverName={} region={} regionState={}", serverName,
+ regionNode, state);
- if (LOG.isTraceEnabled()) {
- LOG.trace(String.format("Update region transition serverName=%s region=%s regionState=%s",
- serverName, regionNode, state));
- }
-
- final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
- if (!reportTransition(regionNode, serverNode, state, seqId)) {
- // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
- // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
- // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
- // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
- // to CLOSED
- // These happen because on cluster shutdown, we currently let the RegionServers close
- // regions. This is the only time that region close is not run by the Master (so cluster
- // goes down fast). Consider changing it so Master runs all shutdowns.
- if (this.master.getServerManager().isClusterShutdown() &&
+ ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
+ regionNode.lock();
+ try {
+ if (!reportTransition(regionNode, serverNode, state, seqId)) {
+ // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
+ // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
+ // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
+ // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
+ // to CLOSED
+ // These happen because on cluster shutdown, we currently let the RegionServers close
+ // regions. This is the only time that region close is not run by the Master (so cluster
+ // goes down fast). Consider changing it so Master runs all shutdowns.
+ if (this.master.getServerManager().isClusterShutdown() &&
state.equals(TransitionCode.CLOSED)) {
- LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
- } else {
- LOG.warn("No matching procedure found for {} transition to {}", regionNode, state);
+ LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
+ } else {
+ LOG.warn("No matching procedure found for {} transition to {}", regionNode, state);
+ }
}
+ } finally {
+ regionNode.unlock();
}
}
- // FYI: regionNode is sometimes synchronized by the caller but not always.
- private boolean reportTransition(final RegionStateNode regionNode,
- final ServerStateNode serverNode, final TransitionCode state, final long seqId)
- throws UnexpectedStateException {
- final ServerName serverName = serverNode.getServerName();
- synchronized (regionNode) {
- final RegionTransitionProcedure proc = regionNode.getProcedure();
- if (proc == null) return false;
-
- // serverNode.getReportEvent().removeProcedure(proc);
- proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(),
- serverName, state, seqId);
+ private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
+ TransitionCode state, long seqId) throws IOException {
+ ServerName serverName = serverNode.getServerName();
+ TransitRegionStateProcedure proc = regionNode.getProcedure();
+ if (proc == null) {
+ return false;
}
+ proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
+ serverName, state, seqId);
return true;
}
@@ -984,10 +1002,9 @@ public class AssignmentManager implements ServerListener {
wakeServerReportEvent(serverNode);
}
- void checkOnlineRegionsReportForMeta(final ServerStateNode serverNode,
- final Set<byte[]> regionNames) {
+ void checkOnlineRegionsReportForMeta(ServerStateNode serverNode, Set<byte[]> regionNames) {
try {
- for (byte[] regionName: regionNames) {
+ for (byte[] regionName : regionNames) {
final RegionInfo hri = getMetaRegionFromName(regionName);
if (hri == null) {
if (LOG.isTraceEnabled()) {
@@ -997,18 +1014,23 @@ public class AssignmentManager implements ServerListener {
continue;
}
- final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
+ RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
LOG.info("META REPORTED: " + regionNode);
- if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) {
- LOG.warn("META REPORTED but no procedure found (complete?); set location=" +
+ regionNode.lock();
+ try {
+ if (!reportTransition(regionNode, serverNode, TransitionCode.OPENED, 0)) {
+ LOG.warn("META REPORTED but no procedure found (complete?); set location=" +
serverNode.getServerName());
- regionNode.setRegionLocation(serverNode.getServerName());
- } else if (LOG.isTraceEnabled()) {
- LOG.trace("META REPORTED: " + regionNode);
+ regionNode.setRegionLocation(serverNode.getServerName());
+ } else if (LOG.isTraceEnabled()) {
+ LOG.trace("META REPORTED: " + regionNode);
+ }
+ } finally {
+ regionNode.unlock();
}
}
- } catch (UnexpectedStateException e) {
- final ServerName serverName = serverNode.getServerName();
+ } catch (IOException e) {
+ ServerName serverName = serverNode.getServerName();
LOG.warn("KILLING " + serverName + ": " + e.getMessage());
killRegionServer(serverNode);
}
@@ -1019,12 +1041,15 @@ public class AssignmentManager implements ServerListener {
final ServerName serverName = serverNode.getServerName();
try {
for (byte[] regionName: regionNames) {
- if (!isRunning()) return;
+ if (!isRunning()) {
+ return;
+ }
final RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
if (regionNode == null) {
throw new UnexpectedStateException("Not online: " + Bytes.toStringBinary(regionName));
}
- synchronized (regionNode) {
+ regionNode.lock();
+ try {
if (regionNode.isInState(State.OPENING, State.OPEN)) {
if (!regionNode.getRegionLocation().equals(serverName)) {
throw new UnexpectedStateException(regionNode.toString() +
@@ -1050,9 +1075,11 @@ public class AssignmentManager implements ServerListener {
" reported an unexpected OPEN; time since last update=" + diff);
}
}
+ } finally {
+ regionNode.unlock();
}
}
- } catch (UnexpectedStateException e) {
+ } catch (IOException e) {
LOG.warn("Killing " + serverName + ": " + e.getMessage());
killRegionServer(serverNode);
throw (YouAreDeadException)new YouAreDeadException(e.getMessage()).initCause(e);
@@ -1267,29 +1294,20 @@ public class AssignmentManager implements ServerListener {
localState = State.OFFLINE;
}
- final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
- synchronized (regionNode) {
- if (!regionNode.isInTransition()) {
- regionNode.setState(localState);
- regionNode.setLastHost(lastHost);
- regionNode.setRegionLocation(regionLocation);
- regionNode.setOpenSeqNum(openSeqNum);
-
- if (localState == State.OPEN) {
- assert regionLocation != null : "found null region location for " + regionNode;
- regionStates.addRegionToServer(regionNode);
- } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
- regionStates.addToOfflineRegions(regionNode);
- } else if (localState == State.CLOSED && getTableStateManager().
- isTableState(regionNode.getTable(), TableState.State.DISABLED,
- TableState.State.DISABLING)) {
- // The region is CLOSED and the table is DISABLED/ DISABLING, there is nothing to
- // schedule; the region is inert.
- } else {
- // These regions should have a procedure in replay
- regionStates.addRegionInTransition(regionNode, null);
- }
- }
+ RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
+ // Do not need to lock on regionNode, as we can make sure that before we finish loading
+ // meta, all the related procedures can not be executed. The only exception is formeta
+ // region related operations, but here we do not load the informations for meta region.
+ regionNode.setState(localState);
+ regionNode.setLastHost(lastHost);
+ regionNode.setRegionLocation(regionLocation);
+ regionNode.setOpenSeqNum(openSeqNum);
+
+ if (localState == State.OPEN) {
+ assert regionLocation != null : "found null region location for " + regionNode;
+ regionStates.addRegionToServer(regionNode);
+ } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
+ regionStates.addToOfflineRegions(regionNode);
}
}
});
@@ -1335,9 +1353,11 @@ public class AssignmentManager implements ServerListener {
}
public void offlineRegion(final RegionInfo regionInfo) {
- // TODO used by MasterRpcServices ServerCrashProcedure
- final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
- if (node != null) node.offline();
+ // TODO used by MasterRpcServices
+ RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
+ if (node != null) {
+ node.offline();
+ }
}
public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
@@ -1373,16 +1393,6 @@ public class AssignmentManager implements ServerListener {
// ============================================================================================
// TODO: Region State In Transition
// ============================================================================================
- protected boolean addRegionInTransition(final RegionStateNode regionNode,
- final RegionTransitionProcedure procedure) {
- return regionStates.addRegionInTransition(regionNode, procedure);
- }
-
- protected void removeRegionInTransition(final RegionStateNode regionNode,
- final RegionTransitionProcedure procedure) {
- regionStates.removeRegionInTransition(regionNode, procedure);
- }
-
public boolean hasRegionsInTransition() {
return regionStates.hasRegionsInTransition();
}
@@ -1401,102 +1411,82 @@ public class AssignmentManager implements ServerListener {
}
// ============================================================================================
- // TODO: Region Status update
+ // Region Status update
+ // Should only be called in TransitRegionStateProcedure
// ============================================================================================
- private void sendRegionOpenedNotification(final RegionInfo regionInfo,
- final ServerName serverName) {
- getBalancer().regionOnline(regionInfo, serverName);
- if (!this.listeners.isEmpty()) {
- for (AssignmentListener listener : this.listeners) {
- listener.regionOpened(regionInfo, serverName);
- }
- }
- }
-
- private void sendRegionClosedNotification(final RegionInfo regionInfo) {
- getBalancer().regionOffline(regionInfo);
- if (!this.listeners.isEmpty()) {
- for (AssignmentListener listener : this.listeners) {
- listener.regionClosed(regionInfo);
- }
- }
- }
- public void markRegionAsOpening(final RegionStateNode regionNode) throws IOException {
- synchronized (regionNode) {
- regionNode.transitionState(State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN);
- regionStates.addRegionToServer(regionNode);
- regionStateStore.updateRegionLocation(regionNode);
- }
+ // should be called within the synchronized block of RegionStateNode
+ void regionOpening(RegionStateNode regionNode) throws IOException {
+ regionNode.transitionState(State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN);
+ regionStateStore.updateRegionLocation(regionNode);
+ regionStates.addRegionToServer(regionNode);
// update the operation count metrics
metrics.incrementOperationCounter();
}
- public void undoRegionAsOpening(final RegionStateNode regionNode) {
- boolean opening = false;
- synchronized (regionNode) {
- if (regionNode.isInState(State.OPENING)) {
- opening = true;
- regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
- }
- // Should we update hbase:meta?
+ // should be called within the synchronized block of RegionStateNode.
+ // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
+ // we will persist the FAILED_OPEN state into hbase:meta.
+ void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
+ if (regionNode.getRegionLocation() != null) {
+ regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
}
- if (opening) {
- // TODO: Metrics. Do opposite of metrics.incrementOperationCounter();
+ if (giveUp) {
+ regionNode.setState(State.FAILED_OPEN);
+ regionNode.setRegionLocation(null);
+ regionStateStore.updateRegionLocation(regionNode);
}
}
- public void markRegionAsOpened(final RegionStateNode regionNode) throws IOException {
- final RegionInfo hri = regionNode.getRegionInfo();
- synchronized (regionNode) {
- regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN);
- if (isMetaRegion(hri)) {
- // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
- // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
- // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
- // on table that contains state.
- setMetaAssigned(hri, true);
- }
- regionStates.addRegionToServer(regionNode);
- // TODO: OPENING Updates hbase:meta too... we need to do both here and there?
- // That is a lot of hbase:meta writing.
- regionStateStore.updateRegionLocation(regionNode);
- sendRegionOpenedNotification(hri, regionNode.getRegionLocation());
+ // should be called within the synchronized block of RegionStateNode
+ void regionOpened(RegionStateNode regionNode) throws IOException {
+ regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN);
+ // TODO: OPENING Updates hbase:meta too... we need to do both here and there?
+ // That is a lot of hbase:meta writing.
+ regionStateStore.updateRegionLocation(regionNode);
+
+ RegionInfo hri = regionNode.getRegionInfo();
+ if (isMetaRegion(hri)) {
+ // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
+ // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
+ // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
+ // on table that contains state.
+ setMetaAssigned(hri, true);
}
+ regionStates.addRegionToServer(regionNode);
+ regionStates.removeFromFailedOpen(hri);
}
- public void markRegionAsClosing(final RegionStateNode regionNode) throws IOException {
- final RegionInfo hri = regionNode.getRegionInfo();
- synchronized (regionNode) {
- regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE);
- // Set meta has not initialized early. so people trying to create/edit tables will wait
- if (isMetaRegion(hri)) {
- setMetaAssigned(hri, false);
- }
- regionStates.addRegionToServer(regionNode);
- regionStateStore.updateRegionLocation(regionNode);
- }
+ // should be called within the synchronized block of RegionStateNode
+ void regionClosing(RegionStateNode regionNode) throws IOException {
+ regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE);
+ regionStateStore.updateRegionLocation(regionNode);
+ RegionInfo hri = regionNode.getRegionInfo();
+ // Set meta has not initialized early. so people trying to create/edit tables will wait
+ if (isMetaRegion(hri)) {
+ setMetaAssigned(hri, false);
+ }
+ regionStates.addRegionToServer(regionNode);
// update the operation count metrics
metrics.incrementOperationCounter();
}
- public void undoRegionAsClosing(final RegionStateNode regionNode) {
- // TODO: Metrics. Do opposite of metrics.incrementOperationCounter();
- // There is nothing to undo?
- }
-
- public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException {
- final RegionInfo hri = regionNode.getRegionInfo();
- synchronized (regionNode) {
- regionNode.transitionState(State.CLOSED, RegionStates.STATES_EXPECTED_ON_CLOSE);
- regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
- regionNode.setLastHost(regionNode.getRegionLocation());
+ // should be called within the synchronized block of RegionStateNode
+ // The parameter 'normally' means whether we are closed cleanly, if it is true, then it means that
+ // we are closed due to a RS crash.
+ void regionClosed(RegionStateNode regionNode, boolean normally) throws IOException {
+ regionNode.transitionState(normally ? State.CLOSED : State.ABNORMALLY_CLOSED,
+ RegionStates.STATES_EXPECTED_ON_CLOSE);
+ ServerName loc = regionNode.getRegionLocation();
+ if (loc != null) {
+ // could be a retry so add a check here to avoid set the lastHost to null.
+ regionNode.setLastHost(loc);
regionNode.setRegionLocation(null);
- regionStateStore.updateRegionLocation(regionNode);
- sendRegionClosedNotification(hri);
+ regionStates.removeRegionFromServer(loc, regionNode);
}
+ regionStateStore.updateRegionLocation(regionNode);
}
public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
@@ -1829,76 +1819,4 @@ public class AssignmentManager implements ServerListener {
private void killRegionServer(final ServerStateNode serverNode) {
master.getServerManager().expireServer(serverNode.getServerName());
}
-
- /**
- * <p>
- * This is a very particular check. The {@link org.apache.hadoop.hbase.master.ServerManager} is
- * where you go to check on state of 'Servers', what Servers are online, etc.
- * </p>
- * <p>
- * Here we are checking the state of a server that is post expiration, a ServerManager function
- * that moves a server from online to dead. Here we are seeing if the server has moved beyond a
- * particular point in the recovery process such that it is safe to move on with assigns; etc.
- * </p>
- * <p>
- * For now it is only used in
- * {@link UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException)} to
- * see whether we can safely quit without losing data.
- * </p>
- * @param meta whether to check for meta log splitting
- * @return {@code true} if the server does not exist or the log splitting is done, i.e, the server
- * is in OFFLINE state, or for meta log, is in SPLITTING_META_DONE state. If null,
- * presumes the ServerStateNode was cleaned up by SCP.
- * @see UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException)
- */
- boolean isLogSplittingDone(ServerName serverName, boolean meta) {
- ServerStateNode ssn = this.regionStates.getServerNode(serverName);
- if (ssn == null) {
- return true;
- }
- ServerState[] inState =
- meta
- ? new ServerState[] { ServerState.SPLITTING_META_DONE, ServerState.SPLITTING,
- ServerState.OFFLINE }
- : new ServerState[] { ServerState.OFFLINE };
- synchronized (ssn) {
- return ssn.isInState(inState);
- }
- }
-
- /**
- * Handle RIT of meta region against crashed server.
- * Only used when ServerCrashProcedure is not enabled.
- * See handleRIT in ServerCrashProcedure for similar function.
- *
- * @param serverName Server that has already crashed
- */
- public void handleMetaRITOnCrashedServer(ServerName serverName) {
- RegionInfo hri = RegionReplicaUtil
- .getRegionInfoForReplica(RegionInfoBuilder.FIRST_META_REGIONINFO,
- RegionInfo.DEFAULT_REPLICA_ID);
- RegionState regionStateNode = getRegionStates().getRegionState(hri);
- if (regionStateNode == null) {
- LOG.warn("RegionStateNode is null for " + hri);
- return;
- }
- ServerName rsnServerName = regionStateNode.getServerName();
- if (rsnServerName != null && !rsnServerName.equals(serverName)) {
- return;
- } else if (rsnServerName == null) {
- LOG.warn("Empty ServerName in RegionStateNode; proceeding anyways in case latched " +
- "RecoverMetaProcedure so meta latch gets cleaned up.");
- }
- // meta has been assigned to crashed server.
- LOG.info("Meta assigned to crashed " + serverName + "; reassigning...");
- // Handle failure and wake event
- RegionTransitionProcedure rtp = getRegionStates().getRegionTransitionProcedure(hri);
- // Do not need to consider for REGION_TRANSITION_QUEUE step
- if (rtp != null && rtp.isMeta() &&
- rtp.getTransitionState() == RegionTransitionState.REGION_TRANSITION_DISPATCH) {
- LOG.debug("Failing " + rtp.toString());
- rtp.remoteCallFailed(master.getMasterProcedureExecutor().getEnvironment(), serverName,
- new ServerCrashException(rtp.getProcId(), serverName));
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java
new file mode 100644
index 0000000..d0dca09
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.favored.FavoredNodesManager;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+
+/**
+ * Utility for this assignment package only.
+ */
+@InterfaceAudience.Private
+final class AssignmentManagerUtil {
+ private AssignmentManagerUtil() {
+ }
+
+ /**
+ * Raw call to remote regionserver to get info on a particular region.
+ * @throws IOException Let it out so can report this IOE as reason for failure
+ */
+ static GetRegionInfoResponse getRegionInfoResponse(final MasterProcedureEnv env,
+ final ServerName regionLocation, final RegionInfo hri) throws IOException {
+ return getRegionInfoResponse(env, regionLocation, hri, false);
+ }
+
+ static GetRegionInfoResponse getRegionInfoResponse(final MasterProcedureEnv env,
+ final ServerName regionLocation, final RegionInfo hri, boolean includeBestSplitRow)
+ throws IOException {
+ // TODO: There is no timeout on this controller. Set one!
+ HBaseRpcController controller =
+ env.getMasterServices().getClusterConnection().getRpcControllerFactory().newController();
+ final AdminService.BlockingInterface admin =
+ env.getMasterServices().getClusterConnection().getAdmin(regionLocation);
+ GetRegionInfoRequest request = null;
+ if (includeBestSplitRow) {
+ request = RequestConverter.buildGetRegionInfoRequest(hri.getRegionName(), false, true);
+ } else {
+ request = RequestConverter.buildGetRegionInfoRequest(hri.getRegionName());
+ }
+ try {
+ return admin.getRegionInfo(controller, request);
+ } catch (ServiceException e) {
+ throw ProtobufUtil.handleRemoteException(e);
+ }
+ }
+
+ private static void lock(List<RegionStateNode> regionNodes) {
+ regionNodes.iterator().forEachRemaining(RegionStateNode::lock);
+ }
+
+ private static void unlock(List<RegionStateNode> regionNodes) {
+ for (ListIterator<RegionStateNode> iter = regionNodes.listIterator(regionNodes.size()); iter
+ .hasPrevious();) {
+ iter.previous().unlock();
+ }
+ }
+
+ static TransitRegionStateProcedure[] createUnassignProceduresForSplitOrMerge(
+ MasterProcedureEnv env, Stream<RegionInfo> regions, int regionReplication)
+ throws IOException {
+ List<RegionStateNode> regionNodes = regions
+ .flatMap(hri -> IntStream.range(0, regionReplication)
+ .mapToObj(i -> RegionReplicaUtil.getRegionInfoForReplica(hri, i)))
+ .map(env.getAssignmentManager().getRegionStates()::getOrCreateRegionStateNode)
+ .collect(Collectors.toList());
+ TransitRegionStateProcedure[] procs = new TransitRegionStateProcedure[regionNodes.size()];
+ boolean rollback = true;
+ int i = 0;
+ // hold the lock at once, and then release it in finally. This is important as SCP may jump in
+ // if we release the lock in the middle when we want to do rollback, and cause problems.
+ lock(regionNodes);
+ try {
+ for (; i < procs.length; i++) {
+ RegionStateNode regionNode = regionNodes.get(i);
+ TransitRegionStateProcedure proc =
+ TransitRegionStateProcedure.unassign(env, regionNode.getRegionInfo());
+ if (regionNode.getProcedure() != null) {
+ throw new HBaseIOException(
+ "The parent region " + regionNode + " is currently in transition, give up");
+ }
+ regionNode.setProcedure(proc);
+ procs[i] = proc;
+ }
+ // all succeeded, set rollback to false
+ rollback = false;
+ } finally {
+ if (rollback) {
+ for (;;) {
+ i--;
+ if (i < 0) {
+ break;
+ }
+ RegionStateNode regionNode = regionNodes.get(i);
+ regionNode.unsetProcedure(procs[i]);
+ }
+ }
+ unlock(regionNodes);
+ }
+ return procs;
+ }
+
+ private static TransitRegionStateProcedure[] createAssignProcedures(MasterProcedureEnv env,
+ Stream<RegionInfo> regions, int regionReplication, ServerName targetServer) {
+ return regions
+ .flatMap(hri -> IntStream.range(0, regionReplication)
+ .mapToObj(i -> RegionReplicaUtil.getRegionInfoForReplica(hri, i)))
+ .map(env.getAssignmentManager().getRegionStates()::getOrCreateRegionStateNode)
+ .map(regionNode -> {
+ TransitRegionStateProcedure proc =
+ TransitRegionStateProcedure.assign(env, regionNode.getRegionInfo(), targetServer);
+ regionNode.lock();
+ try {
+ // should never fail, as we have the exclusive region lock, and the region is newly
+ // created, or has been successfully closed so should not be on any servers, so SCP will
+ // not process it either.
+ assert !regionNode.isInTransition();
+ regionNode.setProcedure(proc);
+ } finally {
+ regionNode.unlock();
+ }
+ return proc;
+ }).toArray(TransitRegionStateProcedure[]::new);
+ }
+
+ static TransitRegionStateProcedure[] createAssignProceduresForOpeningNewRegions(
+ MasterProcedureEnv env, Stream<RegionInfo> regions, int regionReplication,
+ ServerName targetServer) {
+ return createAssignProcedures(env, regions, regionReplication, targetServer);
+ }
+
+ static void reopenRegionsForRollback(MasterProcedureEnv env, Stream<RegionInfo> regions,
+ int regionReplication, ServerName targetServer) {
+ TransitRegionStateProcedure[] procs =
+ createAssignProcedures(env, regions, regionReplication, targetServer);
+ env.getMasterServices().getMasterProcedureExecutor().submitProcedures(procs);
+ }
+
+ static void removeNonDefaultReplicas(MasterProcedureEnv env, Stream<RegionInfo> regions,
+ int regionReplication) {
+ // Remove from in-memory states
+ regions.flatMap(hri -> IntStream.range(1, regionReplication)
+ .mapToObj(i -> RegionReplicaUtil.getRegionInfoForReplica(hri, i))).forEach(hri -> {
+ env.getAssignmentManager().getRegionStates().deleteRegion(hri);
+ env.getMasterServices().getServerManager().removeRegion(hri);
+ FavoredNodesManager fnm = env.getMasterServices().getFavoredNodesManager();
+ if (fnm != null) {
+ fnm.deleteFavoredNodesForRegions(Collections.singletonList(hri));
+ }
+ });
+ }
+
+ static void checkClosedRegion(MasterProcedureEnv env, RegionInfo regionInfo) throws IOException {
+ if (WALSplitter.hasRecoveredEdits(env.getMasterServices().getFileSystem(),
+ env.getMasterConfiguration(), regionInfo)) {
+ throw new IOException("Recovered.edits are found in Region: " + regionInfo +
+ ", abort split/merge to prevent data loss");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/CloseRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/CloseRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/CloseRegionProcedure.java
new file mode 100644
index 0000000..e446e17
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/CloseRegionProcedure.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.CloseRegionProcedureStateData;
+
+/**
+ * The remote procedure used to close a region.
+ */
+@InterfaceAudience.Private
+public class CloseRegionProcedure extends RegionRemoteProcedureBase {
+
+ // For a region move operation, we will assign the region after we unassign it, this is the target
+ // server for the subsequent assign. We will send this value to RS, and RS will record the region
+ // in a Map to tell client that where the region has been moved to. Can be null. And also, can be
+ // wrong(but do not make it wrong intentionally). The client can handle this error.
+ private ServerName assignCandidate;
+
+ public CloseRegionProcedure() {
+ super();
+ }
+
+ public CloseRegionProcedure(RegionInfo region, ServerName targetServer,
+ ServerName assignCandidate) {
+ super(region, targetServer);
+ this.assignCandidate = assignCandidate;
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.REGION_UNASSIGN;
+ }
+
+ @Override
+ public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
+ return new RegionCloseOperation(this, region, assignCandidate);
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.serializeStateData(serializer);
+ CloseRegionProcedureStateData.Builder builder = CloseRegionProcedureStateData.newBuilder();
+ if (assignCandidate != null) {
+ builder.setAssignCandidate(ProtobufUtil.toServerName(assignCandidate));
+ }
+ serializer.serialize(builder.build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.deserializeStateData(serializer);
+ CloseRegionProcedureStateData data =
+ serializer.deserialize(CloseRegionProcedureStateData.class);
+ if (data.hasAssignCandidate()) {
+ assignCandidate = ProtobufUtil.toServerName(data.getAssignCandidate());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
index 20ae444..ff2ba5b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException;
@@ -23,7 +22,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-
+import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -64,7 +62,9 @@ import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@@ -72,10 +72,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.M
/**
* The procedure to Merge a region in a table.
+ * <p/>
* This procedure takes an exclusive table lock since it is working over multiple regions.
+ * <p/>
* It holds the lock for the life of the procedure.
- * <p>Throws exception on construction if determines context hostile to merge (cluster going
- * down or master is shutting down or table is disabled).</p>
+ * <p/>
+ * Throws exception on construction if determines context hostile to merge (cluster going down or
+ * master is shutting down or table is disabled).
*/
@InterfaceAudience.Private
public class MergeTableRegionsProcedure
@@ -216,6 +219,20 @@ public class MergeTableRegionsProcedure
return rid;
}
+
+ private void removeNonDefaultReplicas(MasterProcedureEnv env) throws IOException {
+ AssignmentManagerUtil.removeNonDefaultReplicas(env, Stream.of(regionsToMerge),
+ getRegionReplication(env));
+ }
+
+ private void checkClosedRegions(MasterProcedureEnv env) throws IOException {
+ // theoretically this should not happen any more after we use TRSP, but anyway let's add a check
+ // here
+ for (RegionInfo region : regionsToMerge) {
+ AssignmentManagerUtil.checkClosedRegion(env, region);
+ }
+ }
+
@Override
protected Flow executeFromState(final MasterProcedureEnv env,
MergeTableRegionsState state) {
@@ -234,27 +251,15 @@ public class MergeTableRegionsProcedure
setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CLOSE_REGIONS);
break;
case MERGE_TABLE_REGIONS_CLOSE_REGIONS:
- addChildProcedure(createUnassignProcedures(env, getRegionReplication(env)));
+ addChildProcedure(createUnassignProcedures(env));
setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CHECK_CLOSED_REGIONS);
break;
case MERGE_TABLE_REGIONS_CHECK_CLOSED_REGIONS:
- List<RegionInfo> ris = hasRecoveredEdits(env);
- if (ris.isEmpty()) {
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CREATE_MERGED_REGION);
- } else {
- // Need to reopen parent regions to pickup missed recovered.edits. Do it by creating
- // child assigns and then stepping back to MERGE_TABLE_REGIONS_CLOSE_REGIONS.
- // Just assign the primary regions recovering the missed recovered.edits -- no replicas.
- // May need to cycle here a few times if heavy writes.
- // TODO: Add an assign read-only.
- for (RegionInfo ri: ris) {
- LOG.info("Found recovered.edits under {}, reopen to pickup missed edits!", ri);
- addChildProcedure(env.getAssignmentManager().createAssignProcedure(ri));
- }
- setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CLOSE_REGIONS);
- }
+ checkClosedRegions(env);
+ setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CREATE_MERGED_REGION);
break;
case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
+ removeNonDefaultReplicas(env);
createMergedRegion(env);
setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_WRITE_MAX_SEQUENCE_ID_FILE);
break;
@@ -275,7 +280,7 @@ public class MergeTableRegionsProcedure
setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_OPEN_MERGED_REGION);
break;
case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION:
- addChildProcedure(createAssignProcedures(env, getRegionReplication(env)));
+ addChildProcedure(createAssignProcedures(env));
setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION);
break;
case MERGE_TABLE_REGIONS_POST_OPERATION:
@@ -471,25 +476,7 @@ public class MergeTableRegionsProcedure
}
/**
- * Return list of regions that have recovered.edits... usually its an empty list.
- * @param env the master env
- * @throws IOException IOException
- */
- private List<RegionInfo> hasRecoveredEdits(final MasterProcedureEnv env) throws IOException {
- List<RegionInfo> ris = new ArrayList<RegionInfo>(regionsToMerge.length);
- for (int i = 0; i < regionsToMerge.length; i++) {
- RegionInfo ri = regionsToMerge[i];
- if (SplitTableRegionProcedure.hasRecoveredEdits(env, ri)) {
- ris.add(ri);
- }
- }
- return ris;
- }
-
- /**
* Prepare merge and do some check
- * @param env MasterProcedureEnv
- * @throws IOException
*/
private boolean prepareMergeRegion(final MasterProcedureEnv env) throws IOException {
// Note: the following logic assumes that we only have 2 regions to merge. In the future,
@@ -559,9 +546,9 @@ public class MergeTableRegionsProcedure
}
private boolean isMergeable(final MasterProcedureEnv env, final RegionState rs)
- throws IOException {
+ throws IOException {
GetRegionInfoResponse response =
- Util.getRegionInfoResponse(env, rs.getServerName(), rs.getRegion());
+ AssignmentManagerUtil.getRegionInfoResponse(env, rs.getServerName(), rs.getRegion());
return response.hasMergeable() && response.getMergeable();
}
@@ -598,9 +585,8 @@ public class MergeTableRegionsProcedure
/**
* Set the region states to MERGING state
- * @param env MasterProcedureEnv
*/
- public void setRegionStateToMerging(final MasterProcedureEnv env) {
+ private void setRegionStateToMerging(final MasterProcedureEnv env) {
// Set State.MERGING to regions to be merged
RegionStates regionStates = env.getAssignmentManager().getRegionStates();
regionStates.getRegionStateNode(regionsToMerge[0]).setState(State.MERGING);
@@ -675,49 +661,22 @@ public class MergeTableRegionsProcedure
/**
* Rollback close regions
- * @param env MasterProcedureEnv
**/
- private void rollbackCloseRegionsForMerge(final MasterProcedureEnv env) throws IOException {
- // Check whether the region is closed; if so, open it in the same server
- final int regionReplication = getRegionReplication(env);
- final ServerName serverName = getServerName(env);
-
- final AssignProcedure[] procs =
- new AssignProcedure[regionsToMerge.length * regionReplication];
- int procsIdx = 0;
- for (int i = 0; i < regionsToMerge.length; ++i) {
- for (int j = 0; j < regionReplication; ++j) {
- final RegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(regionsToMerge[i], j);
- procs[procsIdx++] = env.getAssignmentManager().createAssignProcedure(hri, serverName);
- }
- }
- env.getMasterServices().getMasterProcedureExecutor().submitProcedures(procs);
+ private void rollbackCloseRegionsForMerge(MasterProcedureEnv env) throws IOException {
+ AssignmentManagerUtil.reopenRegionsForRollback(env, Stream.of(regionsToMerge),
+ getRegionReplication(env), getServerName(env));
}
- private UnassignProcedure[] createUnassignProcedures(final MasterProcedureEnv env,
- final int regionReplication) {
- final UnassignProcedure[] procs =
- new UnassignProcedure[regionsToMerge.length * regionReplication];
- int procsIdx = 0;
- for (int i = 0; i < regionsToMerge.length; ++i) {
- for (int j = 0; j < regionReplication; ++j) {
- final RegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(regionsToMerge[i], j);
- procs[procsIdx++] = env.getAssignmentManager().
- createUnassignProcedure(hri, null, true, !RegionReplicaUtil.isDefaultReplica(hri));
- }
- }
- return procs;
+ private TransitRegionStateProcedure[] createUnassignProcedures(MasterProcedureEnv env)
+ throws IOException {
+ return AssignmentManagerUtil.createUnassignProceduresForSplitOrMerge(env,
+ Stream.of(regionsToMerge), getRegionReplication(env));
}
- private AssignProcedure[] createAssignProcedures(final MasterProcedureEnv env,
- final int regionReplication) {
- final ServerName targetServer = getServerName(env);
- final AssignProcedure[] procs = new AssignProcedure[regionReplication];
- for (int i = 0; i < procs.length; ++i) {
- final RegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(mergedRegion, i);
- procs[i] = env.getAssignmentManager().createAssignProcedure(hri, targetServer);
- }
- return procs;
+ private TransitRegionStateProcedure[] createAssignProcedures(MasterProcedureEnv env)
+ throws IOException {
+ return AssignmentManagerUtil.createAssignProceduresForOpeningNewRegions(env,
+ Stream.of(mergedRegion), getRegionReplication(env), getServerName(env));
}
private int getRegionReplication(final MasterProcedureEnv env) throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
index 6135ce1..3aadb92 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,8 +19,6 @@
package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException;
-
-import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -30,81 +27,29 @@ import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProced
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MoveRegionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MoveRegionStateData;
/**
- * Procedure that implements a RegionPlan.
- * It first runs an unassign subprocedure followed
- * by an assign subprocedure. It takes a lock on the region being moved.
- * It holds the lock for the life of the procedure.
- *
- * <p>Throws exception on construction if determines context hostile to move (cluster going
- * down or master is shutting down or table is disabled).</p>
+ * Leave here only for checking if we can successfully start the master.
+ * @deprecated Do not use any more.
+ * @see TransitRegionStateProcedure
*/
+@Deprecated
@InterfaceAudience.Private
public class MoveRegionProcedure extends AbstractStateMachineRegionProcedure<MoveRegionState> {
- private static final Logger LOG = LoggerFactory.getLogger(MoveRegionProcedure.class);
private RegionPlan plan;
public MoveRegionProcedure() {
- // Required by the Procedure framework to create the procedure on replay
super();
}
- /**
- * @param check whether we should do some checks in the constructor. We will skip the checks if we
- * are reopening a region as this may fail the whole procedure and cause stuck. We will
- * do the check later when actually executing the procedure so not a big problem.
- * @throws IOException If the cluster is offline or master is stopping or if table is disabled or
- * non-existent.
- */
- public MoveRegionProcedure(MasterProcedureEnv env, RegionPlan plan, boolean check)
- throws HBaseIOException {
- super(env, plan.getRegionInfo());
- this.plan = plan;
- if (check) {
- preflightChecks(env, true);
- checkOnline(env, plan.getRegionInfo());
- }
- }
-
@Override
protected Flow executeFromState(final MasterProcedureEnv env, final MoveRegionState state)
throws InterruptedException {
- LOG.trace("{} execute state={}", this, state);
- switch (state) {
- case MOVE_REGION_PREPARE:
- // Check context again and that region is online; do it here after we have lock on region.
- try {
- preflightChecks(env, true);
- checkOnline(env, this.plan.getRegionInfo());
- if (!env.getMasterServices().getServerManager().isServerOnline(this.plan.getSource())) {
- throw new HBaseIOException(this.plan.getSource() + " not online");
- }
- } catch (HBaseIOException e) {
- LOG.warn(this.toString() + " FAILED because " + e.toString());
- return Flow.NO_MORE_STATE;
- }
- break;
- case MOVE_REGION_UNASSIGN:
- addChildProcedure(new UnassignProcedure(plan.getRegionInfo(), plan.getSource(),
- plan.getDestination(), true));
- setNextState(MoveRegionState.MOVE_REGION_ASSIGN);
- break;
- case MOVE_REGION_ASSIGN:
- AssignProcedure assignProcedure = plan.getDestination() == null ?
- new AssignProcedure(plan.getRegionInfo()):
- new AssignProcedure(plan.getRegionInfo(), plan.getDestination());
- addChildProcedure(assignProcedure);
- return Flow.NO_MORE_STATE;
- default:
- throw new UnsupportedOperationException("unhandled state=" + state);
- }
- return Flow.HAS_MORE_STATE;
+ return Flow.NO_MORE_STATE;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/OpenRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/OpenRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/OpenRegionProcedure.java
new file mode 100644
index 0000000..1a79697
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/OpenRegionProcedure.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.OpenRegionProcedureStateData;
+
+/**
+ * The remote procedure used to open a region.
+ */
+@InterfaceAudience.Private
+public class OpenRegionProcedure extends RegionRemoteProcedureBase {
+
+ public OpenRegionProcedure() {
+ super();
+ }
+
+ public OpenRegionProcedure(RegionInfo region, ServerName targetServer) {
+ super(region, targetServer);
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.REGION_ASSIGN;
+ }
+
+ @Override
+ public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
+ return new RegionOpenOperation(this, region, env.getAssignmentManager().getFavoredNodes(region),
+ false);
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.serializeStateData(serializer);
+ serializer.serialize(OpenRegionProcedureStateData.getDefaultInstance());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.deserializeStateData(serializer);
+ serializer.deserialize(OpenRegionProcedureStateData.class);
+ }
+}