You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/03 12:37:31 UTC

[5/6] flink git commit: [FLINK-5499] [JobManager] Make the location preferences combined by state and inputs.

[FLINK-5499] [JobManager] Make the location preferences combined by state and inputs.

Reusing the prior location (for state locality) takes precedence over input locality.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9ed4ff1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9ed4ff1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9ed4ff1

Branch: refs/heads/master
Commit: b9ed4ff151c5d3a64be395c660160b5619e32c7f
Parents: fe4fe58
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 20:34:33 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 12:46:14 2017 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionVertex.java |  83 +++++--
 .../apache/flink/runtime/instance/SlotPool.java |   6 +-
 .../instance/SlotSharingGroupAssignment.java    |   6 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |   4 +-
 .../runtime/jobmanager/slots/AllocatedSlot.java |   2 +-
 .../ExecutionVertexLocalityTest.java            | 244 +++++++++++++++++++
 .../scheduler/SchedulerTestUtils.java           |   6 +-
 7 files changed, 323 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 0bb3514..cb2e177 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.util.EvictingBoundedList;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 
 import java.io.IOException;
@@ -264,20 +265,21 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	}
 
 	/**
-	 * Just return the last assigned resource location if found
-	 *
-	 * @return The collection of TaskManagerLocation
+	 * Gets the location where the latest completed/canceled/failed execution of the vertex's
+	 * task happened.
+	 * 
+	 * @return The latest prior execution location, or null, if there is none, yet.
 	 */
-	public List<TaskManagerLocation> getPriorAssignedResourceLocations() {
-		List<TaskManagerLocation> list = new ArrayList<>();
-		for (int i = priorExecutions.size() - 1 ; i >= 0 ; i--) {
-			Execution prior = priorExecutions.get(i) ;
-			if (prior.getAssignedResourceLocation() != null) {
-				list.add(prior.getAssignedResourceLocation());
-				break;
+	public TaskManagerLocation getLatestPriorLocation() {
+		synchronized (priorExecutions) {
+			final int size = priorExecutions.size();
+			if (size > 0) {
+				return priorExecutions.get(size - 1).getAssignedResourceLocation();
+			}
+			else {
+				return null;
 			}
 		}
-		return list;
 	}
 
 	EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
@@ -398,14 +400,61 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	}
 
 	/**
-	 * Gets the location preferences of this task, determined by the locations of the predecessors from which
-	 * it receives input data.
+	 * Gets the overall preferred execution location for this vertex's current execution.
+	 * The preference is determined as follows:
+	 * 
+	 * <ol>
+	 *     <li>If the task execution has state to load (from a checkpoint), then the location preference
+	 *         is the location of the previous execution (if there is a previous execution attempt).
+	 *     <li>If the task execution has no state or no previous location, then the location preference
+	 *         is based on the task's inputs.
+	 * </ol>
+	 * 
+	 * These rules should result in the following behavior:
+	 * 
+	 * <ul>
+	 *     <li>Stateless tasks are always scheduled based on co-location with inputs.
+	 *     <li>Stateful tasks are on their initial attempt executed based on co-location with inputs.
+	 *     <li>Repeated executions of stateful tasks try to co-locate the execution with its state.
+	 * </ul>
+	 * 
+	 * @return The preferred excution locations for the execution attempt.
+	 * 
+	 * @see #getPreferredLocationsBasedOnState()
+	 * @see #getPreferredLocationsBasedOnInputs() 
+	 */
+	public Iterable<TaskManagerLocation> getPreferredLocations() {
+		Iterable<TaskManagerLocation> basedOnState = getPreferredLocationsBasedOnState();
+		return basedOnState != null ? basedOnState : getPreferredLocationsBasedOnInputs();
+	}
+	
+	/**
+	 * Gets the preferred location to execute the current task execution attempt, based on the state
+	 * that the execution attempt will resume.
+	 * 
+	 * @return A size-one iterable with the location preference, or null, if there is no
+	 *         location preference based on the state.
+	 */
+	public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnState() {
+		TaskManagerLocation priorLocation;
+		if (currentExecution.getTaskStateHandles() != null && (priorLocation = getLatestPriorLocation()) != null) {
+			return Collections.singleton(priorLocation);
+		}
+		else {
+			return null;
+		}
+	}
+
+	/**
+	 * Gets the location preferences of the vertex's current task execution, as determined by the locations
+	 * of the predecessors from which it receives input data.
 	 * If there are more than MAX_DISTINCT_LOCATIONS_TO_CONSIDER different locations of source data, this
 	 * method returns {@code null} to indicate no location preference.
 	 *
-	 * @return The preferred locations for this vertex execution, or null, if there is no preference.
+	 * @return The preferred locations based in input streams, or an empty iterable,
+	 *         if there is no input-based preference.
 	 */
-	public Iterable<TaskManagerLocation> getPreferredLocations() {
+	public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
 		// otherwise, base the preferred locations on the input connections
 		if (inputEdges == null) {
 			return Collections.emptySet();
@@ -435,7 +484,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 					}
 				}
 				// keep the locations of the input with the least preferred locations
-				if(locations.isEmpty() || // nothing assigned yet
+				if (locations.isEmpty() || // nothing assigned yet
 						(!inputLocations.isEmpty() && inputLocations.size() < locations.size())) {
 					// current input has fewer preferred locations
 					locations.clear();
@@ -443,7 +492,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 				}
 			}
 
-			return locations;
+			return locations.isEmpty() ? Collections.<TaskManagerLocation>emptyList() : locations;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 6fac3c8..672431e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -984,8 +984,10 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
 		@Override
 		public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
-			return gateway.allocateSlot(task, ResourceProfile.UNKNOWN,
-					task.getTaskToExecute().getVertex().getPriorAssignedResourceLocations(), timeout);
+			Iterable<TaskManagerLocation> locationPreferences = 
+					task.getTaskToExecute().getVertex().getPreferredLocations();
+
+			return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 346cc77..88fbc10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -262,7 +262,7 @@ public class SlotSharingGroupAssignment {
 
 	/**
 	 * Gets a slot suitable for the given task vertex. This method will prefer slots that are local
-	 * (with respect to {@link ExecutionVertex#getPreferredLocations()}), but will return non local
+	 * (with respect to {@link ExecutionVertex#getPreferredLocationsBasedOnInputs()}), but will return non local
 	 * slots if no local slot is available. The method returns null, when this sharing group has
 	 * no slot is available for the given JobVertexID. 
 	 *
@@ -271,7 +271,7 @@ public class SlotSharingGroupAssignment {
 	 * @return A slot to execute the given ExecutionVertex in, or null, if none is available.
 	 */
 	public SimpleSlot getSlotForTask(ExecutionVertex vertex) {
-		return getSlotForTask(vertex.getJobvertexId(), vertex.getPreferredLocations());
+		return getSlotForTask(vertex.getJobvertexId(), vertex.getPreferredLocationsBasedOnInputs());
 	}
 
 	/**
@@ -313,7 +313,7 @@ public class SlotSharingGroupAssignment {
 	 *         shared slot is available.
 	 */
 	public SimpleSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) {
-		return getSlotForTask(constraint, vertex.getPreferredLocations());
+		return getSlotForTask(constraint, vertex.getPreferredLocationsBasedOnInputs());
 	}
 	
 	SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable<TaskManagerLocation> locationPreferences) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 466a148..dc82440 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -174,7 +174,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 
 		final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 		
-		final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();
+		final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocationsBasedOnInputs();
 		final boolean forceExternalLocation = false &&
 									preferredLocations != null && preferredLocations.iterator().hasNext();
 	
@@ -240,7 +240,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 						localOnly = true;
 					}
 					else {
-						locations = vertex.getPreferredLocations();
+						locations = vertex.getPreferredLocationsBasedOnInputs();
 						localOnly = forceExternalLocation;
 					}
 					

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
index 269a8f3..4910862 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
@@ -65,7 +65,7 @@ public class AllocatedSlot {
 			JobID jobID,
 			TaskManagerLocation location,
 			int slotNumber,
-			ResourceProfile resourceProfile,		
+			ResourceProfile resourceProfile,
 			TaskManagerGateway taskManagerGateway) {
 		this.slotAllocationId = checkNotNull(slotAllocationId);
 		this.jobID = checkNotNull(jobID);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
new file mode 100644
index 0000000..36b7575
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * Tests that the execution vertex handles locality preferences well.
+ */
+public class ExecutionVertexLocalityTest extends TestLogger {
+
+	private final JobID jobId = new JobID();
+
+	private final JobVertexID sourceVertexId = new JobVertexID();
+	private final JobVertexID targetVertexId = new JobVertexID();
+
+	/**
+	 * This test validates that vertices that have only one input stream try to
+	 * co-locate their tasks with the producer.
+	 */
+	@Test
+	public void testLocalityInputBasedForward() throws Exception {
+		final int parallelism = 10;
+		final TaskManagerLocation[] locations = new TaskManagerLocation[parallelism];
+
+		final ExecutionGraph graph = createTestGraph(parallelism, false);
+
+		// set the location for all sources to a distinct location
+		for (int i = 0; i < parallelism; i++) {
+			ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+
+			TaskManagerLocation location = new TaskManagerLocation(
+					ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i);
+
+			locations[i] = location;
+			initializeLocation(source, location);
+		}
+
+		// validate that the target vertices have no location preference
+		for (int i = 0; i < parallelism; i++) {
+			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+			Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+
+			assertTrue(preference.hasNext());
+			assertEquals(locations[i], preference.next());
+			assertFalse(preference.hasNext());
+		}
+	}
+
+	/**
+	 * This test validates that vertices with too many input streams do not have a location
+	 * preference any more.
+	 */
+	@Test
+	public void testNoLocalityInputLargeAllToAll() throws Exception {
+		final int parallelism = 100;
+
+		final ExecutionGraph graph = createTestGraph(parallelism, true);
+
+		// set the location for all sources to a distinct location
+		for (int i = 0; i < parallelism; i++) {
+			ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+			TaskManagerLocation location = new TaskManagerLocation(
+					ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i);
+			initializeLocation(source, location);
+		}
+
+		// validate that the target vertices have no location preference
+		for (int i = 0; i < parallelism; i++) {
+			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+
+			Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+			assertFalse(preference.hasNext());
+		}
+	}
+
+	/**
+	 * This test validates that stateful vertices schedule based in the state's location
+	 * (which is the prior execution's location).
+	 */
+	@Test
+	public void testLocalityBasedOnState() throws Exception {
+		final int parallelism = 10;
+		final TaskManagerLocation[] locations = new TaskManagerLocation[parallelism];
+
+		final ExecutionGraph graph = createTestGraph(parallelism, false);
+
+		// set the location for all sources and targets
+		for (int i = 0; i < parallelism; i++) {
+			ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+
+			TaskManagerLocation randomLocation = new TaskManagerLocation(
+					ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i);
+			
+			TaskManagerLocation location = new TaskManagerLocation(
+					ResourceID.generate(), InetAddress.getLoopbackAddress(), 20000 + i);
+
+			locations[i] = location;
+			initializeLocation(source, randomLocation);
+			initializeLocation(target, location);
+
+			setState(source.getCurrentExecutionAttempt(), ExecutionState.CANCELED);
+			setState(target.getCurrentExecutionAttempt(), ExecutionState.CANCELED);
+		}
+
+		// mimic a restart: all vertices get re-initialized without actually being executed
+		for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) {
+			ejv.resetForNewExecution();
+		}
+
+		// set new location for the sources and some state for the targets
+		for (int i = 0; i < parallelism; i++) {
+			// source location
+			ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+			TaskManagerLocation randomLocation = new TaskManagerLocation(
+					ResourceID.generate(), InetAddress.getLoopbackAddress(), 30000 + i);
+			initializeLocation(source, randomLocation);
+
+			// target state
+			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+			target.getCurrentExecutionAttempt().setInitialState(mock(TaskStateHandles.class));
+		}
+
+		// validate that the target vertices have the state's location as the location preference
+		for (int i = 0; i < parallelism; i++) {
+			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+			Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+
+			assertTrue(preference.hasNext());
+			assertEquals(locations[i], preference.next());
+			assertFalse(preference.hasNext());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a simple 2 vertex graph with a parallel source and a parallel target.
+	 */
+	private ExecutionGraph createTestGraph(int parallelism, boolean allToAll) throws Exception {
+
+		JobVertex source = new JobVertex("source", sourceVertexId);
+		source.setParallelism(parallelism);
+		source.setInvokableClass(NoOpInvokable.class);
+
+		JobVertex target = new JobVertex("source", targetVertexId);
+		target.setParallelism(parallelism);
+		target.setInvokableClass(NoOpInvokable.class);
+
+		DistributionPattern connectionPattern = allToAll ? DistributionPattern.ALL_TO_ALL : DistributionPattern.POINTWISE;
+		target.connectNewDataSetAsInput(source, connectionPattern);
+
+		JobGraph testJob = new JobGraph(jobId, "test job", source, target);
+
+		return ExecutionGraphBuilder.buildGraph(
+				null,
+				testJob,
+				new Configuration(),
+				Executors.directExecutor(),
+				Executors.directExecutor(),
+				getClass().getClassLoader(),
+				new StandaloneCheckpointRecoveryFactory(),
+				Time.of(10, TimeUnit.SECONDS),
+				new FixedDelayRestartStrategy(10, 0L),
+				new UnregisteredMetricsGroup(),
+				1,
+				log);
+	}
+
+	private void initializeLocation(ExecutionVertex vertex, TaskManagerLocation location) throws Exception {
+		// we need a bit of reflection magic to initialize the location without going through
+		// scheduling paths. we choose to do that, rather than the alternatives:
+		//  - mocking the scheduler created fragile tests that break whenever the scheduler is adjusted
+		//  - exposing test methods in the ExecutionVertex leads to undesirable setters 
+
+		AllocatedSlot slot = new AllocatedSlot(
+				new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class));
+
+		SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0);
+
+		final Field locationField = Execution.class.getDeclaredField("assignedResource");
+		locationField.setAccessible(true);
+
+		locationField.set(vertex.getCurrentExecutionAttempt(), simpleSlot);
+	}
+
+	private void setState(Execution execution, ExecutionState state) throws Exception {
+		final Field stateField = Execution.class.getDeclaredField("state");
+		stateField.setAccessible(true);
+
+		stateField.set(execution, state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index b36de77..9e692ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -106,7 +106,7 @@ public class SchedulerTestUtils {
 	public static Execution getTestVertex(Iterable<TaskManagerLocation> preferredLocations) {
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		
-		when(vertex.getPreferredLocations()).thenReturn(preferredLocations);
+		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocations);
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
 		
@@ -119,7 +119,7 @@ public class SchedulerTestUtils {
 	public static Execution getTestVertex(JobVertexID jid, int taskIndex, int numTasks) {
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		
-		when(vertex.getPreferredLocations()).thenReturn(null);
+		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(null);
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.getJobvertexId()).thenReturn(jid);
 		when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
@@ -139,7 +139,7 @@ public class SchedulerTestUtils {
 
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 
-		when(vertex.getPreferredLocations()).thenReturn(Arrays.asList(locations));
+		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Arrays.asList(locations));
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.getJobvertexId()).thenReturn(jid);
 		when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);