You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/02 17:01:12 UTC

[1/4] flink git commit: [FLINK-7153] Re-introduce preferred locations for scheduling

Repository: flink
Updated Branches:
  refs/heads/master 8afadd459 -> 3ff91be1d


[FLINK-7153] Re-introduce preferred locations for scheduling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c73b2fe1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c73b2fe1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c73b2fe1

Branch: refs/heads/master
Commit: c73b2fe1f93f9ff2f05eb9130051729320634448
Parents: 8afadd4
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 16 14:04:13 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 2 17:04:44 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 162 ++++++++++++++-----
 .../ExecutionAndAllocationFuture.java           |  45 ++++++
 .../executiongraph/ExecutionAndSlot.java        |  47 ------
 .../runtime/executiongraph/ExecutionGraph.java  | 146 ++++++-----------
 .../executiongraph/ExecutionGraphUtils.java     | 106 ------------
 .../executiongraph/ExecutionJobVertex.java      |  32 +---
 .../runtime/executiongraph/ExecutionVertex.java |  51 +++---
 .../apache/flink/runtime/instance/SlotPool.java |   9 +-
 .../instance/SlotSharingGroupAssignment.java    |  20 +--
 .../runtime/jobmanager/scheduler/Scheduler.java |  50 +++---
 .../ExecutionGraphSchedulingTest.java           | 151 +----------------
 .../executiongraph/ExecutionGraphStopTest.java  |  15 +-
 .../executiongraph/ExecutionGraphTestUtils.java |  23 +--
 .../executiongraph/ExecutionGraphUtilsTest.java | 124 --------------
 .../ExecutionVertexCancelTest.java              |   8 +-
 .../ExecutionVertexLocalityTest.java            |  19 +--
 .../ScheduleWithCoLocationHintTest.java         |   3 +-
 .../scheduler/SchedulerSlotSharingTest.java     |   3 +-
 .../scheduler/SchedulerTestUtils.java           |  21 ++-
 19 files changed, 352 insertions(+), 683 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c1f423b..9d3e128 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -44,6 +44,8 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 
@@ -52,9 +54,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
@@ -126,9 +130,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	/** A future that completes once the Execution reaches a terminal ExecutionState */
 	private final CompletableFuture<ExecutionState> terminationFuture;
 
+	private final CompletableFuture<TaskManagerLocation> taskManagerLocationFuture;
+
 	private volatile ExecutionState state = CREATED;
 
-	private volatile SimpleSlot assignedResource;     // once assigned, never changes until the execution is archived
+	private final AtomicReference<SimpleSlot> assignedResource;
 
 	private volatile Throwable failureCause;          // once assigned, never changes
 
@@ -185,6 +191,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 		this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
 		this.terminationFuture = new CompletableFuture<>();
+		this.taskManagerLocationFuture = new CompletableFuture<>();
+
+		this.assignedResource = new AtomicReference<>();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -220,14 +229,53 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		return globalModVersion;
 	}
 
+	public CompletableFuture<TaskManagerLocation> getTaskManagerLocationFuture() {
+		return taskManagerLocationFuture;
+	}
+
 	public SimpleSlot getAssignedResource() {
-		return assignedResource;
+		return assignedResource.get();
+	}
+
+	/**
+	 * Tries to assign the given slot to the execution. The assignment works only if the
+	 * Execution is in state SCHEDULED. Returns true, if the resource could be assigned.
+	 *
+	 * @param slot to assign to this execution
+	 * @return true if the slot could be assigned to the execution, otherwise false
+	 */
+	boolean tryAssignResource(final SimpleSlot slot) {
+		Preconditions.checkNotNull(slot);
+
+		// only allow to set the assigned resource in state SCHEDULED or CREATED
+		// note: we also accept resource assignment when being in state CREATED for testing purposes
+		if (state == SCHEDULED || state == CREATED) {
+			if (assignedResource.compareAndSet(null, slot)) {
+				// check for concurrent modification (e.g. cancelling call)
+				if (state == SCHEDULED || state == CREATED) {
+					Preconditions.checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet.");
+					taskManagerLocationFuture.complete(slot.getTaskManagerLocation());
+
+					return true;
+				} else {
+					// free assigned resource and return false
+					assignedResource.set(null);
+					return false;
+				}
+			} else {
+				// the slot already has another slot assigned
+				return false;
+			}
+		} else {
+			// do not allow resource assignment if we are not in state SCHEDULED
+			return false;
+		}
 	}
 
 	@Override
 	public TaskManagerLocation getAssignedResourceLocation() {
 		// returns non-null only when a location is already assigned
-		return assignedResource != null ? assignedResource.getTaskManagerLocation() : null;
+		return assignedResource.get() != null ? assignedResource.get().getTaskManagerLocation() : null;
 	}
 
 	public Throwable getFailureCause() {
@@ -301,27 +349,23 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 */
 	public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) {
 		try {
-			final CompletableFuture<SimpleSlot> slotAllocationFuture = allocateSlotForExecution(slotProvider, queued);
+			final CompletableFuture<Execution> allocationFuture = allocateAndAssignSlotForExecution(slotProvider, queued);
 
 			// IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
 			// that we directly deploy the tasks if the slot allocation future is completed. This is
 			// necessary for immediate deployment.
-			final CompletableFuture<Void> deploymentFuture = slotAllocationFuture.handle(
-				(simpleSlot, throwable) ->  {
-					if (simpleSlot != null) {
+			final CompletableFuture<Void> deploymentFuture = allocationFuture.handle(
+				(Execution ignored, Throwable throwable) ->  {
+					if (throwable != null) {
+						markFailed(ExceptionUtils.stripCompletionException(throwable));
+					}
+					else {
 						try {
-							deployToSlot(simpleSlot);
+							deploy();
 						} catch (Throwable t) {
-							try {
-								simpleSlot.releaseSlot();
-							} finally {
-								markFailed(t);
-							}
+							markFailed(ExceptionUtils.stripCompletionException(t));
 						}
 					}
-					else {
-						markFailed(throwable);
-					}
 					return null;
 				}
 			);
@@ -338,8 +382,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
-	public CompletableFuture<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider, boolean queued)
-			throws IllegalExecutionStateException {
+	/**
+	 * Allocates and assigns a slot obtained from the slot provider to the execution.
+	 *
+	 * @param slotProvider to obtain a new slot from
+	 * @param queued if the allocation can be queued
+	 * @return Future which is completed with this execution once the slot has been assigned
+	 * 			or with an exception if an error occurred.
+	 * @throws IllegalExecutionStateException if this method has been called while not being in the CREATED state
+	 */
+	public CompletableFuture<Execution> allocateAndAssignSlotForExecution(SlotProvider slotProvider, boolean queued) throws IllegalExecutionStateException {
 
 		checkNotNull(slotProvider);
 
@@ -359,7 +411,18 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					new ScheduledUnit(this, sharingGroup) :
 					new ScheduledUnit(this, sharingGroup, locationConstraint);
 
-			return slotProvider.allocateSlot(toSchedule, queued);
+			CompletableFuture<SimpleSlot> slotFuture = slotProvider.allocateSlot(toSchedule, queued);
+
+			return slotFuture.thenApply(slot -> {
+				if (tryAssignResource(slot)) {
+					return this;
+				} else {
+					// release the slot
+					slot.releaseSlot();
+
+					throw new CompletionException(new FlinkException("Could not assign slot " + slot + " to execution " + this + " because it has already been assigned "));
+				}
+			});
 		}
 		else {
 			// call race, already deployed, or already done
@@ -367,8 +430,15 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
-	public void deployToSlot(final SimpleSlot slot) throws JobException {
-		checkNotNull(slot);
+	/**
+	 * Deploys the execution to the previously assigned resource.
+	 *
+	 * @throws JobException if the execution cannot be deployed to the assigned resource
+	 */
+	public void deploy() throws JobException {
+		final SimpleSlot slot  = assignedResource.get();
+
+		checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");
 
 		// Check if the TaskManager died in the meantime
 		// This only speeds up the response to TaskManagers failing concurrently to deployments.
@@ -397,7 +467,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			if (!slot.setExecutedVertex(this)) {
 				throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
 			}
-			this.assignedResource = slot;
 
 			// race double check, did we fail/cancel and do we need to release the slot?
 			if (this.state != DEPLOYING) {
@@ -447,7 +516,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * Sends stop RPC call.
 	 */
 	public void stop() {
-		final SimpleSlot slot = assignedResource;
+		final SimpleSlot slot = assignedResource.get();
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -504,10 +573,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					// we skip the canceling state. set the timestamp, for a consistent appearance
 					markTimestamp(CANCELING, getStateTimestamp(CANCELED));
 
+					// cancel the future in order to fail depending scheduling operations
+					taskManagerLocationFuture.cancel(false);
+
 					try {
 						vertex.getExecutionGraph().deregisterExecution(this);
-						if (assignedResource != null) {
-							assignedResource.releaseSlot();
+
+						final SimpleSlot slot = assignedResource.get();
+
+						if (slot != null) {
+							slot.releaseSlot();
 						}
 					}
 					finally {
@@ -673,7 +748,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			int maxStrackTraceDepth,
 			Time timeout) {
 
-		final SimpleSlot slot = assignedResource;
+		final SimpleSlot slot = assignedResource.get();
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -697,7 +772,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param timestamp of the completed checkpoint
 	 */
 	public void notifyCheckpointComplete(long checkpointId, long timestamp) {
-		final SimpleSlot slot = assignedResource;
+		final SimpleSlot slot = assignedResource.get();
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -717,7 +792,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param checkpointOptions of the checkpoint to trigger
 	 */
 	public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
-		final SimpleSlot slot = assignedResource;
+		final SimpleSlot slot = assignedResource.get();
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -775,7 +850,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 						updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
-						assignedResource.releaseSlot();
+						final SimpleSlot slot = assignedResource.get();
+
+						if (slot != null) {
+							slot.releaseSlot();
+						}
+
 						vertex.getExecutionGraph().deregisterExecution(this);
 					}
 					finally {
@@ -828,7 +908,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 				if (transitionState(current, CANCELED)) {
 					try {
-						assignedResource.releaseSlot();
+						final SimpleSlot slot = assignedResource.get();
+
+						if (slot != null) {
+							slot.releaseSlot();
+						}
+
 						vertex.getExecutionGraph().deregisterExecution(this);
 					}
 					finally {
@@ -920,8 +1005,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
 				try {
-					if (assignedResource != null) {
-						assignedResource.releaseSlot();
+					final SimpleSlot slot = assignedResource.get();
+					if (slot != null) {
+						slot.releaseSlot();
 					}
 					vertex.getExecutionGraph().deregisterExecution(this);
 				}
@@ -936,7 +1022,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					}
 
 					try {
-						if (assignedResource != null) {
+						if (assignedResource.get() != null) {
 							sendCancelRpcCall();
 						}
 					} catch (Throwable tt) {
@@ -1003,7 +1089,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * The sending is tried up to NUM_CANCEL_CALL_TRIES times.
 	 */
 	private void sendCancelRpcCall() {
-		final SimpleSlot slot = assignedResource;
+		final SimpleSlot slot = assignedResource.get();
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1024,7 +1110,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	private void sendFailIntermediateResultPartitionsRpcCall() {
-		final SimpleSlot slot = assignedResource;
+		final SimpleSlot slot = assignedResource.get();
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1042,7 +1128,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	private void sendUpdatePartitionInfoRpcCall(
 			final Iterable<PartitionInfo> partitionInfos) {
 
-		final SimpleSlot slot = assignedResource;
+		final SimpleSlot slot = assignedResource.get();
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1162,8 +1248,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	
 	@Override
 	public String toString() {
+		final SimpleSlot slot = assignedResource.get();
+
 		return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getTaskNameWithSubtaskIndex(),
-				(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
+				(slot == null ? "(unassigned)" : slot), state);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java
new file mode 100644
index 0000000..1022dbc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A pair of an {@link Execution} together with an allocation future.
+ */
+public class ExecutionAndAllocationFuture {
+
+	public final Execution executionAttempt;
+
+	public final CompletableFuture<Void> allocationFuture;
+
+	public ExecutionAndAllocationFuture(Execution executionAttempt, CompletableFuture<Void> allocationFuture) {
+		this.executionAttempt = checkNotNull(executionAttempt);
+		this.allocationFuture = checkNotNull(allocationFuture);
+	}
+
+	// -----------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return super.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
deleted file mode 100644
index 123ff0c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.instance.SimpleSlot;
-
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A pair of an {@link Execution} together with a slot future.
- */
-public class ExecutionAndSlot {
-
-	public final Execution executionAttempt;
-
-	public final CompletableFuture<SimpleSlot> slotFuture;
-
-	public ExecutionAndSlot(Execution executionAttempt, CompletableFuture<SimpleSlot> slotFuture) {
-		this.executionAttempt = checkNotNull(executionAttempt);
-		this.slotFuture = checkNotNull(slotFuture);
-	}
-
-	// -----------------------------------------------------------------------
-
-	@Override
-	public String toString() {
-		return super.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index dca6c44..62c6e99 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -50,7 +50,6 @@ import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -90,7 +89,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
@@ -878,113 +876,67 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		// that way we do not have any operation that can fail between allocating the slots
 		// and adding them to the list. If we had a failure in between there, that would
 		// cause the slots to get lost
-		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
 		final boolean queued = allowQueuedScheduling;
 
-		// we use this flag to handle failures in a 'finally' clause
-		// that allows us to not go through clumsy cast-and-rethrow logic
-		boolean successful = false;
+		// collecting all the slots may resize and fail in that operation without slots getting lost
+		final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
-		try {
-			// collecting all the slots may resize and fail in that operation without slots getting lost
-			final ArrayList<CompletableFuture<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+		// allocate the slots (obtain all their futures
+		for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+			// these calls are not blocking, they only return futures
+			Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(slotProvider, queued);
 
-			// allocate the slots (obtain all their futures
-			for (ExecutionJobVertex ejv : getVerticesTopologically()) {
-				// these calls are not blocking, they only return futures
-				ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
+			allAllocationFutures.addAll(allocationFutures);
+		}
 
-				// we need to first add the slots to this list, to be safe on release
-				resources.add(slots);
+		// this future is complete once all slot futures are complete.
+		// the future fails once one slot future fails.
+		final ConjunctFuture<Collection<Execution>> allAllocationsComplete = FutureUtils.combineAll(allAllocationFutures);
 
-				for (ExecutionAndSlot ens : slots) {
-					slotFutures.add(ens.slotFuture);
-				}
+		// make sure that we fail if the allocation timeout was exceeded
+		final ScheduledFuture<?> timeoutCancelHandle = futureExecutor.schedule(new Runnable() {
+			@Override
+			public void run() {
+				// When the timeout triggers, we try to complete the conjunct future with an exception.
+				// Note that this is a no-op if the future is already completed
+				int numTotal = allAllocationsComplete.getNumFuturesTotal();
+				int numComplete = allAllocationsComplete.getNumFuturesCompleted();
+				String message = "Could not allocate all requires slots within timeout of " +
+						timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete;
+
+				allAllocationsComplete.completeExceptionally(new NoResourceAvailableException(message));
 			}
+		}, timeout.getSize(), timeout.getUnit());
 
-			// this future is complete once all slot futures are complete.
-			// the future fails once one slot future fails.
-			final ConjunctFuture<Void> allAllocationsComplete = FutureUtils.waitForAll(slotFutures);
-
-			// make sure that we fail if the allocation timeout was exceeded
-			final ScheduledFuture<?> timeoutCancelHandle = futureExecutor.schedule(new Runnable() {
-				@Override
-				public void run() {
-					// When the timeout triggers, we try to complete the conjunct future with an exception.
-					// Note that this is a no-op if the future is already completed
-					int numTotal = allAllocationsComplete.getNumFuturesTotal();
-					int numComplete = allAllocationsComplete.getNumFuturesCompleted();
-					String message = "Could not allocate all requires slots within timeout of " +
-							timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete;
-
-					allAllocationsComplete.completeExceptionally(new NoResourceAvailableException(message));
-				}
-			}, timeout.getSize(), timeout.getUnit());
-
-
-			allAllocationsComplete.handleAsync(
-				(Void slots, Throwable throwable) -> {
-					try {
-						// we do not need the cancellation timeout any more
-						timeoutCancelHandle.cancel(false);
-
-						if (throwable == null) {
-							// successfully obtained all slots, now deploy
-
-							for (ExecutionAndSlot[] jobVertexTasks : resources) {
-								for (ExecutionAndSlot execAndSlot : jobVertexTasks) {
-
-									// the futures must all be ready - this is simply a sanity check
-									final SimpleSlot slot;
-									try {
-										slot = execAndSlot.slotFuture.getNow(null);
-										checkNotNull(slot);
-									}
-									catch (CompletionException | NullPointerException e) {
-										throw new IllegalStateException("SlotFuture is incomplete " +
-												"or erroneous even though all futures completed", e);
-									}
-
-									// actual deployment
-									execAndSlot.executionAttempt.deployToSlot(slot);
-								}
-							}
-						}
-						else {
-							// let the exception handler deal with this
-							throw throwable;
-						}
-					}
-					catch (Throwable t) {
-						// we catch everything here to make sure cleanup happens and the
-						// ExecutionGraph notices the error
 
-						// we need to to release all slots before going into recovery!
-						try {
-							ExecutionGraphUtils.releaseAllSlotsSilently(resources);
-						}
-						finally {
-							failGlobal(t);
+		allAllocationsComplete.handleAsync(
+			(Collection<Execution> executions, Throwable throwable) -> {
+				try {
+					// we do not need the cancellation timeout any more
+					timeoutCancelHandle.cancel(false);
+
+					if (throwable == null) {
+						// successfully obtained all slots, now deploy
+						for (Execution execution : executions) {
+							execution.deploy();
 						}
 					}
+					else {
+						// let the exception handler deal with this
+						throw throwable;
+					}
+				}
+				catch (Throwable t) {
+					// we catch everything here to make sure cleanup happens and the
+					// ExecutionGraph notices the error
+					failGlobal(ExceptionUtils.stripCompletionException(t));
+				}
 
-					// Wouldn't it be nice if we could return an actual Void object?
-					// return (Void) Unsafe.getUnsafe().allocateInstance(Void.class);
-					return null;
-				},
-				futureExecutor);
-
-			// from now on, slots will be rescued by the futures and their completion, or by the timeout
-			successful = true;
-		}
-		finally {
-			if (!successful) {
-				// we come here only if the 'try' block finished with an exception
-				// we release the slots (possibly failing some executions on the way) and
-				// let the exception bubble up
-				ExecutionGraphUtils.releaseAllSlotsSilently(resources);
-			}
-		}
+				// Wouldn't it be nice if we could return an actual Void object?
+				// return (Void) Unsafe.getUnsafe().allocateInstance(Void.class);
+				return null;
+			},
+			futureExecutor);
 	}
 
 	public void cancel() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
deleted file mode 100644
index f1d793d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.util.ExceptionUtils;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.BiFunction;
-
-/**
- * Utilities for dealing with the execution graphs and scheduling.
- */
-public class ExecutionGraphUtils {
-
-	/**
-	 * Releases the slot represented by the given future. If the future is complete, the
-	 * slot is immediately released. Otherwise, the slot is released as soon as the future
-	 * is completed.
-	 * 
-	 * <p>Note that releasing the slot means cancelling any task execution currently
-	 * associated with that slot.
-	 * 
-	 * @param slotFuture The future for the slot to release.
-	 */
-	public static void releaseSlotFuture(CompletableFuture<SimpleSlot> slotFuture) {
-		slotFuture.handle(ReleaseSlotFunction.INSTANCE::apply);
-	}
-
-	/**
-	 * Releases the all the slots in the list of arrays of {@code ExecutionAndSlot}.
-	 * For each future in that collection holds: If the future is complete, its slot is
-	 * immediately released. Otherwise, the slot is released as soon as the future
-	 * is completed.
-	 * 
-	 * <p>This methods never throws any exceptions (except for fatal exceptions) and continues
-	 * to release the remaining slots if one slot release failed.
-	 *
-	 * <p>Note that releasing the slot means cancelling any task execution currently
-	 * associated with that slot.
-	 * 
-	 * @param resources The collection of ExecutionAndSlot whose slots should be released.
-	 */
-	public static void releaseAllSlotsSilently(List<ExecutionAndSlot[]> resources) {
-		try {
-			for (ExecutionAndSlot[] jobVertexResources : resources) {
-				if (jobVertexResources != null) {
-					for (ExecutionAndSlot execAndSlot : jobVertexResources) {
-						if (execAndSlot != null) {
-							try {
-								releaseSlotFuture(execAndSlot.slotFuture);
-							}
-							catch (Throwable t) {
-								ExceptionUtils.rethrowIfFatalError(t);
-							}
-						}
-					}
-				}
-			}
-		}
-		catch (Throwable t) {
-			ExceptionUtils.rethrowIfFatalError(t);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A function to be applied into a future, releasing the slot immediately upon completion.
-	 * Completion here refers to both the successful and exceptional completion.
-	 */
-	private static final class ReleaseSlotFunction implements BiFunction<SimpleSlot, Throwable, Void> {
-
-		static final ReleaseSlotFunction INSTANCE = new ReleaseSlotFunction();
-
-		@Override
-		public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
-			if (simpleSlot != null) {
-				simpleSlot.releaseSlot();
-			}
-			return null;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/** Utility class is not meant to be instantiated */
-	private ExecutionGraphUtils() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 90224b0..98191d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -57,6 +56,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -475,37 +475,21 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	 * 
 	 * @param resourceProvider The resource provider from whom the slots are requested.
 	 */
-	public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) {
+	public Collection<CompletableFuture<Execution>> allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) {
 		final ExecutionVertex[] vertices = this.taskVertices;
-		final ExecutionAndSlot[] slots = new ExecutionAndSlot[vertices.length];
+		final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length];
 
 		// try to acquire a slot future for each execution.
 		// we store the execution with the future just to be on the safe side
 		for (int i = 0; i < vertices.length; i++) {
-
-			// we use this flag to handle failures in a 'finally' clause
-			// that allows us to not go through clumsy cast-and-rethrow logic
-			boolean successful = false;
-
-			try {
-				// allocate the next slot (future)
-				final Execution exec = vertices[i].getCurrentExecutionAttempt();
-				final CompletableFuture<SimpleSlot> future = exec.allocateSlotForExecution(resourceProvider, queued);
-				slots[i] = new ExecutionAndSlot(exec, future);
-				successful = true;
-			}
-			finally {
-				if (!successful) {
-					// this is the case if an exception was thrown
-					for (int k = 0; k < i; k++) {
-						ExecutionGraphUtils.releaseSlotFuture(slots[k].slotFuture);
-					}
-				}
-			}
+			// allocate the next slot (future)
+			final Execution exec = vertices[i].getCurrentExecutionAttempt();
+			final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(resourceProvider, queued);
+			slots[i] = allocationFuture;
 		}
 
 		// all good, we acquired all slots
-		return slots;
+		return Arrays.asList(slots);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 6b9d481..e87a5a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -55,6 +55,7 @@ import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -266,6 +267,10 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		return currentExecution.getFailureCause();
 	}
 
+	public CompletableFuture<TaskManagerLocation> getCurrentTaskManagerLocationFuture() {
+		return currentExecution.getTaskManagerLocationFuture();
+	}
+
 	public SimpleSlot getCurrentAssignedResource() {
 		return currentExecution.getAssignedResource();
 	}
@@ -445,8 +450,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	 * @see #getPreferredLocationsBasedOnState()
 	 * @see #getPreferredLocationsBasedOnInputs() 
 	 */
-	public Iterable<TaskManagerLocation> getPreferredLocations() {
-		Iterable<TaskManagerLocation> basedOnState = getPreferredLocationsBasedOnState();
+	public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocations() {
+		Collection<CompletableFuture<TaskManagerLocation>> basedOnState = getPreferredLocationsBasedOnState();
 		return basedOnState != null ? basedOnState : getPreferredLocationsBasedOnInputs();
 	}
 	
@@ -454,13 +459,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	 * Gets the preferred location to execute the current task execution attempt, based on the state
 	 * that the execution attempt will resume.
 	 * 
-	 * @return A size-one iterable with the location preference, or null, if there is no
+	 * @return A size-one collection with the location preference, or null, if there is no
 	 *         location preference based on the state.
 	 */
-	public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnState() {
+	public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnState() {
 		TaskManagerLocation priorLocation;
 		if (currentExecution.getTaskStateSnapshot() != null && (priorLocation = getLatestPriorLocation()) != null) {
-			return Collections.singleton(priorLocation);
+			return Collections.singleton(CompletableFuture.completedFuture(priorLocation));
 		}
 		else {
 			return null;
@@ -476,14 +481,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	 * @return The preferred locations based in input streams, or an empty iterable,
 	 *         if there is no input-based preference.
 	 */
-	public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
+	public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnInputs() {
 		// otherwise, base the preferred locations on the input connections
 		if (inputEdges == null) {
 			return Collections.emptySet();
 		}
 		else {
-			Set<TaskManagerLocation> locations = new HashSet<>();
-			Set<TaskManagerLocation> inputLocations = new HashSet<>();
+			Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(4);
 
 			// go over all inputs
 			for (int i = 0; i < inputEdges.length; i++) {
@@ -493,28 +497,17 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 					// go over all input sources
 					for (int k = 0; k < sources.length; k++) {
 						// look-up assigned slot of input source
-						SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
-						if (sourceSlot != null) {
-							// add input location
-							inputLocations.add(sourceSlot.getTaskManagerLocation());
-							// inputs which have too many distinct sources are not considered
-							if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
-								inputLocations.clear();
-								break;
-							}
+						CompletableFuture<TaskManagerLocation> taskManagerLocationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
+						inputLocations.add(taskManagerLocationFuture);
+
+						if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
+							return Collections.emptyList();
 						}
 					}
 				}
-				// keep the locations of the input with the least preferred locations
-				if (locations.isEmpty() || // nothing assigned yet
-						(!inputLocations.isEmpty() && inputLocations.size() < locations.size())) {
-					// current input has fewer preferred locations
-					locations.clear();
-					locations.addAll(inputLocations);
-				}
 			}
 
-			return locations.isEmpty() ? Collections.<TaskManagerLocation>emptyList() : locations;
+			return inputLocations.isEmpty() ? Collections.emptyList() : inputLocations;
 		}
 	}
 
@@ -598,8 +591,14 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		return this.currentExecution.scheduleForExecution(slotProvider, queued);
 	}
 
+	@VisibleForTesting
 	public void deployToSlot(SimpleSlot slot) throws JobException {
-		this.currentExecution.deployToSlot(slot);
+		if (this.currentExecution.tryAssignResource(slot)) {
+			this.currentExecution.deploy();
+		} else {
+			throw new IllegalStateException("Could not assign resource " + slot + " to current execution " +
+				currentExecution + '.');
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 6397043..fcf7d40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -1003,10 +1003,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 		@Override
 		public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
-			Iterable<TaskManagerLocation> locationPreferences = 
-					task.getTaskToExecute().getVertex().getPreferredLocations();
+			Collection<CompletableFuture<TaskManagerLocation>> locationPreferenceFutures =
+				task.getTaskToExecute().getVertex().getPreferredLocations();
 
-			return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout);
+			CompletableFuture<Collection<TaskManagerLocation>> locationPreferencesFuture = FutureUtils.combineAll(locationPreferenceFutures);
+
+			return locationPreferencesFuture.thenCompose(
+				locationPreferences -> gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 88fbc10..a071e50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -266,18 +266,12 @@ public class SlotSharingGroupAssignment {
 	 * slots if no local slot is available. The method returns null, when this sharing group has
 	 * no slot is available for the given JobVertexID. 
 	 *
-	 * @param vertex The vertex to allocate a slot for.
+	 * @param vertexID the vertex id
+	 * @param locationPreferences location preferences
 	 *
 	 * @return A slot to execute the given ExecutionVertex in, or null, if none is available.
 	 */
-	public SimpleSlot getSlotForTask(ExecutionVertex vertex) {
-		return getSlotForTask(vertex.getJobvertexId(), vertex.getPreferredLocationsBasedOnInputs());
-	}
-
-	/**
-	 * 
-	 */
-	SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) {
+	public SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) {
 		synchronized (lock) {
 			Tuple2<SharedSlot, Locality> p = getSlotForTaskInternal(vertexID, locationPreferences, false);
 
@@ -306,17 +300,13 @@ public class SlotSharingGroupAssignment {
 	 * shared slot and returns it. If no suitable shared slot could be found, this method
 	 * returns null.</p>
 	 * 
-	 * @param vertex The execution vertex to find a slot for.
 	 * @param constraint The co-location constraint for the placement of the execution vertex.
+	 * @param locationPreferences location preferences
 	 * 
 	 * @return A simple slot allocate within a suitable shared slot, or {@code null}, if no suitable
 	 *         shared slot is available.
 	 */
-	public SimpleSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) {
-		return getSlotForTask(constraint, vertex.getPreferredLocationsBasedOnInputs());
-	}
-	
-	SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable<TaskManagerLocation> locationPreferences) {
+	public SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable<TaskManagerLocation> locationPreferences) {
 		synchronized (lock) {
 			if (constraint.isAssignedAndAlive()) {
 				// the shared slot of the co-location group is initialized and set we allocate a sub-slot

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 5a7e819..9b1ffbe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -31,6 +32,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -134,31 +136,36 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 
 	@Override
 	public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
-		try {
-			final Object ret = scheduleTask(task, allowQueued);
+		Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = task.getTaskToExecute().getVertex().getPreferredLocationsBasedOnInputs();
 
-			if (ret instanceof SimpleSlot) {
-				return CompletableFuture.completedFuture((SimpleSlot) ret);
-			}
-			else if (ret instanceof CompletableFuture) {
-				@SuppressWarnings("unchecked")
-				CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret;
-				return typed;
-			}
-			else {
-				// this should never happen, simply guard this case with an exception
-				throw new RuntimeException();
+		CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
+
+		return preferredLocationsFuture.thenCompose(
+			preferredLocations -> {
+				try {
+					final Object ret = scheduleTask(task, allowQueued, preferredLocations);
+
+					if (ret instanceof SimpleSlot) {
+						return CompletableFuture.completedFuture((SimpleSlot) ret);
+					} else if (ret instanceof CompletableFuture) {
+						@SuppressWarnings("unchecked")
+						CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret;
+						return typed;
+					} else {
+						// this should never happen, simply guard this case with an exception
+						throw new RuntimeException();
+					}
+				} catch (NoResourceAvailableException e) {
+					throw new CompletionException(e);
+				}
 			}
-		}
-		catch (NoResourceAvailableException e) {
-			return FutureUtils.completedExceptionally(e);
-		}
+		);
 	}
 
 	/**
 	 * Returns either a {@link SimpleSlot}, or a {@link CompletableFuture}.
 	 */
-	private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
+	private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource, Iterable<TaskManagerLocation> preferredLocations) throws NoResourceAvailableException {
 		if (task == null) {
 			throw new NullPointerException();
 		}
@@ -168,7 +175,6 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 
 		final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 		
-		final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocationsBasedOnInputs();
 		final boolean forceExternalLocation = false &&
 									preferredLocations != null && preferredLocations.iterator().hasNext();
 	
@@ -197,10 +203,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 				// get a slot from the group, if the group has one for us (and can fulfill the constraint)
 				final SimpleSlot slotFromGroup;
 				if (constraint == null) {
-					slotFromGroup = assignment.getSlotForTask(vertex);
+					slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), preferredLocations);
 				}
 				else {
-					slotFromGroup = assignment.getSlotForTask(vertex, constraint);
+					slotFromGroup = assignment.getSlotForTask(constraint, preferredLocations);
 				}
 
 				SimpleSlot newSlot = null;
@@ -234,7 +240,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 						localOnly = true;
 					}
 					else {
-						locations = vertex.getPreferredLocationsBasedOnInputs();
+						locations = preferredLocations;
 						localOnly = forceExternalLocation;
 					}
 					

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index b90c306..69a679a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -49,21 +48,22 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Test;
-
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.verification.Timeout;
 
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the scheduling of the execution graph. This tests that
@@ -399,141 +399,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		}
 	}
 
-	/**
-	 * Tests that the {@link ExecutionJobVertex#allocateResourcesForAll(SlotProvider, boolean)} method
-	 * releases partially acquired resources upon exception.
-	 */
-	@Test
-	public void testExecutionJobVertexAllocateResourcesReleasesOnException() throws Exception {
-		final int parallelism = 8;
-
-		final JobVertex vertex = new JobVertex("vertex");
-		vertex.setParallelism(parallelism);
-		vertex.setInvokableClass(NoOpInvokable.class);
-
-		final JobID jobId = new JobID();
-		final JobGraph jobGraph = new JobGraph(jobId, "test", vertex);
-
-		// set up some available slots and some slot owner that accepts released slots back
-		final List<SimpleSlot> returnedSlots = new ArrayList<>();
-		final SlotOwner recycler = new SlotOwner() {
-			@Override
-			public boolean returnAllocatedSlot(Slot slot) {
-				returnedSlots.add((SimpleSlot) slot);
-				return true;
-			}
-		};
-
-		// slot provider that hand out parallelism / 3 slots, then throws an exception
-		final SlotProvider slotProvider = mock(SlotProvider.class);
-
-		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
-		final List<SimpleSlot> availableSlots = new ArrayList<>(Arrays.asList(
-			createSlot(taskManager, jobId, recycler),
-			createSlot(taskManager, jobId, recycler),
-			createSlot(taskManager, jobId, recycler)));
-
-		when(slotProvider.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
-			(InvocationOnMock invocation) -> {
-					if (availableSlots.isEmpty()) {
-						throw new TestRuntimeException();
-					} else {
-						return CompletableFuture.completedFuture(availableSlots.remove(0));
-					}
-				});
-
-		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
-		final ExecutionJobVertex ejv = eg.getJobVertex(vertex.getID());
-
-		// acquire resources and check that all are back after the failure
-
-		final int numSlotsToExpectBack = availableSlots.size();
-
-		try {
-			ejv.allocateResourcesForAll(slotProvider, false);
-			fail("should have failed with an exception");
-		}
-		catch (TestRuntimeException e) {
-			// expected
-		}
-
-		assertEquals(numSlotsToExpectBack, returnedSlots.size());
-	}
-
-	/**
-	 * Tests that the {@link ExecutionGraph#scheduleForExecution()} method
-	 * releases partially acquired resources upon exception.
-	 */
-	@Test
-	public void testExecutionGraphScheduleReleasesResourcesOnException() throws Exception {
-
-		//                                            [pipelined]
-		//  we construct a simple graph    (source) ----------------> (target)
-
-		final int parallelism = 3;
-
-		final JobVertex sourceVertex = new JobVertex("source");
-		sourceVertex.setParallelism(parallelism);
-		sourceVertex.setInvokableClass(NoOpInvokable.class);
-
-		final JobVertex targetVertex = new JobVertex("target");
-		targetVertex.setParallelism(parallelism);
-		targetVertex.setInvokableClass(NoOpInvokable.class);
-
-		targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-
-		final JobID jobId = new JobID();
-		final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
-
-		// set up some available slots and some slot owner that accepts released slots back
-		final List<SimpleSlot> returnedSlots = new ArrayList<>();
-		final SlotOwner recycler = new SlotOwner() {
-			@Override
-			public boolean returnAllocatedSlot(Slot slot) {
-				returnedSlots.add((SimpleSlot) slot);
-				return true;
-			}
-		};
-
-		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
-		final List<SimpleSlot> availableSlots = new ArrayList<>(Arrays.asList(
-			createSlot(taskManager, jobId, recycler),
-			createSlot(taskManager, jobId, recycler),
-			createSlot(taskManager, jobId, recycler),
-			createSlot(taskManager, jobId, recycler),
-			createSlot(taskManager, jobId, recycler)));
-
-
-		// slot provider that hand out parallelism / 3 slots, then throws an exception
-		final SlotProvider slotProvider = mock(SlotProvider.class);
-
-		when(slotProvider.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
-			(InvocationOnMock invocation) -> {
-					if (availableSlots.isEmpty()) {
-						throw new TestRuntimeException();
-					} else {
-						return CompletableFuture.completedFuture(availableSlots.remove(0));
-					}
-				});
-
-		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
-
-		// acquire resources and check that all are back after the failure
-
-		final int numSlotsToExpectBack = availableSlots.size();
-
-		try {
-			eg.setScheduleMode(ScheduleMode.EAGER);
-			eg.scheduleForExecution();
-			fail("should have failed with an exception");
-		}
-		catch (TestRuntimeException e) {
-			// expected
-		}
-
-		assertEquals(numSlotsToExpectBack, returnedSlots.size());
-	}
-
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
index de9081b..4ce3f9d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
@@ -112,25 +112,29 @@ public class ExecutionGraphStopTest extends TestLogger {
 		// deploy source 1
 		for (ExecutionVertex ev : eg.getJobVertex(source1.getID()).getTaskVertices()) {
 			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
-			ev.getCurrentExecutionAttempt().deployToSlot(slot);
+			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
+			ev.getCurrentExecutionAttempt().deploy();
 		}
 
 		// deploy source 2
 		for (ExecutionVertex ev : eg.getJobVertex(source2.getID()).getTaskVertices()) {
 			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
-			ev.getCurrentExecutionAttempt().deployToSlot(slot);
+			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
+			ev.getCurrentExecutionAttempt().deploy();
 		}
 
 		// deploy non-source 1
 		for (ExecutionVertex ev : eg.getJobVertex(nonSource1.getID()).getTaskVertices()) {
 			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
-			ev.getCurrentExecutionAttempt().deployToSlot(slot);
+			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
+			ev.getCurrentExecutionAttempt().deploy();
 		}
 
 		// deploy non-source 2
 		for (ExecutionVertex ev : eg.getJobVertex(nonSource2.getID()).getTaskVertices()) {
 			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
-			ev.getCurrentExecutionAttempt().deployToSlot(slot);
+			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
+			ev.getCurrentExecutionAttempt().deploy();
 		}
 
 		eg.stop();
@@ -162,7 +166,8 @@ public class ExecutionGraphStopTest extends TestLogger {
 
 		final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway);
 
-		exec.deployToSlot(slot);
+		exec.tryAssignResource(slot);
+		exec.deploy();
 		exec.switchToRunning();
 		assertEquals(ExecutionState.RUNNING, exec.getState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index b534ade..5feeabc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.actor.Status;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
@@ -63,20 +61,20 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
 
+import akka.actor.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
-
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
@@ -231,15 +229,10 @@ public class ExecutionGraphTestUtils {
 	}
 	
 	public static void setVertexResource(ExecutionVertex vertex, SimpleSlot slot) {
-		try {
-			Execution exec = vertex.getCurrentExecutionAttempt();
-			
-			Field f = Execution.class.getDeclaredField("assignedResource");
-			f.setAccessible(true);
-			f.set(exec, slot);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Modifying the slot failed", e);
+		Execution exec = vertex.getCurrentExecutionAttempt();
+
+		if(!exec.tryAssignResource(slot)) {
+			throw new RuntimeException("Could not assign resource.");
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
deleted file mode 100644
index c616501..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
-import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import org.junit.Test;
-
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static org.mockito.Mockito.*;
-
-/**
- * Tests for the utility methods in the class {@link ExecutionGraphUtils}.
- */
-public class ExecutionGraphUtilsTest {
-
-	@Test
-	public void testReleaseSlots() {
-		final JobID jid = new JobID();
-		final SlotOwner owner = mock(SlotOwner.class);
-
-		final SimpleSlot slot1 = new SimpleSlot(createAllocatedSlot(jid, 0), owner, 0);
-		final SimpleSlot slot2 = new SimpleSlot(createAllocatedSlot(jid, 1), owner, 1);
-		final SimpleSlot slot3 = new SimpleSlot(createAllocatedSlot(jid, 2), owner, 2);
-
-		final CompletableFuture<SimpleSlot> incompleteFuture = new CompletableFuture<>();
-
-		final CompletableFuture<SimpleSlot> completeFuture = new CompletableFuture<>();
-		completeFuture.complete(slot2);
-
-		final CompletableFuture<SimpleSlot> disposedSlotFuture = new CompletableFuture<>();
-		slot3.releaseSlot();
-		disposedSlotFuture.complete(slot3);
-
-		// release all futures
-		ExecutionGraphUtils.releaseSlotFuture(incompleteFuture);
-		ExecutionGraphUtils.releaseSlotFuture(completeFuture);
-		ExecutionGraphUtils.releaseSlotFuture(disposedSlotFuture);
-
-		// only now complete the incomplete future
-		incompleteFuture.complete(slot1);
-
-		// verify that each slot was returned once to the owner
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot1));
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot2));
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot3));
-	}
-
-	@Test
-	public void testReleaseSlotsWithNulls() {
-		final JobID jid = new JobID();
-		final SlotOwner owner = mock(SlotOwner.class);
-
-		final Execution mockExecution = mock(Execution.class);
-
-		final SimpleSlot slot1 = new SimpleSlot(createAllocatedSlot(jid, 0), owner, 0);
-		final SimpleSlot slot2 = new SimpleSlot(createAllocatedSlot(jid, 1), owner, 1);
-		final SimpleSlot slot3 = new SimpleSlot(createAllocatedSlot(jid, 2), owner, 2);
-		final SimpleSlot slot4 = new SimpleSlot(createAllocatedSlot(jid, 3), owner, 3);
-		final SimpleSlot slot5 = new SimpleSlot(createAllocatedSlot(jid, 4), owner, 4);
-
-		ExecutionAndSlot[] slots1 = new ExecutionAndSlot[] {
-				null,
-				new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot1)),
-				null,
-				new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot2)),
-				null
-		};
-
-		ExecutionAndSlot[] slots2 = new ExecutionAndSlot[] {
-				new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot3)),
-				new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot4)),
-				new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot5))
-		};
-
-		List<ExecutionAndSlot[]> resources = Arrays.asList(null, slots1, new ExecutionAndSlot[0], null, slots2);
-
-		ExecutionGraphUtils.releaseAllSlotsSilently(resources);
-
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot1));
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot2));
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot3));
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot4));
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot5));
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static AllocatedSlot createAllocatedSlot(JobID jid, int num) {
-		TaskManagerLocation loc = new TaskManagerLocation(
-				ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + num);
-	
-		return new AllocatedSlot(new AllocationID(), jid, loc, num,
-				ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 44e1794..9908dae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -263,8 +263,8 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
 			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
+			setVertexState(vertex, ExecutionState.RUNNING);
 
 			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 
@@ -303,8 +303,8 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
 			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
+			setVertexState(vertex, ExecutionState.RUNNING);
 
 			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 
@@ -351,8 +351,8 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
 			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
+			setVertexState(vertex, ExecutionState.RUNNING);
 
 			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 
@@ -384,8 +384,8 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			Instance instance = getInstance(new ActorTaskManagerGateway(gateway));
 			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
+			setVertexState(vertex, ExecutionState.RUNNING);
 
 			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 5f12646..15d021a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -50,6 +51,7 @@ import org.junit.Test;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -92,10 +94,10 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 		// validate that the target vertices have no location preference
 		for (int i = 0; i < parallelism; i++) {
 			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
-			Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+			Iterator<CompletableFuture<TaskManagerLocation>> preference = target.getPreferredLocations().iterator();
 
 			assertTrue(preference.hasNext());
-			assertEquals(locations[i], preference.next());
+			assertEquals(locations[i], preference.next().get());
 			assertFalse(preference.hasNext());
 		}
 	}
@@ -122,7 +124,7 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 		for (int i = 0; i < parallelism; i++) {
 			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
 
-			Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+			Iterator<CompletableFuture<TaskManagerLocation>> preference = target.getPreferredLocations().iterator();
 			assertFalse(preference.hasNext());
 		}
 	}
@@ -178,10 +180,10 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 		// validate that the target vertices have the state's location as the location preference
 		for (int i = 0; i < parallelism; i++) {
 			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
-			Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+			Iterator<CompletableFuture<TaskManagerLocation>> preference = target.getPreferredLocations().iterator();
 
 			assertTrue(preference.hasNext());
-			assertEquals(locations[i], preference.next());
+			assertEquals(locations[i], preference.next().get());
 			assertFalse(preference.hasNext());
 		}
 	}
@@ -236,10 +238,9 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 
 		SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0);
 
-		final Field locationField = Execution.class.getDeclaredField("assignedResource");
-		locationField.setAccessible(true);
-
-		locationField.set(vertex.getCurrentExecutionAttempt(), simpleSlot);
+		if (!vertex.getCurrentExecutionAttempt().tryAssignResource(simpleSlot)) {
+			throw new FlinkException("Could not assign resource.");
+		}
 	}
 
 	private void setState(Execution execution, ExecutionState state) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index afb9dac..9f4a675 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -33,12 +33,13 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import java.util.concurrent.ExecutionException;
 
-public class ScheduleWithCoLocationHintTest {
+public class ScheduleWithCoLocationHintTest extends TestLogger {
 
 	@Test
 	public void scheduleAllSharedAndCoLocated() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index c049593..1f88dd8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -47,7 +48,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests for the scheduler when scheduling tasks in slot sharing groups.
  */
-public class SchedulerSlotSharingTest {
+public class SchedulerSlotSharingTest extends TestLogger {
 
 	@Test
 	public void scheduleSingleVertexType() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index 4312b0f..c7d0f09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -25,9 +25,11 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -105,8 +107,13 @@ public class SchedulerTestUtils {
 	
 	public static Execution getTestVertex(Iterable<TaskManagerLocation> preferredLocations) {
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
-		
-		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocations);
+
+		Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = new ArrayList<>(4);
+
+		for (TaskManagerLocation preferredLocation : preferredLocations) {
+			preferredLocationFutures.add(CompletableFuture.completedFuture(preferredLocation));
+		}
+		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocationFutures);
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
 		
@@ -119,7 +126,7 @@ public class SchedulerTestUtils {
 	public static Execution getTestVertex(JobVertexID jid, int taskIndex, int numTasks) {
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		
-		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(null);
+		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Collections.emptyList());
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.getJobvertexId()).thenReturn(jid);
 		when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
@@ -139,7 +146,13 @@ public class SchedulerTestUtils {
 
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 
-		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Arrays.asList(locations));
+		Collection<CompletableFuture<TaskManagerLocation>> preferrecLocationFutures = new ArrayList<>(locations.length);
+
+		for (TaskManagerLocation location : locations) {
+			preferrecLocationFutures.add(CompletableFuture.completedFuture(location));
+		}
+
+		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferrecLocationFutures);
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.getJobvertexId()).thenReturn(jid);
 		when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);


[2/4] flink git commit: [FLINK-7153] Introduce LocationPreferenceConstraint for scheduling

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index 9f4a675..2d35ce2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 
 public class ScheduleWithCoLocationHintTest extends TestLogger {
@@ -66,18 +67,18 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			CoLocationConstraint c6 = new CoLocationConstraint(ccg);
 
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2), false).get();
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4), false).get();
-			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1), false).get();
-			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2), false).get();
-			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3), false).get();
-			SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5), false).get();
-			SimpleSlot s9 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6), false).get();
-			SimpleSlot s10 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4), false).get();
-			SimpleSlot s11 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5), false).get();
-			SimpleSlot s12 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1), false, Collections.emptyList()).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2), false, Collections.emptyList()).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3), false, Collections.emptyList()).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4), false, Collections.emptyList()).get();
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1), false, Collections.emptyList()).get();
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2), false, Collections.emptyList()).get();
+			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3), false, Collections.emptyList()).get();
+			SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5), false, Collections.emptyList()).get();
+			SimpleSlot s9 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6), false, Collections.emptyList()).get();
+			SimpleSlot s10 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4), false, Collections.emptyList()).get();
+			SimpleSlot s11 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5), false, Collections.emptyList()).get();
+			SimpleSlot s12 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6), false, Collections.emptyList()).get();
 
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -140,7 +141,7 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			assertTrue(scheduler.getNumberOfAvailableSlots() >= 1);
 
 			SimpleSlot single = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)), false).get();
+					new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)), false, Collections.emptyList()).get();
 			assertNotNull(single);
 
 			s1.releaseSlot();
@@ -188,11 +189,11 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
 
 			SimpleSlot s1 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get();
+					new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
 			SimpleSlot s2 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1), false).get();
+					new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
 
-			SimpleSlot sSolo = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1)), false).get();
+			SimpleSlot sSolo = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1)), false, Collections.emptyList()).get();
 
 			ResourceID taskManager = s1.getTaskManagerID();
 
@@ -201,7 +202,7 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			sSolo.releaseSlot();
 
 			SimpleSlot sNew = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get();
+					new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
 			assertEquals(taskManager, sNew.getTaskManagerID());
 
 			assertEquals(2, scheduler.getNumberOfLocalizedAssignments());
@@ -235,14 +236,14 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
 
 			SimpleSlot s1 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get();
+					new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
 			s1.releaseSlot();
 
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1)), false).get();
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2)), false).get();
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1)), false, Collections.emptyList()).get();
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2)), false, Collections.emptyList()).get();
 
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
 				fail("Scheduled even though no resource was available.");
 			} catch (ExecutionException e) {
 				assertTrue(e.getCause() instanceof NoResourceAvailableException);
@@ -283,35 +284,35 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			SlotSharingGroup shareGroup = new SlotSharingGroup();
 
 			// first wave
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup), false);
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup), false);
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup), false);
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup), false);
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup), false, Collections.emptyList());
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup), false, Collections.emptyList());
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup), false, Collections.emptyList());
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup), false, Collections.emptyList());
 			
 			// second wave
 			SimpleSlot s21 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1), false).get();
+					new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1), false, Collections.emptyList()).get();
 			SimpleSlot s22 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2), false).get();
+					new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2), false, Collections.emptyList()).get();
 			SimpleSlot s23 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3), false).get();
+					new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3), false, Collections.emptyList()).get();
 			SimpleSlot s24 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4), false).get();
+					new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4), false, Collections.emptyList()).get();
 			
 			// third wave
 			SimpleSlot s31 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2), false).get();
+					new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2), false, Collections.emptyList()).get();
 			SimpleSlot s32 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3), false).get();
+					new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3), false, Collections.emptyList()).get();
 			SimpleSlot s33 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4), false).get();
+					new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4), false, Collections.emptyList()).get();
 			SimpleSlot s34 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1), false).get();
+					new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1), false, Collections.emptyList()).get();
 			
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup), false);
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup), false);
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup), false);
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup), false);
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup), false, Collections.emptyList());
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup), false, Collections.emptyList());
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup), false, Collections.emptyList());
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup), false, Collections.emptyList());
 			
 			assertEquals(s21.getTaskManagerID(), s34.getTaskManagerID());
 			assertEquals(s22.getTaskManagerID(), s31.getTaskManagerID());
@@ -357,25 +358,25 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 
 			// schedule something into the shared group so that both instances are in the sharing group
 			SimpleSlot s1 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
 			SimpleSlot s2 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
 			
 			// schedule one locally to instance 1
 			SimpleSlot s3 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
 
 			// schedule with co location constraint (yet unassigned) and a preference for
 			// instance 1, but it can only get instance 2
 			SimpleSlot s4 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
 			
 			// schedule something into the assigned co-location constraints and check that they override the
 			// other preferences
 			SimpleSlot s5 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get();
 			SimpleSlot s6 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
 			
 			// check that each slot got three
 			assertEquals(3, s1.getRoot().getNumberLeaves());
@@ -434,9 +435,9 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
 
 			SimpleSlot s1 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
 			SimpleSlot s2 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.singleton(loc2)).get();
 
 			s1.releaseSlot();
 			s2.releaseSlot();
@@ -445,9 +446,9 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
 
 			SimpleSlot s3 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get();
 			SimpleSlot s4 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
 
 			// still preserves the previous instance mapping)
 			assertEquals(i1.getTaskManagerID(), s3.getTaskManagerID());
@@ -495,9 +496,9 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
 
 			SimpleSlot s1 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
 			SimpleSlot s2 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.singleton(loc2)).get();
 
 			s1.releaseSlot();
 			s2.releaseSlot();
@@ -506,13 +507,13 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
 
 			SimpleSlot sa = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)), false, Collections.emptyList()).get();
 			SimpleSlot sb = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)), false, Collections.emptyList()).get();
 
 			try {
 				scheduler.allocateSlot(
-						new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false).get();
+						new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get();
 				fail("should not be able to find a resource");
 			}
 			catch (ExecutionException e) {
@@ -565,14 +566,14 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			// and give locality preferences that hint at using the same shared slot for both
 			// co location constraints (which we seek to prevent)
 			SimpleSlot s1 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
 			SimpleSlot s2 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
 
 			SimpleSlot s3 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
 			SimpleSlot s4 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
 
 			// check that each slot got three
 			assertEquals(2, s1.getRoot().getNumberLeaves());
@@ -631,14 +632,14 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
 
 			SimpleSlot s1 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.emptyList()).get();
 			SimpleSlot s2 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.emptyList()).get();
 
 			SimpleSlot s3 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false, Collections.emptyList()).get();
 			SimpleSlot s4 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup), false, Collections.emptyList()).get();
 
 			// check that each slot got two
 			assertEquals(2, s1.getRoot().getNumberLeaves());

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index a05c1a3..7882f4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -123,17 +125,17 @@ public class SchedulerIsolatedTasksTest {
 			assertEquals(5, scheduler.getNumberOfAvailableSlots());
 			
 			// schedule something into all slots
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
-			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
 			
 			// the slots should all be different
 			assertTrue(areAllDistinct(s1, s2, s3, s4, s5));
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
 				fail("Scheduler accepted scheduling request without available resource.");
 			}
 			catch (ExecutionException e) {
@@ -146,8 +148,8 @@ public class SchedulerIsolatedTasksTest {
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
 			
 			// now we can schedule some more slots
-			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
-			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
+			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
 			
 			assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7));
 			
@@ -235,7 +237,7 @@ public class SchedulerIsolatedTasksTest {
 			disposeThread.start();
 
 			for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) {
-				CompletableFuture<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
+				CompletableFuture<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true, Collections.emptyList());
 				future.thenAcceptAsync(
 					(SimpleSlot slot) -> {
 						synchronized (toRelease) {
@@ -284,11 +286,11 @@ public class SchedulerIsolatedTasksTest {
 			scheduler.newInstanceAvailable(i3);
 			
 			List<SimpleSlot> slots = new ArrayList<SimpleSlot>();
-			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
-			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
-			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
-			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
-			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
+			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
+			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
+			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
+			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
+			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
 			
 			i2.markDead();
 			
@@ -309,7 +311,7 @@ public class SchedulerIsolatedTasksTest {
 			
 			// cannot get another slot, since all instances are dead
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
 				fail("Scheduler served a slot from a dead instance");
 			}
 			catch (ExecutionException e) {
@@ -344,7 +346,7 @@ public class SchedulerIsolatedTasksTest {
 			scheduler.newInstanceAvailable(i3);
 			
 			// schedule something on an arbitrary instance
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false, Collections.emptyList()).get();
 			
 			// figure out how we use the location hints
 			Instance first = (Instance) s1.getOwner();
@@ -352,28 +354,28 @@ public class SchedulerIsolatedTasksTest {
 			Instance third = first == i3 ? i2 : i3;
 			
 			// something that needs to go to the first instance again
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false, Collections.singleton(s1.getTaskManagerLocation())).get();
 			assertEquals(first, s2.getOwner());
 
 			// first or second --> second, because first is full
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false, Arrays.asList(first.getTaskManagerLocation(), second.getTaskManagerLocation())).get();
 			assertEquals(second, s3.getOwner());
 			
 			// first or third --> third (because first is full)
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
-			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
 			assertEquals(third, s4.getOwner());
 			assertEquals(third, s5.getOwner());
 			
 			// first or third --> second, because all others are full
-			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
 			assertEquals(second, s6.getOwner());
 			
 			// release something on the first and second instance
 			s2.releaseSlot();
 			s6.releaseSlot();
 			
-			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
+			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
 			assertEquals(first, s7.getOwner());
 			
 			assertEquals(1, scheduler.getNumberOfUnconstrainedAssignments());

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index 1f88dd8..a478eb9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.Random;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -64,10 +65,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(i2);
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup), false).get();
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -78,7 +79,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// we cannot schedule another task from the first vertex group
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -92,7 +93,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s3.releaseSlot();
 			
 			// allocate another slot from that group
-			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get();
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false, Collections.emptyList()).get();
 			assertNotNull(s5);
 			
 			// release all old slots
@@ -100,9 +101,9 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s2.releaseSlot();
 			s4.releaseSlot();
 			
-			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup), false).get();
-			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup), false).get();
-			SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup), false).get();
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s6);
 			assertNotNull(s7);
@@ -149,10 +150,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup), false).get();
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -163,7 +164,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// we cannot schedule another task from the first vertex group
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -174,10 +175,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			}
 			
 			// schedule some tasks from the second ID group
-			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup), false).get();
-			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup), false).get();
-			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup), false).get();
-			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup), false).get();
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
@@ -186,7 +187,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// we cannot schedule another task from the second vertex group
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -207,7 +208,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// we can still not schedule anything from the second group of vertices
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -218,7 +219,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			}
 			
 			// we can schedule something from the first vertex group
-			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get();
 			assertNotNull(s5);
 			
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
@@ -228,7 +229,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// now we release a slot from the second vertex group and schedule another task from that group
 			s2_2.releaseSlot();
-			SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
+			SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get();
 			assertNotNull(s5_2);
 			
 			// release all slots
@@ -269,10 +270,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
@@ -288,10 +289,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
 			
 			// schedule some tasks from the second ID group
-			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get();
-			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get();
-			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
-			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get();
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
@@ -334,10 +335,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
-			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
-			SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
-			SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
+			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1_1);
 			assertNotNull(s2_1);
@@ -347,10 +348,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1));
 			
 			// schedule 4 tasks from the second vertex group
-			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup), false).get();
-			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup), false).get();
-			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup), false).get();
-			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup), false).get();
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
@@ -360,10 +361,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2));
 			
 			// schedule 4 tasks from the third vertex group
-			SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup), false).get();
-			SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup), false).get();
-			SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup), false).get();
-			SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup), false).get();
+			SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1_3);
 			assertNotNull(s2_3);
@@ -375,7 +376,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// we cannot schedule another task from the second vertex group
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -391,9 +392,9 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s3_2.releaseSlot();
 			s4_2.releaseSlot();
 			
-			SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup), false).get();
-			SimpleSlot s6_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup), false).get();
-			SimpleSlot s7_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup), false).get();
+			SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s6_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s7_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s5_2);
 			assertNotNull(s6_2);
@@ -444,9 +445,9 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 1 tasks from the first vertex group and 2 from the second
-			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup), false).get();
-			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup), false).get();
-			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup), false).get();
+			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1_1);
 			assertNotNull(s2_1);
@@ -462,7 +463,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			
 			// this should free one slot so we can allocate one non-shared
-			SimpleSlot sx = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1)), false).get();
+			SimpleSlot sx = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1)), false, Collections.emptyList()).get();
 			assertNotNull(sx);
 			
 			assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfSlots());
@@ -497,28 +498,28 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule some individual vertices
-			SimpleSlot sA1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2)), false).get();
-			SimpleSlot sA2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2)), false).get();
+			SimpleSlot sA1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2)), false, Collections.emptyList()).get();
+			SimpleSlot sA2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2)), false, Collections.emptyList()).get();
 			assertNotNull(sA1);
 			assertNotNull(sA2);
 			
 			// schedule some vertices in the sharing group
-			SimpleSlot s1_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
-			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
-			SimpleSlot s2_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get();
-			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get();
+			SimpleSlot s1_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get();
 			assertNotNull(s1_0);
 			assertNotNull(s1_1);
 			assertNotNull(s2_0);
 			assertNotNull(s2_1);
 			
 			// schedule another isolated vertex
-			SimpleSlot sB1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3)), false).get();
+			SimpleSlot sB1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3)), false, Collections.emptyList()).get();
 			assertNotNull(sB1);
 			
 			// should not be able to schedule more vertices
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -529,7 +530,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			}
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -540,7 +541,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			}
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -551,7 +552,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			}
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -564,8 +565,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			// release some isolated task and check that the sharing group may grow
 			sA1.releaseSlot();
 			
-			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
-			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get();
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
 			
@@ -577,19 +578,19 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			assertEquals(1, scheduler.getNumberOfAvailableSlots());
 			
 			// schedule one more no-shared task
-			SimpleSlot sB0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false).get();
+			SimpleSlot sB0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false, Collections.emptyList()).get();
 			assertNotNull(sB0);
 			
 			// release the last of the original shared slots and allocate one more non-shared slot
 			s2_1.releaseSlot();
-			SimpleSlot sB2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3)), false).get();
+			SimpleSlot sB2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3)), false, Collections.emptyList()).get();
 			assertNotNull(sB2);
 			
 			
 			// release on non-shared and add some shared slots
 			sA2.releaseSlot();
-			SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
-			SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
+			SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
 			assertNotNull(s1_3);
 			assertNotNull(s2_3);
 			
@@ -599,8 +600,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s1_3.releaseSlot();
 			s2_3.releaseSlot();
 			
-			SimpleSlot sC0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2)), false).get();
-			SimpleSlot sC1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2)), false).get();
+			SimpleSlot sC0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2)), false, Collections.emptyList()).get();
+			SimpleSlot sC1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2)), false, Collections.emptyList()).get();
 			assertNotNull(sC0);
 			assertNotNull(sC1);
 			
@@ -648,8 +649,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			
 			// schedule one to each instance
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
 			assertNotNull(s1);
 			assertNotNull(s2);
 			
@@ -658,8 +659,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			assertEquals(1, i2.getNumberOfAvailableSlots());
 			
 			// schedule one from the other group to each instance
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
 			assertNotNull(s3);
 			assertNotNull(s4);
 			
@@ -701,8 +702,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			
 			// schedule one to each instance
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
 			assertNotNull(s1);
 			assertNotNull(s2);
 			
@@ -711,8 +712,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			assertEquals(2, i2.getNumberOfAvailableSlots());
 			
 			// schedule one from the other group to each instance
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
 			assertNotNull(s3);
 			assertNotNull(s4);
 			
@@ -752,14 +753,14 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(i2);
 			
 			// schedule until the one instance is full
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get();
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
 
 			// schedule two more with preference of same instance --> need to go to other instance
-			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup), false).get();
-			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup), false).get();
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -805,19 +806,19 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(getRandomInstance(4));
 			
 			// allocate something from group 1 and 2 interleaved with schedule for group 3
-			SimpleSlot slot_1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
-			SimpleSlot slot_1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
+			SimpleSlot slot_1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot slot_1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
 
-			SimpleSlot slot_2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get();
-			SimpleSlot slot_2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get();
+			SimpleSlot slot_2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot slot_2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get();
 			
-			SimpleSlot slot_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get();
+			SimpleSlot slot_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false, Collections.emptyList()).get();
 			
-			SimpleSlot slot_1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
-			SimpleSlot slot_1_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
+			SimpleSlot slot_1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot slot_1_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			
-			SimpleSlot slot_2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
-			SimpleSlot slot_2_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get();
+			SimpleSlot slot_2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot slot_2_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			
 			// release groups 1 and 2
 			
@@ -833,10 +834,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// allocate group 4
 			
-			SimpleSlot slot_4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get();
-			SimpleSlot slot_4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get();
-			SimpleSlot slot_4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get();
-			SimpleSlot slot_4_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get();
+			SimpleSlot slot_4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot slot_4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot slot_4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot slot_4_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			
 			// release groups 3 and 4
 			
@@ -887,7 +888,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 					@Override
 					public void run() {
 						try {
-							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup), false).get();
+							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get();
 
 							sleepUninterruptibly(rnd.nextInt(5));
 							slot.releaseSlot();
@@ -910,7 +911,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 					public void run() {
 						try {
 							if (flag3.compareAndSet(false, true)) {
-								SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get();
+								SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false, Collections.emptyList()).get();
 								
 								sleepUninterruptibly(5);
 								
@@ -939,7 +940,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 					@Override
 					public void run() {
 						try {
-							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup), false).get();
+							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get();
 							
 							// wait a bit till scheduling the successor
 							sleepUninterruptibly(rnd.nextInt(5));
@@ -966,7 +967,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 					@Override
 					public void run() {
 						try {
-							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup), false).get();
+							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get();
 							
 							// wait a bit till scheduling the successor
 							sleepUninterruptibly(rnd.nextInt(5));
@@ -1041,27 +1042,27 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(getRandomInstance(4));
 			
 			// schedule one task for the first and second vertex
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertTrue(  s1.getParent() == s2.getParent() );
 			assertEquals(3, scheduler.getNumberOfAvailableSlots());
 			
-			SimpleSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup), false).get();
-			SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup), false).get();
-			SimpleSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get();
-			SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get();
+			SimpleSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false, Collections.emptyList()).get();
 			
 			s1.releaseSlot();
 			s2.releaseSlot();
 			
-			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup), false).get();
-			SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup), false).get();
-			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get();
-			SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get();
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false, Collections.emptyList()).get();
 				fail("should throw an exception");
 			}
 			catch (ExecutionException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index c7d0f09..98dca03 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -106,20 +107,26 @@ public class SchedulerTestUtils {
 	
 	
 	public static Execution getTestVertex(Iterable<TaskManagerLocation> preferredLocations) {
-		ExecutionVertex vertex = mock(ExecutionVertex.class);
-
 		Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = new ArrayList<>(4);
 
 		for (TaskManagerLocation preferredLocation : preferredLocations) {
 			preferredLocationFutures.add(CompletableFuture.completedFuture(preferredLocation));
 		}
+
+		return getTestVertex(preferredLocationFutures);
+	}
+
+	public static Execution getTestVertex(Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures) {
+		ExecutionVertex vertex = mock(ExecutionVertex.class);
+
 		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocationFutures);
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
-		
+
 		Execution execution = mock(Execution.class);
 		when(execution.getVertex()).thenReturn(vertex);
-		
+		when(execution.calculatePreferredLocations(any(LocationPreferenceConstraint.class))).thenCallRealMethod();
+
 		return execution;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java
new file mode 100644
index 0000000..60dddbb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+import java.net.InetAddress;
+
+/**
+ * Dummy local task manager location for testing purposes.
+ */
+public class LocalTaskManagerLocation extends TaskManagerLocation {
+
+	private static final long serialVersionUID = 2396142513336559461L;
+
+	public LocalTaskManagerLocation() {
+		super(ResourceID.generate(), InetAddress.getLoopbackAddress(), -1);
+	}
+}


[3/4] flink git commit: [FLINK-7153] Introduce LocationPreferenceConstraint for scheduling

Posted by tr...@apache.org.
[FLINK-7153] Introduce LocationPreferenceConstraint for scheduling

The LocationPreferenceConstraint defines whether all or any preferred locations
have to be taken into consideration when scheduling tasks. Especially for batch
jobs where we do lazy scheduling not all input locations might be known for a
consumer task. Therefore, we set the location preference constraint to any which
means that only those location are taken into consideration which are known at
scheduling time.

[FLINK-7153] Add test cases

Replace AtomicReference with AtomicReferenceUpdater

Fix

Use static imports Preconditions.checkNotNull

Initialize ANY array with number of returned futures

Revert formatting changes in Scheduler

Set flink-runtime log level in log4j-test.properties to OFF

Revert changes to ExecutionVertex#getPreferredLocationsBasedOnInputs

Fix failing FailoverRegionTest

This closes #4916.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b0fb26b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b0fb26b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b0fb26b

Branch: refs/heads/master
Commit: 3b0fb26bfb779a98f8dcadbb4a7ba206e11f9c2c
Parents: c73b2fe
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Oct 27 09:47:03 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 2 17:04:45 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 152 +++++++---
 .../runtime/executiongraph/ExecutionGraph.java  |  13 +-
 .../executiongraph/ExecutionJobVertex.java      |  29 +-
 .../runtime/executiongraph/ExecutionVertex.java |  41 ++-
 .../executiongraph/failover/FailoverRegion.java |   6 +-
 .../apache/flink/runtime/instance/SlotPool.java |  12 +-
 .../flink/runtime/instance/SlotProvider.java    |   8 +-
 .../scheduler/LocationPreferenceConstraint.java |  32 +++
 .../runtime/jobmanager/scheduler/Scheduler.java |  84 +++---
 .../ExecutionGraphDeploymentTest.java           | 107 +++++++
 .../ExecutionGraphMetricsTest.java              |   3 +-
 .../ExecutionGraphSchedulingTest.java           |   4 +-
 .../executiongraph/ExecutionGraphTestUtils.java |  14 +-
 .../runtime/executiongraph/ExecutionTest.java   | 286 +++++++++++++++++++
 .../ExecutionVertexCancelTest.java              |   5 +-
 .../ExecutionVertexDeploymentTest.java          |   4 +-
 .../ExecutionVertexSchedulingTest.java          |  15 +-
 .../executiongraph/FailoverRegionTest.java      |   9 +-
 .../executiongraph/ProgrammedSlotProvider.java  |  29 +-
 .../utils/SimpleSlotProvider.java               |   6 +-
 .../ScheduleWithCoLocationHintTest.java         | 121 ++++----
 .../scheduler/SchedulerIsolatedTasksTest.java   |  46 +--
 .../scheduler/SchedulerSlotSharingTest.java     | 223 ++++++++-------
 .../scheduler/SchedulerTestUtils.java           |  15 +-
 .../taskmanager/LocalTaskManagerLocation.java   |  35 +++
 25 files changed, 972 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 9d3e128..38c3821 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.time.Time;
@@ -37,6 +38,7 @@ import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -45,11 +47,12 @@ import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -58,7 +61,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
@@ -96,6 +98,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
 
+	private static final AtomicReferenceFieldUpdater<Execution, SimpleSlot> ASSIGNED_SLOT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+		Execution.class,
+		SimpleSlot.class,
+		"assignedResource");
+
 	private static final Logger LOG = ExecutionGraph.LOG;
 
 	private static final int NUM_CANCEL_CALL_TRIES = 3;
@@ -134,7 +141,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	private volatile ExecutionState state = CREATED;
 
-	private final AtomicReference<SimpleSlot> assignedResource;
+	private volatile SimpleSlot assignedResource;
 
 	private volatile Throwable failureCause;          // once assigned, never changes
 
@@ -193,7 +200,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		this.terminationFuture = new CompletableFuture<>();
 		this.taskManagerLocationFuture = new CompletableFuture<>();
 
-		this.assignedResource = new AtomicReference<>();
+		this.assignedResource = null;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -234,7 +241,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	public SimpleSlot getAssignedResource() {
-		return assignedResource.get();
+		return assignedResource;
 	}
 
 	/**
@@ -244,22 +251,23 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param slot to assign to this execution
 	 * @return true if the slot could be assigned to the execution, otherwise false
 	 */
+	@VisibleForTesting
 	boolean tryAssignResource(final SimpleSlot slot) {
-		Preconditions.checkNotNull(slot);
+		checkNotNull(slot);
 
 		// only allow to set the assigned resource in state SCHEDULED or CREATED
 		// note: we also accept resource assignment when being in state CREATED for testing purposes
 		if (state == SCHEDULED || state == CREATED) {
-			if (assignedResource.compareAndSet(null, slot)) {
+			if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, slot)) {
 				// check for concurrent modification (e.g. cancelling call)
 				if (state == SCHEDULED || state == CREATED) {
-					Preconditions.checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet.");
+					checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet.");
 					taskManagerLocationFuture.complete(slot.getTaskManagerLocation());
 
 					return true;
 				} else {
 					// free assigned resource and return false
-					assignedResource.set(null);
+					ASSIGNED_SLOT_UPDATER.set(this, null);
 					return false;
 				}
 			} else {
@@ -275,7 +283,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	@Override
 	public TaskManagerLocation getAssignedResourceLocation() {
 		// returns non-null only when a location is already assigned
-		return assignedResource.get() != null ? assignedResource.get().getTaskManagerLocation() : null;
+		final SimpleSlot currentAssignedResource = assignedResource;
+		return currentAssignedResource != null ? currentAssignedResource.getTaskManagerLocation() : null;
 	}
 
 	public Throwable getFailureCause() {
@@ -333,7 +342,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	public boolean scheduleForExecution() {
 		SlotProvider resourceProvider = getVertex().getExecutionGraph().getSlotProvider();
 		boolean allowQueued = getVertex().getExecutionGraph().isQueuedSchedulingAllowed();
-		return scheduleForExecution(resourceProvider, allowQueued);
+		return scheduleForExecution(resourceProvider, allowQueued, LocationPreferenceConstraint.ANY);
 	}
 
 	/**
@@ -344,12 +353,19 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param slotProvider The slot provider to use to allocate slot for this execution attempt.
 	 * @param queued Flag to indicate whether the scheduler may queue this task if it cannot
 	 *               immediately deploy it.
+	 * @param locationPreferenceConstraint constraint for the location preferences
 	 * 
 	 * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
 	 */
-	public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) {
+	public boolean scheduleForExecution(
+		SlotProvider slotProvider,
+		boolean queued,
+		LocationPreferenceConstraint locationPreferenceConstraint) {
 		try {
-			final CompletableFuture<Execution> allocationFuture = allocateAndAssignSlotForExecution(slotProvider, queued);
+			final CompletableFuture<Execution> allocationFuture = allocateAndAssignSlotForExecution(
+				slotProvider,
+				queued,
+				locationPreferenceConstraint);
 
 			// IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
 			// that we directly deploy the tasks if the slot allocation future is completed. This is
@@ -387,11 +403,15 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 *
 	 * @param slotProvider to obtain a new slot from
 	 * @param queued if the allocation can be queued
+	 * @param locationPreferenceConstraint constraint for the location preferences
 	 * @return Future which is completed with this execution once the slot has been assigned
 	 * 			or with an exception if an error occurred.
 	 * @throws IllegalExecutionStateException if this method has been called while not being in the CREATED state
 	 */
-	public CompletableFuture<Execution> allocateAndAssignSlotForExecution(SlotProvider slotProvider, boolean queued) throws IllegalExecutionStateException {
+	public CompletableFuture<Execution> allocateAndAssignSlotForExecution(
+			SlotProvider slotProvider,
+			boolean queued,
+			LocationPreferenceConstraint locationPreferenceConstraint) throws IllegalExecutionStateException {
 
 		checkNotNull(slotProvider);
 
@@ -411,18 +431,27 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					new ScheduledUnit(this, sharingGroup) :
 					new ScheduledUnit(this, sharingGroup, locationConstraint);
 
-			CompletableFuture<SimpleSlot> slotFuture = slotProvider.allocateSlot(toSchedule, queued);
-
-			return slotFuture.thenApply(slot -> {
-				if (tryAssignResource(slot)) {
-					return this;
-				} else {
-					// release the slot
-					slot.releaseSlot();
+			// calculate the preferred locations
+			final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = calculatePreferredLocations(locationPreferenceConstraint);
+
+			return preferredLocationsFuture
+				.thenCompose(
+					(Collection<TaskManagerLocation> preferredLocations) ->
+						slotProvider.allocateSlot(
+							toSchedule,
+							queued,
+							preferredLocations))
+				.thenApply(
+					(SimpleSlot slot) -> {
+						if (tryAssignResource(slot)) {
+							return this;
+						} else {
+							// release the slot
+							slot.releaseSlot();
 
-					throw new CompletionException(new FlinkException("Could not assign slot " + slot + " to execution " + this + " because it has already been assigned "));
-				}
-			});
+							throw new CompletionException(new FlinkException("Could not assign slot " + slot + " to execution " + this + " because it has already been assigned "));
+						}
+					});
 		}
 		else {
 			// call race, already deployed, or already done
@@ -436,7 +465,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @throws JobException if the execution cannot be deployed to the assigned resource
 	 */
 	public void deploy() throws JobException {
-		final SimpleSlot slot  = assignedResource.get();
+		final SimpleSlot slot  = assignedResource;
 
 		checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");
 
@@ -516,7 +545,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * Sends stop RPC call.
 	 */
 	public void stop() {
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -579,7 +608,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					try {
 						vertex.getExecutionGraph().deregisterExecution(this);
 
-						final SimpleSlot slot = assignedResource.get();
+						final SimpleSlot slot = assignedResource;
 
 						if (slot != null) {
 							slot.releaseSlot();
@@ -640,8 +669,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					() -> {
 						try {
 							consumerVertex.scheduleForExecution(
-									consumerVertex.getExecutionGraph().getSlotProvider(),
-									consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed());
+								consumerVertex.getExecutionGraph().getSlotProvider(),
+								consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed(),
+								LocationPreferenceConstraint.ANY); // there must be at least one known location
 						} catch (Throwable t) {
 							consumerVertex.fail(new IllegalStateException("Could not schedule consumer " +
 									"vertex " + consumerVertex, t));
@@ -748,7 +778,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			int maxStrackTraceDepth,
 			Time timeout) {
 
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -772,7 +802,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param timestamp of the completed checkpoint
 	 */
 	public void notifyCheckpointComplete(long checkpointId, long timestamp) {
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -792,7 +822,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param checkpointOptions of the checkpoint to trigger
 	 */
 	public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -850,7 +880,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 						updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
-						final SimpleSlot slot = assignedResource.get();
+						final SimpleSlot slot = assignedResource;
 
 						if (slot != null) {
 							slot.releaseSlot();
@@ -908,7 +938,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 				if (transitionState(current, CANCELED)) {
 					try {
-						final SimpleSlot slot = assignedResource.get();
+						final SimpleSlot slot = assignedResource;
 
 						if (slot != null) {
 							slot.releaseSlot();
@@ -1005,7 +1035,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
 				try {
-					final SimpleSlot slot = assignedResource.get();
+					final SimpleSlot slot = assignedResource;
 					if (slot != null) {
 						slot.releaseSlot();
 					}
@@ -1022,7 +1052,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					}
 
 					try {
-						if (assignedResource.get() != null) {
+						if (assignedResource != null) {
 							sendCancelRpcCall();
 						}
 					} catch (Throwable tt) {
@@ -1089,7 +1119,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * The sending is tried up to NUM_CANCEL_CALL_TRIES times.
 	 */
 	private void sendCancelRpcCall() {
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1110,7 +1140,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	private void sendFailIntermediateResultPartitionsRpcCall() {
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1128,7 +1158,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	private void sendUpdatePartitionInfoRpcCall(
 			final Iterable<PartitionInfo> partitionInfos) {
 
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1151,6 +1181,46 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	//  Miscellaneous
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Calculates the preferred locations based on the location preference constraint.
+	 *
+	 * @param locationPreferenceConstraint constraint for the location preference
+	 * @return Future containing the collection of preferred locations. This might not be completed if not all inputs
+	 * 		have been a resource assigned.
+	 */
+	@VisibleForTesting
+	public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
+		final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
+		final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;
+
+		switch(locationPreferenceConstraint) {
+			case ALL:
+				preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
+				break;
+			case ANY:
+				final ArrayList<TaskManagerLocation> completedTaskManagerLocations = new ArrayList<>(preferredLocationFutures.size());
+
+				for (CompletableFuture<TaskManagerLocation> preferredLocationFuture : preferredLocationFutures) {
+					if (preferredLocationFuture.isDone() && !preferredLocationFuture.isCompletedExceptionally()) {
+						final TaskManagerLocation taskManagerLocation = preferredLocationFuture.getNow(null);
+
+						if (taskManagerLocation == null) {
+							throw new FlinkRuntimeException("TaskManagerLocationFuture was completed with null. This indicates a programming bug.");
+						}
+
+						completedTaskManagerLocations.add(taskManagerLocation);
+					}
+				}
+
+				preferredLocationsFuture = CompletableFuture.completedFuture(completedTaskManagerLocations);
+				break;
+			default:
+				throw new RuntimeException("Unknown LocationPreferenceConstraint " + locationPreferenceConstraint + '.');
+		}
+
+		return preferredLocationsFuture;
+	}
+
 	private boolean transitionState(ExecutionState currentState, ExecutionState targetState) {
 		return transitionState(currentState, targetState, null);
 	}
@@ -1248,7 +1318,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	
 	@Override
 	public String toString() {
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getTaskNameWithSubtaskIndex(),
 				(slot == null ? "(unassigned)" : slot), state);

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 62c6e99..8a74001 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -60,6 +60,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistry;
@@ -853,11 +854,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
+	private void scheduleLazy(SlotProvider slotProvider) {
 		// simply take the vertices without inputs.
 		for (ExecutionJobVertex ejv : verticesInCreationOrder) {
 			if (ejv.getJobVertex().isInputVertex()) {
-				ejv.scheduleAll(slotProvider, allowQueuedScheduling);
+				ejv.scheduleAll(
+					slotProvider,
+					allowQueuedScheduling,
+					LocationPreferenceConstraint.ALL); // since it is an input vertex, the input based location preferences should be empty
 			}
 		}
 	}
@@ -884,7 +888,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		// allocate the slots (obtain all their futures
 		for (ExecutionJobVertex ejv : getVerticesTopologically()) {
 			// these calls are not blocking, they only return futures
-			Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(slotProvider, queued);
+			Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
+				slotProvider,
+				queued,
+				LocationPreferenceConstraint.ALL);
 
 			allAllocationFutures.addAll(allocationFutures);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 98191d0..fff7ce1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.types.Either;
@@ -455,14 +456,24 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	//---------------------------------------------------------------------------------------------
 	//  Actions
 	//---------------------------------------------------------------------------------------------
-	
-	public void scheduleAll(SlotProvider slotProvider, boolean queued) {
+
+	/**
+	 * Schedules all execution vertices of this ExecutionJobVertex.
+	 *
+	 * @param slotProvider to allocate the slots from
+	 * @param queued if the allocations can be queued
+	 * @param locationPreferenceConstraint constraint for the location preferences
+	 */
+	public void scheduleAll(
+			SlotProvider slotProvider,
+			boolean queued,
+			LocationPreferenceConstraint locationPreferenceConstraint) {
 		
 		final ExecutionVertex[] vertices = this.taskVertices;
 
 		// kick off the tasks
 		for (ExecutionVertex ev : vertices) {
-			ev.scheduleForExecution(slotProvider, queued);
+			ev.scheduleForExecution(slotProvider, queued, locationPreferenceConstraint);
 		}
 	}
 
@@ -474,8 +485,13 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	 * <p>If this method throws an exception, it makes sure to release all so far requested slots.
 	 * 
 	 * @param resourceProvider The resource provider from whom the slots are requested.
+	 * @param queued if the allocation can be queued
+	 * @param locationPreferenceConstraint constraint for the location preferences
 	 */
-	public Collection<CompletableFuture<Execution>> allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) {
+	public Collection<CompletableFuture<Execution>> allocateResourcesForAll(
+			SlotProvider resourceProvider,
+			boolean queued,
+			LocationPreferenceConstraint locationPreferenceConstraint) {
 		final ExecutionVertex[] vertices = this.taskVertices;
 		final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length];
 
@@ -484,7 +500,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		for (int i = 0; i < vertices.length; i++) {
 			// allocate the next slot (future)
 			final Execution exec = vertices[i].getCurrentExecutionAttempt();
-			final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(resourceProvider, queued);
+			final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(
+				resourceProvider,
+				queued,
+				locationPreferenceConstraint);
 			slots[i] = allocationFuture;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e87a5a0..6d45d06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EvictingBoundedList;
@@ -487,7 +488,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			return Collections.emptySet();
 		}
 		else {
-			Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(4);
+			Set<CompletableFuture<TaskManagerLocation>> locations = new HashSet<>(getTotalNumberOfParallelSubtasks());
+			Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(getTotalNumberOfParallelSubtasks());
 
 			// go over all inputs
 			for (int i = 0; i < inputEdges.length; i++) {
@@ -497,17 +499,26 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 					// go over all input sources
 					for (int k = 0; k < sources.length; k++) {
 						// look-up assigned slot of input source
-						CompletableFuture<TaskManagerLocation> taskManagerLocationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
-						inputLocations.add(taskManagerLocationFuture);
-
+						CompletableFuture<TaskManagerLocation> locationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
+						// add input location
+						inputLocations.add(locationFuture);
+						// inputs which have too many distinct sources are not considered
 						if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
-							return Collections.emptyList();
+							inputLocations.clear();
+							break;
 						}
 					}
 				}
+				// keep the locations of the input with the least preferred locations
+				if (locations.isEmpty() || // nothing assigned yet
+						(!inputLocations.isEmpty() && inputLocations.size() < locations.size())) {
+					// current input has fewer preferred locations
+					locations.clear();
+					locations.addAll(inputLocations);
+				}
 			}
 
-			return inputLocations.isEmpty() ? Collections.emptyList() : inputLocations;
+			return locations.isEmpty() ? Collections.emptyList() : locations;
 		}
 	}
 
@@ -587,8 +598,22 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		}
 	}
 
-	public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) {
-		return this.currentExecution.scheduleForExecution(slotProvider, queued);
+	/**
+	 * Schedules the current execution of this ExecutionVertex.
+	 *
+	 * @param slotProvider to allocate the slots from
+	 * @param queued if the allocation can be queued
+	 * @param locationPreferenceConstraint constraint for the location preferences
+	 * @return
+	 */
+	public boolean scheduleForExecution(
+			SlotProvider slotProvider,
+			boolean queued,
+			LocationPreferenceConstraint locationPreferenceConstraint) {
+		return this.currentExecution.scheduleForExecution(
+			slotProvider,
+			queued,
+			locationPreferenceConstraint);
 	}
 
 	@VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index 1919c61..0b00c0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.FlinkException;
 
@@ -216,8 +217,9 @@ public class FailoverRegion {
 				for (ExecutionVertex ev : connectedExecutionVertexes) {
 					try {
 						ev.scheduleForExecution(
-								executionGraph.getSlotProvider(),
-								executionGraph.isQueuedSchedulingAllowed());
+							executionGraph.getSlotProvider(),
+							executionGraph.isQueuedSchedulingAllowed(),
+							LocationPreferenceConstraint.ANY); // some inputs not belonging to the failover region might have failed concurrently
 					}
 					catch (Throwable e) {
 						failover(globalModVersionOfFailover);

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index fcf7d40..1944b38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -1002,14 +1002,12 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		}
 
 		@Override
-		public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
-			Collection<CompletableFuture<TaskManagerLocation>> locationPreferenceFutures =
-				task.getTaskToExecute().getVertex().getPreferredLocations();
+		public CompletableFuture<SimpleSlot> allocateSlot(
+				ScheduledUnit task,
+				boolean allowQueued,
+				Collection<TaskManagerLocation> preferredLocations) {
 
-			CompletableFuture<Collection<TaskManagerLocation>> locationPreferencesFuture = FutureUtils.combineAll(locationPreferenceFutures);
-
-			return locationPreferencesFuture.thenCompose(
-				locationPreferences -> gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout));
+			return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, preferredLocations, timeout);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
index 23e6749..ef988b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -40,7 +42,11 @@ public interface SlotProvider {
 	 *
 	 * @param task         The task to allocate the slot for
 	 * @param allowQueued  Whether allow the task be queued if we do not have enough resource
+	 * @param preferredLocations preferred locations for the slot allocation
 	 * @return The future of the allocation
 	 */
-	CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued);
+	CompletableFuture<SimpleSlot> allocateSlot(
+		ScheduledUnit task,
+		boolean allowQueued,
+		Collection<TaskManagerLocation> preferredLocations);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LocationPreferenceConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LocationPreferenceConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LocationPreferenceConstraint.java
new file mode 100644
index 0000000..e890512
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LocationPreferenceConstraint.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+/**
+ * Defines the location preference constraint.
+ *
+ * <p> Currently, we support that all input locations have to be taken into consideration
+ * and only those which are known at scheduling time. Note that if all input locations
+ * are considered, then the scheduling operation can potentially take a while until all
+ * inputs have locations assigned.
+ */
+public enum LocationPreferenceConstraint {
+	ALL, // wait for all inputs to have a location assigned
+	ANY // only consider those inputs who already have a location assigned
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 9b1ffbe..1995c12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -18,6 +18,26 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceDiedException;
+import org.apache.flink.runtime.instance.InstanceListener;
+import org.apache.flink.runtime.instance.SharedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -32,31 +52,9 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.instance.SharedSlot;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceDiedException;
-import org.apache.flink.runtime.instance.InstanceListener;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * The scheduler is responsible for distributing the ready-to-run tasks among instances and slots.
  * 
@@ -135,31 +133,29 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 
 
 	@Override
-	public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
-		Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = task.getTaskToExecute().getVertex().getPreferredLocationsBasedOnInputs();
+	public CompletableFuture<SimpleSlot> allocateSlot(
+			ScheduledUnit task,
+			boolean allowQueued,
+			Collection<TaskManagerLocation> preferredLocations) {
 
-		CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
+		try {
+			final Object ret = scheduleTask(task, allowQueued, preferredLocations);
 
-		return preferredLocationsFuture.thenCompose(
-			preferredLocations -> {
-				try {
-					final Object ret = scheduleTask(task, allowQueued, preferredLocations);
-
-					if (ret instanceof SimpleSlot) {
-						return CompletableFuture.completedFuture((SimpleSlot) ret);
-					} else if (ret instanceof CompletableFuture) {
-						@SuppressWarnings("unchecked")
-						CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret;
-						return typed;
-					} else {
-						// this should never happen, simply guard this case with an exception
-						throw new RuntimeException();
-					}
-				} catch (NoResourceAvailableException e) {
-					throw new CompletionException(e);
-				}
+			if (ret instanceof SimpleSlot) {
+				return CompletableFuture.completedFuture((SimpleSlot) ret);
+			}
+			else if (ret instanceof CompletableFuture) {
+				@SuppressWarnings("unchecked")
+				CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret;
+				return typed;
 			}
-		);
+			else {
+				// this should never happen, simply guard this case with an exception
+				throw new RuntimeException();
+			}
+		} catch (NoResourceAvailableException e) {
+			return FutureUtils.completedExceptionally(e);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 5c80405..de91d78 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -46,14 +47,19 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.TestLogger;
 
@@ -67,14 +73,18 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests for {@link ExecutionGraph} deployment.
@@ -555,6 +565,103 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 			eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
 	}
 
+	/**
+	 * Tests that eager scheduling will wait until all input locations have been set before
+	 * scheduling a task.
+	 */
+	@Test
+	public void testEagerSchedulingWaitsOnAllInputPreferredLocations() throws Exception {
+		final int parallelism = 2;
+		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
+
+		final Time timeout = Time.hours(1L);
+		final JobVertexID sourceVertexId = new JobVertexID();
+		final JobVertex sourceVertex = new JobVertex("Test source", sourceVertexId);
+		sourceVertex.setInvokableClass(NoOpInvokable.class);
+		sourceVertex.setParallelism(parallelism);
+
+		final JobVertexID sinkVertexId = new JobVertexID();
+		final JobVertex sinkVertex = new JobVertex("Test sink", sinkVertexId);
+		sinkVertex.setInvokableClass(NoOpInvokable.class);
+		sinkVertex.setParallelism(parallelism);
+
+		sinkVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+		final Map<JobVertexID, CompletableFuture<SimpleSlot>[]> slotFutures = new HashMap<>(2);
+
+		for (JobVertexID jobVertexID : Arrays.asList(sourceVertexId, sinkVertexId)) {
+			CompletableFuture<SimpleSlot>[] slotFutureArray = new CompletableFuture[parallelism];
+
+			for (int i = 0; i < parallelism; i++) {
+				slotFutureArray[i] = new CompletableFuture<>();
+			}
+
+			slotFutures.put(jobVertexID, slotFutureArray);
+			slotProvider.addSlots(jobVertexID, slotFutureArray);
+		}
+
+		final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(3);
+
+		final ExecutionGraph executionGraph = ExecutionGraphTestUtils.createExecutionGraph(
+			new JobID(),
+			slotProvider,
+			new NoRestartStrategy(),
+			scheduledExecutorService,
+			timeout,
+			sourceVertex,
+			sinkVertex);
+
+		executionGraph.setScheduleMode(ScheduleMode.EAGER);
+		executionGraph.scheduleForExecution();
+
+		// all tasks should be in state SCHEDULED
+		for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
+			assertEquals(ExecutionState.SCHEDULED, executionVertex.getCurrentExecutionAttempt().getState());
+		}
+
+		// wait until the source vertex slots have been requested
+		assertTrue(slotProvider.getSlotRequestedFuture(sourceVertexId, 0).get());
+		assertTrue(slotProvider.getSlotRequestedFuture(sourceVertexId, 1).get());
+
+		// check that the sinks have not requested their slots because they need the location
+		// information of the sources
+		assertFalse(slotProvider.getSlotRequestedFuture(sinkVertexId, 0).isDone());
+		assertFalse(slotProvider.getSlotRequestedFuture(sinkVertexId, 1).isDone());
+
+		final TaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
+
+		final SimpleSlot sourceSlot1 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0);
+
+		final SimpleSlot sourceSlot2 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1);
+
+		final SimpleSlot sinkSlot1 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0);
+
+		final SimpleSlot sinkSlot2 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1);
+
+		slotFutures.get(sourceVertexId)[0].complete(sourceSlot1);
+		slotFutures.get(sourceVertexId)[1].complete(sourceSlot2);
+
+		// wait until the sink vertex slots have been requested after we completed the source slots
+		assertTrue(slotProvider.getSlotRequestedFuture(sinkVertexId, 0).get());
+		assertTrue(slotProvider.getSlotRequestedFuture(sinkVertexId, 1).get());
+
+		slotFutures.get(sinkVertexId)[0].complete(sinkSlot1);
+		slotFutures.get(sinkVertexId)[1].complete(sinkSlot2);
+
+		for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
+			ExecutionGraphTestUtils.waitUntilExecutionState(executionVertex.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 5000L);
+		}
+	}
+
+	private SimpleSlot createSlot(JobID jobId, TaskManagerLocation taskManagerLocation, int index) {
+		return new SimpleSlot(
+			jobId,
+			mock(SlotOwner.class),
+			taskManagerLocation,
+			index,
+			new SimpleAckingTaskManagerGateway());
+	}
+
 	@SuppressWarnings("serial")
 	public static class FailingFinalizeJobVertex extends JobVertex {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index f4e8b30..d3cec30 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -53,6 +53,7 @@ import org.mockito.Matchers;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
@@ -115,7 +116,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
 			CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
 			future.complete(simpleSlot);
-			when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
+			when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(future);
 
 			when(rootSlot.getSlotNumber()).thenReturn(0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 69a679a..90136a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -306,7 +306,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		for (int i = 0; i < parallelism; i += 2) {
 			sourceFutures[i].complete(sourceSlots[i]);
-			targetFutures[i + 1].complete(targetSlots[i + 1]);
+			targetFutures[i].complete(targetSlots[i]);
 		}
 
 		//
@@ -331,7 +331,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		// all completed futures must have been returns
 		for (int i = 0; i < parallelism; i += 2) {
 			assertTrue(sourceSlots[i].isCanceled());
-			assertTrue(targetSlots[i + 1].isCanceled());
+			assertTrue(targetSlots[i].isCanceled());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 5feeabc..42a63ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -320,9 +320,21 @@ public class ExecutionGraphTestUtils {
 			ScheduledExecutorService executor,
 			JobVertex... vertices) throws Exception {
 
+			return createExecutionGraph(jid, slotProvider, restartStrategy, executor, Time.seconds(10L), vertices);
+	}
+
+	public static ExecutionGraph createExecutionGraph(
+			JobID jid,
+			SlotProvider slotProvider,
+			RestartStrategy restartStrategy,
+			ScheduledExecutorService executor,
+			Time timeout,
+			JobVertex... vertices) throws Exception {
+
 		checkNotNull(jid);
 		checkNotNull(restartStrategy);
 		checkNotNull(vertices);
+		checkNotNull(timeout);
 
 		return ExecutionGraphBuilder.buildGraph(
 			null,
@@ -333,7 +345,7 @@ public class ExecutionGraphTestUtils {
 			slotProvider,
 			ExecutionGraphTestUtils.class.getClassLoader(),
 			new StandaloneCheckpointRecoveryFactory(),
-			Time.seconds(10),
+			timeout,
 			restartStrategy,
 			new UnregisteredMetricsGroup(),
 			1,

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
new file mode 100644
index 0000000..fa845cf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link Execution}.
+ */
+public class ExecutionTest extends TestLogger {
+
+	/**
+	 * Tests that slots are released if we cannot assign the allocated resource to the
+	 * Execution. In this case, a concurrent cancellation precedes the assignment.
+	 */
+	@Test
+	public void testSlotReleaseOnFailedResourceAssignment() throws Exception {
+		final JobVertexID jobVertexId = new JobVertexID();
+		final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+
+		final CompletableFuture<SimpleSlot> slotFuture = new CompletableFuture<>();
+		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
+		slotProvider.addSlot(jobVertexId, 0, slotFuture);
+
+		ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
+			new JobID(),
+			slotProvider,
+			new NoRestartStrategy(),
+			jobVertex);
+
+		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
+
+		final Execution execution = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
+
+		final TestingSlotOwner slotOwner = new TestingSlotOwner();
+
+		final SimpleSlot slot = new SimpleSlot(
+			new JobID(),
+			slotOwner,
+			new LocalTaskManagerLocation(),
+			0,
+			new SimpleAckingTaskManagerGateway());
+
+		CompletableFuture<Execution> allocationFuture = execution.allocateAndAssignSlotForExecution(
+			slotProvider,
+			false,
+			LocationPreferenceConstraint.ALL);
+
+		assertFalse(allocationFuture.isDone());
+
+		assertEquals(ExecutionState.SCHEDULED, execution.getState());
+
+		// cancelling the execution should move it into state CANCELED; this happens before
+		// the slot future has been completed
+		execution.cancel();
+
+		assertEquals(ExecutionState.CANCELED, execution.getState());
+
+		// completing now the future should cause the slot to be released
+		slotFuture.complete(slot);
+
+		assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
+	}
+
+	/**
+	 * Tests that the slot is released in case of a execution cancellation when having
+	 * a slot assigned and being in state SCHEDULED.
+	 */
+	@Test
+	public void testSlotReleaseOnExecutionCancellationInScheduled() throws Exception {
+		final JobVertexID jobVertexId = new JobVertexID();
+		final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+
+		final TestingSlotOwner slotOwner = new TestingSlotOwner();
+
+		final SimpleSlot slot = new SimpleSlot(
+			new JobID(),
+			slotOwner,
+			new LocalTaskManagerLocation(),
+			0,
+			new SimpleAckingTaskManagerGateway());
+
+		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
+		slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
+
+		ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
+			new JobID(),
+			slotProvider,
+			new NoRestartStrategy(),
+			jobVertex);
+
+		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
+
+		final Execution execution = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
+
+		CompletableFuture<Execution> allocationFuture = execution.allocateAndAssignSlotForExecution(
+			slotProvider,
+			false,
+			LocationPreferenceConstraint.ALL);
+
+		assertTrue(allocationFuture.isDone());
+
+		assertEquals(ExecutionState.SCHEDULED, execution.getState());
+
+		assertEquals(slot, execution.getAssignedResource());
+
+		// cancelling the execution should move it into state CANCELED
+		execution.cancel();
+		assertEquals(ExecutionState.CANCELED, execution.getState());
+
+		assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
+	}
+
+	/**
+	 * Tests that the slot is released in case of a execution cancellation when being in state
+	 * RUNNING.
+	 */
+	@Test
+	public void testSlotReleaseOnExecutionCancellationInRunning() throws Exception {
+		final JobVertexID jobVertexId = new JobVertexID();
+		final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+
+		final TestingSlotOwner slotOwner = new TestingSlotOwner();
+
+		final SimpleSlot slot = new SimpleSlot(
+			new JobID(),
+			slotOwner,
+			new LocalTaskManagerLocation(),
+			0,
+			new SimpleAckingTaskManagerGateway());
+
+		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
+		slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
+
+		ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
+			new JobID(),
+			slotProvider,
+			new NoRestartStrategy(),
+			jobVertex);
+
+		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
+
+		final Execution execution = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
+
+		CompletableFuture<Execution> allocationFuture = execution.allocateAndAssignSlotForExecution(
+			slotProvider,
+			false,
+			LocationPreferenceConstraint.ALL);
+
+		assertTrue(allocationFuture.isDone());
+
+		assertEquals(ExecutionState.SCHEDULED, execution.getState());
+
+		assertEquals(slot, execution.getAssignedResource());
+
+		execution.deploy();
+
+		execution.switchToRunning();
+
+		// cancelling the execution should move it into state CANCELING
+		execution.cancel();
+		assertEquals(ExecutionState.CANCELING, execution.getState());
+
+		execution.cancelingComplete();
+
+		assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
+	}
+
+	/**
+	 * Tests that all preferred locations are calculated.
+	 */
+	@Test
+	public void testAllPreferredLocationCalculation() throws ExecutionException, InterruptedException {
+		final TaskManagerLocation taskManagerLocation1 = new LocalTaskManagerLocation();
+		final TaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
+		final TaskManagerLocation taskManagerLocation3 = new LocalTaskManagerLocation();
+
+		final CompletableFuture<TaskManagerLocation> locationFuture1 = CompletableFuture.completedFuture(taskManagerLocation1);
+		final CompletableFuture<TaskManagerLocation> locationFuture2 = new CompletableFuture<>();
+		final CompletableFuture<TaskManagerLocation> locationFuture3 = new CompletableFuture<>();
+
+		final Execution execution = SchedulerTestUtils.getTestVertex(Arrays.asList(locationFuture1, locationFuture2, locationFuture3));
+
+		CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = execution.calculatePreferredLocations(LocationPreferenceConstraint.ALL);
+
+		assertFalse(preferredLocationsFuture.isDone());
+
+		locationFuture3.complete(taskManagerLocation3);
+
+		assertFalse(preferredLocationsFuture.isDone());
+
+		locationFuture2.complete(taskManagerLocation2);
+
+		assertTrue(preferredLocationsFuture.isDone());
+
+		final Collection<TaskManagerLocation> preferredLocations = preferredLocationsFuture.get();
+
+		assertThat(preferredLocations, containsInAnyOrder(taskManagerLocation1, taskManagerLocation2, taskManagerLocation3));
+	}
+
+	/**
+	 * Tests that any preferred locations are calculated.
+	 */
+	@Test
+	public void testAnyPreferredLocationCalculation() throws ExecutionException, InterruptedException {
+		final TaskManagerLocation taskManagerLocation1 = new LocalTaskManagerLocation();
+		final TaskManagerLocation taskManagerLocation3 = new LocalTaskManagerLocation();
+
+		final CompletableFuture<TaskManagerLocation> locationFuture1 = CompletableFuture.completedFuture(taskManagerLocation1);
+		final CompletableFuture<TaskManagerLocation> locationFuture2 = new CompletableFuture<>();
+		final CompletableFuture<TaskManagerLocation> locationFuture3 = CompletableFuture.completedFuture(taskManagerLocation3);
+
+		final Execution execution = SchedulerTestUtils.getTestVertex(Arrays.asList(locationFuture1, locationFuture2, locationFuture3));
+
+		CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = execution.calculatePreferredLocations(LocationPreferenceConstraint.ANY);
+
+		assertTrue(preferredLocationsFuture.isDone());
+
+		final Collection<TaskManagerLocation> preferredLocations = preferredLocationsFuture.get();
+
+		assertThat(preferredLocations, containsInAnyOrder(taskManagerLocation1, taskManagerLocation3));
+	}
+
+	/**
+	 * Slot owner which records the first returned slot.
+	 */
+	public static final class TestingSlotOwner implements SlotOwner {
+
+		final CompletableFuture<Slot> returnedSlot = new CompletableFuture<>();
+
+		public CompletableFuture<Slot> getReturnedSlotFuture() {
+			return returnedSlot;
+		}
+
+		@Override
+		public boolean returnAllocatedSlot(Slot slot) {
+			return returnedSlot.complete(slot);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 9908dae..cb31d15 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -447,7 +448,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			// it can occur as the result of races
 			{
 				Scheduler scheduler = mock(Scheduler.class);
-				vertex.scheduleForExecution(scheduler, false);
+				vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL);
 
 				assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 			}
@@ -486,7 +487,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 				setVertexState(vertex, ExecutionState.CANCELING);
 
 				Scheduler scheduler = mock(Scheduler.class);
-				vertex.scheduleForExecution(scheduler, false);
+				vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL);
 			}
 			catch (Exception e) {
 				fail("should not throw an exception");

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 26cb3f1..cf08687 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -35,6 +35,8 @@ import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import java.util.Collection;
@@ -52,7 +54,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class ExecutionVertexDeploymentTest {
+public class ExecutionVertexDeploymentTest extends TestLogger {
 
 	@Test
 	public void testDeployCall() {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 4eac4aa..27f7f51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
@@ -32,6 +33,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 import org.mockito.Matchers;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
@@ -39,6 +41,7 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -62,11 +65,11 @@ public class ExecutionVertexSchedulingTest {
 			Scheduler scheduler = mock(Scheduler.class);
 			CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
 			future.complete(slot);
-			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
+			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(future);
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			// try to deploy to the slot
-			vertex.scheduleForExecution(scheduler, false);
+			vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL);
 
 			// will have failed
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
@@ -94,11 +97,11 @@ public class ExecutionVertexSchedulingTest {
 			final CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
 
 			Scheduler scheduler = mock(Scheduler.class);
-			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
+			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(future);
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			// try to deploy to the slot
-			vertex.scheduleForExecution(scheduler, true);
+			vertex.scheduleForExecution(scheduler, true, LocationPreferenceConstraint.ALL);
 
 			// future has not yet a slot
 			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
@@ -128,12 +131,12 @@ public class ExecutionVertexSchedulingTest {
 			Scheduler scheduler = mock(Scheduler.class);
 			CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
 			future.complete(slot);
-			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
+			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(future);
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
 			// try to deploy to the slot
-			vertex.scheduleForExecution(scheduler, false);
+			vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL);
 			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index 4e89d43..f1e0f7c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -156,7 +157,7 @@ public class FailoverRegionTest extends TestLogger {
 
 		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
 
-		ev21.scheduleForExecution(slotProvider, true);
+		ev21.scheduleForExecution(slotProvider, true, LocationPreferenceConstraint.ALL);
 		ev21.getCurrentExecutionAttempt().fail(new Exception("New fail"));
 		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev11).getState());
 		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
@@ -169,7 +170,7 @@ public class FailoverRegionTest extends TestLogger {
 
 		ev11.getCurrentExecutionAttempt().markFinished();
 		ev21.getCurrentExecutionAttempt().markFinished();
-		ev22.scheduleForExecution(slotProvider, true);
+		ev22.scheduleForExecution(slotProvider, true, LocationPreferenceConstraint.ALL);
 		ev22.getCurrentExecutionAttempt().markFinished();
 		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
 		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
@@ -209,11 +210,11 @@ public class FailoverRegionTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that two failover regions failover at the same time, they will not influence each orther
+	 * Tests that two failover regions failover at the same time, they will not influence each other
 	 * @throws Exception
 	 */
 	@Test
-	public void testMutilRegionFailoverAtSameTime() throws Exception {
+	public void testMultiRegionFailoverAtSameTime() throws Exception {
 		Instance instance = ExecutionGraphTestUtils.getInstance(
 				new ActorTaskManagerGateway(
 						new SimpleActorGateway(TestingUtils.directExecutionContext())),

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
index fef6aaa..5d7fa1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
@@ -22,7 +22,9 @@ import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -38,6 +40,8 @@ class ProgrammedSlotProvider implements SlotProvider {
 
 	private final Map<JobVertexID, CompletableFuture<SimpleSlot>[]> slotFutures = new HashMap<>();
 
+	private final Map<JobVertexID, CompletableFuture<Boolean>[]> slotFutureRequested = new HashMap<>();
+
 	private final int parallelism;
 
 	public ProgrammedSlotProvider(int parallelism) {
@@ -51,14 +55,20 @@ class ProgrammedSlotProvider implements SlotProvider {
 		checkArgument(subtaskIndex >= 0 && subtaskIndex < parallelism);
 
 		CompletableFuture<SimpleSlot>[] futures = slotFutures.get(vertex);
+		CompletableFuture<Boolean>[] requestedFutures = slotFutureRequested.get(vertex);
+
 		if (futures == null) {
 			@SuppressWarnings("unchecked")
 			CompletableFuture<SimpleSlot>[] newArray = (CompletableFuture<SimpleSlot>[]) new CompletableFuture<?>[parallelism];
 			futures = newArray;
 			slotFutures.put(vertex, futures);
+
+			requestedFutures = new CompletableFuture[parallelism];
+			slotFutureRequested.put(vertex, requestedFutures);
 		}
 
 		futures[subtaskIndex] = future;
+		requestedFutures[subtaskIndex] = new CompletableFuture<>();
 	}
 
 	public void addSlots(JobVertexID vertex, CompletableFuture<SimpleSlot>[] futures) {
@@ -67,10 +77,25 @@ class ProgrammedSlotProvider implements SlotProvider {
 		checkArgument(futures.length == parallelism);
 
 		slotFutures.put(vertex, futures);
+
+		CompletableFuture<Boolean>[] requestedFutures = new CompletableFuture[futures.length];
+
+		for (int i = 0; i < futures.length; i++) {
+			requestedFutures[i] = new CompletableFuture<>();
+		}
+
+		slotFutureRequested.put(vertex, requestedFutures);
+	}
+
+	public CompletableFuture<Boolean> getSlotRequestedFuture(JobVertexID jobVertexId, int subtaskIndex) {
+		return slotFutureRequested.get(jobVertexId)[subtaskIndex];
 	}
 
 	@Override
-	public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+	public CompletableFuture<SimpleSlot> allocateSlot(
+			ScheduledUnit task,
+			boolean allowQueued,
+			Collection<TaskManagerLocation> preferredLocations) {
 		JobVertexID vertexId = task.getTaskToExecute().getVertex().getJobvertexId();
 		int subtask = task.getTaskToExecute().getParallelSubtaskIndex();
 
@@ -78,6 +103,8 @@ class ProgrammedSlotProvider implements SlotProvider {
 		if (forTask != null) {
 			CompletableFuture<SimpleSlot> future = forTask[subtask];
 			if (future != null) {
+				slotFutureRequested.get(vertexId)[subtask].complete(true);
+
 				return future;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index be5282a..a2323bf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.net.InetAddress;
 import java.util.ArrayDeque;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -70,7 +71,10 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 	}
 
 	@Override
-	public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+	public CompletableFuture<SimpleSlot> allocateSlot(
+			ScheduledUnit task,
+			boolean allowQueued,
+			Collection<TaskManagerLocation> preferredLocations) {
 		final AllocatedSlot slot;
 
 		synchronized (slots) {


[4/4] flink git commit: [hotfix] Make failover region topological sorted

Posted by tr...@apache.org.
[hotfix] Make failover region topological sorted


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ff91be1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ff91be1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ff91be1

Branch: refs/heads/master
Commit: 3ff91be1d49cf9f972ad5f1c556af173d97d102e
Parents: 3b0fb26
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Oct 26 18:22:43 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 2 17:04:45 2017 +0100

----------------------------------------------------------------------
 .../executiongraph/failover/RestartPipelinedRegionStrategy.java | 5 +++--
 .../apache/flink/runtime/executiongraph/FailoverRegionTest.java | 1 +
 2 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3ff91be1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
index 1884d1c..b8f6964 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
@@ -166,8 +166,9 @@ public class RestartPipelinedRegionStrategy extends FailoverStrategy {
 									if (predecessorRegion != thisRegion) {
 
 										// we need to merge our region and the predecessor's region
-										thisRegion.addAll(predecessorRegion);
-										distinctRegions.remove(predecessorRegion);
+										predecessorRegion.addAll(thisRegion);
+										distinctRegions.remove(thisRegion);
+										thisRegion = predecessorRegion;
 
 										// remap the vertices from that merged region
 										for (ExecutionVertex inPredRegion: predecessorRegion) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff91be1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index f1e0f7c..4d53e67 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -422,6 +422,7 @@ public class FailoverRegionTest extends TestLogger {
 		v2.setInvokableClass(AbstractInvokable.class);
 		v3.setInvokableClass(AbstractInvokable.class);
 
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 		v3.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 		v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);