You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/03 18:07:54 UTC
[4/5] flink git commit: [FLINK-6340] [flip-1] Add a termination
future to the Execution
[FLINK-6340] [flip-1] Add a termination future to the Execution
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0061272
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0061272
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0061272
Branch: refs/heads/master
Commit: e00612726991a05058168e5a4fbfb53853e645a5
Parents: aadfe45
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 29 22:49:54 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 3 19:17:23 2017 +0200
----------------------------------------------------------------------
.../flink/runtime/executiongraph/Execution.java | 26 ++-
.../runtime/executiongraph/ExecutionGraph.java | 182 ++++++++++------
.../executiongraph/ExecutionJobVertex.java | 116 +++-------
.../runtime/executiongraph/ExecutionVertex.java | 56 +++--
.../ExecutionGraphRestartTest.java | 78 ++++---
.../executiongraph/ExecutionGraphTestUtils.java | 8 +-
.../ExecutionStateProgressTest.java | 94 --------
.../TerminalStateDeadlockTest.java | 216 -------------------
8 files changed, 250 insertions(+), 526 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 729e161..2680849 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -121,6 +121,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
+ /** A future that completes once the Execution reaches a terminal ExecutionState */
+ private final FlinkCompletableFuture<ExecutionState> terminationFuture;
+
private volatile ExecutionState state = CREATED;
private volatile SimpleSlot assignedResource; // once assigned, never changes until the execution is archived
@@ -161,6 +164,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
markTimestamp(ExecutionState.CREATED, startTimestamp);
this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
+ this.terminationFuture = new FlinkCompletableFuture<>();
}
// --------------------------------------------------------------------------------------------
@@ -234,6 +238,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
this.taskState = checkpointStateHandles;
}
+ /**
+ * Gets a future that completes once the task execution reaches a terminal state.
+ * The future will be completed with specific state that the execution reached.
+ *
+ * @return A future for the execution's termination
+ */
+ public Future<ExecutionState> getTerminationFuture() {
+ return terminationFuture;
+ }
+
// --------------------------------------------------------------------------------------------
// Actions
// --------------------------------------------------------------------------------------------
@@ -473,7 +487,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
}
finally {
- vertex.executionCanceled();
+ vertex.executionCanceled(this);
+ terminationFuture.complete(CANCELED);
}
return;
}
@@ -741,7 +756,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
- vertex.executionFinished();
+ vertex.executionFinished(this);
+ terminationFuture.complete(FINISHED);
}
return;
}
@@ -793,7 +809,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
- vertex.executionCanceled();
+ vertex.executionCanceled(this);
+ terminationFuture.complete(CANCELED);
}
return;
}
@@ -886,7 +903,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
- vertex.executionFailed(t);
+ vertex.executionFailed(this, t);
+ terminationFuture.complete(FAILED);
}
if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 23ed99d..fff1ea2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -89,6 +90,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -188,6 +190,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** Registered KvState instances reported by the TaskManagers. */
private final KvStateLocationRegistry kvStateLocationRegistry;
+ private int numVerticesTotal;
+
// ------ Configuration of the Execution -------
/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
@@ -203,6 +207,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
+ private final AtomicInteger verticesFinished;
+
/** Current status of the job execution */
private volatile JobStatus state = JobStatus.CREATED;
@@ -210,9 +216,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* that was not recoverable and triggered job failure */
private volatile Throwable failureCause;
- /** The number of job vertices that have reached a terminal state */
- private volatile int numFinishedJobVertices;
-
// ------ Fields that are relevant to the execution and need to be cleared before archiving -------
/** The coordinator for checkpoints, if snapshot checkpoints are enabled */
@@ -317,6 +320,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
this.restartStrategy = restartStrategy;
this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
+
+ this.verticesFinished = new AtomicInteger();
}
// --------------------------------------------------------------------------------------------
@@ -454,7 +459,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return jv.getTaskVertices();
}
else {
- ArrayList<ExecutionVertex> all = new ArrayList<ExecutionVertex>();
+ ArrayList<ExecutionVertex> all = new ArrayList<>();
for (ExecutionJobVertex jv : jobVertices) {
if (jv.getGraph() != this) {
throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
@@ -586,6 +591,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
};
}
+ public int getTotalNumberOfVertices() {
+ return numVerticesTotal;
+ }
+
public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
return Collections.unmodifiableMap(this.intermediateResults);
}
@@ -620,7 +629,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
*/
public Map<String, Accumulator<?,?>> aggregateUserAccumulators() {
- Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<String, Accumulator<?, ?>>();
+ Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>();
for (ExecutionVertex vertex : getAllExecutionVertices()) {
Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
@@ -657,7 +666,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
- Map<String, SerializedValue<Object>> result = new HashMap<String, SerializedValue<Object>>();
+ Map<String, SerializedValue<Object>> result = new HashMap<>();
for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
result.put(entry.getKey(), new SerializedValue<Object>(entry.getValue().getLocalValue()));
}
@@ -713,6 +722,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
this.verticesInCreationOrder.add(ejv);
+ this.numVerticesTotal += ejv.getParallelism();
}
}
@@ -878,9 +888,23 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
if (current == JobStatus.RUNNING || current == JobStatus.CREATED) {
if (transitionState(current, JobStatus.CANCELLING)) {
+
+ final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
+
+ // cancel all tasks (that still need cancelling)
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
- ejv.cancel();
+ futures.add(ejv.cancelWithFuture());
}
+
+ // we build a future that is complete once all vertices have reached a terminal state
+ final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+ allTerminal.thenAccept(new AcceptFunction<Void>() {
+ @Override
+ public void accept(Void value) {
+ allVerticesInTerminalState();
+ }
+ });
+
return;
}
}
@@ -968,26 +992,33 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
current == JobStatus.SUSPENDED ||
current.isGloballyTerminalState()) {
return;
- } else if (current == JobStatus.RESTARTING) {
+ }
+ else if (current == JobStatus.RESTARTING) {
this.failureCause = t;
if (tryRestartOrFail()) {
return;
}
- // concurrent job status change, let's check again
- } else if (transitionState(current, JobStatus.FAILING, t)) {
+ }
+ else if (transitionState(current, JobStatus.FAILING, t)) {
this.failureCause = t;
- if (!verticesInCreationOrder.isEmpty()) {
- // cancel all. what is failed will not cancel but stay failed
- for (ExecutionJobVertex ejv : verticesInCreationOrder) {
- ejv.cancel();
- }
- } else {
- // set the state of the job to failed
- transitionState(JobStatus.FAILING, JobStatus.FAILED, t);
+ // we build a future that is complete once all vertices have reached a terminal state
+ final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
+
+ // cancel all tasks (that still need cancelling)
+ for (ExecutionJobVertex ejv : verticesInCreationOrder) {
+ futures.add(ejv.cancelWithFuture());
}
+ final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+ allTerminal.thenAccept(new AcceptFunction<Void>() {
+ @Override
+ public void accept(Void value) {
+ allVerticesInTerminalState();
+ }
+ });
+
return;
}
@@ -1039,7 +1070,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
stateTimestamps[i] = 0;
}
}
- numFinishedJobVertices = 0;
+
transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
// if we have checkpointed state, reload it into the executions
@@ -1097,9 +1128,24 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* For testing: This waits until the job execution has finished.
*/
public void waitUntilFinished() throws InterruptedException {
- synchronized (progressLock) {
- while (!state.isTerminalState()) {
- progressLock.wait();
+ // we may need multiple attempts in the presence of failures / recovery
+ while (true) {
+ for (ExecutionJobVertex ejv : verticesInCreationOrder) {
+ for (ExecutionVertex ev : ejv.getTaskVertices()) {
+ try {
+ ev.getCurrentExecutionAttempt().getTerminationFuture().get();
+ }
+ catch (ExecutionException e) {
+ // this should never happen
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ // now that all vertices have been (at some point) in a terminal state,
+ // we need to check if the job as a whole has entered a final state
+ if (state.isTerminalState()) {
+ return;
}
}
}
@@ -1129,59 +1175,57 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
}
- void jobVertexInFinalState() {
- synchronized (progressLock) {
- if (numFinishedJobVertices >= verticesInCreationOrder.size()) {
- throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");
- }
-
- numFinishedJobVertices++;
+ void vertexFinished() {
+ int numFinished = verticesFinished.incrementAndGet();
+ if (numFinished == numVerticesTotal) {
+ // done :-)
+ allVerticesInTerminalState();
+ }
+ }
- if (numFinishedJobVertices == verticesInCreationOrder.size()) {
+ void vertexUnFinished() {
+ verticesFinished.getAndDecrement();
+ }
- // we are done, transition to the final state
- JobStatus current;
- while (true) {
- current = this.state;
+ private void allVerticesInTerminalState() {
+ // we are done, transition to the final state
+ JobStatus current;
+ while (true) {
+ current = this.state;
- if (current == JobStatus.RUNNING) {
- if (transitionState(current, JobStatus.FINISHED)) {
- postRunCleanup();
- break;
- }
- }
- else if (current == JobStatus.CANCELLING) {
- if (transitionState(current, JobStatus.CANCELED)) {
- postRunCleanup();
- break;
- }
- }
- else if (current == JobStatus.FAILING) {
- if (tryRestartOrFail()) {
- break;
- }
- // concurrent job status change, let's check again
- }
- else if (current == JobStatus.SUSPENDED) {
- // we've already cleaned up when entering the SUSPENDED state
- break;
- }
- else if (current.isGloballyTerminalState()) {
- LOG.warn("Job has entered globally terminal state without waiting for all " +
- "job vertices to reach final state.");
- break;
- }
- else {
- fail(new Exception("ExecutionGraph went into final state from state " + current));
- break;
- }
+ if (current == JobStatus.RUNNING) {
+ if (transitionState(current, JobStatus.FINISHED)) {
+ postRunCleanup();
+ break;
}
- // done transitioning the state
-
- // also, notify waiters
- progressLock.notifyAll();
+ }
+ else if (current == JobStatus.CANCELLING) {
+ if (transitionState(current, JobStatus.CANCELED)) {
+ postRunCleanup();
+ break;
+ }
+ }
+ else if (current == JobStatus.FAILING) {
+ if (tryRestartOrFail()) {
+ break;
+ }
+ // concurrent job status change, let's check again
+ }
+ else if (current == JobStatus.SUSPENDED) {
+ // we've already cleaned up when entering the SUSPENDED state
+ break;
+ }
+ else if (current.isGloballyTerminalState()) {
+ LOG.warn("Job has entered globally terminal state without waiting for all " +
+ "job vertices to reach final state.");
+ break;
+ }
+ else {
+ fail(new Exception("ExecutionGraph went into final state from state " + current));
+ break;
}
}
+ // done transitioning the state
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 2e5de64..3197e65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
@@ -65,9 +66,9 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
public static final int VALUE_NOT_SET = -1;
private final Object stateMonitor = new Object();
-
+
private final ExecutionGraph graph;
-
+
private final JobVertex jobVertex;
/**
@@ -91,12 +92,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
private final ExecutionVertex[] taskVertices;
private final IntermediateResult[] producedDataSets;
-
+
private final List<IntermediateResult> inputs;
-
- private final int parallelism;
- private final boolean[] finishedSubtasks;
+ private final int parallelism;
private final SlotSharingGroup slotSharingGroup;
@@ -108,8 +107,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
private int maxParallelism;
- private volatile int numSubtasksInFinalState;
-
/**
* Serialized task information which is for all sub tasks the same. Thus, it avoids to
* serialize the same information multiple times in order to create the
@@ -231,8 +228,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
catch (Throwable t) {
throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);
}
-
- finishedSubtasks = new boolean[parallelism];
}
/**
@@ -360,10 +355,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
return serializedTaskInformation;
}
- public boolean isInFinalState() {
- return numSubtasksInFinalState == parallelism;
- }
-
@Override
public ExecutionState getAggregateState() {
int[] num = new int[ExecutionState.values().length];
@@ -484,51 +475,51 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
return slots;
}
+ /**
+ * Cancels all currently running vertex executions.
+ */
public void cancel() {
for (ExecutionVertex ev : getTaskVertices()) {
ev.cancel();
}
}
-
- public void fail(Throwable t) {
+
+ /**
+ * Cancels all currently running vertex executions.
+ *
+ * @return A future that is complete once all tasks have canceled.
+ */
+ public Future<Void> cancelWithFuture() {
+ // we collect all futures from the task cancellations
+ ArrayList<Future<?>> futures = new ArrayList<>(parallelism);
+
+ // cancel each vertex
for (ExecutionVertex ev : getTaskVertices()) {
- ev.fail(t);
+ futures.add(ev.cancel());
}
+
+ // return a conjunct future, which is complete once all individual tasks are canceled
+ return FutureUtils.combineAll(futures);
}
-
- public void waitForAllVerticesToReachFinishingState() throws InterruptedException {
- synchronized (stateMonitor) {
- while (numSubtasksInFinalState < parallelism) {
- stateMonitor.wait();
- }
+
+ public void fail(Throwable t) {
+ for (ExecutionVertex ev : getTaskVertices()) {
+ ev.fail(t);
}
}
-
+
public void resetForNewExecution() {
- if (!(numSubtasksInFinalState == 0 || numSubtasksInFinalState == parallelism)) {
- throw new IllegalStateException("Cannot reset vertex that is not in final state");
- }
-
+
synchronized (stateMonitor) {
// check and reset the sharing groups with scheduler hints
if (slotSharingGroup != null) {
slotSharingGroup.clearTaskAssignment();
}
-
- // reset vertices one by one. if one reset fails, the "vertices in final state"
- // fields will be consistent to handle triggered cancel calls
+
for (int i = 0; i < parallelism; i++) {
taskVertices[i].resetForNewExecution();
- if (finishedSubtasks[i]) {
- finishedSubtasks[i] = false;
- numSubtasksInFinalState--;
- }
- }
-
- if (numSubtasksInFinalState != 0) {
- throw new RuntimeException("Bug: resetting the execution job vertex failed.");
}
-
+
// set up the input splits again
try {
if (this.inputSplits != null) {
@@ -548,51 +539,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
}
}
}
-
- //---------------------------------------------------------------------------------------------
- // Notifications
- //---------------------------------------------------------------------------------------------
-
- void vertexFinished(int subtask) {
- subtaskInFinalState(subtask);
- }
-
- void vertexCancelled(int subtask) {
- subtaskInFinalState(subtask);
- }
-
- void vertexFailed(int subtask, Throwable error) {
- subtaskInFinalState(subtask);
- }
-
- private void subtaskInFinalState(int subtask) {
- synchronized (stateMonitor) {
- if (!finishedSubtasks[subtask]) {
- finishedSubtasks[subtask] = true;
-
- if (numSubtasksInFinalState+1 == parallelism) {
-
- // call finalizeOnMaster hook
- try {
- getJobVertex().finalizeOnMaster(getGraph().getUserClassLoader());
- }
- catch (Throwable t) {
- getGraph().fail(t);
- }
-
- numSubtasksInFinalState++;
-
- // we are in our final state
- stateMonitor.notifyAll();
-
- // tell the graph
- graph.jobVertexInFinalState();
- } else {
- numSubtasksInFinalState++;
- }
- }
- }
- }
// --------------------------------------------------------------------------------------------
// Accumulators / Metrics
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 3f6ce88..bcf7a7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -60,8 +61,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
-import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
/**
@@ -509,30 +508,41 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
// Actions
// --------------------------------------------------------------------------------------------
- public void resetForNewExecution() {
+ public Execution resetForNewExecution() {
LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex());
synchronized (priorExecutions) {
- Execution execution = currentExecution;
- ExecutionState state = execution.getState();
+ final Execution oldExecution = currentExecution;
+ final ExecutionState oldState = oldExecution.getState();
- if (state == FINISHED || state == CANCELED || state == FAILED) {
- priorExecutions.add(execution);
- currentExecution = new Execution(
+ if (oldState.isTerminal()) {
+ priorExecutions.add(oldExecution);
+
+ final Execution newExecution = new Execution(
getExecutionGraph().getFutureExecutor(),
this,
- execution.getAttemptNumber()+1,
+ oldExecution.getAttemptNumber()+1,
System.currentTimeMillis(),
timeout);
+ this.currentExecution = newExecution;
+
CoLocationGroup grp = jobVertex.getCoLocationGroup();
if (grp != null) {
this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
}
+
+ // if the execution was 'FINISHED' before, tell the ExecutionGraph that
+ // we take one step back on the road to reaching global FINISHED
+ if (oldState == FINISHED) {
+ getExecutionGraph().vertexUnFinished();
+ }
+
+ return newExecution;
}
else {
- throw new IllegalStateException("Cannot reset a vertex that is in state " + state);
+ throw new IllegalStateException("Cannot reset a vertex that is in non-terminal state " + oldState);
}
}
}
@@ -545,8 +555,16 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
this.currentExecution.deployToSlot(slot);
}
- public void cancel() {
- this.currentExecution.cancel();
+ /**
+ *
+ * @return A future that completes once the execution has reached its final state.
+ */
+ public Future<ExecutionState> cancel() {
+ // to avoid any case of mixup in the presence of concurrent calls,
+ // we copy a reference to the stack to make sure both calls go to the same Execution
+ final Execution exec = this.currentExecution;
+ exec.cancel();
+ return exec.getTerminationFuture();
}
public void stop() {
@@ -621,16 +639,18 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
// Notifications from the Execution Attempt
// --------------------------------------------------------------------------------------------
- void executionFinished() {
- jobVertex.vertexFinished(subTaskIndex);
+ void executionFinished(Execution execution) {
+ if (execution == currentExecution) {
+ getExecutionGraph().vertexFinished();
+ }
}
- void executionCanceled() {
- jobVertex.vertexCancelled(subTaskIndex);
+ void executionCanceled(Execution execution) {
+ // nothing to do
}
- void executionFailed(Throwable t) {
- jobVertex.vertexFailed(subTaskIndex, t);
+ void executionFailed(Execution execution, Throwable cause) {
+ // nothing to do
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 1729582..1ebfcac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -60,12 +60,12 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.doNothing;
+
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@@ -288,48 +288,58 @@ public class ExecutionGraphRestartTest extends TestLogger {
@Test
public void testCancelWhileFailing() throws Exception {
- // We want to manually control the restart and delay
- RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
- Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createSpyExecutionGraph(restartStrategy);
- ExecutionGraph executionGraph = executionGraphInstanceTuple.f0;
- Instance instance = executionGraphInstanceTuple.f1;
- doNothing().when(executionGraph).jobVertexInFinalState();
+ final RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
+ final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;
- // Kill the instance...
- instance.markDead();
+ assertEquals(JobStatus.RUNNING, graph.getState());
- Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
+ // switch all tasks to running
+ for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ vertex.getCurrentExecutionAttempt().switchToRunning();
+ }
- // ...and wait for all vertices to be in state FAILED. The
- // jobVertexInFinalState does nothing, that's why we don't wait on the
- // job status.
- boolean success = false;
- while (deadline.hasTimeLeft() && !success) {
- success = true;
- for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
- ExecutionState state = vertex.getExecutionState();
- if (state != ExecutionState.FAILED && state != ExecutionState.CANCELED) {
- success = false;
- Thread.sleep(100);
- break;
- }
- }
+ graph.fail(new Exception("test"));
+
+ assertEquals(JobStatus.FAILING, graph.getState());
+
+ graph.cancel();
+
+ assertEquals(JobStatus.CANCELLING, graph.getState());
+
+ // let all tasks finish cancelling
+ for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ vertex.getCurrentExecutionAttempt().cancelingComplete();
}
- // Still in failing
- assertEquals(JobStatus.FAILING, executionGraph.getState());
+ assertEquals(JobStatus.CANCELED, graph.getState());
+ }
- // The cancel call needs to change the state to CANCELLING
- executionGraph.cancel();
+ @Test
+ public void testFailWhileCanceling() throws Exception {
+ final RestartStrategy restartStrategy = new NoRestartStrategy();
+ final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;
- assertEquals(JobStatus.CANCELLING, executionGraph.getState());
+ assertEquals(JobStatus.RUNNING, graph.getState());
- // Unspy and finalize the job state
- doCallRealMethod().when(executionGraph).jobVertexInFinalState();
+ // switch all tasks to running
+ for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ vertex.getCurrentExecutionAttempt().switchToRunning();
+ }
- executionGraph.jobVertexInFinalState();
+ graph.cancel();
- assertEquals(JobStatus.CANCELED, executionGraph.getState());
+ assertEquals(JobStatus.CANCELLING, graph.getState());
+
+ graph.fail(new Exception("test"));
+
+ assertEquals(JobStatus.FAILING, graph.getState());
+
+ // let all tasks finish cancelling
+ for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ vertex.getCurrentExecutionAttempt().cancelingComplete();
+ }
+
+ assertEquals(JobStatus.FAILED, graph.getState());
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index b0137fa..73838dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.executiongraph;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -52,9 +51,10 @@ import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPart
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
-import org.mockito.Matchers;
+
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
@@ -198,10 +198,6 @@ public class ExecutionGraphTestUtils {
}
};
- doAnswer(noop).when(ejv).vertexCancelled(Matchers.anyInt());
- doAnswer(noop).when(ejv).vertexFailed(Matchers.anyInt(), Matchers.any(Throwable.class));
- doAnswer(noop).when(ejv).vertexFinished(Matchers.anyInt());
-
return ejv;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
deleted file mode 100644
index bd51c81..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-import java.util.Collections;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-public class ExecutionStateProgressTest {
-
- @Test
- public void testAccumulatedStateFinished() {
- try {
- final JobID jid = new JobID();
- final JobVertexID vid = new JobVertexID();
-
- JobVertex ajv = new JobVertex("TestVertex", vid);
- ajv.setParallelism(3);
- ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
-
- ExecutionGraph graph = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- jid,
- "test job",
- new Configuration(),
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
- graph.attachJobGraph(Collections.singletonList(ajv));
-
- setGraphStatus(graph, JobStatus.RUNNING);
-
- ExecutionJobVertex ejv = graph.getJobVertex(vid);
-
- // mock resources and mock taskmanager
- for (ExecutionVertex ee : ejv.getTaskVertices()) {
- SimpleSlot slot = getInstance(
- new ActorTaskManagerGateway(
- new SimpleActorGateway(
- TestingUtils.defaultExecutionContext()))
- ).allocateSimpleSlot(jid);
- ee.deployToSlot(slot);
- }
-
- // finish all
- for (ExecutionVertex ee : ejv.getTaskVertices()) {
- ee.executionFinished();
- }
-
- assertTrue(ejv.isInFinalState());
- assertEquals(JobStatus.FINISHED, graph.getState());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
deleted file mode 100644
index d717986..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import org.apache.flink.runtime.instance.DummyActorGateway;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.*;
-
-public class TerminalStateDeadlockTest {
-
- private final Field stateField;
- private final Field resourceField;
- private final Field execGraphStateField;
- private final Field execGraphSlotProviderField;
-
- private final SimpleSlot resource;
-
-
- public TerminalStateDeadlockTest() {
- try {
- // the reflection fields to access the private fields
- this.stateField = Execution.class.getDeclaredField("state");
- this.stateField.setAccessible(true);
-
- this.resourceField = Execution.class.getDeclaredField("assignedResource");
- this.resourceField.setAccessible(true);
-
- this.execGraphStateField = ExecutionGraph.class.getDeclaredField("state");
- this.execGraphStateField.setAccessible(true);
-
- this.execGraphSlotProviderField = ExecutionGraph.class.getDeclaredField("slotProvider");
- this.execGraphSlotProviderField.setAccessible(true);
-
- // the dummy resource
- ResourceID resourceId = ResourceID.generate();
- InetAddress address = InetAddress.getByName("127.0.0.1");
- TaskManagerLocation ci = new TaskManagerLocation(resourceId, address, 12345);
-
- HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000);
- Instance instance = new Instance(
- new ActorTaskManagerGateway(DummyActorGateway.INSTANCE), ci, new InstanceID(), resources, 4);
-
- this.resource = instance.allocateSimpleSlot(new JobID());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
-
- // silence the compiler
- throw new RuntimeException();
- }
- }
-
- // ------------------------------------------------------------------------
-
- @Test
- public void testProvokeDeadlock() {
- try {
- final JobID jobId = resource.getJobID();
- final JobVertexID vid1 = new JobVertexID();
- final JobVertexID vid2 = new JobVertexID();
-
- final List<JobVertex> vertices;
- {
- JobVertex v1 = new JobVertex("v1", vid1);
- JobVertex v2 = new JobVertex("v2", vid2);
- v1.setParallelism(1);
- v2.setParallelism(1);
- v1.setInvokableClass(DummyInvokable.class);
- v2.setInvokableClass(DummyInvokable.class);
- vertices = Arrays.asList(v1, v2);
- }
-
- final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-
- final Executor executor = Executors.newFixedThreadPool(4);
-
- // try a lot!
- for (int i = 0; i < 20000; i++) {
- final TestExecGraph eg = new TestExecGraph(jobId);
- eg.attachJobGraph(vertices);
-
- final Execution e1 = eg.getJobVertex(vid1).getTaskVertices()[0].getCurrentExecutionAttempt();
- final Execution e2 = eg.getJobVertex(vid2).getTaskVertices()[0].getCurrentExecutionAttempt();
-
- initializeExecution(e1);
- initializeExecution(e2);
-
- execGraphStateField.set(eg, JobStatus.FAILING);
- execGraphSlotProviderField.set(eg, scheduler);
-
- Runnable r1 = new Runnable() {
- @Override
- public void run() {
- e1.cancelingComplete();
- }
- };
- Runnable r2 = new Runnable() {
- @Override
- public void run() {
- e2.cancelingComplete();
- }
- };
-
- executor.execute(r1);
- executor.execute(r2);
-
- eg.waitTillDone();
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- private void initializeExecution(Execution exec) throws IllegalAccessException {
- // set state to canceling
- stateField.set(exec, ExecutionState.CANCELING);
-
- // assign a resource
- resourceField.set(exec, resource);
- }
-
-
- static class TestExecGraph extends ExecutionGraph {
-
- private static final Configuration EMPTY_CONFIG = new Configuration();
-
- private static final Time TIMEOUT = Time.seconds(30L);
-
- private volatile boolean done;
-
- TestExecGraph(JobID jobId) throws IOException {
- super(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- jobId,
- "test graph",
- EMPTY_CONFIG,
- new SerializedValue<>(new ExecutionConfig()),
- TIMEOUT,
- new FixedDelayRestartStrategy(1, 0),
- new Scheduler(TestingUtils.defaultExecutionContext()));
- }
-
- @Override
- public void scheduleForExecution() {
- // notify that we are done with the "restarting"
- synchronized (this) {
- done = true;
- this.notifyAll();
- }
- }
-
- public void waitTillDone() {
- try {
- synchronized (this) {
- while (!done) {
- this.wait();
- }
- }
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
-}