You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/20 00:55:30 UTC

[16/19] flink git commit: [FLINK-5747] [distributed coordination] Eager scheduling allocates slots and deploys tasks in bulk

[FLINK-5747] [distributed coordination] Eager scheduling allocates slots and deploys tasks in bulk

That way, strictly topological deployment can be guaranteed.

Also, many quick deploy/not-enough-resources/fail/recover cycles can be
avoided in the cases where resources need some time to appear.

This closes #3295


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f113d794
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f113d794
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f113d794

Branch: refs/heads/master
Commit: f113d79451ba88c487358861cc3e20aac3d19257
Parents: 5902ea0
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 3 20:26:23 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 01:01:24 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/util/ExceptionUtils.java   |  12 +
 .../apache/flink/util/ExceptionUtilsTest.java   |  60 ++
 .../flink/runtime/concurrent/Executors.java     |   3 +-
 .../flink/runtime/concurrent/FutureUtils.java   | 115 ++++
 .../flink/runtime/executiongraph/Execution.java |  73 ++-
 .../executiongraph/ExecutionAndSlot.java        |  46 ++
 .../runtime/executiongraph/ExecutionGraph.java  | 170 +++++-
 .../executiongraph/ExecutionGraphUtils.java     | 106 ++++
 .../executiongraph/ExecutionJobVertex.java      |  46 +-
 .../runtime/executiongraph/ExecutionVertex.java |   3 +-
 .../IllegalExecutionStateException.java         |  53 ++
 .../apache/flink/runtime/instance/SlotPool.java |   9 +-
 .../runtime/concurrent/FutureUtilsTest.java     | 194 ++++++
 .../ExecutionGraphSchedulingTest.java           | 610 +++++++++++++++++++
 .../executiongraph/ExecutionGraphUtilsTest.java | 124 ++++
 .../ExecutionVertexCancelTest.java              |   2 +-
 .../ExecutionVertexSchedulingTest.java          |   3 -
 .../executiongraph/PointwisePatternTest.java    |  12 +-
 .../executiongraph/ProgrammedSlotProvider.java  |  87 +++
 .../TerminalJobStatusListener.java              |  45 ++
 .../LeaderChangeJobRecoveryTest.java            |  23 +-
 .../runtime/minicluster/MiniClusterITCase.java  |  28 +-
 .../Flip6LocalStreamEnvironment.java            |   4 +-
 23 files changed, 1735 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 6ba9ef6..69c2692 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -101,6 +101,18 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Rethrows the given {@code Throwable}, if it represents an error that is fatal to the JVM.
+	 * See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a definition of fatal errors.
+	 * 
+	 * @param t The Throwable to check and rethrow.
+	 */
+	public static void rethrowIfFatalError(Throwable t) {
+		if (isJvmFatalError(t)) {
+			throw (Error) t;
+		}
+	}
+
+	/**
 	 * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
 	 * to a prior exception, or returns the new exception, if no prior exception exists.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
new file mode 100644
index 0000000..343b9d6
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the utility methods in {@link ExceptionUtils}.
+ */
+public class ExceptionUtilsTest {
+
+	@Test
+	public void testStringifyNullException() {
+		assertNotNull(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION);
+		assertEquals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION, ExceptionUtils.stringifyException(null));
+	}
+
+	@Test
+	public void testJvmFatalError() {
+		// not all errors are fatal
+		assertFalse(ExceptionUtils.isJvmFatalError(new Error()));
+
+		// linkage errors are not fatal
+		assertFalse(ExceptionUtils.isJvmFatalError(new LinkageError()));
+
+		// some errors are fatal
+		assertTrue(ExceptionUtils.isJvmFatalError(new InternalError()));
+		assertTrue(ExceptionUtils.isJvmFatalError(new UnknownError()));
+	}
+
+	@Test
+	public void testRethrowFatalError() {
+		// fatal error is rethrown
+		try {
+			ExceptionUtils.rethrowIfFatalError(new InternalError());
+			fail();
+		} catch (InternalError ignored) {}
+
+		// non-fatal error is not rethrown
+		ExceptionUtils.rethrowIfFatalError(new NoClassDefFoundError());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 391f233..63b6a25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.concurrent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -52,7 +53,7 @@ public class Executors {
 		private DirectExecutor() {}
 
 		@Override
-		public void execute(Runnable command) {
+		public void execute(@Nonnull Runnable command) {
 			command.run();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index a404c98..4948147 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -20,11 +20,22 @@ package org.apache.flink.runtime.concurrent;
 
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 
+import java.util.Collection;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A collection of utilities that expand the usage of {@link Future} and {@link CompletableFuture}.
+ */
 public class FutureUtils {
 
+	// ------------------------------------------------------------------------
+	//  retrying operations
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Retry the given operation the given number of times in case of a failure.
 	 *
@@ -88,4 +99,108 @@ public class FutureUtils {
 			super(cause);
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  composing futures
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a future that is complete once multiple other futures completed. 
+	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
+	 * conjunction fails.
+	 *
+	 * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
+	 * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. 
+	 * 
+	 * @param futures The futures that make up the conjunction. No null entries are allowed.
+	 * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
+	 */
+	public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
+		checkNotNull(futures, "futures");
+
+		final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size());
+
+		if (futures.isEmpty()) {
+			conjunct.complete(null);
+		}
+		else {
+			for (Future<?> future : futures) {
+				future.handle(conjunct.completionHandler);
+			}
+		}
+
+		return conjunct;
+	}
+
+	/**
+	 * A future that is complete once multiple other futures completed. The futures are not
+	 * necessarily of the same type, which is why the type of this Future is {@code Void}.
+	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
+	 * conjunction fails.
+	 * 
+	 * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via
+	 * {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how
+	 * many of the Futures are already complete.
+	 */
+	public interface ConjunctFuture extends CompletableFuture<Void> {
+
+		/**
+		 * Gets the total number of Futures in the conjunction.
+		 * @return The total number of Futures in the conjunction.
+		 */
+		int getNumFuturesTotal();
+
+		/**
+		 * Gets the number of Futures in the conjunction that are already complete.
+		 * @return The number of Futures in the conjunction that are already complete
+		 */
+		int getNumFuturesCompleted();
+	}
+
+	/**
+	 * The implementation of the {@link ConjunctFuture}.
+	 * 
+	 * <p>Implementation notice: The member fields all have package-private access, because they are
+	 * either accessed by an inner subclass or by the enclosing class.
+	 */
+	private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
+
+		/** The total number of futures in the conjunction */
+		final int numTotal;
+
+		/** The number of futures in the conjunction that are already complete */
+		final AtomicInteger numCompleted = new AtomicInteger();
+
+		/** The function that is attached to all futures in the conjunction. Once a future
+		 * is complete, this function tracks the completion or fails the conjunct.  
+		 */
+		final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() {
+
+			@Override
+			public Void apply(Object o, Throwable throwable) {
+				if (throwable != null) {
+					completeExceptionally(throwable);
+				}
+				else if (numTotal == numCompleted.incrementAndGet()) {
+					complete(null);
+				}
+
+				return null;
+			}
+		};
+
+		ConjunctFutureImpl(int numTotal) {
+			this.numTotal = numTotal;
+		}
+
+		@Override
+		public int getNumFuturesTotal() {
+			return numTotal;
+		}
+
+		@Override
+		public int getNumFuturesCompleted() {
+			return numCompleted.get();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 60e5575..b3fe443 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -249,27 +249,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
 	 */
 	public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) {
-		if (slotProvider == null) {
-			throw new IllegalArgumentException("Cannot send null Scheduler when scheduling execution.");
-		}
-
-		final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
-		final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();
-
-		// sanity check
-		if (locationConstraint != null && sharingGroup == null) {
-			throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed.");
-		}
-
-		if (transitionState(CREATED, SCHEDULED)) {
-
-			ScheduledUnit toSchedule = locationConstraint == null ?
-				new ScheduledUnit(this, sharingGroup) :
-				new ScheduledUnit(this, sharingGroup, locationConstraint);
-
-			// IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
-			//     in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
-			final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued);
+		try {
+			final Future<SimpleSlot> slotAllocationFuture = allocateSlotForExecution(slotProvider, queued);
 
 			// IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
 			// that we directly deploy the tasks if the slot allocation future is completed. This is
@@ -296,28 +277,54 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			});
 
 			// if tasks have to scheduled immediately check that the task has been deployed
-			// TODO: This might be problematic if the future is not completed right away
-			if (!queued) {
-				if (!deploymentFuture.isDone()) {
-					markFailed(new IllegalArgumentException("The slot allocation future has not been completed yet."));
-				}
+			if (!queued && !deploymentFuture.isDone()) {
+				markFailed(new IllegalArgumentException("The slot allocation future has not been completed yet."));
 			}
-			
+
 			return true;
 		}
+		catch (IllegalExecutionStateException e) {
+			return false;
+		}
+	}
+
+	public Future<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider, boolean queued) 
+			throws IllegalExecutionStateException {
+
+		checkNotNull(slotProvider);
+
+		final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
+		final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();
+
+		// sanity check
+		if (locationConstraint != null && sharingGroup == null) {
+			throw new IllegalStateException(
+					"Trying to schedule with co-location constraint but without slot sharing allowed.");
+		}
+
+		// this method only works if the execution is in the state 'CREATED'
+		if (transitionState(CREATED, SCHEDULED)) {
+
+			ScheduledUnit toSchedule = locationConstraint == null ?
+					new ScheduledUnit(this, sharingGroup) :
+					new ScheduledUnit(this, sharingGroup, locationConstraint);
+
+			return slotProvider.allocateSlot(toSchedule, queued);
+		}
 		else {
 			// call race, already deployed, or already done
-			return false;
+			throw new IllegalExecutionStateException(this, CREATED, state);
 		}
 	}
 
 	public void deployToSlot(final SimpleSlot slot) throws JobException {
-		// sanity checks
-		if (slot == null) {
-			throw new NullPointerException();
-		}
+		checkNotNull(slot);
+
+		// Check if the TaskManager died in the meantime
+		// This only speeds up the response to TaskManagers failing concurrently to deployments.
+		// The more general check is the timeout of the deployment call
 		if (!slot.isAlive()) {
-			throw new JobException("Target slot for deployment is not alive.");
+			throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
 		}
 
 		// make sure exactly one deployment call happens from the correct state

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
new file mode 100644
index 0000000..ea6186e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.instance.SimpleSlot;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A pair of an {@link Execution} together with a slot future.
+ */
+public class ExecutionAndSlot {
+
+	public final Execution executionAttempt;
+
+	public final Future<SimpleSlot> slotFuture;
+
+	public ExecutionAndSlot(Execution executionAttempt, Future<SimpleSlot> slotFuture) {
+		this.executionAttempt = checkNotNull(executionAttempt);
+		this.slotFuture = checkNotNull(slotFuture);
+	}
+
+	// -----------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return super.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f25120c..ad4347d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.commons.lang3.StringUtils;
+
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -40,9 +41,14 @@ import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -53,6 +59,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
@@ -60,6 +67,7 @@ import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,11 +85,14 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The execution graph is the central data structure that coordinates the distributed
@@ -158,7 +169,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	private final long[] stateTimestamps;
 
 	/** The timeout for all messages that require a response/acknowledgement */
-	private final Time timeout;
+	private final Time rpcCallTimeout;
 
 	// ------ Configuration of the Execution -------
 
@@ -171,6 +182,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * from results than need to be materialized. */
 	private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
+	private final Time scheduleAllocationTimeout;
+
 	// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
 
 	/** Current status of the job execution */
@@ -292,7 +305,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		this.stateTimestamps = new long[JobStatus.values().length];
 		this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
 
-		this.timeout = timeout;
+		this.rpcCallTimeout = checkNotNull(timeout);
+		this.scheduleAllocationTimeout = checkNotNull(timeout);
 
 		this.restartStrategy = restartStrategy;
 
@@ -695,7 +709,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 			// create the execution job vertex and attach it to the graph
 			ExecutionJobVertex ejv =
-					new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);
+					new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, createTimestamp);
 			ejv.connectToPredecessors(this.intermediateResults);
 
 			ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
@@ -717,9 +731,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	public void scheduleForExecution(SlotProvider slotProvider) throws JobException {
-		if (slotProvider == null) {
-			throw new IllegalArgumentException("Scheduler must not be null.");
-		}
+		checkNotNull(slotProvider);
 
 		if (this.slotProvider != null && this.slotProvider != slotProvider) {
 			throw new IllegalArgumentException("Cannot use different slot providers for the same job");
@@ -731,18 +743,11 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			switch (scheduleMode) {
 
 				case LAZY_FROM_SOURCES:
-					// simply take the vertices without inputs.
-					for (ExecutionJobVertex ejv : this.tasks.values()) {
-						if (ejv.getJobVertex().isInputVertex()) {
-							ejv.scheduleAll(slotProvider, allowQueuedScheduling);
-						}
-					}
+					scheduleLazy(slotProvider);
 					break;
 
 				case EAGER:
-					for (ExecutionJobVertex ejv : getVerticesTopologically()) {
-						ejv.scheduleAll(slotProvider, allowQueuedScheduling);
-					}
+					scheduleEager(slotProvider, scheduleAllocationTimeout);
 					break;
 
 				default:
@@ -754,6 +759,139 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
+	private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
+		// simply take the vertices without inputs.
+		for (ExecutionJobVertex ejv : this.tasks.values()) {
+			if (ejv.getJobVertex().isInputVertex()) {
+				ejv.scheduleAll(slotProvider, allowQueuedScheduling);
+			}
+		}
+	}
+
+	/**
+	 * 
+	 * 
+	 * @param slotProvider  The resource provider from which the slots are allocated
+	 * @param timeout       The maximum time that the deployment may take, before a
+	 *                      TimeoutException is thrown.
+	 */
+	private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
+		checkState(state == JobStatus.RUNNING, "job is not running currently");
+
+		// Important: reserve all the space we need up front.
+		// that way we do not have any operation that can fail between allocating the slots
+		// and adding them to the list. If we had a failure in between there, that would
+		// cause the slots to get lost
+		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
+		final boolean queued = allowQueuedScheduling;
+
+		// we use this flag to handle failures in a 'finally' clause
+		// that allows us to not go through clumsy cast-and-rethrow logic
+		boolean successful = false;
+
+		try {
+			// collecting all the slots may resize and fail in that operation without slots getting lost
+			final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+
+			// allocate the slots (obtain all their futures
+			for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+				// these calls are not blocking, they only return futures
+				ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
+
+				// we need to first add the slots to this list, to be safe on release
+				resources.add(slots);
+
+				for (ExecutionAndSlot ens : slots) {
+					slotFutures.add(ens.slotFuture);
+				}
+			}
+
+			// this future is complete once all slot futures are complete.
+			// the future fails once one slot future fails.
+			final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures);
+
+			// make sure that we fail if the allocation timeout was exceeded
+			final ScheduledFuture<?> timeoutCancelHandle = futureExecutor.schedule(new Runnable() {
+				@Override
+				public void run() {
+					// When the timeout triggers, we try to complete the conjunct future with an exception.
+					// Note that this is a no-op if the future is already completed
+					int numTotal = allAllocationsComplete.getNumFuturesTotal();
+					int numComplete = allAllocationsComplete.getNumFuturesCompleted();
+					String message = "Could not allocate all requires slots within timeout of " +
+							timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete;
+
+					allAllocationsComplete.completeExceptionally(new NoResourceAvailableException(message));
+				}
+			}, timeout.getSize(), timeout.getUnit());
+
+
+			allAllocationsComplete.handleAsync(new BiFunction<Void, Throwable, Void>() {
+
+				@Override
+				public Void apply(Void ignored, Throwable throwable) {
+					try {
+						// we do not need the cancellation timeout any more
+						timeoutCancelHandle.cancel(false);
+
+						if (throwable == null) {
+							// successfully obtained all slots, now deploy
+
+							for (ExecutionAndSlot[] jobVertexTasks : resources) {
+								for (ExecutionAndSlot execAndSlot : jobVertexTasks) {
+
+									// the futures must all be ready - this is simply a sanity check
+									final SimpleSlot slot;
+									try {
+										slot = execAndSlot.slotFuture.getNow(null);
+										checkNotNull(slot);
+									}
+									catch (ExecutionException | NullPointerException e) {
+										throw new IllegalStateException("SlotFuture is incomplete " +
+												"or erroneous even though all futures completed");
+									}
+
+									// actual deployment
+									execAndSlot.executionAttempt.deployToSlot(slot);
+								}
+							}
+						}
+						else {
+							// let the exception handler deal with this
+							throw throwable;
+						}
+					}
+					catch (Throwable t) {
+						// we catch everything here to make sure cleanup happens and the
+						// ExecutionGraph notices
+						// we need to go into recovery and make sure to release all slots
+						try {
+							fail(t);
+						}
+						finally {
+							ExecutionGraphUtils.releaseAllSlotsSilently(resources);
+						}
+					}
+
+					// Wouldn't it be nice if we could return an actual Void object?
+					// return (Void) Unsafe.getUnsafe().allocateInstance(Void.class);
+					return null; 
+				}
+			}, futureExecutor);
+
+			// from now on, slots will be rescued by the the futures and their completion, or by the timeout
+			successful = true;
+		}
+		finally {
+			if (!successful) {
+				// we come here only if the 'try' block finished with an exception
+				// we release the slots (possibly failing some executions on the way) and
+				// let the exception bubble up
+				ExecutionGraphUtils.releaseAllSlotsSilently(resources);
+			}
+		}
+	}
+
 	public void cancel() {
 		while (true) {
 			JobStatus current = state;
@@ -971,7 +1109,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			}
 		} catch (IOException | ClassNotFoundException e) {
 			LOG.error("Couldn't create ArchivedExecutionConfig for job {} ", getJobID(), e);
-		};
+		}
 		return null;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
new file mode 100644
index 0000000..cd6d6aa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.util.List;
+
+/**
+ * Utilities for dealing with the execution graphs and scheduling.
+ */
+public class ExecutionGraphUtils {
+
+	/**
+	 * Releases the slot represented by the given future. If the future is complete, the
+	 * slot is immediately released. Otherwise, the slot is released as soon as the future
+	 * is completed.
+	 * 
+	 * <p>Note that releasing the slot means cancelling any task execution currently
+	 * associated with that slot.
+	 * 
+	 * @param slotFuture The future for the slot to release.
+	 */
+	public static void releaseSlotFuture(Future<SimpleSlot> slotFuture) {
+		slotFuture.handle(ReleaseSlotFunction.INSTANCE);
+	}
+
+	/**
+	 * Releases the all the slots in the list of arrays of {@code ExecutionAndSlot}.
+	 * For each future in that collection holds: If the future is complete, its slot is
+	 * immediately released. Otherwise, the slot is released as soon as the future
+	 * is completed.
+	 * 
+	 * <p>This methods never throws any exceptions (except for fatal exceptions) and continues
+	 * to release the remaining slots if one slot release failed.
+	 *
+	 * <p>Note that releasing the slot means cancelling any task execution currently
+	 * associated with that slot.
+	 * 
+	 * @param resources The collection of ExecutionAndSlot whose slots should be released.
+	 */
+	public static void releaseAllSlotsSilently(List<ExecutionAndSlot[]> resources) {
+		try {
+			for (ExecutionAndSlot[] jobVertexResources : resources) {
+				if (jobVertexResources != null) {
+					for (ExecutionAndSlot execAndSlot : jobVertexResources) {
+						if (execAndSlot != null) {
+							try {
+								releaseSlotFuture(execAndSlot.slotFuture);
+							}
+							catch (Throwable t) {
+								ExceptionUtils.rethrowIfFatalError(t);
+							}
+						}
+					}
+				}
+			}
+		}
+		catch (Throwable t) {
+			ExceptionUtils.rethrowIfFatalError(t);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A function to be applied into a future, releasing the slot immediately upon completion.
+	 * Completion here refers to both the successful and exceptional completion.
+	 */
+	private static final class ReleaseSlotFunction implements BiFunction<SimpleSlot, Throwable, Void> {
+
+		static final ReleaseSlotFunction INSTANCE = new ReleaseSlotFunction();
+
+		@Override
+		public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
+			if (simpleSlot != null) {
+				simpleSlot.releaseSlot();
+			}
+			return null;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** Utility class is not meant to be instantiated */
+	private ExecutionGraphUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 3828fc9..754148e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -30,7 +30,9 @@ import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -388,7 +390,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	
 	public void scheduleAll(SlotProvider slotProvider, boolean queued) {
 		
-		ExecutionVertex[] vertices = this.taskVertices;
+		final ExecutionVertex[] vertices = this.taskVertices;
 
 		// kick off the tasks
 		for (ExecutionVertex ev : vertices) {
@@ -396,6 +398,48 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		}
 	}
 
+	/**
+	 * Acquires a slot for all the execution vertices of this ExecutionJobVertex. The method returns
+	 * pairs of the slots and execution attempts, to ease correlation between vertices and execution
+	 * attempts.
+	 * 
+	 * <p>If this method throws an exception, it makes sure to release all so far requested slots.
+	 * 
+	 * @param resourceProvider The resource provider from whom the slots are requested.
+	 */
+	public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) {
+		final ExecutionVertex[] vertices = this.taskVertices;
+		final ExecutionAndSlot[] slots = new ExecutionAndSlot[vertices.length];
+
+		// try to acquire a slot future for each execution.
+		// we store the execution with the future just to be on the safe side
+		for (int i = 0; i < vertices.length; i++) {
+
+			// we use this flag to handle failures in a 'finally' clause
+			// that allows us to not go through clumsy cast-and-rethrow logic
+			boolean successful = false;
+
+			try {
+				// allocate the next slot (future)
+				final Execution exec = vertices[i].getCurrentExecutionAttempt();
+				final Future<SimpleSlot> future = exec.allocateSlotForExecution(resourceProvider, queued);
+				slots[i] = new ExecutionAndSlot(exec, future);
+				successful = true;
+			}
+			finally {
+				if (!successful) {
+					// this is the case if an exception was thrown
+					for (int k = 0; k < i; k++) {
+						ExecutionGraphUtils.releaseSlotFuture(slots[k].slotFuture);
+					}
+				}
+			}
+		}
+
+		// all good, we acquired all slots
+		return slots;
+	}
+
 	public void cancel() {
 		for (ExecutionVertex ev : getTaskVertices()) {
 			ev.cancel();

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 92327fd..ca8e07c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -102,6 +102,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			int subTaskIndex,
 			IntermediateResult[] producedDataSets,
 			Time timeout) {
+
 		this(
 				jobVertex,
 				subTaskIndex,
@@ -133,7 +134,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		this.taskNameWithSubtask = String.format("%s (%d/%d)",
 				jobVertex.getJobVertex().getName(), subTaskIndex + 1, jobVertex.getParallelism());
 
-		this.resultPartitions = new LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition>(producedDataSets.length, 1);
+		this.resultPartitions = new LinkedHashMap<>(producedDataSets.length, 1);
 
 		for (IntermediateResult result : producedDataSets) {
 			IntermediateResultPartition irp = new IntermediateResultPartition(result, this, subTaskIndex);

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java
new file mode 100644
index 0000000..44162ab
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+
+/**
+ * A special {@link IllegalStateException} indicating a mismatch in the expected and actual
+ * {@link ExecutionState} of an {@link Execution}.
+ */
+public class IllegalExecutionStateException extends IllegalStateException {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Creates a new IllegalExecutionStateException with the error message indicating
+	 * the expected and actual state.
+	 * 
+	 * @param expected The expected state 
+	 * @param actual   The actual state
+	 */
+	public IllegalExecutionStateException(ExecutionState expected, ExecutionState actual) {
+		super("Invalid execution state: Expected " + expected + " , found " + actual);
+	}
+
+	/**
+	 * Creates a new IllegalExecutionStateException with the error message indicating
+	 * the expected and actual state.
+	 *
+	 * @param expected The expected state 
+	 * @param actual   The actual state
+	 */
+	public IllegalExecutionStateException(Execution execution, ExecutionState expected, ExecutionState actual) {
+		super(execution.getVertexWithAttempt() + " is no longer in expected state " + expected + 
+				" but in state " + actual);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 4da6c7b..8ba5040 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -1048,9 +1048,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
 		private final long timestamp;
 
-		SlotAndTimestamp(
-				AllocatedSlot slot,
-				long timestamp) {
+		SlotAndTimestamp(AllocatedSlot slot, long timestamp) {
 			this.slot = slot;
 			this.timestamp = timestamp;
 		}
@@ -1062,5 +1060,10 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 		public long timestamp() {
 			return timestamp;
 		}
+
+		@Override
+		public String toString() {
+			return slot + " @ " + timestamp;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
new file mode 100644
index 0000000..43710cb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the utility methods in {@link FutureUtils}
+ */
+public class FutureUtilsTest {
+
+	@Test
+	public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
+		try {
+			FutureUtils.combineAll(null);
+			fail();
+		} catch (NullPointerException ignored) {}
+
+		try {
+			FutureUtils.combineAll(Arrays.asList(
+					new FlinkCompletableFuture<Object>(),
+					null,
+					new FlinkCompletableFuture<Object>()));
+			fail();
+		} catch (NullPointerException ignored) {}
+	}
+
+	@Test
+	public void testConjunctFutureCompletion() throws Exception {
+		// some futures that we combine
+		CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
+		CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
+		CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
+		CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
+
+		// some future is initially completed
+		future2.complete(new Object());
+
+		// build the conjunct future
+		ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
+
+		Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
+			@Override
+			public void accept(Void value) {}
+		});
+
+		assertEquals(4, result.getNumFuturesTotal());
+		assertEquals(1, result.getNumFuturesCompleted());
+		assertFalse(result.isDone());
+		assertFalse(resultMapped.isDone());
+
+		// complete two more futures
+		future4.complete(new Object());
+		assertEquals(2, result.getNumFuturesCompleted());
+		assertFalse(result.isDone());
+		assertFalse(resultMapped.isDone());
+
+		future1.complete(new Object());
+		assertEquals(3, result.getNumFuturesCompleted());
+		assertFalse(result.isDone());
+		assertFalse(resultMapped.isDone());
+
+		// complete one future again
+		future1.complete(new Object());
+		assertEquals(3, result.getNumFuturesCompleted());
+		assertFalse(result.isDone());
+		assertFalse(resultMapped.isDone());
+
+		// complete the final future
+		future3.complete(new Object());
+		assertEquals(4, result.getNumFuturesCompleted());
+		assertTrue(result.isDone());
+		assertTrue(resultMapped.isDone());
+	}
+
+	@Test
+	public void testConjunctFutureFailureOnFirst() throws Exception {
+
+		CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
+		CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
+		CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
+		CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
+
+		// build the conjunct future
+		ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
+
+		Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
+			@Override
+			public void accept(Void value) {}
+		});
+
+		assertEquals(4, result.getNumFuturesTotal());
+		assertEquals(0, result.getNumFuturesCompleted());
+		assertFalse(result.isDone());
+		assertFalse(resultMapped.isDone());
+
+		future2.completeExceptionally(new IOException());
+
+		assertEquals(0, result.getNumFuturesCompleted());
+		assertTrue(result.isDone());
+		assertTrue(resultMapped.isDone());
+
+		try {
+			result.get();
+			fail();
+		} catch (ExecutionException e) {
+			assertTrue(e.getCause() instanceof IOException);
+		}
+
+		try {
+			resultMapped.get();
+			fail();
+		} catch (ExecutionException e) {
+			assertTrue(e.getCause() instanceof IOException);
+		}
+	}
+
+	@Test
+	public void testConjunctFutureFailureOnSuccessive() throws Exception {
+
+		CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
+		CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
+		CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
+		CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
+
+		// build the conjunct future
+		ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
+		assertEquals(4, result.getNumFuturesTotal());
+
+		Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
+			@Override
+			public void accept(Void value) {}
+		});
+
+		future1.complete(new Object());
+		future3.complete(new Object());
+		future4.complete(new Object());
+
+		future2.completeExceptionally(new IOException());
+
+		assertEquals(3, result.getNumFuturesCompleted());
+		assertTrue(result.isDone());
+		assertTrue(resultMapped.isDone());
+
+		try {
+			result.get();
+			fail();
+		} catch (ExecutionException e) {
+			assertTrue(e.getCause() instanceof IOException);
+		}
+
+		try {
+			resultMapped.get();
+			fail();
+		} catch (ExecutionException e) {
+			assertTrue(e.getCause() instanceof IOException);
+		}
+	}
+
+	@Test
+	public void testConjunctOfNone() throws Exception {
+		final ConjunctFuture result = FutureUtils.combineAll(Collections.<Future<Object>>emptyList());
+
+		assertEquals(0, result.getNumFuturesTotal());
+		assertEquals(0, result.getNumFuturesCompleted());
+		assertTrue(result.isDone());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
new file mode 100644
index 0000000..9834dc6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Test;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.verification.Timeout;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the scheduling of the execution graph. This tests that
+ * for example the order of deployments is correct and that bulk slot allocation
+ * works properly.
+ */
+public class ExecutionGraphSchedulingTest extends TestLogger {
+
+	private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
+	@After
+	public void shutdown() {
+		executor.shutdownNow();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+
+	
+	/**
+	 * Tests that with scheduling futures and pipelined deployment, the target vertex will
+	 * not deploy its task before the source vertex does.
+	 */
+	@Test
+	public void testScheduleSourceBeforeTarget() throws Exception {
+
+		//                                            [pipelined]
+		//  we construct a simple graph    (source) ----------------> (target)
+
+		final int parallelism = 1;
+
+		final JobVertex sourceVertex = new JobVertex("source");
+		sourceVertex.setParallelism(parallelism);
+		sourceVertex.setInvokableClass(NoOpInvokable.class);
+
+		final JobVertex targetVertex = new JobVertex("target");
+		targetVertex.setParallelism(parallelism);
+		targetVertex.setInvokableClass(NoOpInvokable.class);
+
+		targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+		final JobID jobId = new JobID();
+		final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
+
+		final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+		//
+		//  set up two TaskManager gateways and slots
+
+		final TaskManagerGateway gatewaySource = createTaskManager();
+		final TaskManagerGateway gatewayTarget = createTaskManager();
+
+		final SimpleSlot sourceSlot = createSlot(gatewaySource, jobId);
+		final SimpleSlot targetSlot = createSlot(gatewayTarget, jobId);
+
+		final FlinkCompletableFuture<SimpleSlot> sourceFuture = new FlinkCompletableFuture<>();
+		final FlinkCompletableFuture<SimpleSlot> targetFuture = new FlinkCompletableFuture<>();
+
+		ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
+		slotProvider.addSlot(sourceVertex.getID(), 0, sourceFuture);
+		slotProvider.addSlot(targetVertex.getID(), 0, targetFuture);
+
+		eg.setScheduleMode(ScheduleMode.EAGER);
+		eg.setQueuedSchedulingAllowed(true);
+		eg.scheduleForExecution(slotProvider);
+
+		// job should be running
+		assertEquals(JobStatus.RUNNING, eg.getState());
+
+		// we fulfill the target slot before the source slot
+		// that should not cause a deployment or deployment related failure
+		targetFuture.complete(targetSlot);
+
+		verify(gatewayTarget, new Timeout(50, times(0))).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+		assertEquals(JobStatus.RUNNING, eg.getState());
+
+		// now supply the source slot
+		sourceFuture.complete(sourceSlot);
+
+		// by now, all deployments should have happened
+		verify(gatewaySource, timeout(1000)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+		verify(gatewayTarget, timeout(1000)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+
+		assertEquals(JobStatus.RUNNING, eg.getState());
+	}
+
+	/**
+	 * This test verifies that before deploying a pipelined connected component, the
+	 * full set of slots is available, and that not some tasks are deployed, and later the
+	 * system realizes that not enough resources are available.
+	 */
+	@Test
+	public void testDeployPipelinedConnectedComponentsTogether() throws Exception {
+
+		//                                            [pipelined]
+		//  we construct a simple graph    (source) ----------------> (target)
+
+		final int parallelism = 8;
+
+		final JobVertex sourceVertex = new JobVertex("source");
+		sourceVertex.setParallelism(parallelism);
+		sourceVertex.setInvokableClass(NoOpInvokable.class);
+
+		final JobVertex targetVertex = new JobVertex("target");
+		targetVertex.setParallelism(parallelism);
+		targetVertex.setInvokableClass(NoOpInvokable.class);
+
+		targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+		final JobID jobId = new JobID();
+		final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
+
+		final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+		//
+		//  Create the slots, futures, and the slot provider
+
+		final TaskManagerGateway[] sourceTaskManagers = new TaskManagerGateway[parallelism];
+		final TaskManagerGateway[] targetTaskManagers = new TaskManagerGateway[parallelism];
+
+		final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
+		final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
+
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new FlinkCompletableFuture[parallelism];
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new FlinkCompletableFuture[parallelism];
+
+		for (int i = 0; i < parallelism; i++) {
+			sourceTaskManagers[i] = createTaskManager();
+			targetTaskManagers[i] = createTaskManager();
+
+			sourceSlots[i] = createSlot(sourceTaskManagers[i], jobId);
+			targetSlots[i] = createSlot(targetTaskManagers[i], jobId);
+
+			sourceFutures[i] = new FlinkCompletableFuture<>();
+			targetFutures[i] = new FlinkCompletableFuture<>();
+		}
+
+		ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
+		slotProvider.addSlots(sourceVertex.getID(), sourceFutures);
+		slotProvider.addSlots(targetVertex.getID(), targetFutures);
+
+		//
+		//  we complete some of the futures
+
+		for (int i = 0; i < parallelism; i += 2) {
+			sourceFutures[i].complete(sourceSlots[i]);
+		}
+
+		//
+		//  kick off the scheduling
+
+		eg.setScheduleMode(ScheduleMode.EAGER);
+		eg.setQueuedSchedulingAllowed(true);
+		eg.scheduleForExecution(slotProvider);
+
+		verifyNothingDeployed(eg, sourceTaskManagers);
+
+		//  complete the remaining sources
+		for (int i = 1; i < parallelism; i += 2) {
+			sourceFutures[i].complete(sourceSlots[i]);
+		}
+		verifyNothingDeployed(eg, sourceTaskManagers);
+
+		//  complete the targets except for one
+		for (int i = 1; i < parallelism; i++) {
+			targetFutures[i].complete(targetSlots[i]);
+		}
+		verifyNothingDeployed(eg, targetTaskManagers);
+
+		//  complete the last target slot future
+		targetFutures[0].complete(targetSlots[0]);
+
+		//
+		//  verify that all deployments have happened
+
+		for (TaskManagerGateway gateway : sourceTaskManagers) {
+			verify(gateway, timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+		}
+		for (TaskManagerGateway gateway : targetTaskManagers) {
+			verify(gateway, timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+		}
+	}
+
+	/**
+	 * This test verifies that if one slot future fails, the deployment will be aborted.
+	 */
+	@Test
+	public void testOneSlotFailureAbortsDeploy() throws Exception {
+
+		//                                            [pipelined]
+		//  we construct a simple graph    (source) ----------------> (target)
+
+		final int parallelism = 6;
+
+		final JobVertex sourceVertex = new JobVertex("source");
+		sourceVertex.setParallelism(parallelism);
+		sourceVertex.setInvokableClass(NoOpInvokable.class);
+
+		final JobVertex targetVertex = new JobVertex("target");
+		targetVertex.setParallelism(parallelism);
+		targetVertex.setInvokableClass(NoOpInvokable.class);
+
+		targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+		final JobID jobId = new JobID();
+		final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
+
+		final ExecutionGraph eg = createExecutionGraph(jobGraph);
+		TerminalJobStatusListener testListener = new TerminalJobStatusListener();
+		eg.registerJobStatusListener(testListener);
+
+		//
+		//  Create the slots, futures, and the slot provider
+
+		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
+		final SlotOwner slotOwner = mock(SlotOwner.class);
+
+		final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
+		final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
+
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new FlinkCompletableFuture[parallelism];
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new FlinkCompletableFuture[parallelism];
+
+		for (int i = 0; i < parallelism; i++) {
+			sourceSlots[i] = createSlot(taskManager, jobId, slotOwner);
+			targetSlots[i] = createSlot(taskManager, jobId, slotOwner);
+
+			sourceFutures[i] = new FlinkCompletableFuture<>();
+			targetFutures[i] = new FlinkCompletableFuture<>();
+		}
+
+		ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
+		slotProvider.addSlots(sourceVertex.getID(), sourceFutures);
+		slotProvider.addSlots(targetVertex.getID(), targetFutures);
+
+		//
+		//  we complete some of the futures
+
+		for (int i = 0; i < parallelism; i += 2) {
+			sourceFutures[i].complete(sourceSlots[i]);
+			targetFutures[i + 1].complete(targetSlots[i + 1]);
+		}
+
+		//
+		//  kick off the scheduling
+
+		eg.setScheduleMode(ScheduleMode.EAGER);
+		eg.setQueuedSchedulingAllowed(true);
+		eg.scheduleForExecution(slotProvider);
+
+		// fail one slot
+		sourceFutures[1].completeExceptionally(new TestRuntimeException());
+
+		// wait until the job failed as a whole
+		testListener.waitForTerminalState(2000);
+
+		// wait until all slots are back
+		verify(slotOwner, new Timeout(2000, times(6))).returnAllocatedSlot(any(Slot.class));
+
+		// no deployment calls must have happened
+		verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+
+		// all completed futures must have been returns
+		for (int i = 0; i < parallelism; i += 2) {
+			assertTrue(sourceSlots[i].isCanceled());
+			assertTrue(targetSlots[i + 1].isCanceled());
+		}
+	}
+
+	/**
+	 * This test verifies that the slot allocations times out after a certain time, and that
+	 * all slots are released in that case.
+	 */
+	@Test
+	public void testTimeoutForSlotAllocation() throws Exception {
+
+		//  we construct a simple graph:    (task)
+
+		final int parallelism = 3;
+
+		final JobVertex vertex = new JobVertex("task");
+		vertex.setParallelism(parallelism);
+		vertex.setInvokableClass(NoOpInvokable.class);
+
+		final JobID jobId = new JobID();
+		final JobGraph jobGraph = new JobGraph(jobId, "test", vertex);
+
+		final ExecutionGraph eg = createExecutionGraph(jobGraph, Time.milliseconds(20));
+		final TerminalJobStatusListener statusListener = new TerminalJobStatusListener();
+		eg.registerJobStatusListener(statusListener);
+
+		final SlotOwner slotOwner = mock(SlotOwner.class);
+
+		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
+		final SimpleSlot[] slots = new SimpleSlot[parallelism];
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		final FlinkCompletableFuture<SimpleSlot>[] slotFutures = new FlinkCompletableFuture[parallelism];
+
+		for (int i = 0; i < parallelism; i++) {
+			slots[i] = createSlot(taskManager, jobId, slotOwner);
+			slotFutures[i] = new FlinkCompletableFuture<>();
+		}
+
+		ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
+		slotProvider.addSlots(vertex.getID(), slotFutures);
+
+		//  we complete one future
+		slotFutures[1].complete(slots[1]);
+
+		//  kick off the scheduling
+
+		eg.setScheduleMode(ScheduleMode.EAGER);
+		eg.setQueuedSchedulingAllowed(true);
+		eg.scheduleForExecution(slotProvider);
+
+		//  we complete another future
+		slotFutures[2].complete(slots[2]);
+
+		// since future[0] is still missing the while operation must time out
+		// we have no restarts allowed, so the job will go terminal
+		statusListener.waitForTerminalState(2000);
+
+		// wait until all slots are back
+		verify(slotOwner, new Timeout(2000, times(2))).returnAllocatedSlot(any(Slot.class));
+
+		//  verify that no deployments have happened
+		verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+
+		for (Future<SimpleSlot> future : slotFutures) {
+			if (future.isDone()) {
+				assertTrue(future.get().isCanceled());
+			}
+		}
+	}
+
+	/**
+	 * Tests that the {@link ExecutionJobVertex#allocateResourcesForAll(SlotProvider, boolean)} method
+	 * releases partially acquired resources upon exception.
+	 */
+	@Test
+	public void testExecutionJobVertexAllocateResourcesReleasesOnException() throws Exception {
+		final int parallelism = 8;
+
+		final JobVertex vertex = new JobVertex("vertex");
+		vertex.setParallelism(parallelism);
+		vertex.setInvokableClass(NoOpInvokable.class);
+
+		final JobID jobId = new JobID();
+		final JobGraph jobGraph = new JobGraph(jobId, "test", vertex);
+
+		final ExecutionGraph eg = createExecutionGraph(jobGraph);
+		final ExecutionJobVertex ejv = eg.getJobVertex(vertex.getID());
+
+		// set up some available slots and some slot owner that accepts released slots back
+		final List<SimpleSlot> returnedSlots = new ArrayList<>();
+		final SlotOwner recycler = new SlotOwner() {
+			@Override
+			public boolean returnAllocatedSlot(Slot slot) {
+				returnedSlots.add((SimpleSlot) slot);
+				return true;
+			}
+		};
+
+		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
+		final List<SimpleSlot> availableSlots = new ArrayList<>(Arrays.asList(
+				createSlot(taskManager, jobId, recycler),
+				createSlot(taskManager, jobId, recycler),
+				createSlot(taskManager, jobId, recycler)));
+
+
+		// slot provider that hand out parallelism / 3 slots, then throws an exception
+		final SlotProvider slots = mock(SlotProvider.class);
+		
+		when(slots.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
+				new Answer<Future<SimpleSlot>>() {
+
+					@Override
+					public Future<SimpleSlot> answer(InvocationOnMock invocation) {
+						if (availableSlots.isEmpty()) {
+							throw new TestRuntimeException();
+						} else {
+							return FlinkCompletableFuture.completed(availableSlots.remove(0));
+						}
+					}
+				});
+
+		// acquire resources and check that all are back after the failure
+
+		final int numSlotsToExpectBack = availableSlots.size();
+
+		try {
+			ejv.allocateResourcesForAll(slots, false);
+			fail("should have failed with an exception");
+		}
+		catch (TestRuntimeException e) {
+			// expected
+		}
+
+		assertEquals(numSlotsToExpectBack, returnedSlots.size());
+	}
+
+	/**
+	 * Tests that the {@link ExecutionGraph#scheduleForExecution(SlotProvider)} method
+	 * releases partially acquired resources upon exception.
+	 */
+	@Test
+	public void testExecutionGraphScheduleReleasesResourcesOnException() throws Exception {
+
+		//                                            [pipelined]
+		//  we construct a simple graph    (source) ----------------> (target)
+
+		final int parallelism = 3;
+
+		final JobVertex sourceVertex = new JobVertex("source");
+		sourceVertex.setParallelism(parallelism);
+		sourceVertex.setInvokableClass(NoOpInvokable.class);
+
+		final JobVertex targetVertex = new JobVertex("target");
+		targetVertex.setParallelism(parallelism);
+		targetVertex.setInvokableClass(NoOpInvokable.class);
+
+		targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+		final JobID jobId = new JobID();
+		final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
+
+		final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+		// set up some available slots and some slot owner that accepts released slots back
+		final List<SimpleSlot> returnedSlots = new ArrayList<>();
+		final SlotOwner recycler = new SlotOwner() {
+			@Override
+			public boolean returnAllocatedSlot(Slot slot) {
+				returnedSlots.add((SimpleSlot) slot);
+				return true;
+			}
+		};
+
+		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
+		final List<SimpleSlot> availableSlots = new ArrayList<>(Arrays.asList(
+				createSlot(taskManager, jobId, recycler),
+				createSlot(taskManager, jobId, recycler),
+				createSlot(taskManager, jobId, recycler),
+				createSlot(taskManager, jobId, recycler),
+				createSlot(taskManager, jobId, recycler)));
+
+
+		// slot provider that hand out parallelism / 3 slots, then throws an exception
+		final SlotProvider slots = mock(SlotProvider.class);
+
+		when(slots.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
+				new Answer<Future<SimpleSlot>>() {
+
+					@Override
+					public Future<SimpleSlot> answer(InvocationOnMock invocation) {
+						if (availableSlots.isEmpty()) {
+							throw new TestRuntimeException();
+						} else {
+							return FlinkCompletableFuture.completed(availableSlots.remove(0));
+						}
+					}
+				});
+
+		// acquire resources and check that all are back after the failure
+
+		final int numSlotsToExpectBack = availableSlots.size();
+
+		try {
+			eg.setScheduleMode(ScheduleMode.EAGER);
+			eg.scheduleForExecution(slots);
+			fail("should have failed with an exception");
+		}
+		catch (TestRuntimeException e) {
+			// expected
+		}
+
+		assertEquals(numSlotsToExpectBack, returnedSlots.size());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws Exception {
+		return createExecutionGraph(jobGraph, Time.minutes(10));
+	}
+
+	private ExecutionGraph createExecutionGraph(JobGraph jobGraph, Time timeout) throws Exception {
+		return ExecutionGraphBuilder.buildGraph(
+				null,
+				jobGraph,
+				new Configuration(),
+				executor,
+				executor,
+				getClass().getClassLoader(),
+				new StandaloneCheckpointRecoveryFactory(),
+				timeout,
+				new NoRestartStrategy(),
+				new UnregisteredMetricsGroup(),
+				1,
+				log);
+	}
+
+	private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId) {
+		return createSlot(taskManager, jobId, mock(SlotOwner.class));
+	}
+
+	private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId, SlotOwner slotOwner) {
+		TaskManagerLocation location = new TaskManagerLocation(
+				ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345);
+
+		AllocatedSlot slot = new AllocatedSlot(
+				new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, taskManager);
+
+		return new SimpleSlot(slot, slotOwner, 0);
+	}
+
+	private static TaskManagerGateway createTaskManager() {
+		TaskManagerGateway tm = mock(TaskManagerGateway.class);
+		when(tm.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)))
+				.thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+
+		return tm;
+	}
+
+	private static void verifyNothingDeployed(ExecutionGraph eg, TaskManagerGateway[] taskManagers) {
+		// job should still be running
+		assertEquals(JobStatus.RUNNING, eg.getState());
+
+		// none of the TaskManager should have gotten a deployment call, yet
+		for (TaskManagerGateway gateway : taskManagers) {
+			verify(gateway, new Timeout(50, times(0))).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+		}
+	}
+
+	private static class TestRuntimeException extends RuntimeException {
+		private static final long serialVersionUID = 1L;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
new file mode 100644
index 0000000..2e6da98
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the utility methods in the class {@link ExecutionGraphUtils}.
+ */
+public class ExecutionGraphUtilsTest {
+
+	@Test
+	public void testReleaseSlots() {
+		final JobID jid = new JobID();
+		final SlotOwner owner = mock(SlotOwner.class);
+
+		final SimpleSlot slot1 = new SimpleSlot(createAllocatedSlot(jid, 0), owner, 0);
+		final SimpleSlot slot2 = new SimpleSlot(createAllocatedSlot(jid, 1), owner, 1);
+		final SimpleSlot slot3 = new SimpleSlot(createAllocatedSlot(jid, 2), owner, 2);
+
+		final FlinkCompletableFuture<SimpleSlot> incompleteFuture = new FlinkCompletableFuture<>();
+
+		final FlinkCompletableFuture<SimpleSlot> completeFuture = new FlinkCompletableFuture<>();
+		completeFuture.complete(slot2);
+
+		final FlinkCompletableFuture<SimpleSlot> disposedSlotFuture = new FlinkCompletableFuture<>();
+		slot3.releaseSlot();
+		disposedSlotFuture.complete(slot3);
+
+		// release all futures
+		ExecutionGraphUtils.releaseSlotFuture(incompleteFuture);
+		ExecutionGraphUtils.releaseSlotFuture(completeFuture);
+		ExecutionGraphUtils.releaseSlotFuture(disposedSlotFuture);
+
+		// only now complete the incomplete future
+		incompleteFuture.complete(slot1);
+
+		// verify that each slot was returned once to the owner
+		verify(owner, times(1)).returnAllocatedSlot(eq(slot1));
+		verify(owner, times(1)).returnAllocatedSlot(eq(slot2));
+		verify(owner, times(1)).returnAllocatedSlot(eq(slot3));
+	}
+
+	@Test
+	public void testReleaseSlotsWithNulls() {
+		final JobID jid = new JobID();
+		final SlotOwner owner = mock(SlotOwner.class);
+
+		final Execution mockExecution = mock(Execution.class);
+
+		final SimpleSlot slot1 = new SimpleSlot(createAllocatedSlot(jid, 0), owner, 0);
+		final SimpleSlot slot2 = new SimpleSlot(createAllocatedSlot(jid, 1), owner, 1);
+		final SimpleSlot slot3 = new SimpleSlot(createAllocatedSlot(jid, 2), owner, 2);
+		final SimpleSlot slot4 = new SimpleSlot(createAllocatedSlot(jid, 3), owner, 3);
+		final SimpleSlot slot5 = new SimpleSlot(createAllocatedSlot(jid, 4), owner, 4);
+
+		ExecutionAndSlot[] slots1 = new ExecutionAndSlot[] {
+				null,
+				new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot1)),
+				null,
+				new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot2)),
+				null
+		};
+
+		ExecutionAndSlot[] slots2 = new ExecutionAndSlot[] {
+				new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot3)),
+				new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot4)),
+				new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot5))
+		};
+
+		List<ExecutionAndSlot[]> resources = Arrays.asList(null, slots1, new ExecutionAndSlot[0], null, slots2);
+
+		ExecutionGraphUtils.releaseAllSlotsSilently(resources);
+
+		verify(owner, times(1)).returnAllocatedSlot(eq(slot1));
+		verify(owner, times(1)).returnAllocatedSlot(eq(slot2));
+		verify(owner, times(1)).returnAllocatedSlot(eq(slot3));
+		verify(owner, times(1)).returnAllocatedSlot(eq(slot4));
+		verify(owner, times(1)).returnAllocatedSlot(eq(slot5));
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static AllocatedSlot createAllocatedSlot(JobID jid, int num) {
+		TaskManagerLocation loc = new TaskManagerLocation(
+				ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + num);
+	
+		return new AllocatedSlot(new AllocationID(), jid, loc, num,
+				ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 7b6c6ea..82561b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -457,7 +457,7 @@ public class ExecutionVertexCancelTest {
 			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 
 			// 1)
-			// scheduling after being created should be tolerated (no exception) because
+			// scheduling after being canceled should be tolerated (no exception) because
 			// it can occur as the result of races
 			{
 				Scheduler scheduler = mock(Scheduler.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 9132aee..1b029e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -104,9 +104,6 @@ public class ExecutionVertexSchedulingTest {
 
 			future.complete(slot);
 
-			// wait a second for future's future action be executed
-			Thread.sleep(1000);
-
 			// will have failed
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 3a7e759..006f894 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -348,9 +348,9 @@ public class PointwisePatternTest {
 			
 			timesUsed[inEdges[0].getSource().getPartitionNumber()]++;
 		}
-		
-		for (int i = 0; i < timesUsed.length; i++) {
-			assertTrue(timesUsed[i] >= factor && timesUsed[i] <= factor + delta);
+
+		for (int used : timesUsed) {
+			assertTrue(used >= factor && used <= factor + delta);
 		}
 	}
 	
@@ -406,9 +406,9 @@ public class PointwisePatternTest {
 				timesUsed[ee.getSource().getPartitionNumber()]++;
 			}
 		}
-		
-		for (int i = 0; i < timesUsed.length; i++) {
-			assertEquals(1, timesUsed[i]);
+
+		for (int used : timesUsed) {
+			assertEquals(1, used);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
new file mode 100644
index 0000000..3acb2eb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A slot provider where one can pre-set the slot futures for tasks based on
+ * vertex ID and subtask index.
+ */
+class ProgrammedSlotProvider implements SlotProvider {
+
+	private final Map<JobVertexID, Future<SimpleSlot>[]> slotFutures = new HashMap<>();
+
+	private final int parallelism;
+
+	public ProgrammedSlotProvider(int parallelism) {
+		checkArgument(parallelism > 0);
+		this.parallelism = parallelism;
+	}
+
+	public void addSlot(JobVertexID vertex, int subtaskIndex, Future<SimpleSlot> future) {
+		checkNotNull(vertex);
+		checkNotNull(future);
+		checkArgument(subtaskIndex >= 0 && subtaskIndex < parallelism);
+
+		Future<SimpleSlot>[] futures = slotFutures.get(vertex);
+		if (futures == null) {
+			@SuppressWarnings("unchecked")
+			Future<SimpleSlot>[] newArray = (Future<SimpleSlot>[]) new Future<?>[parallelism];
+			futures = newArray;
+			slotFutures.put(vertex, futures);
+		}
+
+		futures[subtaskIndex] = future;
+	}
+
+	public void addSlots(JobVertexID vertex, Future<SimpleSlot>[] futures) {
+		checkNotNull(vertex);
+		checkNotNull(futures);
+		checkArgument(futures.length == parallelism);
+
+		slotFutures.put(vertex, futures);
+	}
+
+	@Override
+	public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+		JobVertexID vertexId = task.getTaskToExecute().getVertex().getJobvertexId();
+		int subtask = task.getTaskToExecute().getParallelSubtaskIndex();
+
+		Future<SimpleSlot>[] forTask = slotFutures.get(vertexId);
+		if (forTask != null) {
+			Future<SimpleSlot> future = forTask[subtask];
+			if (future != null) {
+				return future;
+			}
+		}
+
+		throw new IllegalArgumentException("No registered slot future for task " + vertexId + " (" + subtask + ')');
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
new file mode 100644
index 0000000..c107d54
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A job status listener that waits lets one block until the job is in a terminal state.
+ */
+public class TerminalJobStatusListener  implements JobStatusListener {
+
+	private final OneShotLatch terminalStateLatch = new OneShotLatch();
+
+	public void waitForTerminalState(long timeoutMillis) throws InterruptedException, TimeoutException {
+		terminalStateLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+	}
+
+	@Override
+	public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
+		if (newJobStatus.isGloballyTerminalState() || newJobStatus == JobStatus.SUSPENDED) {
+			terminalStateLatch.trigger();
+		}
+	}
+}