You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/03 18:07:54 UTC

[4/5] flink git commit: [FLINK-6340] [flip-1] Add a termination future to the Execution

[FLINK-6340] [flip-1] Add a termination future to the Execution


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0061272
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0061272
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0061272

Branch: refs/heads/master
Commit: e00612726991a05058168e5a4fbfb53853e645a5
Parents: aadfe45
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 29 22:49:54 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 3 19:17:23 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  26 ++-
 .../runtime/executiongraph/ExecutionGraph.java  | 182 ++++++++++------
 .../executiongraph/ExecutionJobVertex.java      | 116 +++-------
 .../runtime/executiongraph/ExecutionVertex.java |  56 +++--
 .../ExecutionGraphRestartTest.java              |  78 ++++---
 .../executiongraph/ExecutionGraphTestUtils.java |   8 +-
 .../ExecutionStateProgressTest.java             |  94 --------
 .../TerminalStateDeadlockTest.java              | 216 -------------------
 8 files changed, 250 insertions(+), 526 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 729e161..2680849 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -121,6 +121,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
 
+	/** A future that completes once the Execution reaches a terminal ExecutionState */
+	private final FlinkCompletableFuture<ExecutionState> terminationFuture;
+
 	private volatile ExecutionState state = CREATED;
 
 	private volatile SimpleSlot assignedResource;     // once assigned, never changes until the execution is archived
@@ -161,6 +164,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		markTimestamp(ExecutionState.CREATED, startTimestamp);
 
 		this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
+		this.terminationFuture = new FlinkCompletableFuture<>();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -234,6 +238,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		this.taskState = checkpointStateHandles;
 	}
 
+	/**
+	 * Gets a future that completes once the task execution reaches a terminal state.
+	 * The future will be completed with specific state that the execution reached.
+	 *
+	 * @return A future for the execution's termination
+	 */
+	public Future<ExecutionState> getTerminationFuture() {
+		return terminationFuture;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Actions
 	// --------------------------------------------------------------------------------------------
@@ -473,7 +487,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 						}
 					}
 					finally {
-						vertex.executionCanceled();
+						vertex.executionCanceled(this);
+						terminationFuture.complete(CANCELED);
 					}
 					return;
 				}
@@ -741,7 +756,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 						vertex.getExecutionGraph().deregisterExecution(this);
 					}
 					finally {
-						vertex.executionFinished();
+						vertex.executionFinished(this);
+						terminationFuture.complete(FINISHED);
 					}
 					return;
 				}
@@ -793,7 +809,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 						vertex.getExecutionGraph().deregisterExecution(this);
 					}
 					finally {
-						vertex.executionCanceled();
+						vertex.executionCanceled(this);
+						terminationFuture.complete(CANCELED);
 					}
 					return;
 				}
@@ -886,7 +903,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					vertex.getExecutionGraph().deregisterExecution(this);
 				}
 				finally {
-					vertex.executionFailed(t);
+					vertex.executionFailed(this, t);
+					terminationFuture.complete(FAILED);
 				}
 
 				if (!isCallback && (current == RUNNING || current == DEPLOYING)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 23ed99d..fff1ea2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -89,6 +90,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -188,6 +190,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** Registered KvState instances reported by the TaskManagers. */
 	private final KvStateLocationRegistry kvStateLocationRegistry;
 
+	private int numVerticesTotal;
+
 	// ------ Configuration of the Execution -------
 
 	/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
@@ -203,6 +207,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
 
+	private final AtomicInteger verticesFinished;
+
 	/** Current status of the job execution */
 	private volatile JobStatus state = JobStatus.CREATED;
 
@@ -210,9 +216,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * that was not recoverable and triggered job failure */
 	private volatile Throwable failureCause;
 
-	/** The number of job vertices that have reached a terminal state */
-	private volatile int numFinishedJobVertices;
-
 	// ------ Fields that are relevant to the execution and need to be cleared before archiving  -------
 
 	/** The coordinator for checkpoints, if snapshot checkpoints are enabled */
@@ -317,6 +320,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 		this.restartStrategy = restartStrategy;
 		this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
+
+		this.verticesFinished = new AtomicInteger();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -454,7 +459,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			return jv.getTaskVertices();
 		}
 		else {
-			ArrayList<ExecutionVertex> all = new ArrayList<ExecutionVertex>();
+			ArrayList<ExecutionVertex> all = new ArrayList<>();
 			for (ExecutionJobVertex jv : jobVertices) {
 				if (jv.getGraph() != this) {
 					throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
@@ -586,6 +591,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		};
 	}
 
+	public int getTotalNumberOfVertices() {
+		return numVerticesTotal;
+	}
+
 	public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
 		return Collections.unmodifiableMap(this.intermediateResults);
 	}
@@ -620,7 +629,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 */
 	public Map<String, Accumulator<?,?>> aggregateUserAccumulators() {
 
-		Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<String, Accumulator<?, ?>>();
+		Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>();
 
 		for (ExecutionVertex vertex : getAllExecutionVertices()) {
 			Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
@@ -657,7 +666,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
 
-		Map<String, SerializedValue<Object>> result = new HashMap<String, SerializedValue<Object>>();
+		Map<String, SerializedValue<Object>> result = new HashMap<>();
 		for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
 			result.put(entry.getKey(), new SerializedValue<Object>(entry.getValue().getLocalValue()));
 		}
@@ -713,6 +722,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			}
 
 			this.verticesInCreationOrder.add(ejv);
+			this.numVerticesTotal += ejv.getParallelism();
 		}
 	}
 
@@ -878,9 +888,23 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 			if (current == JobStatus.RUNNING || current == JobStatus.CREATED) {
 				if (transitionState(current, JobStatus.CANCELLING)) {
+
+					final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
+
+					// cancel all tasks (that still need cancelling)
 					for (ExecutionJobVertex ejv : verticesInCreationOrder) {
-						ejv.cancel();
+						futures.add(ejv.cancelWithFuture());
 					}
+
+					// we build a future that is complete once all vertices have reached a terminal state
+					final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+					allTerminal.thenAccept(new AcceptFunction<Void>() {
+						@Override
+						public void accept(Void value) {
+							allVerticesInTerminalState();
+						}
+					});
+
 					return;
 				}
 			}
@@ -968,26 +992,33 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				current == JobStatus.SUSPENDED ||
 				current.isGloballyTerminalState()) {
 				return;
-			} else if (current == JobStatus.RESTARTING) {
+			}
+			else if (current == JobStatus.RESTARTING) {
 				this.failureCause = t;
 
 				if (tryRestartOrFail()) {
 					return;
 				}
-				// concurrent job status change, let's check again
-			} else if (transitionState(current, JobStatus.FAILING, t)) {
+			}
+			else if (transitionState(current, JobStatus.FAILING, t)) {
 				this.failureCause = t;
 
-				if (!verticesInCreationOrder.isEmpty()) {
-					// cancel all. what is failed will not cancel but stay failed
-					for (ExecutionJobVertex ejv : verticesInCreationOrder) {
-						ejv.cancel();
-					}
-				} else {
-					// set the state of the job to failed
-					transitionState(JobStatus.FAILING, JobStatus.FAILED, t);
+				// we build a future that is complete once all vertices have reached a terminal state
+				final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
+
+				// cancel all tasks (that still need cancelling)
+				for (ExecutionJobVertex ejv : verticesInCreationOrder) {
+					futures.add(ejv.cancelWithFuture());
 				}
 
+				final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+				allTerminal.thenAccept(new AcceptFunction<Void>() {
+					@Override
+					public void accept(Void value) {
+						allVerticesInTerminalState();
+					}
+				});
+
 				return;
 			}
 
@@ -1039,7 +1070,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 						stateTimestamps[i] = 0;
 					}
 				}
-				numFinishedJobVertices = 0;
+
 				transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
 
 				// if we have checkpointed state, reload it into the executions
@@ -1097,9 +1128,24 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * For testing: This waits until the job execution has finished.
 	 */
 	public void waitUntilFinished() throws InterruptedException {
-		synchronized (progressLock) {
-			while (!state.isTerminalState()) {
-				progressLock.wait();
+		// we may need multiple attempts in the presence of failures / recovery
+		while (true) {
+			for (ExecutionJobVertex ejv : verticesInCreationOrder) {
+				for (ExecutionVertex ev : ejv.getTaskVertices()) {
+					try {
+						ev.getCurrentExecutionAttempt().getTerminationFuture().get();
+					}
+					catch (ExecutionException e) {
+						// this should never happen
+						throw new RuntimeException(e);
+					}
+				}
+			}
+
+			// now that all vertices have been (at some point) in a terminal state,
+			// we need to check if the job as a whole has entered a final state
+			if (state.isTerminalState()) {
+				return;
 			}
 		}
 	}
@@ -1129,59 +1175,57 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	void jobVertexInFinalState() {
-		synchronized (progressLock) {
-			if (numFinishedJobVertices >= verticesInCreationOrder.size()) {
-				throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");
-			}
-
-			numFinishedJobVertices++;
+	void vertexFinished() {
+		int numFinished = verticesFinished.incrementAndGet();
+		if (numFinished == numVerticesTotal) {
+			// done :-)
+			allVerticesInTerminalState();
+		}
+	}
 
-			if (numFinishedJobVertices == verticesInCreationOrder.size()) {
+	void vertexUnFinished() {
+		verticesFinished.getAndDecrement();
+	}
 
-				// we are done, transition to the final state
-				JobStatus current;
-				while (true) {
-					current = this.state;
+	private void allVerticesInTerminalState() {
+		// we are done, transition to the final state
+		JobStatus current;
+		while (true) {
+			current = this.state;
 
-					if (current == JobStatus.RUNNING) {
-						if (transitionState(current, JobStatus.FINISHED)) {
-							postRunCleanup();
-							break;
-						}
-					}
-					else if (current == JobStatus.CANCELLING) {
-						if (transitionState(current, JobStatus.CANCELED)) {
-							postRunCleanup();
-							break;
-						}
-					}
-					else if (current == JobStatus.FAILING) {
-						if (tryRestartOrFail()) {
-							break;
-						}
-						// concurrent job status change, let's check again
-					}
-					else if (current == JobStatus.SUSPENDED) {
-						// we've already cleaned up when entering the SUSPENDED state
-						break;
-					}
-					else if (current.isGloballyTerminalState()) {
-						LOG.warn("Job has entered globally terminal state without waiting for all " +
-							"job vertices to reach final state.");
-						break;
-					}
-					else {
-						fail(new Exception("ExecutionGraph went into final state from state " + current));
-						break;
-					}
+			if (current == JobStatus.RUNNING) {
+				if (transitionState(current, JobStatus.FINISHED)) {
+					postRunCleanup();
+					break;
 				}
-				// done transitioning the state
-
-				// also, notify waiters
-				progressLock.notifyAll();
+			}
+			else if (current == JobStatus.CANCELLING) {
+				if (transitionState(current, JobStatus.CANCELED)) {
+					postRunCleanup();
+					break;
+				}
+			}
+			else if (current == JobStatus.FAILING) {
+				if (tryRestartOrFail()) {
+					break;
+				}
+				// concurrent job status change, let's check again
+			}
+			else if (current == JobStatus.SUSPENDED) {
+				// we've already cleaned up when entering the SUSPENDED state
+				break;
+			}
+			else if (current.isGloballyTerminalState()) {
+				LOG.warn("Job has entered globally terminal state without waiting for all " +
+						"job vertices to reach final state.");
+				break;
+			}
+			else {
+				fail(new Exception("ExecutionGraph went into final state from state " + current));
+				break;
 			}
 		}
+		// done transitioning the state
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 2e5de64..3197e65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -65,9 +66,9 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	public static final int VALUE_NOT_SET = -1;
 
 	private final Object stateMonitor = new Object();
-	
+
 	private final ExecutionGraph graph;
-	
+
 	private final JobVertex jobVertex;
 
 	/**
@@ -91,12 +92,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	private final ExecutionVertex[] taskVertices;
 
 	private final IntermediateResult[] producedDataSets;
-	
+
 	private final List<IntermediateResult> inputs;
-	
-	private final int parallelism;
 
-	private final boolean[] finishedSubtasks;
+	private final int parallelism;
 
 	private final SlotSharingGroup slotSharingGroup;
 
@@ -108,8 +107,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 
 	private int maxParallelism;
 
-	private volatile int numSubtasksInFinalState;
-
 	/**
 	 * Serialized task information which is for all sub tasks the same. Thus, it avoids to
 	 * serialize the same information multiple times in order to create the
@@ -231,8 +228,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		catch (Throwable t) {
 			throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);
 		}
-		
-		finishedSubtasks = new boolean[parallelism];
 	}
 
 	/**
@@ -360,10 +355,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		return serializedTaskInformation;
 	}
 
-	public boolean isInFinalState() {
-		return numSubtasksInFinalState == parallelism;
-	}
-
 	@Override
 	public ExecutionState getAggregateState() {
 		int[] num = new int[ExecutionState.values().length];
@@ -484,51 +475,51 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		return slots;
 	}
 
+	/**
+	 * Cancels all currently running vertex executions.
+	 */
 	public void cancel() {
 		for (ExecutionVertex ev : getTaskVertices()) {
 			ev.cancel();
 		}
 	}
-	
-	public void fail(Throwable t) {
+
+	/**
+	 * Cancels all currently running vertex executions.
+	 * 
+	 * @return A future that is complete once all tasks have canceled.
+	 */
+	public Future<Void> cancelWithFuture() {
+		// we collect all futures from the task cancellations
+		ArrayList<Future<?>> futures = new ArrayList<>(parallelism);
+
+		// cancel each vertex
 		for (ExecutionVertex ev : getTaskVertices()) {
-			ev.fail(t);
+			futures.add(ev.cancel());
 		}
+
+		// return a conjunct future, which is complete once all individual tasks are canceled
+		return FutureUtils.combineAll(futures);
 	}
-	
-	public void waitForAllVerticesToReachFinishingState() throws InterruptedException {
-		synchronized (stateMonitor) {
-			while (numSubtasksInFinalState < parallelism) {
-				stateMonitor.wait();
-			}
+
+	public void fail(Throwable t) {
+		for (ExecutionVertex ev : getTaskVertices()) {
+			ev.fail(t);
 		}
 	}
-	
+
 	public void resetForNewExecution() {
-		if (!(numSubtasksInFinalState == 0 || numSubtasksInFinalState == parallelism)) {
-			throw new IllegalStateException("Cannot reset vertex that is not in final state");
-		}
-		
+
 		synchronized (stateMonitor) {
 			// check and reset the sharing groups with scheduler hints
 			if (slotSharingGroup != null) {
 				slotSharingGroup.clearTaskAssignment();
 			}
-			
-			// reset vertices one by one. if one reset fails, the "vertices in final state"
-			// fields will be consistent to handle triggered cancel calls
+
 			for (int i = 0; i < parallelism; i++) {
 				taskVertices[i].resetForNewExecution();
-				if (finishedSubtasks[i]) {
-					finishedSubtasks[i] = false;
-					numSubtasksInFinalState--;
-				}
-			}
-			
-			if (numSubtasksInFinalState != 0) {
-				throw new RuntimeException("Bug: resetting the execution job vertex failed.");
 			}
-			
+
 			// set up the input splits again
 			try {
 				if (this.inputSplits != null) {
@@ -548,51 +539,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 			}
 		}
 	}
-	
-	//---------------------------------------------------------------------------------------------
-	//  Notifications
-	//---------------------------------------------------------------------------------------------
-	
-	void vertexFinished(int subtask) {
-		subtaskInFinalState(subtask);
-	}
-	
-	void vertexCancelled(int subtask) {
-		subtaskInFinalState(subtask);
-	}
-	
-	void vertexFailed(int subtask, Throwable error) {
-		subtaskInFinalState(subtask);
-	}
-	
-	private void subtaskInFinalState(int subtask) {
-		synchronized (stateMonitor) {
-			if (!finishedSubtasks[subtask]) {
-				finishedSubtasks[subtask] = true;
-				
-				if (numSubtasksInFinalState+1 == parallelism) {
-					
-					// call finalizeOnMaster hook
-					try {
-						getJobVertex().finalizeOnMaster(getGraph().getUserClassLoader());
-					}
-					catch (Throwable t) {
-						getGraph().fail(t);
-					}
-
-					numSubtasksInFinalState++;
-					
-					// we are in our final state
-					stateMonitor.notifyAll();
-					
-					// tell the graph
-					graph.jobVertexInFinalState();
-				} else {
-					numSubtasksInFinalState++;
-				}
-			}
-		}
-	}
 
 	// --------------------------------------------------------------------------------------------
 	//  Accumulators / Metrics

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 3f6ce88..bcf7a7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -60,8 +61,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
-import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 
 /**
@@ -509,30 +508,41 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	//   Actions
 	// --------------------------------------------------------------------------------------------
 
-	public void resetForNewExecution() {
+	public Execution resetForNewExecution() {
 
 		LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex());
 
 		synchronized (priorExecutions) {
-			Execution execution = currentExecution;
-			ExecutionState state = execution.getState();
+			final Execution oldExecution = currentExecution;
+			final ExecutionState oldState = oldExecution.getState();
 
-			if (state == FINISHED || state == CANCELED || state == FAILED) {
-				priorExecutions.add(execution);
-				currentExecution = new Execution(
+			if (oldState.isTerminal()) {
+				priorExecutions.add(oldExecution);
+
+				final Execution newExecution = new Execution(
 					getExecutionGraph().getFutureExecutor(),
 					this,
-					execution.getAttemptNumber()+1,
+						oldExecution.getAttemptNumber()+1,
 					System.currentTimeMillis(),
 					timeout);
 
+				this.currentExecution = newExecution;
+
 				CoLocationGroup grp = jobVertex.getCoLocationGroup();
 				if (grp != null) {
 					this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
 				}
+
+				// if the execution was 'FINISHED' before, tell the ExecutionGraph that
+				// we take one step back on the road to reaching global FINISHED
+				if (oldState == FINISHED) {
+					getExecutionGraph().vertexUnFinished();
+				}
+
+				return newExecution;
 			}
 			else {
-				throw new IllegalStateException("Cannot reset a vertex that is in state " + state);
+				throw new IllegalStateException("Cannot reset a vertex that is in non-terminal state " + oldState);
 			}
 		}
 	}
@@ -545,8 +555,16 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		this.currentExecution.deployToSlot(slot);
 	}
 
-	public void cancel() {
-		this.currentExecution.cancel();
+	/**
+	 *  
+	 * @return A future that completes once the execution has reached its final state.
+	 */
+	public Future<ExecutionState> cancel() {
+		// to avoid any case of mixup in the presence of concurrent calls,
+		// we copy a reference to the stack to make sure both calls go to the same Execution 
+		final Execution exec = this.currentExecution;
+		exec.cancel();
+		return exec.getTerminationFuture();
 	}
 
 	public void stop() {
@@ -621,16 +639,18 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	//   Notifications from the Execution Attempt
 	// --------------------------------------------------------------------------------------------
 
-	void executionFinished() {
-		jobVertex.vertexFinished(subTaskIndex);
+	void executionFinished(Execution execution) {
+		if (execution == currentExecution) {
+			getExecutionGraph().vertexFinished();
+		}
 	}
 
-	void executionCanceled() {
-		jobVertex.vertexCancelled(subTaskIndex);
+	void executionCanceled(Execution execution) {
+		// nothing to do
 	}
 
-	void executionFailed(Throwable t) {
-		jobVertex.vertexFailed(subTaskIndex, t);
+	void executionFailed(Execution execution, Throwable cause) {
+		// nothing to do
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 1729582..1ebfcac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -60,12 +60,12 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.doNothing;
+
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -288,48 +288,58 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	@Test
 	public void testCancelWhileFailing() throws Exception {
-		// We want to manually control the restart and delay
-		RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
-		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createSpyExecutionGraph(restartStrategy);
-		ExecutionGraph executionGraph = executionGraphInstanceTuple.f0;
-		Instance instance = executionGraphInstanceTuple.f1;
-		doNothing().when(executionGraph).jobVertexInFinalState();
+		final RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
+		final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;
 
-		// Kill the instance...
-		instance.markDead();
+		assertEquals(JobStatus.RUNNING, graph.getState());
 
-		Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
+		// switch all tasks to running
+		for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+			vertex.getCurrentExecutionAttempt().switchToRunning();
+		}
 
-		// ...and wait for all vertices to be in state FAILED. The
-		// jobVertexInFinalState does nothing, that's why we don't wait on the
-		// job status.
-		boolean success = false;
-		while (deadline.hasTimeLeft() && !success) {
-			success = true;
-			for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
-				ExecutionState state = vertex.getExecutionState();
-				if (state != ExecutionState.FAILED && state != ExecutionState.CANCELED) {
-					success = false;
-					Thread.sleep(100);
-					break;
-				}
-			}
+		graph.fail(new Exception("test"));
+
+		assertEquals(JobStatus.FAILING, graph.getState());
+
+		graph.cancel();
+
+		assertEquals(JobStatus.CANCELLING, graph.getState());
+
+		// let all tasks finish cancelling
+		for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
 		}
 
-		// Still in failing
-		assertEquals(JobStatus.FAILING, executionGraph.getState());
+		assertEquals(JobStatus.CANCELED, graph.getState());
+	}
 
-		// The cancel call needs to change the state to CANCELLING
-		executionGraph.cancel();
+	@Test
+	public void testFailWhileCanceling() throws Exception {
+		final RestartStrategy restartStrategy = new NoRestartStrategy();
+		final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;
 
-		assertEquals(JobStatus.CANCELLING, executionGraph.getState());
+		assertEquals(JobStatus.RUNNING, graph.getState());
 
-		// Unspy and finalize the job state
-		doCallRealMethod().when(executionGraph).jobVertexInFinalState();
+		// switch all tasks to running
+		for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+			vertex.getCurrentExecutionAttempt().switchToRunning();
+		}
 
-		executionGraph.jobVertexInFinalState();
+		graph.cancel();
 
-		assertEquals(JobStatus.CANCELED, executionGraph.getState());
+		assertEquals(JobStatus.CANCELLING, graph.getState());
+
+		graph.fail(new Exception("test"));
+
+		assertEquals(JobStatus.FAILING, graph.getState());
+
+		// let all tasks finish cancelling
+		for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
+		}
+
+		assertEquals(JobStatus.FAILED, graph.getState());
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index b0137fa..73838dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
@@ -52,9 +51,10 @@ import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPart
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
-import org.mockito.Matchers;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
 
@@ -198,10 +198,6 @@ public class ExecutionGraphTestUtils {
 			}
 		};
 
-		doAnswer(noop).when(ejv).vertexCancelled(Matchers.anyInt());
-		doAnswer(noop).when(ejv).vertexFailed(Matchers.anyInt(), Matchers.any(Throwable.class));
-		doAnswer(noop).when(ejv).vertexFinished(Matchers.anyInt());
-
 		return ejv;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
deleted file mode 100644
index bd51c81..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-import java.util.Collections;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-public class ExecutionStateProgressTest {
-
-	@Test
-	public void testAccumulatedStateFinished() {
-		try {
-			final JobID jid = new JobID();
-			final JobVertexID vid = new JobVertexID();
-
-			JobVertex ajv = new JobVertex("TestVertex", vid);
-			ajv.setParallelism(3);
-			ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
-
-			ExecutionGraph graph = new ExecutionGraph(
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				jid, 
-				"test job", 
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy(),
-				new Scheduler(TestingUtils.defaultExecutionContext()));
-			graph.attachJobGraph(Collections.singletonList(ajv));
-
-			setGraphStatus(graph, JobStatus.RUNNING);
-
-			ExecutionJobVertex ejv = graph.getJobVertex(vid);
-
-			// mock resources and mock taskmanager
-			for (ExecutionVertex ee : ejv.getTaskVertices()) {
-				SimpleSlot slot = getInstance(
-					new ActorTaskManagerGateway(
-						new SimpleActorGateway(
-							TestingUtils.defaultExecutionContext()))
-				).allocateSimpleSlot(jid);
-				ee.deployToSlot(slot);
-			}
-
-			// finish all
-			for (ExecutionVertex ee : ejv.getTaskVertices()) {
-				ee.executionFinished();
-			}
-
-			assertTrue(ejv.isInFinalState());
-			assertEquals(JobStatus.FINISHED, graph.getState());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
deleted file mode 100644
index d717986..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import org.apache.flink.runtime.instance.DummyActorGateway;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.*;
-
-public class TerminalStateDeadlockTest {
-
-	private final Field stateField;
-	private final Field resourceField;
-	private final Field execGraphStateField;
-	private final Field execGraphSlotProviderField;
-
-	private final SimpleSlot resource;
-
-
-	public TerminalStateDeadlockTest() {
-		try {
-			// the reflection fields to access the private fields
-			this.stateField = Execution.class.getDeclaredField("state");
-			this.stateField.setAccessible(true);
-
-			this.resourceField = Execution.class.getDeclaredField("assignedResource");
-			this.resourceField.setAccessible(true);
-
-			this.execGraphStateField = ExecutionGraph.class.getDeclaredField("state");
-			this.execGraphStateField.setAccessible(true);
-
-			this.execGraphSlotProviderField = ExecutionGraph.class.getDeclaredField("slotProvider");
-			this.execGraphSlotProviderField.setAccessible(true);
-			
-			// the dummy resource
-			ResourceID resourceId = ResourceID.generate();
-			InetAddress address = InetAddress.getByName("127.0.0.1");
-			TaskManagerLocation ci = new TaskManagerLocation(resourceId, address, 12345);
-				
-			HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000);
-			Instance instance = new Instance(
-				new ActorTaskManagerGateway(DummyActorGateway.INSTANCE), ci, new InstanceID(), resources, 4);
-
-			this.resource = instance.allocateSimpleSlot(new JobID());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-			
-			// silence the compiler
-			throw new RuntimeException();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testProvokeDeadlock() {
-		try {
-			final JobID jobId = resource.getJobID();
-			final JobVertexID vid1 = new JobVertexID();
-			final JobVertexID vid2 = new JobVertexID();
-			
-			final List<JobVertex> vertices;
-			{
-				JobVertex v1 = new JobVertex("v1", vid1);
-				JobVertex v2 = new JobVertex("v2", vid2);
-				v1.setParallelism(1);
-				v2.setParallelism(1);
-				v1.setInvokableClass(DummyInvokable.class);
-				v2.setInvokableClass(DummyInvokable.class);
-				vertices = Arrays.asList(v1, v2);
-			}
-			
-			final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-			
-			final Executor executor = Executors.newFixedThreadPool(4);
-			
-			// try a lot!
-			for (int i = 0; i < 20000; i++) {
-				final TestExecGraph eg = new TestExecGraph(jobId);
-				eg.attachJobGraph(vertices);
-
-				final Execution e1 = eg.getJobVertex(vid1).getTaskVertices()[0].getCurrentExecutionAttempt();
-				final Execution e2 = eg.getJobVertex(vid2).getTaskVertices()[0].getCurrentExecutionAttempt();
-
-				initializeExecution(e1);
-				initializeExecution(e2);
-
-				execGraphStateField.set(eg, JobStatus.FAILING);
-				execGraphSlotProviderField.set(eg, scheduler);
-				
-				Runnable r1 = new Runnable() {
-					@Override
-					public void run() {
-						e1.cancelingComplete();
-					}
-				};
-				Runnable r2 = new Runnable() {
-					@Override
-					public void run() {
-						e2.cancelingComplete();
-					}
-				};
-				
-				executor.execute(r1);
-				executor.execute(r2);
-				
-				eg.waitTillDone();
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	private void initializeExecution(Execution exec) throws IllegalAccessException {
-		// set state to canceling
-		stateField.set(exec, ExecutionState.CANCELING);
-		
-		// assign a resource
-		resourceField.set(exec, resource);
-	}
-	
-	
-	static class TestExecGraph extends ExecutionGraph {
-
-		private static final Configuration EMPTY_CONFIG = new Configuration();
-
-		private static final Time TIMEOUT = Time.seconds(30L);
-
-		private volatile boolean done;
-
-		TestExecGraph(JobID jobId) throws IOException {
-			super(
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				jobId,
-				"test graph",
-				EMPTY_CONFIG,
-				new SerializedValue<>(new ExecutionConfig()),
-				TIMEOUT,
-				new FixedDelayRestartStrategy(1, 0),
-				new Scheduler(TestingUtils.defaultExecutionContext()));
-		}
-
-		@Override
-		public void scheduleForExecution() {
-			// notify that we are done with the "restarting"
-			synchronized (this) {
-				done = true;
-				this.notifyAll();
-			}
-		}
-
-		public void waitTillDone() {
-			try {
-				synchronized (this) {
-					while (!done) {
-						this.wait();
-					}
-				}
-			}
-			catch (InterruptedException e) {
-				throw new RuntimeException(e);
-			}
-		}
-	}
-}