You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2018/08/05 00:34:37 UTC
[1/2] hbase git commit: HBASE-20924 Backport "HBASE-20846 Restore
procedure locks when master restarts"
Repository: hbase
Updated Branches:
refs/heads/branch-2.0 174229101 -> f3c010da2
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java
index 46185ea..2fc0030 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java
@@ -22,11 +22,9 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
/**
@@ -42,8 +40,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-class RootProcedureState {
- private static final Logger LOG = LoggerFactory.getLogger(RootProcedureState.class);
+class RootProcedureState<TEnvironment> {
private enum State {
RUNNING, // The Procedure is running or ready to run
@@ -51,8 +48,8 @@ class RootProcedureState {
ROLLINGBACK, // The Procedure failed and the execution was rolledback
}
- private Set<Procedure> subprocs = null;
- private ArrayList<Procedure> subprocStack = null;
+ private Set<Procedure<TEnvironment>> subprocs = null;
+ private ArrayList<Procedure<TEnvironment>> subprocStack = null;
private State state = State.RUNNING;
private int running = 0;
@@ -91,22 +88,19 @@ class RootProcedureState {
}
protected synchronized long[] getSubprocedureIds() {
- if (subprocs == null) return null;
- int index = 0;
- final long[] subIds = new long[subprocs.size()];
- for (Procedure proc: subprocs) {
- subIds[index++] = proc.getProcId();
+ if (subprocs == null) {
+ return null;
}
- return subIds;
+ return subprocs.stream().mapToLong(Procedure::getProcId).toArray();
}
- protected synchronized List<Procedure> getSubproceduresStack() {
+ protected synchronized List<Procedure<TEnvironment>> getSubproceduresStack() {
return subprocStack;
}
protected synchronized RemoteProcedureException getException() {
if (subprocStack != null) {
- for (Procedure proc: subprocStack) {
+ for (Procedure<TEnvironment> proc: subprocStack) {
if (proc.hasException()) {
return proc.getException();
}
@@ -118,8 +112,10 @@ class RootProcedureState {
/**
* Called by the ProcedureExecutor to mark the procedure step as running.
*/
- protected synchronized boolean acquire(final Procedure proc) {
- if (state != State.RUNNING) return false;
+ protected synchronized boolean acquire(Procedure<TEnvironment> proc) {
+ if (state != State.RUNNING) {
+ return false;
+ }
running++;
return true;
@@ -128,7 +124,7 @@ class RootProcedureState {
/**
* Called by the ProcedureExecutor to mark the procedure step as finished.
*/
- protected synchronized void release(final Procedure proc) {
+ protected synchronized void release(Procedure<TEnvironment> proc) {
running--;
}
@@ -142,7 +138,7 @@ class RootProcedureState {
* Called by the ProcedureExecutor after the procedure step is completed,
* to add the step to the rollback list (or procedure stack)
*/
- protected synchronized void addRollbackStep(final Procedure proc) {
+ protected synchronized void addRollbackStep(Procedure<TEnvironment> proc) {
if (proc.isFailed()) {
state = State.FAILED;
}
@@ -153,8 +149,10 @@ class RootProcedureState {
subprocStack.add(proc);
}
- protected synchronized void addSubProcedure(final Procedure proc) {
- if (!proc.hasParent()) return;
+ protected synchronized void addSubProcedure(Procedure<TEnvironment> proc) {
+ if (!proc.hasParent()) {
+ return;
+ }
if (subprocs == null) {
subprocs = new HashSet<>();
}
@@ -168,7 +166,7 @@ class RootProcedureState {
* to the store only the Procedure we executed, and nothing else.
* on load we recreate the full stack by aggregating each procedure stack-positions.
*/
- protected synchronized void loadStack(final Procedure proc) {
+ protected synchronized void loadStack(Procedure<TEnvironment> proc) {
addSubProcedure(proc);
int[] stackIndexes = proc.getStackIndexes();
if (stackIndexes != null) {
@@ -196,7 +194,7 @@ class RootProcedureState {
*/
protected synchronized boolean isValid() {
if (subprocStack != null) {
- for (Procedure proc: subprocStack) {
+ for (Procedure<TEnvironment> proc : subprocStack) {
if (proc == null) {
return false;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index e5e3230..9e050a2 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -31,15 +31,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* @see InlineChore
*/
@InterfaceAudience.Private
-class TimeoutExecutorThread extends StoppableThread {
+class TimeoutExecutorThread<TEnvironment> extends StoppableThread {
private static final Logger LOG = LoggerFactory.getLogger(TimeoutExecutorThread.class);
- private final ProcedureExecutor<?> executor;
+ private final ProcedureExecutor<TEnvironment> executor;
private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();
- public TimeoutExecutorThread(ProcedureExecutor<?> executor, ThreadGroup group) {
+ public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group) {
super(group, "ProcExecTimeout");
setDaemon(true);
this.executor = executor;
@@ -65,7 +65,7 @@ class TimeoutExecutorThread extends StoppableThread {
if (task instanceof InlineChore) {
execInlineChore((InlineChore) task);
} else if (task instanceof DelayedProcedure) {
- execDelayedProcedure((DelayedProcedure) task);
+ execDelayedProcedure((DelayedProcedure<TEnvironment>) task);
} else {
LOG.error("CODE-BUG unknown timeout task type {}", task);
}
@@ -77,15 +77,15 @@ class TimeoutExecutorThread extends StoppableThread {
queue.add(chore);
}
- public void add(Procedure<?> procedure) {
+ public void add(Procedure<TEnvironment> procedure) {
assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;
LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
procedure.getTimeoutTimestamp());
- queue.add(new DelayedProcedure(procedure));
+ queue.add(new DelayedProcedure<>(procedure));
}
- public boolean remove(Procedure<?> procedure) {
- return queue.remove(new DelayedProcedure(procedure));
+ public boolean remove(Procedure<TEnvironment> procedure) {
+ return queue.remove(new DelayedProcedure<>(procedure));
}
private void execInlineChore(InlineChore chore) {
@@ -93,13 +93,13 @@ class TimeoutExecutorThread extends StoppableThread {
add(chore);
}
- private void execDelayedProcedure(DelayedProcedure delayed) {
+ private void execDelayedProcedure(DelayedProcedure<TEnvironment> delayed) {
// TODO: treat this as a normal procedure, add it to the scheduler and
// let one of the workers handle it.
// Today we consider ProcedureInMemoryChore as InlineChores
- Procedure<?> procedure = delayed.getObject();
+ Procedure<TEnvironment> procedure = delayed.getObject();
if (procedure instanceof ProcedureInMemoryChore) {
- executeInMemoryChore((ProcedureInMemoryChore) procedure);
+ executeInMemoryChore((ProcedureInMemoryChore<TEnvironment>) procedure);
// if the procedure is in a waiting state again, put it back in the queue
procedure.updateTimestamp();
if (procedure.isWaiting()) {
@@ -111,7 +111,7 @@ class TimeoutExecutorThread extends StoppableThread {
}
}
- private void executeInMemoryChore(ProcedureInMemoryChore chore) {
+ private void executeInMemoryChore(ProcedureInMemoryChore<TEnvironment> chore) {
if (!chore.isWaiting()) {
return;
}
@@ -126,12 +126,12 @@ class TimeoutExecutorThread extends StoppableThread {
}
}
- private void executeTimedoutProcedure(Procedure proc) {
+ private void executeTimedoutProcedure(Procedure<TEnvironment> proc) {
// The procedure received a timeout. if the procedure itself does not handle it,
// call abort() and add the procedure back in the queue for rollback.
if (proc.setTimeoutFailure(executor.getEnvironment())) {
long rootProcId = executor.getRootProcedureId(proc);
- RootProcedureState procStack = executor.getProcStack(rootProcId);
+ RootProcedureState<TEnvironment> procStack = executor.getProcStack(rootProcId);
procStack.abort();
executor.getStore().update(proc);
executor.getScheduler().addFront(proc);
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
index 319ddb2..2bbd53d 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
@@ -42,7 +43,12 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
-@Category({MasterTests.class, LargeTests.class})
+/**
+ * For now we do not guarantee this, we will restore the locks when restarting ProcedureExecutor so
+ * we should use lock to obtain the correct order. Ignored.
+ */
+@Ignore
+@Category({ MasterTests.class, LargeTests.class })
public class TestProcedureReplayOrder {
@ClassRule
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
index a9e919c..c1c9187 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
@@ -227,7 +227,6 @@ public class TestProcedureSuspended {
protected void releaseLock(final TestProcEnv env) {
LOG.info("RELEASE LOCK " + this + " " + hasLock);
lock.set(false);
- hasLock = false;
}
@Override
@@ -235,11 +234,6 @@ public class TestProcedureSuspended {
return true;
}
- @Override
- protected boolean hasLock(final TestProcEnv env) {
- return hasLock;
- }
-
public ArrayList<Long> getTimestamps() {
return timestamps;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-protocol-shaded/src/main/protobuf/Procedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Procedure.proto b/hbase-protocol-shaded/src/main/protobuf/Procedure.proto
index 2c5f1aa..b4a3107 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Procedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Procedure.proto
@@ -63,6 +63,9 @@ message Procedure {
// Nonce to prevent same procedure submit by multiple times
optional uint64 nonce_group = 13 [default = 0];
optional uint64 nonce = 14 [default = 0];
+
+ // whether the procedure has held the lock
+ optional bool locked = 16 [default = false];
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchemaServiceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchemaServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchemaServiceImpl.java
index 7ad5b56..5af7614 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchemaServiceImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchemaServiceImpl.java
@@ -81,8 +81,8 @@ class ClusterSchemaServiceImpl extends AbstractService implements ClusterSchemaS
return this.tableNamespaceManager;
}
- private long submitProcedure(final Procedure<?> procedure, final NonceKey nonceKey)
- throws ServiceNotRunningException {
+ private long submitProcedure(final Procedure<MasterProcedureEnv> procedure,
+ final NonceKey nonceKey) throws ServiceNotRunningException {
checkIsRunning();
ProcedureExecutor<MasterProcedureEnv> pe = this.masterServices.getMasterProcedureExecutor();
return pe.submitProcedure(procedure, nonceKey);
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 696760c..15d44f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -874,7 +874,7 @@ public class HMaster extends HRegionServer implements MasterServices {
InitMetaProcedure initMetaProc = null;
if (assignmentManager.getRegionStates().getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO)
.isOffline()) {
- Optional<Procedure<?>> optProc = procedureExecutor.getProcedures().stream()
+ Optional<Procedure<MasterProcedureEnv>> optProc = procedureExecutor.getProcedures().stream()
.filter(p -> p instanceof InitMetaProcedure).findAny();
if (optProc.isPresent()) {
initMetaProc = (InitMetaProcedure) optProc.get();
@@ -3162,7 +3162,8 @@ public class HMaster extends HRegionServer implements MasterServices {
cpHost.preGetProcedures();
}
- final List<Procedure<?>> procList = this.procedureExecutor.getProcedures();
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ List<Procedure<?>> procList = (List) this.procedureExecutor.getProcedures();
if (cpHost != null) {
cpHost.postGetProcedures(procList);
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java
index bbb27e1..0b6e45b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java
@@ -148,9 +148,4 @@ public class GCRegionProcedure extends AbstractStateMachineRegionProcedure<GCReg
serializer.deserialize(MasterProcedureProtos.GCRegionStateData.class);
setRegion(ProtobufUtil.toRegionInfo(msg.getRegionInfo()));
}
-
- @Override
- protected org.apache.hadoop.hbase.procedure2.Procedure.LockState acquireLock(MasterProcedureEnv env) {
- return super.acquireLock(env);
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
index d4b061f..20ae444 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
@@ -82,7 +82,6 @@ public class MergeTableRegionsProcedure
extends AbstractStateMachineTableProcedure<MergeTableRegionsState> {
private static final Logger LOG = LoggerFactory.getLogger(MergeTableRegionsProcedure.class);
private Boolean traceEnabled;
- private volatile boolean lock = false;
private ServerName regionLocation;
private RegionInfo[] regionsToMerge;
private RegionInfo mergedRegion;
@@ -433,24 +432,20 @@ public class MergeTableRegionsProcedure
@Override
protected LockState acquireLock(final MasterProcedureEnv env) {
- if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT;
if (env.getProcedureScheduler().waitRegions(this, getTableName(),
mergedRegion, regionsToMerge[0], regionsToMerge[1])) {
try {
LOG.debug(LockState.LOCK_EVENT_WAIT + " " + env.getProcedureScheduler().dumpLocks());
} catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ // Ignore, just for logging
}
return LockState.LOCK_EVENT_WAIT;
}
- this.lock = true;
return LockState.LOCK_ACQUIRED;
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
- this.lock = false;
env.getProcedureScheduler().wakeRegions(this, getTableName(),
mergedRegion, regionsToMerge[0], regionsToMerge[1]);
}
@@ -461,11 +456,6 @@ public class MergeTableRegionsProcedure
}
@Override
- protected boolean hasLock(MasterProcedureEnv env) {
- return this.lock;
- }
-
- @Override
public TableName getTableName() {
return mergedRegion.getTable();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
index 54721c8..de39f4c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
@@ -35,14 +35,16 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+
/**
* Base class for the Assign and Unassign Procedure.
*
@@ -115,8 +117,6 @@ public abstract class RegionTransitionProcedure
*/
private int attempt;
- private volatile boolean lock = false;
-
// Required by the Procedure framework to create the procedure on replay
public RegionTransitionProcedure() {}
@@ -426,15 +426,17 @@ public abstract class RegionTransitionProcedure
}
@Override
- protected LockState acquireLock(final MasterProcedureEnv env) {
+ protected boolean waitInitialized(MasterProcedureEnv env) {
// Unless we are assigning meta, wait for meta to be available and loaded.
- if (!isMeta()) {
- AssignmentManager am = env.getAssignmentManager();
- if (am.waitMetaLoaded(this) || am.waitMetaAssigned(this, regionInfo)) {
- return LockState.LOCK_EVENT_WAIT;
- }
+ if (isMeta()) {
+ return false;
}
+ AssignmentManager am = env.getAssignmentManager();
+ return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, regionInfo);
+ }
+ @Override
+ protected LockState acquireLock(final MasterProcedureEnv env) {
// TODO: Revisit this and move it to the executor
if (env.getProcedureScheduler().waitRegion(this, getRegionInfo())) {
try {
@@ -445,14 +447,12 @@ public abstract class RegionTransitionProcedure
}
return LockState.LOCK_EVENT_WAIT;
}
- this.lock = true;
return LockState.LOCK_ACQUIRED;
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureScheduler().wakeRegion(this, getRegionInfo());
- lock = false;
}
@Override
@@ -461,11 +461,6 @@ public abstract class RegionTransitionProcedure
}
@Override
- protected boolean hasLock(final MasterProcedureEnv env) {
- return lock;
- }
-
- @Override
protected boolean shouldWaitClientAck(MasterProcedureEnv env) {
// The operation is triggered internally on the server
// the client does not know about this procedure.
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java
index b4c55f4..3a87bbc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java
@@ -76,8 +76,6 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
private String description;
// True when recovery of master lock from WALs
private boolean recoveredMasterLock;
- // this is for internal working
- private boolean hasLock;
private final ProcedureEvent<LockProcedure> event = new ProcedureEvent<>(this);
// True if this proc acquired relevant locks. This value is for client checks.
@@ -306,7 +304,6 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
protected LockState acquireLock(final MasterProcedureEnv env) {
boolean ret = lock.acquireLock(env);
locked.set(ret);
- hasLock = ret;
if (ret) {
if (LOG.isDebugEnabled()) {
LOG.debug("LOCKED " + toString());
@@ -321,7 +318,6 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
@Override
protected void releaseLock(final MasterProcedureEnv env) {
lock.releaseLock(env);
- hasLock = false;
}
/**
@@ -423,11 +419,6 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
return true;
}
- @Override
- public boolean hasLock(final MasterProcedureEnv env) {
- return hasLock;
- }
-
///////////////////////
// LOCK IMPLEMENTATIONS
///////////////////////
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java
index 574706a..341d116 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java
@@ -66,8 +66,12 @@ public abstract class AbstractStateMachineNamespaceProcedure<TState>
}
@Override
+ protected boolean waitInitialized(MasterProcedureEnv env) {
+ return env.waitInitialized(this);
+ }
+
+ @Override
protected LockState acquireLock(final MasterProcedureEnv env) {
- if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT;
if (env.getProcedureScheduler().waitNamespaceExclusiveLock(this, getNamespaceName())) {
return LockState.LOCK_EVENT_WAIT;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java
index e711ca0..3b5e3b5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
public abstract class AbstractStateMachineRegionProcedure<TState>
extends AbstractStateMachineTableProcedure<TState> {
private RegionInfo hri;
- private volatile boolean lock = false;
public AbstractStateMachineRegionProcedure(final MasterProcedureEnv env,
final RegionInfo hri) {
@@ -100,25 +99,17 @@ public abstract class AbstractStateMachineRegionProcedure<TState>
@Override
protected LockState acquireLock(final MasterProcedureEnv env) {
- if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT;
if (env.getProcedureScheduler().waitRegions(this, getTableName(), getRegion())) {
return LockState.LOCK_EVENT_WAIT;
}
- this.lock = true;
return LockState.LOCK_ACQUIRED;
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
- this.lock = false;
env.getProcedureScheduler().wakeRegions(this, getTableName(), getRegion());
}
- @Override
- protected boolean hasLock(final MasterProcedureEnv env) {
- return this.lock;
- }
-
protected void setFailure(Throwable cause) {
super.setFailure(getClass().getSimpleName(), cause);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
index a924552..8c13ef4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
@@ -92,10 +92,12 @@ public abstract class AbstractStateMachineTableProcedure<TState>
}
@Override
+ protected boolean waitInitialized(MasterProcedureEnv env) {
+ return env.waitInitialized(this);
+ }
+
+ @Override
protected LockState acquireLock(final MasterProcedureEnv env) {
- if (env.waitInitialized(this)) {
- return LockState.LOCK_EVENT_WAIT;
- }
if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) {
return LockState.LOCK_EVENT_WAIT;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
index c63f420..2f56e83 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
@@ -131,7 +131,7 @@ public class CreateNamespaceProcedure
@Override
protected CreateNamespaceState getState(final int stateId) {
- return CreateNamespaceState.valueOf(stateId);
+ return CreateNamespaceState.forNumber(stateId);
}
@Override
@@ -171,15 +171,18 @@ public class CreateNamespaceProcedure
}
@Override
- protected LockState acquireLock(final MasterProcedureEnv env) {
- if (!env.getMasterServices().isInitialized()) {
- // Namespace manager might not be ready if master is not fully initialized,
- // return false to reject user namespace creation; return true for default
- // and system namespace creation (this is part of master initialization).
- if (!isBootstrapNamespace() && env.waitInitialized(this)) {
- return LockState.LOCK_EVENT_WAIT;
- }
+ protected boolean waitInitialized(MasterProcedureEnv env) {
+ // Namespace manager might not be ready if master is not fully initialized,
+ // return false to reject user namespace creation; return true for default
+ // and system namespace creation (this is part of master initialization).
+ if (isBootstrapNamespace()) {
+ return false;
}
+ return env.waitInitialized(this);
+ }
+
+ @Override
+ protected LockState acquireLock(final MasterProcedureEnv env) {
if (env.getProcedureScheduler().waitNamespaceExclusiveLock(this, getNamespaceName())) {
return LockState.LOCK_EVENT_WAIT;
}
@@ -263,20 +266,6 @@ public class CreateNamespaceProcedure
}
}
- /**
- * remove quota for the namespace if exists
- * @param env MasterProcedureEnv
- * @throws IOException
- **/
- private void rollbackSetNamespaceQuota(final MasterProcedureEnv env) throws IOException {
- try {
- DeleteNamespaceProcedure.removeNamespaceQuota(env, nsDescriptor.getName());
- } catch (Exception e) {
- // Ignore exception
- LOG.debug("Rollback of setNamespaceQuota throws exception: " + e);
- }
- }
-
private static TableNamespaceManager getTableNamespaceManager(final MasterProcedureEnv env) {
return env.getMasterServices().getClusterSchema().getTableNamespaceManager();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index acee1af..faad3dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -220,10 +220,16 @@ public class CreateTableProcedure
}
@Override
- protected LockState acquireLock(final MasterProcedureEnv env) {
- if (!getTableName().isSystemTable() && env.waitInitialized(this)) {
- return LockState.LOCK_EVENT_WAIT;
+ protected boolean waitInitialized(MasterProcedureEnv env) {
+ if (getTableName().isSystemTable()) {
+ // Creating system table is part of the initialization, so do not wait here.
+ return false;
}
+ return super.waitInitialized(env);
+ }
+
+ @Override
+ protected LockState acquireLock(final MasterProcedureEnv env) {
if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) {
return LockState.LOCK_EVENT_WAIT;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java
index 4736d65..d984632 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java
@@ -63,8 +63,13 @@ public class InitMetaProcedure extends AbstractStateMachineTableProcedure<InitMe
}
@Override
- protected LockState acquireLock(MasterProcedureEnv env) {
+ protected boolean waitInitialized(MasterProcedureEnv env) {
// we do not need to wait for master initialized, we are part of the initialization.
+ return false;
+ }
+
+ @Override
+ protected LockState acquireLock(MasterProcedureEnv env) {
if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) {
return LockState.LOCK_EVENT_WAIT;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index de3e984..a97e88d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -137,21 +137,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq,
final Queue<T> queue, final Procedure proc, final boolean addFront) {
- if (!queue.getLockStatus().hasExclusiveLock() ||
- queue.getLockStatus().isLockOwner(proc.getProcId())) {
- // if the queue was not remove for an xlock execution
- // or the proc is the lock owner, put the queue back into execution
+ if (!queue.getLockStatus().hasExclusiveLock()) {
+ // if the queue was not remove for an xlock execution,put the queue back into execution
queue.add(proc, addFront);
addToRunQueue(fairq, queue);
- } else if (queue.getLockStatus().hasParentLock(proc)) {
- // always add it to front as its parent has the xlock
- // usually the addFront is true if we arrive here as we will call addFront for adding sub
- // proc, but sometimes we may retry on the proc which means we will arrive here through yield,
- // so it is possible the addFront here is false.
+ } else if (queue.getLockStatus().hasLockAccess(proc)) {
+ // always add it to front as the have the lock access.
queue.add(proc, true);
- // our (proc) parent has the xlock,
- // so the queue is not in the fairq (run-queue)
- // add it back to let the child run (inherit the lock)
addToRunQueue(fairq, queue);
} else {
queue.add(proc, addFront);
@@ -370,9 +362,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
if (proc != null) {
priority = MasterProcedureUtil.getServerPriority(proc);
} else {
- LOG.warn("Usually this should not happen as proc can only be null when calling from " +
- "wait/wake lock, which means at least we should have one procedure in the queue which " +
- "wants to acquire the lock or just released the lock.");
priority = 1;
}
node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
@@ -788,9 +777,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
try {
final LockAndQueue lock = locking.getServerLock(serverName);
if (lock.tryExclusiveLock(procedure)) {
- // We do not need to create a new queue so just pass null, as in tests we may pass
- // procedures other than ServerProcedureInterface
- removeFromRunQueue(serverRunQueue, getServerQueue(serverName, null));
+ // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
+ // so.
+ removeFromRunQueue(serverRunQueue,
+ getServerQueue(serverName,
+ procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
+ : null));
return false;
}
waitProcedure(lock, procedure);
@@ -813,9 +805,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
final LockAndQueue lock = locking.getServerLock(serverName);
// Only SCP will acquire/release server lock so do not need to check the return value here.
lock.releaseExclusiveLock(procedure);
- // We do not need to create a new queue so just pass null, as in tests we may pass procedures
- // other than ServerProcedureInterface
- addToRunQueue(serverRunQueue, getServerQueue(serverName, null));
+ // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
+ // so.
+ addToRunQueue(serverRunQueue,
+ getServerQueue(serverName,
+ procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
+ : null));
int waitingCount = wakeWaitingProcedures(lock);
wakePollIfNeeded(waitingCount);
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
index 587cc82..58263d3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
@@ -99,7 +99,7 @@ public final class MasterProcedureUtil {
protected abstract void run() throws IOException;
protected abstract String getDescription();
- protected long submitProcedure(final Procedure<?> proc) {
+ protected long submitProcedure(final Procedure<MasterProcedureEnv> proc) {
assert procId == null : "submitProcedure() was already called, running procId=" + procId;
procId = getProcedureExecutor().submitProcedure(proc, nonceKey);
return procId;
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
index df0875e..328ac00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
@@ -106,7 +106,7 @@ public final class ProcedureSyncWait {
}
public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec,
- final Procedure<?> proc) {
+ final Procedure<MasterProcedureEnv> proc) {
if (proc.isInitializing()) {
procExec.submitProcedure(proc);
}
@@ -114,7 +114,7 @@ public final class ProcedureSyncWait {
}
public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec,
- final Procedure<?> proc) throws IOException {
+ final Procedure<MasterProcedureEnv> proc) throws IOException {
if (proc.isInitializing()) {
procExec.submitProcedure(proc);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java
index f7bea2a..43e66d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java
@@ -63,7 +63,18 @@ abstract class Queue<TKey extends Comparable<TKey>> extends AvlLinkedNode<Queue<
// This should go away when we have the new AM and its events
// and we move xlock to the lock-event-queue.
public boolean isAvailable() {
- return !lockStatus.hasExclusiveLock() && !isEmpty();
+ if (isEmpty()) {
+ return false;
+ }
+ if (getLockStatus().hasExclusiveLock()) {
+ // If we have an exclusive lock already taken, only child of the lock owner can be executed
+ // And now we will restore locks when master restarts, so it is possible that the procedure
+ // which is holding the lock is also in the queue, so we need to use hasLockAccess here
+ // instead of hasParentLock
+ Procedure<?> nextProc = peek();
+ return nextProc != null && getLockStatus().hasLockAccess(nextProc);
+ }
+ return true;
}
// ======================================================================
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp b/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp
index 4e546cd..f617237 100644
--- a/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp
+++ b/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp
@@ -46,7 +46,7 @@
long millisFromLastRoll = walStore.getMillisFromLastRoll();
ArrayList<ProcedureWALFile> procedureWALFiles = walStore.getActiveLogs();
Set<ProcedureWALFile> corruptedWALFiles = walStore.getCorruptedLogs();
- List<Procedure<?>> procedures = procExecutor.getProcedures();
+ List<Procedure<MasterProcedureEnv>> procedures = procExecutor.getProcedures();
Collections.sort(procedures, new Comparator<Procedure>() {
@Override
public int compare(Procedure lhs, Procedure rhs) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestGetProcedureResult.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestGetProcedureResult.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestGetProcedureResult.java
index 8b1584f..4186594 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestGetProcedureResult.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestGetProcedureResult.java
@@ -124,7 +124,7 @@ public class TestGetProcedureResult {
@Test
public void testRace() throws Exception {
- ProcedureExecutor<?> executor =
+ ProcedureExecutor<MasterProcedureEnv> executor =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
DummyProcedure p = new DummyProcedure();
long procId = executor.submitProcedure(p);
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
index 0c1cc1f..730e314 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
import org.apache.hadoop.hbase.procedure2.Procedure;
@@ -434,7 +435,7 @@ public class TestAssignmentManager {
am.wakeMetaLoadedEvent();
}
- private Future<byte[]> submitProcedure(final Procedure<?> proc) {
+ private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
index 9a0e2f6..a56e842 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
@@ -111,7 +111,7 @@ public class TestMasterProcedureEvents {
}
private void testProcedureEventWaitWake(final HMaster master, final ProcedureEvent<?> event,
- final Procedure<?> proc) throws Exception {
+ final Procedure<MasterProcedureEnv> proc) throws Exception {
final ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
final MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureScheduler();
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureAdmin.java
index c003379..02f0257 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureAdmin.java
@@ -207,7 +207,7 @@ public class TestProcedureAdmin {
// Wait for one step to complete
ProcedureTestingUtility.waitProcedure(procExec, procId);
- List<Procedure<?>> procedures = procExec.getProcedures();
+ List<Procedure<MasterProcedureEnv>> procedures = procExec.getProcedures();
assertTrue(procedures.size() >= 1);
boolean found = false;
for (Procedure<?> proc: procedures) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestFailedProcCleanup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestFailedProcCleanup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestFailedProcCleanup.java
index 3e21951..1402bbd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestFailedProcCleanup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestFailedProcCleanup.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -88,7 +89,7 @@ public class TestFailedProcCleanup {
LOG.debug("Ignoring exception: ", e);
Thread.sleep(evictionDelay * 3);
}
- List<Procedure<?>> procedureInfos =
+ List<Procedure<MasterProcedureEnv>> procedureInfos =
TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getProcedures();
for (Procedure procedureInfo : procedureInfos) {
if (procedureInfo.getProcName().equals("CreateTableProcedure")
@@ -109,7 +110,7 @@ public class TestFailedProcCleanup {
LOG.debug("Ignoring exception: ", e);
Thread.sleep(evictionDelay * 3);
}
- List<Procedure<?>> procedureInfos =
+ List<Procedure<MasterProcedureEnv>> procedureInfos =
TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getProcedures();
for (Procedure procedureInfo : procedureInfos) {
if (procedureInfo.getProcName().equals("CreateTableProcedure")
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index 8adf366..70a4e11 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -576,7 +576,7 @@ public class TestAccessController extends SecureTestUtil {
Procedure proc = new TestTableDDLProcedure(procExec.getEnvironment(), tableName);
proc.setOwner(USER_OWNER);
procExec.submitProcedure(proc);
- final List<Procedure<?>> procList = procExec.getProcedures();
+ final List<Procedure<MasterProcedureEnv>> procList = procExec.getProcedures();
AccessTestAction getProceduresAction = new AccessTestAction() {
@Override
[2/2] hbase git commit: HBASE-20924 Backport "HBASE-20846 Restore
procedure locks when master restarts"
Posted by st...@apache.org.
HBASE-20924 Backport "HBASE-20846 Restore procedure locks when master restarts"
Signed-off-by: Michael Stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f3c010da
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f3c010da
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f3c010da
Branch: refs/heads/branch-2.0
Commit: f3c010da21895897f7acd93e781f18b50ffe56e9
Parents: 1742291
Author: zhangduo <zh...@apache.org>
Authored: Sun Jul 22 15:10:06 2018 +0800
Committer: Michael Stack <st...@apache.org>
Committed: Sat Aug 4 06:57:55 2018 -0700
----------------------------------------------------------------------
.../procedure2/AbstractProcedureScheduler.java | 2 +-
.../hbase/procedure2/DelayedProcedure.java | 5 +-
.../hadoop/hbase/procedure2/Procedure.java | 384 +++++++++-------
.../hbase/procedure2/ProcedureExecutor.java | 439 ++++++++++---------
.../hadoop/hbase/procedure2/ProcedureUtil.java | 7 +
.../hbase/procedure2/RootProcedureState.java | 44 +-
.../hbase/procedure2/TimeoutExecutorThread.java | 28 +-
.../procedure2/TestProcedureReplayOrder.java | 8 +-
.../procedure2/TestProcedureSuspended.java | 6 -
.../src/main/protobuf/Procedure.proto | 3 +
.../hbase/master/ClusterSchemaServiceImpl.java | 4 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 5 +-
.../master/assignment/GCRegionProcedure.java | 5 -
.../assignment/MergeTableRegionsProcedure.java | 12 +-
.../assignment/RegionTransitionProcedure.java | 33 +-
.../hbase/master/locking/LockProcedure.java | 9 -
.../AbstractStateMachineNamespaceProcedure.java | 6 +-
.../AbstractStateMachineRegionProcedure.java | 9 -
.../AbstractStateMachineTableProcedure.java | 8 +-
.../procedure/CreateNamespaceProcedure.java | 35 +-
.../master/procedure/CreateTableProcedure.java | 12 +-
.../master/procedure/InitMetaProcedure.java | 7 +-
.../procedure/MasterProcedureScheduler.java | 37 +-
.../master/procedure/MasterProcedureUtil.java | 2 +-
.../master/procedure/ProcedureSyncWait.java | 4 +-
.../hadoop/hbase/master/procedure/Queue.java | 13 +-
.../hbase-webapps/master/procedures.jsp | 2 +-
.../hbase/client/TestGetProcedureResult.java | 2 +-
.../assignment/TestAssignmentManager.java | 3 +-
.../procedure/TestMasterProcedureEvents.java | 2 +-
.../master/procedure/TestProcedureAdmin.java | 2 +-
.../hbase/procedure/TestFailedProcCleanup.java | 5 +-
.../security/access/TestAccessController.java | 2 +-
33 files changed, 618 insertions(+), 527 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
index c036163..5645f89 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
@@ -163,8 +163,8 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
return null;
}
}
-
final Procedure pollResult = dequeue();
+
pollCalls++;
nullPollCalls += (pollResult == null) ? 1 : 0;
return pollResult;
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java
index a9f3e7d..3fc9750 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java
@@ -24,8 +24,9 @@ import org.apache.yetus.audience.InterfaceAudience;
* Vessel that carries a Procedure and a timeout.
*/
@InterfaceAudience.Private
-class DelayedProcedure extends DelayedUtil.DelayedContainerWithTimestamp<Procedure<?>> {
- public DelayedProcedure(Procedure<?> procedure) {
+class DelayedProcedure<TEnvironment>
+ extends DelayedUtil.DelayedContainerWithTimestamp<Procedure<TEnvironment>> {
+ public DelayedProcedure(Procedure<TEnvironment> procedure) {
super(procedure, procedure.getTimeoutTimestamp());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 545bedf..58757bb 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -22,76 +22,94 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.metrics.Counter;
import org.apache.hadoop.hbase.metrics.Histogram;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.NonceKey;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
/**
- * Base Procedure class responsible for Procedure Metadata;
- * e.g. state, submittedTime, lastUpdate, stack-indexes, etc.
- *
- * <p>Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then
- * the ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done.
- * Execute may be called multiple times in the case of failure or a restart, so code must be
- * idempotent. The return from an execute call is either: null to indicate we are done;
- * ourself if there is more to do; or, a set of sub-procedures that need to
- * be run to completion before the framework resumes our execution.
- *
- * <p>The ProcedureExecutor keeps its
- * notion of Procedure State in the Procedure itself; e.g. it stamps the Procedure as INITIALIZING,
- * RUNNABLE, SUCCESS, etc. Here are some of the States defined in the ProcedureState enum from
- * protos:
- *<ul>
- * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure
- * may or may not have rolled back yet. Any procedure in FAILED state will be eventually moved
- * to ROLLEDBACK state.</li>
- *
+ * Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate,
+ * stack-indexes, etc.
+ * <p/>
+ * Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the
+ * ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may
+ * be called multiple times in the case of failure or a restart, so code must be idempotent. The
+ * return from an execute call is either: null to indicate we are done; ourself if there is more to
+ * do; or, a set of sub-procedures that need to be run to completion before the framework resumes
+ * our execution.
+ * <p/>
+ * The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps
+ * the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the
+ * ProcedureState enum from protos:
+ * <ul>
+ * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may
+ * or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to
+ * ROLLEDBACK state.</li>
* <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li>
- *
* <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only
* condition when scheduler/ executor will drop procedure from further processing is when procedure
* state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li>
- *
* <li>{@link #isWaiting()} - Procedure is in one of the two waiting states
* ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li>
- *</ul>
- * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep
- * their own state. This can lead to confusion. Try to keep the two distinct.
- *
- * <p>rollback() is called when the procedure or one of the sub-procedures
- * has failed. The rollback step is supposed to cleanup the resources created
- * during the execute() step. In case of failure and restart, rollback() may be
- * called multiple times, so again the code must be idempotent.
- *
- * <p>Procedure can be made respect a locking regime. It has acquire/release methods as
- * well as an {@link #hasLock(Object)}. The lock implementation is up to the implementor.
- * If an entity needs to be locked for the life of a procedure -- not just the calls to
- * execute -- then implementations should say so with the {@link #holdLock(Object)}
- * method.
- *
- * <p>Procedures can be suspended or put in wait state with a callback that gets executed on
+ * </ul>
+ * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their
+ * own state. This can lead to confusion. Try to keep the two distinct.
+ * <p/>
+ * rollback() is called when the procedure or one of the sub-procedures has failed. The rollback
+ * step is supposed to cleanup the resources created during the execute() step. In case of failure
+ * and restart, rollback() may be called multiple times, so again the code must be idempotent.
+ * <p/>
+ * Procedure can be made respect a locking regime. It has acquire/release methods as well as an
+ * {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be
+ * locked for the life of a procedure -- not just the calls to execute -- then implementations
+ * should say so with the {@link #holdLock(Object)} method.
+ * <p/>
+ * And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the
+ * implementation is a bit tricky so we add some comments hrre about it.
+ * <ul>
+ * <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record
+ * whether we have the lock. We will set it to {@code true} in
+ * {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in
+ * {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any
+ * more.</li>
+ * <li>Also added a locked field in the proto message. When storing, the field will be set according
+ * to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure
+ * called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto
+ * message is {@code true}.</li>
+ * <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling
+ * {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures
+ * need to wait until master is initialized. So the solution here is that, we introduced a new
+ * method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized
+ * related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method
+ * to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the
+ * {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later
+ * when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the
+ * {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is
+ * {@code true}, when we just set the {@link #locked} field to true and return, without actually
+ * calling the {@link #acquireLock(Object)} method since we have already called it once.</li>
+ * </ul>
+ * <p/>
+ * Procedures can be suspended or put in wait state with a callback that gets executed on
* Procedure-specified timeout. See {@link #setTimeout(int)}}, and
- * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the
- * TestTimeoutEventProcedure class for an example usage.</p>
- *
- * <p>There are hooks for collecting metrics on submit of the procedure and on finish.
- * See {@link #updateMetricsOnSubmit(Object)} and
- * {@link #updateMetricsOnFinish(Object, long, boolean)}.
+ * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure
+ * class for an example usage.
+ * </p>
+ * <p/>
+ * There are hooks for collecting metrics on submit of the procedure and on finish. See
+ * {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}.
*/
@InterfaceAudience.Private
-@InterfaceStability.Evolving
public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> {
private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
public static final long NO_PROC_ID = -1;
@@ -122,6 +140,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
private volatile byte[] result = null;
+ private volatile boolean locked = false;
+
+ private boolean lockedWhenLoading = false;
+
/**
* The main code of the procedure. It must be idempotent since execute()
* may be called multiple times in case of machine failure in the middle
@@ -170,7 +192,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* be able to resume on failure.
* @param serializer stores the serializable state
*/
- protected abstract void serializeStateData(final ProcedureStateSerializer serializer)
+ protected abstract void serializeStateData(ProcedureStateSerializer serializer)
throws IOException;
/**
@@ -178,52 +200,65 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* state.
* @param serializer contains the serialized state
*/
- protected abstract void deserializeStateData(final ProcedureStateSerializer serializer)
+ protected abstract void deserializeStateData(ProcedureStateSerializer serializer)
throws IOException;
/**
- * The user should override this method if they need a lock on an Entity.
- * A lock can be anything, and it is up to the implementor. The Procedure
- * Framework will call this method just before it invokes {@link #execute(Object)}.
- * It calls {@link #releaseLock(Object)} after the call to execute.
- *
- * <p>If you need to hold the lock for the life of the Procedure -- i.e. you do not
- * want any other Procedure interfering while this Procedure is running, see
- * {@link #holdLock(Object)}.
- *
- * <p>Example: in our Master we can execute request in parallel for different tables.
- * We can create t1 and create t2 and these creates can be executed at the same time.
- * Anything else on t1/t2 is queued waiting that specific table create to happen.
- *
- * <p>There are 3 LockState:
- * <ul><li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is
- * ready to execute.</li>
- * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework
- * should take care of readding the procedure back to the runnable set for retry</li>
- * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will
- * take care of readding the procedure back to the runnable set when the lock is available.
- * </li></ul>
+ * The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will
+ * call us to determine whether we need to wait for initialization, second, it will call
+ * {@link #acquireLock(Object)} to actually handle the lock for this procedure.
+ * <p/>
+ * This is because that when master restarts, we need to restore the lock state for all the
+ * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the
+ * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part
+ * of the initialization!), so we need to split the code into two steps, and when restore, we just
+ * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock.
+ * @return true means we need to wait until the environment has been initialized, otherwise true.
+ */
+ protected boolean waitInitialized(TEnvironment env) {
+ return false;
+ }
+
+ /**
+ * The user should override this method if they need a lock on an Entity. A lock can be anything,
+ * and it is up to the implementor. The Procedure Framework will call this method just before it
+ * invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to
+ * execute.
+ * <p/>
+ * If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other
+ * Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}.
+ * <p/>
+ * Example: in our Master we can execute request in parallel for different tables. We can create
+ * t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is
+ * queued waiting that specific table create to happen.
+ * <p/>
+ * There are 3 LockState:
+ * <ul>
+ * <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to
+ * execute.</li>
+ * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should
+ * take care of readding the procedure back to the runnable set for retry</li>
+ * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take
+ * care of readding the procedure back to the runnable set when the lock is available.</li>
+ * </ul>
* @return the lock state as described above.
*/
- protected LockState acquireLock(final TEnvironment env) {
+ protected LockState acquireLock(TEnvironment env) {
return LockState.LOCK_ACQUIRED;
}
/**
* The user should override this method, and release lock if necessary.
*/
- protected void releaseLock(final TEnvironment env) {
+ protected void releaseLock(TEnvironment env) {
// no-op
}
/**
* Used to keep the procedure lock even when the procedure is yielding or suspended.
- * Must implement {@link #hasLock(Object)} if you want to hold the lock for life
- * of the Procedure.
- * @see #hasLock(Object)
* @return true if the procedure should hold on the lock until completionCleanup()
*/
- protected boolean holdLock(final TEnvironment env) {
+ protected boolean holdLock(TEnvironment env) {
return false;
}
@@ -235,8 +270,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* @see #holdLock(Object)
* @return true if the procedure has the lock, false otherwise.
*/
- protected boolean hasLock(final TEnvironment env) {
- return false;
+ protected final boolean hasLock() {
+ return locked;
}
/**
@@ -245,7 +280,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* operation before replay.
* e.g. failing the procedure if the state on replay may be unknown.
*/
- protected void beforeReplay(final TEnvironment env) {
+ protected void beforeReplay(TEnvironment env) {
// no-op
}
@@ -253,7 +288,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Called when the procedure is ready to be added to the queue after
* the loading/replay operation.
*/
- protected void afterReplay(final TEnvironment env) {
+ protected void afterReplay(TEnvironment env) {
// no-op
}
@@ -263,7 +298,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* This operation will not be retried on failure. If a procedure took a lock,
* it will have been released when this method runs.
*/
- protected void completionCleanup(final TEnvironment env) {
+ protected void completionCleanup(TEnvironment env) {
// no-op
}
@@ -275,7 +310,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* @return Return true if the executor should yield on completion of an execution step.
* Defaults to return false.
*/
- protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
+ protected boolean isYieldAfterExecutionStep(TEnvironment env) {
return false;
}
@@ -288,7 +323,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* @return true if the executor should wait the client ack for the result.
* Defaults to return true.
*/
- protected boolean shouldWaitClientAck(final TEnvironment env) {
+ protected boolean shouldWaitClientAck(TEnvironment env) {
return true;
}
@@ -298,7 +333,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* @param env The environment passed to the procedure executor
* @return Container object for procedure related metric
*/
- protected ProcedureMetrics getProcedureMetrics(final TEnvironment env) {
+ protected ProcedureMetrics getProcedureMetrics(TEnvironment env) {
return null;
}
@@ -308,7 +343,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null
* {@link ProcedureMetrics}.
*/
- protected void updateMetricsOnSubmit(final TEnvironment env) {
+ protected void updateMetricsOnSubmit(TEnvironment env) {
ProcedureMetrics metrics = getProcedureMetrics(env);
if (metrics == null) {
return;
@@ -322,21 +357,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* This function will be called just after procedure execution is finished. Override this method
- * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)}
- * returns non-null {@link ProcedureMetrics}, the default implementation adds runtime of a
- * procedure to a time histogram for successfully completed procedures. Increments failed
- * counter for failed procedures.
- *
- * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack,
- * including successfully finished siblings, this function may get called twice in certain
- * cases for certain procedures. Explore further if this can be called once.
- *
+ * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns
+ * non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a
+ * time histogram for successfully completed procedures. Increments failed counter for failed
+ * procedures.
+ * <p/>
+ * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including
+ * successfully finished siblings, this function may get called twice in certain cases for certain
+ * procedures. Explore further if this can be called once.
* @param env The environment passed to the procedure executor
* @param runtime Runtime of the procedure in milliseconds
* @param success true if procedure is completed successfully
*/
- protected void updateMetricsOnFinish(final TEnvironment env, final long runtime,
- boolean success) {
+ protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) {
ProcedureMetrics metrics = getProcedureMetrics(env);
if (metrics == null) {
return;
@@ -362,8 +395,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
}
/**
- * Build the StringBuilder for the simple form of
- * procedure string.
+ * Build the StringBuilder for the simple form of procedure string.
* @return the StringBuilder
*/
protected StringBuilder toStringSimpleSB() {
@@ -389,6 +421,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
sb.append(", state="); // pState for Procedure State as opposed to any other kind.
toStringState(sb);
+ sb.append(", hasLock=").append(locked);
+
if (hasException()) {
sb.append(", exception=" + getException());
}
@@ -400,8 +434,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
}
/**
- * Extend the toString() information with more procedure
- * details
+ * Extend the toString() information with more procedure details
*/
public String toStringDetails() {
final StringBuilder sb = toStringSimpleSB();
@@ -429,8 +462,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
}
/**
- * Called from {@link #toString()} when interpolating {@link Procedure} State.
- * Allows decorating generic Procedure State with Procedure particulars.
+ * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating
+ * generic Procedure State with Procedure particulars.
* @param builder Append current {@link ProcedureState}
*/
protected void toStringState(StringBuilder builder) {
@@ -493,8 +526,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Called by the ProcedureExecutor to assign the ID to the newly created procedure.
*/
@VisibleForTesting
- @InterfaceAudience.Private
- protected void setProcId(final long procId) {
+ protected void setProcId(long procId) {
this.procId = procId;
this.submittedTime = EnvironmentEdgeManager.currentTime();
setState(ProcedureState.RUNNABLE);
@@ -503,13 +535,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Called by the ProcedureExecutor to assign the parent to the newly created procedure.
*/
- @InterfaceAudience.Private
- protected void setParentProcId(final long parentProcId) {
+ protected void setParentProcId(long parentProcId) {
this.parentProcId = parentProcId;
}
- @InterfaceAudience.Private
- protected void setRootProcId(final long rootProcId) {
+ protected void setRootProcId(long rootProcId) {
this.rootProcId = rootProcId;
}
@@ -517,18 +547,16 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Called by the ProcedureExecutor to set the value to the newly created procedure.
*/
@VisibleForTesting
- @InterfaceAudience.Private
- protected void setNonceKey(final NonceKey nonceKey) {
+ protected void setNonceKey(NonceKey nonceKey) {
this.nonceKey = nonceKey;
}
@VisibleForTesting
- @InterfaceAudience.Private
- public void setOwner(final String owner) {
+ public void setOwner(String owner) {
this.owner = StringUtils.isEmpty(owner) ? null : owner;
}
- public void setOwner(final User owner) {
+ public void setOwner(User owner) {
assert owner != null : "expected owner to be not null";
setOwner(owner.getShortName());
}
@@ -537,8 +565,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Called on store load to initialize the Procedure internals after
* the creation/deserialization.
*/
- @InterfaceAudience.Private
- protected void setSubmittedTime(final long submittedTime) {
+ protected void setSubmittedTime(long submittedTime) {
this.submittedTime = submittedTime;
}
@@ -548,7 +575,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* @param timeout timeout interval in msec
*/
- protected void setTimeout(final int timeout) {
+ protected void setTimeout(int timeout) {
this.timeout = timeout;
}
@@ -567,15 +594,13 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Called on store load to initialize the Procedure internals after
* the creation/deserialization.
*/
- @InterfaceAudience.Private
- protected void setLastUpdate(final long lastUpdate) {
+ protected void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
/**
* Called by ProcedureExecutor after each time a procedure step is executed.
*/
- @InterfaceAudience.Private
protected void updateTimestamp() {
this.lastUpdate = EnvironmentEdgeManager.currentTime();
}
@@ -590,7 +615,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* the procedure is in the waiting queue.
* @return the timestamp of the next timeout.
*/
- @InterfaceAudience.Private
protected long getTimeoutTimestamp() {
return getLastUpdate() + getTimeout();
}
@@ -616,10 +640,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* The procedure may leave a "result" on completion.
* @param result the serialized result that will be passed to the client
*/
- protected void setResult(final byte[] result) {
+ protected void setResult(byte[] result) {
this.result = result;
}
+ /**
+ * Will only be called when loading procedures from procedure store, where we need to record
+ * whether the procedure has already held a lock. Later we will call
+ * {@link #doAcquireLock(Object)} to actually acquire the lock.
+ */
+ final void lockedWhenLoading() {
+ this.lockedWhenLoading = true;
+ }
+
// ==============================================================================================
// Runtime state, updated every operation by the ProcedureExecutor
//
@@ -677,13 +710,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
}
@VisibleForTesting
- @InterfaceAudience.Private
protected synchronized void setState(final ProcedureState state) {
this.state = state;
updateTimestamp();
}
- @InterfaceAudience.Private
public synchronized ProcedureState getState() {
return state;
}
@@ -705,10 +736,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
- * @return true to let the framework handle the timeout as abort,
- * false in case the procedure handled the timeout itself.
+ * @return true to let the framework handle the timeout as abort, false in case the procedure
+ * handled the timeout itself.
*/
- protected synchronized boolean setTimeoutFailure(final TEnvironment env) {
+ protected synchronized boolean setTimeoutFailure(TEnvironment env) {
if (state == ProcedureState.WAITING_TIMEOUT) {
long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
setFailure("ProcedureExecutor", new TimeoutIOException(
@@ -729,8 +760,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Called by the ProcedureExecutor on procedure-load to restore the latch state
*/
- @InterfaceAudience.Private
- protected synchronized void setChildrenLatch(final int numChildren) {
+ protected synchronized void setChildrenLatch(int numChildren) {
this.childrenLatch = numChildren;
if (LOG.isTraceEnabled()) {
LOG.trace("CHILD LATCH INCREMENT SET " +
@@ -741,7 +771,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Called by the ProcedureExecutor on procedure-load to restore the latch state
*/
- @InterfaceAudience.Private
protected synchronized void incChildrenLatch() {
// TODO: can this be inferred from the stack? I think so...
this.childrenLatch++;
@@ -753,7 +782,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Called by the ProcedureExecutor to notify that one of the sub-procedures has completed.
*/
- @InterfaceAudience.Private
private synchronized boolean childrenCountDown() {
assert childrenLatch > 0: this;
boolean b = --childrenLatch == 0;
@@ -770,17 +798,18 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
*/
synchronized boolean tryRunnable() {
// Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT
- boolean b = getState() == ProcedureState.WAITING && childrenCountDown();
- if (b) setState(ProcedureState.RUNNABLE);
- return b;
+ if (getState() == ProcedureState.WAITING && childrenCountDown()) {
+ setState(ProcedureState.RUNNABLE);
+ return true;
+ } else {
+ return false;
+ }
}
- @InterfaceAudience.Private
protected synchronized boolean hasChildren() {
return childrenLatch > 0;
}
- @InterfaceAudience.Private
protected synchronized int getChildrenLatch() {
return childrenLatch;
}
@@ -789,7 +818,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Called by the RootProcedureState on procedure execution.
* Each procedure store its stack-index positions.
*/
- @InterfaceAudience.Private
protected synchronized void addStackIndex(final int index) {
if (stackIndexes == null) {
stackIndexes = new int[] { index };
@@ -800,7 +828,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
}
}
- @InterfaceAudience.Private
protected synchronized boolean removeStackIndex() {
if (stackIndexes != null && stackIndexes.length > 1) {
stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
@@ -815,7 +842,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Called on store load to initialize the Procedure internals after
* the creation/deserialization.
*/
- @InterfaceAudience.Private
protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
this.stackIndexes = new int[stackIndexes.size()];
for (int i = 0; i < this.stackIndexes.length; ++i) {
@@ -823,12 +849,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
}
}
- @InterfaceAudience.Private
protected synchronized boolean wasExecuted() {
return stackIndexes != null;
}
- @InterfaceAudience.Private
protected synchronized int[] getStackIndexes() {
return stackIndexes;
}
@@ -840,10 +864,9 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Internal method called by the ProcedureExecutor that starts the user-level code execute().
* @throws ProcedureSuspendedException This is used when procedure wants to halt processing and
- * skip out without changing states or releasing any locks held.
+ * skip out without changing states or releasing any locks held.
*/
- @InterfaceAudience.Private
- protected Procedure<TEnvironment>[] doExecute(final TEnvironment env)
+ protected Procedure<TEnvironment>[] doExecute(TEnvironment env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
try {
updateTimestamp();
@@ -856,8 +879,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Internal method called by the ProcedureExecutor that starts the user-level code rollback().
*/
- @InterfaceAudience.Private
- protected void doRollback(final TEnvironment env)
+ protected void doRollback(TEnvironment env)
throws IOException, InterruptedException {
try {
updateTimestamp();
@@ -867,19 +889,60 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
}
}
+ final void restoreLock(TEnvironment env) {
+ if (!lockedWhenLoading) {
+ LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this);
+ return;
+ }
+
+ LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this);
+ LockState state = acquireLock(env);
+ assert state == LockState.LOCK_ACQUIRED;
+ }
+
/**
* Internal method called by the ProcedureExecutor that starts the user-level code acquireLock().
*/
- @InterfaceAudience.Private
- protected LockState doAcquireLock(final TEnvironment env) {
- return acquireLock(env);
+ final LockState doAcquireLock(TEnvironment env, ProcedureStore store) {
+ if (waitInitialized(env)) {
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ if (lockedWhenLoading) {
+ // reset it so we will not consider it anymore
+ lockedWhenLoading = false;
+ locked = true;
+ // Here we return without persist the locked state, as lockedWhenLoading is true means
+ // that the locked field of the procedure stored in procedure store is true, so we do not need
+ // to store it again.
+ return LockState.LOCK_ACQUIRED;
+ }
+ LockState state = acquireLock(env);
+ if (state == LockState.LOCK_ACQUIRED) {
+ locked = true;
+ // persist that we have held the lock. This must be done before we actually execute the
+ // procedure, otherwise when restarting, we may consider the procedure does not have a lock,
+ // but it may have already done some changes as we have already executed it, and if another
+ // procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do
+ // not expect that another procedure can be executed in the middle.
+ store.update(this);
+ }
+ return state;
}
/**
* Internal method called by the ProcedureExecutor that starts the user-level code releaseLock().
*/
- @InterfaceAudience.Private
- protected void doReleaseLock(final TEnvironment env) {
+ final void doReleaseLock(TEnvironment env, ProcedureStore store) {
+ locked = false;
+ // persist that we have released the lock. This must be done before we actually release the
+ // lock. Another procedure may take this lock immediately after we release the lock, and if we
+ // crash before persist the information that we have already released the lock, then when
+ // restarting there will be two procedures which both have the lock and cause problems.
+ if (getState() != ProcedureState.ROLLEDBACK) {
+ // If the state is ROLLEDBACK, it means that we have already deleted the procedure from
+ // procedure store, so do not need to log the release operation any more.
+ store.update(this);
+ }
releaseLock(env);
}
@@ -896,7 +959,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* Get an hashcode for the specified Procedure ID
* @return the hashcode for the specified procId
*/
- public static long getProcIdHashCode(final long procId) {
+ public static long getProcIdHashCode(long procId) {
long h = procId;
h ^= h >> 16;
h *= 0x85ebca6b;
@@ -906,15 +969,16 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
return h;
}
- /*
+ /**
* Helper to lookup the root Procedure ID given a specified procedure.
*/
- @InterfaceAudience.Private
- protected static Long getRootProcedureId(final Map<Long, Procedure> procedures,
- Procedure<?> proc) {
+ protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures,
+ Procedure<T> proc) {
while (proc.hasParent()) {
proc = procedures.get(proc.getParentProcId());
- if (proc == null) return null;
+ if (proc == null) {
+ return null;
+ }
}
return proc.getProcId();
}
@@ -924,7 +988,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* @param b the second procedure to be compared.
* @return true if the two procedures have the same parent
*/
- public static boolean haveSameParent(final Procedure<?> a, final Procedure<?> b) {
+ public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) {
return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 263b6be..5925e55 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -19,8 +19,10 @@
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -49,7 +51,6 @@ import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +73,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* and get the result via getResult(procId)
*/
@InterfaceAudience.Private
-@InterfaceStability.Evolving
public class ProcedureExecutor<TEnvironment> {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class);
@@ -109,16 +109,16 @@ public class ProcedureExecutor<TEnvironment> {
void procedureFinished(long procId);
}
- private static class CompletedProcedureRetainer {
- private final Procedure<?> procedure;
+ private static final class CompletedProcedureRetainer<TEnvironment> {
+ private final Procedure<TEnvironment> procedure;
private long clientAckTime;
- public CompletedProcedureRetainer(Procedure<?> procedure) {
+ public CompletedProcedureRetainer(Procedure<TEnvironment> procedure) {
this.procedure = procedure;
clientAckTime = -1;
}
- public Procedure<?> getProcedure() {
+ public Procedure<TEnvironment> getProcedure() {
return procedure;
}
@@ -173,13 +173,13 @@ public class ProcedureExecutor<TEnvironment> {
private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
private static final int DEFAULT_BATCH_SIZE = 32;
- private final Map<Long, CompletedProcedureRetainer> completed;
+ private final Map<Long, CompletedProcedureRetainer<TEnvironment>> completed;
private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
private final ProcedureStore store;
private Configuration conf;
- public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
- final Map<Long, CompletedProcedureRetainer> completedMap,
+ public CompletedProcedureCleaner(Configuration conf, final ProcedureStore store,
+ final Map<Long, CompletedProcedureRetainer<TEnvironment>> completedMap,
final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
// set the timeout interval that triggers the periodic-procedure
super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
@@ -206,10 +206,11 @@ public class ProcedureExecutor<TEnvironment> {
int batchCount = 0;
final long now = EnvironmentEdgeManager.currentTime();
- final Iterator<Map.Entry<Long, CompletedProcedureRetainer>> it = completed.entrySet().iterator();
+ final Iterator<Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>>> it =
+ completed.entrySet().iterator();
while (it.hasNext() && store.isRunning()) {
- final Map.Entry<Long, CompletedProcedureRetainer> entry = it.next();
- final CompletedProcedureRetainer retainer = entry.getValue();
+ final Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>> entry = it.next();
+ final CompletedProcedureRetainer<TEnvironment> retainer = entry.getValue();
final Procedure<?> proc = retainer.getProcedure();
// TODO: Select TTL based on Procedure type
@@ -241,28 +242,32 @@ public class ProcedureExecutor<TEnvironment> {
* Once a Root-Procedure completes (success or failure), the result will be added to this map.
* The user of ProcedureExecutor should call getResult(procId) to get the result.
*/
- private final ConcurrentHashMap<Long, CompletedProcedureRetainer> completed = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed =
+ new ConcurrentHashMap<>();
/**
* Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
* The RootProcedureState contains the execution stack of the Root-Procedure,
* It is added to the map by submitProcedure() and removed on procedure completion.
*/
- private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack =
+ new ConcurrentHashMap<>();
/**
* Helper map to lookup the live procedures by ID.
* This map contains every procedure. root-procedures and subprocedures.
*/
- private final ConcurrentHashMap<Long, Procedure> procedures = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures =
+ new ConcurrentHashMap<>();
/**
- * Helper map to lookup whether the procedure already issued from the same client.
- * This map contains every root procedure.
+ * Helper map to lookup whether the procedure already issued from the same client. This map
+ * contains every root procedure.
*/
private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>();
- private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = new CopyOnWriteArrayList<>();
+ private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
+ new CopyOnWriteArrayList<>();
private Configuration conf;
@@ -288,7 +293,7 @@ public class ProcedureExecutor<TEnvironment> {
* Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
* (Should be ok).
*/
- private TimeoutExecutorThread timeoutExecutor;
+ private TimeoutExecutorThread<TEnvironment> timeoutExecutor;
private int corePoolSize;
private int maxPoolSize;
@@ -366,27 +371,68 @@ public class ProcedureExecutor<TEnvironment> {
});
}
- private void loadProcedures(final ProcedureIterator procIter,
- final boolean abortOnCorruption) throws IOException {
- final boolean debugEnabled = LOG.isDebugEnabled();
+ private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) {
+ proc.restoreLock(getEnvironment());
+ restored.add(proc.getProcId());
+ }
+
+ private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) {
+ while (!stack.isEmpty()) {
+ restoreLock(stack.pop(), restored);
+ }
+ }
+
+ // Restore the locks for all the procedures.
+ // Notice that we need to restore the locks starting from the root proc, otherwise there will be
+ // problem that a sub procedure may hold the exclusive lock first and then we are stuck when
+ // calling the acquireLock method for the parent procedure.
+ // The algorithm is straight-forward:
+ // 1. Use a set to record the procedures which locks have already been restored.
+ // 2. Use a stack to store the hierarchy of the procedures
+ // 3. For all the procedure, we will first try to find its parent and push it into the stack,
+ // unless
+ // a. We have no parent, i.e, we are the root procedure
+ // b. The lock has already been restored(by checking the set introduced in #1)
+ // then we start to pop the stack and call acquireLock for each procedure.
+ // Notice that this should be done for all procedures, not only the ones in runnableList.
+ private void restoreLocks() {
+ Set<Long> restored = new HashSet<>();
+ Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>();
+ procedures.values().forEach(proc -> {
+ for (;;) {
+ if (restored.contains(proc.getProcId())) {
+ restoreLocks(stack, restored);
+ return;
+ }
+ if (!proc.hasParent()) {
+ restoreLock(proc, restored);
+ restoreLocks(stack, restored);
+ return;
+ }
+ stack.push(proc);
+ proc = procedures.get(proc.getParentProcId());
+ }
+ });
+ }
+ private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption)
+ throws IOException {
// 1. Build the rollback stack
int runnablesCount = 0;
+ int failedCount = 0;
while (procIter.hasNext()) {
boolean finished = procIter.isNextFinished();
- Procedure proc = procIter.next();
+ Procedure<TEnvironment> proc = procIter.next();
NonceKey nonceKey = proc.getNonceKey();
long procId = proc.getProcId();
if (finished) {
- completed.put(proc.getProcId(), new CompletedProcedureRetainer(proc));
- if (debugEnabled) {
- LOG.debug("Completed " + proc);
- }
+ completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc));
+ LOG.debug("Completed {}", proc);
} else {
if (!proc.hasParent()) {
assert !proc.isFinished() : "unexpected finished procedure";
- rollbackStack.put(proc.getProcId(), new RootProcedureState());
+ rollbackStack.put(proc.getProcId(), new RootProcedureState<>());
}
// add the procedure to the map
@@ -395,6 +441,8 @@ public class ProcedureExecutor<TEnvironment> {
if (proc.getState() == ProcedureState.RUNNABLE) {
runnablesCount++;
+ } else if (proc.getState() == ProcedureState.FAILED) {
+ failedCount++;
}
}
@@ -405,8 +453,19 @@ public class ProcedureExecutor<TEnvironment> {
}
// 2. Initialize the stacks
- final ArrayList<Procedure> runnableList = new ArrayList(runnablesCount);
- HashSet<Procedure> waitingSet = null;
+ // In the old implementation, for procedures in FAILED state, we will push it into the
+ // ProcedureScheduler directly to execute the rollback. But this does not work after we
+ // introduce the restore lock stage.
+ // For now, when we acquire a xlock, we will remove the queue from runQueue in scheduler, and
+ // then when a procedure which has lock access, for example, a sub procedure of the procedure
+ // which has the xlock, is pushed into the scheduler, we will add the queue back to let the
+ // workers poll from it. The assumption here is that, the procedure which has the xlock should
+ // have been polled out already, so when loading we can not add the procedure to scheduler first
+ // and then call acquireLock, since the procedure is still in the queue, and since we will
+ // remove the queue from runQueue, then no one can poll it out, then there is a dead lock
+ List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnablesCount);
+ List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
+ Set<Procedure<TEnvironment>> waitingSet = null;
procIter.reset();
while (procIter.hasNext()) {
if (procIter.isNextFinished()) {
@@ -414,12 +473,10 @@ public class ProcedureExecutor<TEnvironment> {
continue;
}
- Procedure proc = procIter.next();
+ Procedure<TEnvironment> proc = procIter.next();
assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
- if (debugEnabled) {
- LOG.debug(String.format("Loading %s", proc));
- }
+ LOG.debug("Loading {}", proc);
Long rootProcId = getRootProcedureId(proc);
if (rootProcId == null) {
@@ -429,14 +486,14 @@ public class ProcedureExecutor<TEnvironment> {
}
if (proc.hasParent()) {
- Procedure parent = procedures.get(proc.getParentProcId());
+ Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId());
// corrupted procedures are handled later at step 3
if (parent != null && !proc.isFinished()) {
parent.incChildrenLatch();
}
}
- RootProcedureState procStack = rollbackStack.get(rootProcId);
+ RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
procStack.loadStack(proc);
proc.setRootProcId(rootProcId);
@@ -456,8 +513,7 @@ public class ProcedureExecutor<TEnvironment> {
waitingSet.add(proc);
break;
case FAILED:
- // add the proc to the scheduler to perform the rollback
- scheduler.addBack(proc);
+ failedList.add(proc);
break;
case ROLLEDBACK:
case INITIALIZING:
@@ -471,13 +527,14 @@ public class ProcedureExecutor<TEnvironment> {
// 3. Validate the stacks
int corruptedCount = 0;
- Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
+ Iterator<Map.Entry<Long, RootProcedureState<TEnvironment>>> itStack =
+ rollbackStack.entrySet().iterator();
while (itStack.hasNext()) {
- Map.Entry<Long, RootProcedureState> entry = itStack.next();
- RootProcedureState procStack = entry.getValue();
+ Map.Entry<Long, RootProcedureState<TEnvironment>> entry = itStack.next();
+ RootProcedureState<TEnvironment> procStack = entry.getValue();
if (procStack.isValid()) continue;
- for (Procedure proc: procStack.getSubproceduresStack()) {
+ for (Procedure<TEnvironment> proc : procStack.getSubproceduresStack()) {
LOG.error("Corrupted " + proc);
procedures.remove(proc.getProcId());
runnableList.remove(proc);
@@ -493,30 +550,22 @@ public class ProcedureExecutor<TEnvironment> {
// 4. Push the procedures to the timeout executor
if (waitingSet != null && !waitingSet.isEmpty()) {
- for (Procedure proc: waitingSet) {
+ for (Procedure<TEnvironment> proc: waitingSet) {
proc.afterReplay(getEnvironment());
timeoutExecutor.add(proc);
}
}
-
- // 5. Push the procedure to the scheduler
- if (!runnableList.isEmpty()) {
- // TODO: See ProcedureWALFormatReader#hasFastStartSupport
- // some procedure may be started way before this stuff.
- for (int i = runnableList.size() - 1; i >= 0; --i) {
- Procedure proc = runnableList.get(i);
- proc.afterReplay(getEnvironment());
- if (!proc.hasParent()) {
- sendProcedureLoadedNotification(proc.getProcId());
- }
- if (proc.wasExecuted()) {
- scheduler.addFront(proc);
- } else {
- // if it was not in execution, it can wait.
- scheduler.addBack(proc);
- }
+ // 5. restore locks
+ restoreLocks();
+ // 6. Push the procedure to the scheduler
+ failedList.forEach(scheduler::addBack);
+ runnableList.forEach(p -> {
+ p.afterReplay(getEnvironment());
+ if (!p.hasParent()) {
+ sendProcedureLoadedNotification(p.getProcId());
}
- }
+ scheduler.addBack(p);
+ });
}
/**
@@ -538,7 +587,7 @@ public class ProcedureExecutor<TEnvironment> {
corePoolSize, maxPoolSize);
this.threadGroup = new ThreadGroup("PEWorkerGroup");
- this.timeoutExecutor = new TimeoutExecutorThread(this, threadGroup);
+ this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup);
// Create the workers
workerId.set(0);
@@ -590,7 +639,7 @@ public class ProcedureExecutor<TEnvironment> {
timeoutExecutor.add(new WorkerMonitor());
// Add completed cleaner chore
- addChore(new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
+ addChore(new CompletedProcedureCleaner<>(conf, store, completed, nonceKeysToProcIdsMap));
}
public void stop() {
@@ -695,7 +744,7 @@ public class ProcedureExecutor<TEnvironment> {
* Add a chore procedure to the executor
* @param chore the chore to add
*/
- public void addChore(final ProcedureInMemoryChore chore) {
+ public void addChore(ProcedureInMemoryChore<TEnvironment> chore) {
chore.setState(ProcedureState.WAITING_TIMEOUT);
timeoutExecutor.add(chore);
}
@@ -705,7 +754,7 @@ public class ProcedureExecutor<TEnvironment> {
* @param chore the chore to remove
* @return whether the chore is removed, or it will be removed later
*/
- public boolean removeChore(final ProcedureInMemoryChore chore) {
+ public boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore) {
chore.setState(ProcedureState.SUCCESS);
return timeoutExecutor.remove(chore);
}
@@ -839,17 +888,21 @@ public class ProcedureExecutor<TEnvironment> {
* @param procOwner name of the owner of the procedure, used to inform the user
* @param exception the failure to report to the user
*/
- public void setFailureResultForNonce(final NonceKey nonceKey, final String procName,
- final User procOwner, final IOException exception) {
- if (nonceKey == null) return;
+ public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner,
+ IOException exception) {
+ if (nonceKey == null) {
+ return;
+ }
- final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
- if (procId == null || completed.containsKey(procId)) return;
+ Long procId = nonceKeysToProcIdsMap.get(nonceKey);
+ if (procId == null || completed.containsKey(procId)) {
+ return;
+ }
- Procedure<?> proc = new FailedProcedure(procId.longValue(),
- procName, procOwner, nonceKey, exception);
+ Procedure<TEnvironment> proc =
+ new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);
- completed.putIfAbsent(procId, new CompletedProcedureRetainer(proc));
+ completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc));
}
// ==========================================================================
@@ -860,7 +913,7 @@ public class ProcedureExecutor<TEnvironment> {
* @param proc the new procedure to execute.
* @return the procedure id, that can be used to monitor the operation
*/
- public long submitProcedure(final Procedure proc) {
+ public long submitProcedure(Procedure<TEnvironment> proc) {
return submitProcedure(proc, null);
}
@@ -872,7 +925,7 @@ public class ProcedureExecutor<TEnvironment> {
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
justification = "FindBugs is blind to the check-for-null")
- public long submitProcedure(final Procedure proc, final NonceKey nonceKey) {
+ public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) {
Preconditions.checkArgument(lastProcId.get() >= 0);
prepareProcedure(proc);
@@ -892,9 +945,7 @@ public class ProcedureExecutor<TEnvironment> {
// Commit the transaction
store.insert(proc, null);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stored " + proc);
- }
+ LOG.debug("Stored {}", proc);
// Add the procedure to the executor
return pushProcedure(proc);
@@ -905,7 +956,7 @@ public class ProcedureExecutor<TEnvironment> {
* @param procs the new procedures to execute.
*/
// TODO: Do we need to take nonces here?
- public void submitProcedures(final Procedure[] procs) {
+ public void submitProcedures(Procedure<TEnvironment>[] procs) {
Preconditions.checkArgument(lastProcId.get() >= 0);
if (procs == null || procs.length <= 0) {
return;
@@ -928,7 +979,7 @@ public class ProcedureExecutor<TEnvironment> {
}
}
- private Procedure prepareProcedure(final Procedure proc) {
+ private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) {
Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
if (this.checkOwnerSet) {
@@ -937,14 +988,14 @@ public class ProcedureExecutor<TEnvironment> {
return proc;
}
- private long pushProcedure(final Procedure proc) {
+ private long pushProcedure(Procedure<TEnvironment> proc) {
final long currentProcId = proc.getProcId();
// Update metrics on start of a procedure
proc.updateMetricsOnSubmit(getEnvironment());
// Create the rollback stack for the procedure
- RootProcedureState stack = new RootProcedureState();
+ RootProcedureState<TEnvironment> stack = new RootProcedureState<>();
rollbackStack.put(currentProcId, stack);
// Submit the new subprocedures
@@ -961,7 +1012,7 @@ public class ProcedureExecutor<TEnvironment> {
* @param procId the procedure to abort
* @return true if the procedure exists and has received the abort, otherwise false.
*/
- public boolean abort(final long procId) {
+ public boolean abort(long procId) {
return abort(procId, true);
}
@@ -972,8 +1023,8 @@ public class ProcedureExecutor<TEnvironment> {
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if the procedure exists and has received the abort, otherwise false.
*/
- public boolean abort(final long procId, final boolean mayInterruptIfRunning) {
- final Procedure proc = procedures.get(procId);
+ public boolean abort(long procId, boolean mayInterruptIfRunning) {
+ Procedure<TEnvironment> proc = procedures.get(procId);
if (proc != null) {
if (!mayInterruptIfRunning && proc.wasExecuted()) {
return false;
@@ -986,20 +1037,20 @@ public class ProcedureExecutor<TEnvironment> {
// ==========================================================================
// Executor query helpers
// ==========================================================================
- public Procedure getProcedure(final long procId) {
+ public Procedure<TEnvironment> getProcedure(final long procId) {
return procedures.get(procId);
}
- public <T extends Procedure> T getProcedure(final Class<T> clazz, final long procId) {
- final Procedure proc = getProcedure(procId);
+ public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) {
+ Procedure<TEnvironment> proc = getProcedure(procId);
if (clazz.isInstance(proc)) {
- return (T)proc;
+ return clazz.cast(proc);
}
return null;
}
- public Procedure getResult(final long procId) {
- CompletedProcedureRetainer retainer = completed.get(procId);
+ public Procedure<TEnvironment> getResult(long procId) {
+ CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
if (retainer == null) {
return null;
} else {
@@ -1023,8 +1074,8 @@ public class ProcedureExecutor<TEnvironment> {
* @param procId the ID of the procedure to check
* @return true if the procedure execution is started, otherwise false.
*/
- public boolean isStarted(final long procId) {
- final Procedure proc = procedures.get(procId);
+ public boolean isStarted(long procId) {
+ Procedure<?> proc = procedures.get(procId);
if (proc == null) {
return completed.get(procId) != null;
}
@@ -1035,13 +1086,11 @@ public class ProcedureExecutor<TEnvironment> {
* Mark the specified completed procedure, as ready to remove.
* @param procId the ID of the procedure to remove
*/
- public void removeResult(final long procId) {
- CompletedProcedureRetainer retainer = completed.get(procId);
+ public void removeResult(long procId) {
+ CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
if (retainer == null) {
assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
- if (LOG.isDebugEnabled()) {
- LOG.debug("pid=" + procId + " already removed by the cleaner.");
- }
+ LOG.debug("pid={} already removed by the cleaner.", procId);
return;
}
@@ -1049,8 +1098,8 @@ public class ProcedureExecutor<TEnvironment> {
retainer.setClientAckTime(EnvironmentEdgeManager.currentTime());
}
- public Procedure getResultOrProcedure(final long procId) {
- CompletedProcedureRetainer retainer = completed.get(procId);
+ public Procedure<TEnvironment> getResultOrProcedure(long procId) {
+ CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
if (retainer == null) {
return procedures.get(procId);
} else {
@@ -1065,15 +1114,16 @@ public class ProcedureExecutor<TEnvironment> {
* @return true if the user is the owner of the procedure,
* false otherwise or the owner is unknown.
*/
- public boolean isProcedureOwner(final long procId, final User user) {
- if (user == null) return false;
-
- final Procedure runningProc = procedures.get(procId);
+ public boolean isProcedureOwner(long procId, User user) {
+ if (user == null) {
+ return false;
+ }
+ final Procedure<TEnvironment> runningProc = procedures.get(procId);
if (runningProc != null) {
return runningProc.getOwner().equals(user.getShortName());
}
- final CompletedProcedureRetainer retainer = completed.get(procId);
+ final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
if (retainer != null) {
return retainer.getProcedure().getOwner().equals(user.getShortName());
}
@@ -1087,19 +1137,17 @@ public class ProcedureExecutor<TEnvironment> {
* Get procedures.
* @return the procedures in a list
*/
- public List<Procedure<?>> getProcedures() {
- final List<Procedure<?>> procedureLists = new ArrayList<>(procedures.size() + completed.size());
- for (Procedure<?> procedure : procedures.values()) {
- procedureLists.add(procedure);
- }
+ public List<Procedure<TEnvironment>> getProcedures() {
+ List<Procedure<TEnvironment>> procedureList =
+ new ArrayList<>(procedures.size() + completed.size());
+ procedureList.addAll(procedures.values());
// Note: The procedure could show up twice in the list with different state, as
// it could complete after we walk through procedures list and insert into
// procedureList - it is ok, as we will use the information in the Procedure
// to figure it out; to prevent this would increase the complexity of the logic.
- for (CompletedProcedureRetainer retainer: completed.values()) {
- procedureLists.add(retainer.getProcedure());
- }
- return procedureLists;
+ completed.values().stream().map(CompletedProcedureRetainer::getProcedure)
+ .forEach(procedureList::add);
+ return procedureList;
}
// ==========================================================================
@@ -1178,14 +1226,14 @@ public class ProcedureExecutor<TEnvironment> {
return procedures.keySet();
}
- Long getRootProcedureId(Procedure proc) {
+ Long getRootProcedureId(Procedure<TEnvironment> proc) {
return Procedure.getRootProcedureId(procedures, proc);
}
// ==========================================================================
// Executions
// ==========================================================================
- private void executeProcedure(final Procedure proc) {
+ private void executeProcedure(Procedure<TEnvironment> proc) {
final Long rootProcId = getRootProcedureId(proc);
if (rootProcId == null) {
// The 'proc' was ready to run but the root procedure was rolledback
@@ -1194,7 +1242,7 @@ public class ProcedureExecutor<TEnvironment> {
return;
}
- final RootProcedureState procStack = rollbackStack.get(rootProcId);
+ RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
if (procStack == null) {
LOG.warn("RootProcedureState is null for " + proc.getProcId());
return;
@@ -1206,7 +1254,7 @@ public class ProcedureExecutor<TEnvironment> {
// we have the 'rollback-lock' we can start rollingback
switch (executeRollback(rootProcId, procStack)) {
case LOCK_ACQUIRED:
- break;
+ break;
case LOCK_YIELD_WAIT:
procStack.unsetRollback();
scheduler.yield(proc);
@@ -1248,7 +1296,6 @@ public class ProcedureExecutor<TEnvironment> {
switch (lockState) {
case LOCK_ACQUIRED:
execProcedure(procStack, proc);
- releaseLock(proc, false);
break;
case LOCK_YIELD_WAIT:
LOG.info(lockState + " " + proc);
@@ -1263,12 +1310,6 @@ public class ProcedureExecutor<TEnvironment> {
}
procStack.release(proc);
- // allows to kill the executor before something is stored to the wal.
- // useful to test the procedure recovery.
- if (testing != null && !isRunning()) {
- break;
- }
-
if (proc.isSuccess()) {
// update metrics on finishing the procedure
proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
@@ -1284,33 +1325,31 @@ public class ProcedureExecutor<TEnvironment> {
} while (procStack.isFailed());
}
- private LockState acquireLock(final Procedure proc) {
- final TEnvironment env = getEnvironment();
- // hasLock() is used in conjunction with holdLock().
- // This allows us to not rewrite or carry around the hasLock() flag
- // for every procedure. the hasLock() have meaning only if holdLock() is true.
- if (proc.holdLock(env) && proc.hasLock(env)) {
+ private LockState acquireLock(Procedure<TEnvironment> proc) {
+ TEnvironment env = getEnvironment();
+ // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if
+ // hasLock is true.
+ if (proc.hasLock()) {
return LockState.LOCK_ACQUIRED;
}
- return proc.doAcquireLock(env);
+ return proc.doAcquireLock(env, store);
}
- private void releaseLock(final Procedure proc, final boolean force) {
- final TEnvironment env = getEnvironment();
+ private void releaseLock(Procedure<TEnvironment> proc, boolean force) {
+ TEnvironment env = getEnvironment();
// For how the framework works, we know that we will always have the lock
// when we call releaseLock(), so we can avoid calling proc.hasLock()
- if (force || !proc.holdLock(env)) {
- proc.doReleaseLock(env);
+ if (force || !proc.holdLock(env) || proc.isFinished()) {
+ proc.doReleaseLock(env, store);
}
}
/**
- * Execute the rollback of the full procedure stack.
- * Once the procedure is rolledback, the root-procedure will be visible as
- * finished to user, and the result will be the fatal exception.
+ * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the
+ * root-procedure will be visible as finished to user, and the result will be the fatal exception.
*/
- private LockState executeRollback(final long rootProcId, final RootProcedureState procStack) {
- final Procedure rootProc = procedures.get(rootProcId);
+ private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) {
+ Procedure<TEnvironment> rootProc = procedures.get(rootProcId);
RemoteProcedureException exception = rootProc.getException();
// TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
// rolling back because the subprocedure does. Clarify.
@@ -1320,13 +1359,13 @@ public class ProcedureExecutor<TEnvironment> {
store.update(rootProc);
}
- final List<Procedure> subprocStack = procStack.getSubproceduresStack();
+ List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack();
assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
int stackTail = subprocStack.size();
boolean reuseLock = false;
while (stackTail --> 0) {
- final Procedure proc = subprocStack.get(stackTail);
+ Procedure<TEnvironment> proc = subprocStack.get(stackTail);
LockState lockState;
if (!reuseLock && (lockState = acquireLock(proc)) != LockState.LOCK_ACQUIRED) {
@@ -1343,7 +1382,7 @@ public class ProcedureExecutor<TEnvironment> {
// (e.g. StateMachineProcedure reuse the same instance)
// we can avoid to lock/unlock each step
reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
- if (!reuseLock) {
+ if (!reuseLock && proc.hasLock()) {
releaseLock(proc, false);
}
@@ -1377,13 +1416,11 @@ public class ProcedureExecutor<TEnvironment> {
* It updates the store with the new state (stack index)
* or will remove completly the procedure in case it is a child.
*/
- private LockState executeRollback(final Procedure proc) {
+ private LockState executeRollback(Procedure<TEnvironment> proc) {
try {
proc.doRollback(getEnvironment());
} catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Roll back attempt failed for " + proc, e);
- }
+ LOG.debug("Roll back attempt failed for {}", proc, e);
return LockState.LOCK_YIELD_WAIT;
} catch (InterruptedException e) {
handleInterruptedException(proc, e);
@@ -1396,9 +1433,10 @@ public class ProcedureExecutor<TEnvironment> {
// allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery.
if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
- LOG.debug("TESTING: Kill before store update");
+ String msg = "TESTING: Kill before store update";
+ LOG.debug(msg);
stop();
- return LockState.LOCK_YIELD_WAIT;
+ throw new RuntimeException(msg);
}
if (proc.removeStackIndex()) {
@@ -1425,6 +1463,11 @@ public class ProcedureExecutor<TEnvironment> {
return LockState.LOCK_ACQUIRED;
}
+ private void yieldProcedure(Procedure<TEnvironment> proc) {
+ releaseLock(proc, false);
+ scheduler.yield(proc);
+ }
+
/**
* Executes <code>procedure</code>
* <ul>
@@ -1454,10 +1497,10 @@ public class ProcedureExecutor<TEnvironment> {
* </li>
* </ul>
*/
- private void execProcedure(final RootProcedureState procStack,
- final Procedure<TEnvironment> procedure) {
+ private void execProcedure(RootProcedureState<TEnvironment> procStack,
+ Procedure<TEnvironment> procedure) {
Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
- "NOT RUNNABLE! " + procedure.toString());
+ "NOT RUNNABLE! " + procedure.toString());
// Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
// The exception is caught below and then we hurry to the exit without disturbing state. The
@@ -1477,22 +1520,16 @@ public class ProcedureExecutor<TEnvironment> {
subprocs = null;
}
} catch (ProcedureSuspendedException e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Suspend " + procedure);
- }
+ LOG.trace("Suspend {}", procedure);
suspended = true;
} catch (ProcedureYieldException e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Yield " + procedure + ": " + e.getMessage(), e);
- }
- scheduler.yield(procedure);
+ LOG.trace("Yield {}", procedure, e);
+ yieldProcedure(procedure);
return;
} catch (InterruptedException e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Yield interrupt " + procedure + ": " + e.getMessage(), e);
- }
+ LOG.trace("Yield interrupt {}", procedure, e);
handleInterruptedException(procedure, e);
- scheduler.yield(procedure);
+ yieldProcedure(procedure);
return;
} catch (Throwable e) {
// Catch NullPointerExceptions or similar errors...
@@ -1508,9 +1545,7 @@ public class ProcedureExecutor<TEnvironment> {
// i.e. we go around this loop again rather than go back out on the scheduler queue.
subprocs = null;
reExecute = true;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Short-circuit to next step on pid=" + procedure.getProcId());
- }
+ LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());
} else {
// Yield the current procedure, and make the subprocedure runnable
// subprocs may come back 'null'.
@@ -1521,9 +1556,7 @@ public class ProcedureExecutor<TEnvironment> {
collect(Collectors.toList()).toString()));
}
} else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Added to timeoutExecutor " + procedure);
- }
+ LOG.trace("Added to timeoutExecutor {}", procedure);
timeoutExecutor.add(procedure);
} else if (!suspended) {
// No subtask, so we are done
@@ -1537,9 +1570,10 @@ public class ProcedureExecutor<TEnvironment> {
// allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery.
if (testing != null && testing.shouldKillBeforeStoreUpdate(suspended)) {
- LOG.debug("TESTING: Kill before store update: " + procedure);
+ String msg = "TESTING: Kill before store update: " + procedure;
+ LOG.debug(msg);
stop();
- return;
+ throw new RuntimeException(msg);
}
// TODO: The code here doesn't check if store is running before persisting to the store as
@@ -1553,11 +1587,13 @@ public class ProcedureExecutor<TEnvironment> {
updateStoreOnExec(procStack, procedure, subprocs);
// if the store is not running we are aborting
- if (!store.isRunning()) return;
+ if (!store.isRunning()) {
+ return;
+ }
// if the procedure is kind enough to pass the slot to someone else, yield
if (procedure.isRunnable() && !suspended &&
procedure.isYieldAfterExecutionStep(getEnvironment())) {
- scheduler.yield(procedure);
+ yieldProcedure(procedure);
return;
}
@@ -1568,6 +1604,11 @@ public class ProcedureExecutor<TEnvironment> {
submitChildrenProcedures(subprocs);
}
+ // we need to log the release lock operation before waking up the parent procedure, as there
+ // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all
+ // the sub procedures from store and cause problems...
+ releaseLock(procedure, false);
+
// if the procedure is complete and has a parent, count down the children latch.
// If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
if (!suspended && procedure.isFinished() && procedure.hasParent()) {
@@ -1575,12 +1616,12 @@ public class ProcedureExecutor<TEnvironment> {
}
}
- private Procedure[] initializeChildren(final RootProcedureState procStack,
- final Procedure procedure, final Procedure[] subprocs) {
+ private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack,
+ Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
assert subprocs != null : "expected subprocedures";
final long rootProcId = getRootProcedureId(procedure);
for (int i = 0; i < subprocs.length; ++i) {
- final Procedure subproc = subprocs[i];
+ Procedure<TEnvironment> subproc = subprocs[i];
if (subproc == null) {
String msg = "subproc[" + i + "] is null, aborting the procedure";
procedure.setFailure(new RemoteProcedureException(msg,
@@ -1611,9 +1652,9 @@ public class ProcedureExecutor<TEnvironment> {
return subprocs;
}
- private void submitChildrenProcedures(final Procedure[] subprocs) {
+ private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) {
for (int i = 0; i < subprocs.length; ++i) {
- final Procedure subproc = subprocs[i];
+ Procedure<TEnvironment> subproc = subprocs[i];
subproc.updateMetricsOnSubmit(getEnvironment());
assert !procedures.containsKey(subproc.getProcId());
procedures.put(subproc.getProcId(), subproc);
@@ -1621,8 +1662,9 @@ public class ProcedureExecutor<TEnvironment> {
}
}
- private void countDownChildren(final RootProcedureState procStack, final Procedure procedure) {
- final Procedure parent = procedures.get(procedure.getParentProcId());
+ private void countDownChildren(RootProcedureState<TEnvironment> procStack,
+ Procedure<TEnvironment> procedure) {
+ Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId());
if (parent == null) {
assert procStack.isRollingback();
return;
@@ -1639,17 +1681,15 @@ public class ProcedureExecutor<TEnvironment> {
}
}
- private void updateStoreOnExec(final RootProcedureState procStack,
- final Procedure procedure, final Procedure[] subprocs) {
+ private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack,
+ Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
if (subprocs != null && !procedure.isFailed()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
}
store.insert(procedure, subprocs);
} else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Store update " + procedure);
- }
+ LOG.trace("Store update {}", procedure);
if (procedure.isFinished() && !procedure.hasParent()) {
// remove child procedures
final long[] childProcIds = procStack.getSubprocedureIds();
@@ -1667,11 +1707,8 @@ public class ProcedureExecutor<TEnvironment> {
}
}
- private void handleInterruptedException(final Procedure proc, final InterruptedException e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Interrupt during " + proc + ". suspend and retry it later.", e);
- }
-
+ private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) {
+ LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e);
// NOTE: We don't call Thread.currentThread().interrupt()
// because otherwise all the subsequent calls e.g. Thread.sleep() will throw
// the InterruptedException. If the master is going down, we will be notified
@@ -1679,9 +1716,13 @@ public class ProcedureExecutor<TEnvironment> {
// (The interrupted procedure will be retried on the next run)
}
- private void execCompletionCleanup(final Procedure proc) {
+ private void execCompletionCleanup(Procedure<TEnvironment> proc) {
final TEnvironment env = getEnvironment();
- if (proc.holdLock(env) && proc.hasLock(env)) {
+ if (proc.hasLock()) {
+ LOG.warn("Usually this should not happen, we will release the lock before if the procedure" +
+ " is finished, even if the holdLock is true, arrive here means we have some holes where" +
+ " we do not release the lock. And the releaseLock below may fail since the procedure may" +
+ " have already been deleted from the procedure store.");
releaseLock(proc, true);
}
try {
@@ -1692,11 +1733,11 @@ public class ProcedureExecutor<TEnvironment> {
}
}
- private void procedureFinished(final Procedure proc) {
+ private void procedureFinished(Procedure<TEnvironment> proc) {
// call the procedure completion cleanup handler
execCompletionCleanup(proc);
- CompletedProcedureRetainer retainer = new CompletedProcedureRetainer(proc);
+ CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc);
// update the executor internal state maps
if (!proc.shouldWaitClientAck(getEnvironment())) {
@@ -1712,14 +1753,14 @@ public class ProcedureExecutor<TEnvironment> {
scheduler.completionCleanup(proc);
} catch (Throwable e) {
// Catch NullPointerExceptions or similar errors...
- LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: " + proc, e);
+ LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e);
}
// Notify the listeners
sendProcedureFinishedNotification(proc.getProcId());
}
- RootProcedureState getProcStack(long rootProcId) {
+ RootProcedureState<TEnvironment> getProcStack(long rootProcId) {
return rollbackStack.get(rootProcId);
}
@@ -1728,7 +1769,7 @@ public class ProcedureExecutor<TEnvironment> {
// ==========================================================================
private class WorkerThread extends StoppableThread {
private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
- private volatile Procedure<?> activeProcedure;
+ private volatile Procedure<TEnvironment> activeProcedure;
public WorkerThread(ThreadGroup group) {
this(group, "PEWorker-");
@@ -1749,7 +1790,7 @@ public class ProcedureExecutor<TEnvironment> {
long lastUpdate = EnvironmentEdgeManager.currentTime();
try {
while (isRunning() && keepAlive(lastUpdate)) {
- Procedure<?> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
+ Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (proc == null) {
continue;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f3c010da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
index c42dfc4..1215008 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
@@ -202,6 +202,9 @@ public final class ProcedureUtil {
builder.setNonce(proc.getNonceKey().getNonce());
}
+ if (proc.hasLock()) {
+ builder.setLocked(true);
+ }
return builder.build();
}
@@ -255,6 +258,10 @@ public final class ProcedureUtil {
proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce()));
}
+ if (proto.getLocked()) {
+ proc.lockedWhenLoading();
+ }
+
ProcedureStateSerializer serializer = null;
if (proto.getStateMessageCount() > 0) {