You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/23 15:39:08 UTC
[33/50] [abbrv] hbase git commit: Doc and adding
undoMarkRegionAsOpening/Closing to undo OPENING state if failure
Doc and adding undoMarkRegionAsOpening/Closing to undo OPENING state if failure
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/599b5c64
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/599b5c64
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/599b5c64
Branch: refs/heads/HBASE-14614
Commit: 599b5c6446dc537b5fd6d498a7fd85a87ac5285c
Parents: e151281
Author: Michael Stack <st...@apache.org>
Authored: Sun May 7 13:56:09 2017 -0700
Committer: Michael Stack <st...@apache.org>
Committed: Tue May 23 08:36:53 2017 -0700
----------------------------------------------------------------------
.../hbase/procedure2/ProcedureExecutor.java | 4 +-
.../hbase/procedure2/ProcedureScheduler.java | 3 +-
.../master/assignment/AssignProcedure.java | 38 ++++++++++++------
.../master/assignment/AssignmentManager.java | 17 ++++++--
.../hbase/master/assignment/RegionStates.java | 4 ++
.../assignment/RegionTransitionProcedure.java | 42 +++++++++++++++-----
.../master/assignment/UnassignProcedure.java | 22 +++++++---
.../assignment/TestAssignmentManager.java | 3 +-
8 files changed, 99 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/599b5c64/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index e819ae8..ffb09c9 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -1640,7 +1640,7 @@ public class ProcedureExecutor<TEnvironment> {
int activeCount = activeExecutorCount.incrementAndGet();
int runningCount = store.setRunningProcedureCount(activeCount);
if (LOG.isDebugEnabled()) {
- LOG.debug("Run pid=" + procedure.getProcId() +
+ LOG.debug("Execute pid=" + procedure.getProcId() +
" runningCount=" + runningCount + ", activeCount=" + activeCount);
}
executionStartTime.set(EnvironmentEdgeManager.currentTime());
@@ -1653,7 +1653,7 @@ public class ProcedureExecutor<TEnvironment> {
activeCount = activeExecutorCount.decrementAndGet();
runningCount = store.setRunningProcedureCount(activeCount);
if (LOG.isDebugEnabled()) {
- LOG.debug("Done pid=" + procedure.getProcId() +
+ LOG.debug("Leave pid=" + procedure.getProcId() +
" runningCount=" + runningCount + ", activeCount=" + activeCount);
}
lastUpdate = EnvironmentEdgeManager.currentTime();
http://git-wip-us.apache.org/repos/asf/hbase/blob/599b5c64/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
index b5295e7..a2ae514 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
@@ -93,7 +93,7 @@ public interface ProcedureScheduler {
/**
* Mark the event as not ready.
- * procedures calling waitEvent() will be suspended.
+ * Procedures calling waitEvent() will be suspended.
* @param event the event to mark as suspended/not ready
*/
void suspendEvent(ProcedureEvent event);
@@ -125,6 +125,7 @@ public interface ProcedureScheduler {
* List lock queues.
* @return the locks
*/
+ // TODO: This seems to be the wrong place to hang this method.
List<LockInfo> listLocks();
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/599b5c64/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
index 8555925..158155e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
@@ -194,23 +194,21 @@ public class AssignProcedure extends RegionTransitionProcedure {
setTransitionState(RegionTransitionState.REGION_TRANSITION_QUEUE);
return true;
} else if (this.server == null) {
- // Update our server reference to align with regionNode so toString
- // aligns with what regionNode has.
+ // Update our server reference target to align with regionNode regionLocation
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Setting tgt=" + regionNode.getRegionLocation() +
+ " from regionStateNode.getRegionLocation " + this + "; " + regionNode.toShortString());
+ }
this.server = regionNode.getRegionLocation();
}
if (!isServerOnline(env, regionNode)) {
// TODO: is this correct? should we wait the chore/ssh?
- LOG.info("Server not online: " + this + "; " + regionNode.toShortString());
+ LOG.info("Server not online, re-queuing " + this + "; " + regionNode.toShortString());
setTransitionState(RegionTransitionState.REGION_TRANSITION_QUEUE);
return true;
}
- // Wait until server reported. If we have resumed the region may already be assigned.
- if (LOG.isTraceEnabled()) {
- LOG.trace("Wait report on " +
- this /*Full detail on this procedure -- includes server name*/);
- }
if (env.getAssignmentManager().waitServerReportEvent(regionNode.getRegionLocation(), this)) {
LOG.info("Early suspend! " + this + "; " + regionNode.toShortString());
throw new ProcedureSuspendedException();
@@ -221,15 +219,23 @@ public class AssignProcedure extends RegionTransitionProcedure {
return false;
}
- // Set OPENING in hbase:meta and add region to list of regions on server.
+ // Transition regionNode State. Set it to OPENING. Update hbase:meta, and add
+ // region to list of regions on the target regionserver. Need to UNDO if failure!
env.getAssignmentManager().markRegionAsOpening(regionNode);
// TODO: Requires a migration to be open by the RS?
// regionNode.getFormatVersion()
- addToRemoteDispatcher(env, regionNode.getRegionLocation());
- // We always return true, even if we fail dispatch because failiure sets
- // state back to beginning so we retry assign.
+ if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) {
+ // Failed the dispatch BUT addToRemoteDispatcher internally does
+ // cleanup on failure -- even the undoing of markRegionAsOpening above --
+ // so nothing more to do here; in fact we need to get out of here
+ // fast since we've been put back on the scheduler.
+ }
+
+ // We always return true, even if we fail dispatch because addToRemoteDispatcher
+ // failure processing sets state back to REGION_TRANSITION_QUEUE so we try again;
+ // i.e. return true to keep the Procedure running; it has been reset to startover.
return true;
}
@@ -271,6 +277,12 @@ public class AssignProcedure extends RegionTransitionProcedure {
}
}
+ /**
+ * Called when dispatch or subsequent OPEN request fail. Can be run by the
+ * inline dispatch call or later by the ServerCrashProcedure. Our state is
+ * generally OPENING. Cleanup and reset to OFFLINE and put our Procedure
+ * State back to REGION_TRANSITION_QUEUE so the Assign starts over.
+ */
private void handleFailure(final MasterProcedureEnv env, final RegionStateNode regionNode) {
if (incrementAndCheckMaxAttempts(env, regionNode)) {
aborted.set(true);
@@ -278,6 +290,8 @@ public class AssignProcedure extends RegionTransitionProcedure {
this.forceNewPlan = true;
this.server = null;
regionNode.offline();
+ // We were moved to OPENING state before dispatch. Undo. It is safe to call
+ // this method because it checks for OPENING first.
env.getAssignmentManager().undoRegionAsOpening(regionNode);
setTransitionState(RegionTransitionState.REGION_TRANSITION_QUEUE);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/599b5c64/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index 1e42ea6..e13a052 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -839,11 +839,11 @@ public class AssignmentManager implements ServerListener {
final RegionTransitionProcedure proc = regionNode.getProcedure();
if (proc == null) return false;
- //serverNode.getReportEvent().removeProcedure(proc);
+ // serverNode.getReportEvent().removeProcedure(proc);
proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(),
serverName, state, seqId);
- return true;
}
+ return true;
}
private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state,
@@ -1447,13 +1447,17 @@ public class AssignmentManager implements ServerListener {
}
public void undoRegionAsOpening(final RegionStateNode regionNode) {
- // TODO: Metrics. Do opposite of metrics.incrementOperationCounter();
+ boolean opening = false;
synchronized (regionNode) {
if (regionNode.isInState(State.OPENING)) {
- regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode);
+ opening = true;
+ regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
}
// Should we update hbase:meta?
}
+ if (opening) {
+ // TODO: Metrics. Do opposite of metrics.incrementOperationCounter();
+ }
}
public void markRegionAsOpened(final RegionStateNode regionNode) throws IOException {
@@ -1495,6 +1499,11 @@ public class AssignmentManager implements ServerListener {
metrics.incrementOperationCounter();
}
+ public void undoRegionAsClosing(final RegionStateNode regionNode) throws IOException {
+ // TODO: Metrics. Do opposite of metrics.incrementOperationCounter();
+ // There is nothing to undo?
+ }
+
public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException {
final HRegionInfo hri = regionNode.getRegionInfo();
synchronized (regionNode) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/599b5c64/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
index 1c852c9..3390168 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
@@ -84,6 +84,10 @@ public class RegionStates {
}
}
+ /**
+ * Current Region State.
+ * In-memory only. Not persisted.
+ */
// Mutable/Immutable? Changes have to be synchronized or not?
// Data members are volatile which seems to say multi-threaded access is fine.
// In the below we do check and set but the check state could change before
http://git-wip-us.apache.org/repos/asf/hbase/blob/599b5c64/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
index ebae62c..5f19bdc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
@@ -47,11 +47,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
* <p>This procedure is asynchronous and responds to external events.
* The AssignmentManager will notify this procedure when the RS completes
* the operation and reports the transitioned state
- * (see the Assign and Unassign class for more details).
+ * (see the Assign and Unassign class for more detail).
* <p>Procedures move from the REGION_TRANSITION_QUEUE state when they are
* first submitted, to the REGION_TRANSITION_DISPATCH state when the request
- * to remote server is done. They end in the REGION_TRANSITION_FINISH state.
- * the
+ * to remote server is sent and the Procedure is suspended waiting on external
+ * event to be woken again. Once the external event is triggered, Procedure
+ * moves to the REGION_TRANSITION_FINISH state.
*/
@InterfaceAudience.Private
public abstract class RegionTransitionProcedure
@@ -123,8 +124,16 @@ public abstract class RegionTransitionProcedure
protected abstract boolean startTransition(MasterProcedureEnv env, RegionStateNode regionNode)
throws IOException, ProcedureSuspendedException;
+
+ /**
+ * Called when the Procedure is in the REGION_TRANSITION_DISPATCH state.
+ * In here we do the RPC call to OPEN/CLOSE the region. The suspending of
+ * the thread so it sleeps until it gets update that the OPEN/CLOSE has
+ * succeeded is complicated. Read the implementations to learn more.
+ */
protected abstract boolean updateTransition(MasterProcedureEnv env, RegionStateNode regionNode)
throws IOException, ProcedureSuspendedException;
+
protected abstract void finishTransition(MasterProcedureEnv env, RegionStateNode regionNode)
throws IOException, ProcedureSuspendedException;
@@ -150,9 +159,21 @@ public abstract class RegionTransitionProcedure
exception.getMessage();
LOG.warn("Failed " + this + "; " + regionNode.toShortString() + "; exception=" + msg);
remoteCallFailed(env, regionNode, exception);
+ // NOTE: This call to wakeEvent puts this Procedure back on the scheduler.
+ // Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond
+ // this method. Just get out of this current processing quickly.
env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
}
+ /**
+ * Be careful! At the end of this method, the procedure has either succeeded
+ * and this procedure has been set into a suspended state OR, we failed and
+ * this procedure has been put back on the scheduler ready for another worker
+ * to pick it up. In both cases, we need to exit the current Worker processing
+ * toute de suite!
+ * @return True if we successfully dispatched the call and false if we failed;
+ * if failed, we need to roll back any setup done for the dispatch.
+ */
protected boolean addToRemoteDispatcher(final MasterProcedureEnv env,
final ServerName targetServer) {
assert targetServer.equals(getRegionState(env).getRegionLocation()) :
@@ -162,11 +183,13 @@ public abstract class RegionTransitionProcedure
LOG.info("Dispatch " + this + "; " + getRegionState(env).toShortString());
// Put this procedure into suspended mode to wait on report of state change
- // from remote regionserver.
+ // from remote regionserver. Means Procedure associated ProcedureEvent is marked not 'ready'.
env.getProcedureScheduler().suspendEvent(getRegionState(env).getProcedureEvent());
+ // Tricky because this can fail. If it fails need to backtrack on stuff like
+ // the 'suspend' done above -- tricky as the 'wake' requeues us -- and ditto
+ // up in the caller; it needs to undo state changes.
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
- // Undo the 'suspend' done above.
remoteCallFailed(env, targetServer,
new FailedRemoteDispatchException(this + " to " + targetServer));
return false;
@@ -194,10 +217,11 @@ public abstract class RegionTransitionProcedure
reportTransition(env, regionNode, code, seqId);
- // NOTE: This call actual adds this procedure back on the scheduler.
- // This makes it so that this procedure may be picked up by another
- // worker even though another worker may currently be running this
- // procedure. TODO.
+ // NOTE: This call adds this procedure back on the scheduler.
+ // This makes it so this procedure can run again. Another worker will take
+ // processing to the next stage. At an extreme, the other worker may run in
+ // parallel so DO NOT CHANGE any state hereafter! This should be last thing
+ // done in this processing step.
env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/599b5c64/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
index e5fac68..01570a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
@@ -152,11 +153,18 @@ public class UnassignProcedure extends RegionTransitionProcedure {
return false;
}
- // Mark the region as closing
+ // Mark the region as CLOSING.
env.getAssignmentManager().markRegionAsClosing(regionNode);
// Add the close region operation the the server dispatch queue.
- addToRemoteDispatcher(env, regionNode.getRegionLocation());
+ if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) {
+ // If addToRemoteDispatcher fails, it calls #remoteCallFailed which
+ // does all cleanup.
+ }
+
+ // We always return true, even if we fail dispatch because addToRemoteDispatcher
+ // failure processing sets state back to REGION_TRANSITION_QUEUE so we try again;
+ // i.e. return true to keep the Procedure running; it has been reset to startover.
return true;
}
@@ -193,9 +201,13 @@ public class UnassignProcedure extends RegionTransitionProcedure {
if (exception instanceof ServerCrashException) {
// This exception comes from ServerCrashProcedure after log splitting.
// It is ok to let this procedure go on to complete close now.
- // This will release lock on this region so the subsequent assign
- // can succeed.
- setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
+ // This will release lock on this region so the subsequent assign can succeed.
+ try {
+ reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM);
+ } catch (UnexpectedStateException e) {
+ // Should never happen.
+ throw new RuntimeException(e);
+ }
} else if (exception instanceof RegionServerAbortedException ||
exception instanceof RegionServerStoppedException ||
exception instanceof ServerNotRunningYetException) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/599b5c64/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
index 61e2a71..9afb63f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
@@ -84,6 +84,7 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -154,7 +155,7 @@ public class TestAssignmentManager {
if (this.am.waitServerReportEvent(null, null)) throw new UnexpectedStateException();
}
- @Test
+ @Ignore @Test // TODO
public void testGoodSplit() throws Exception {
TableName tableName = TableName.valueOf(this.name.getMethodName());
HRegionInfo hri = new HRegionInfo(tableName, Bytes.toBytes(0), Bytes.toBytes(2), false, 0);