You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/03 12:37:31 UTC
[5/6] flink git commit: [FLINK-5499] [JobManager] Make the location
preferences combined by state and inputs.
[FLINK-5499] [JobManager] Make the location preferences combined by state and inputs.
Reusing the prior location (for state locality) takes precedence over input locality.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9ed4ff1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9ed4ff1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9ed4ff1
Branch: refs/heads/master
Commit: b9ed4ff151c5d3a64be395c660160b5619e32c7f
Parents: fe4fe58
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 20:34:33 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 12:46:14 2017 +0100
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionVertex.java | 83 +++++--
.../apache/flink/runtime/instance/SlotPool.java | 6 +-
.../instance/SlotSharingGroupAssignment.java | 6 +-
.../runtime/jobmanager/scheduler/Scheduler.java | 4 +-
.../runtime/jobmanager/slots/AllocatedSlot.java | 2 +-
.../ExecutionVertexLocalityTest.java | 244 +++++++++++++++++++
.../scheduler/SchedulerTestUtils.java | 6 +-
7 files changed, 323 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 0bb3514..cb2e177 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.util.EvictingBoundedList;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
+
import org.slf4j.Logger;
import java.io.IOException;
@@ -264,20 +265,21 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
/**
- * Just return the last assigned resource location if found
- *
- * @return The collection of TaskManagerLocation
+ * Gets the location where the latest completed/canceled/failed execution of the vertex's
+ * task happened.
+ *
+ * @return The latest prior execution location, or null, if there is none, yet.
*/
- public List<TaskManagerLocation> getPriorAssignedResourceLocations() {
- List<TaskManagerLocation> list = new ArrayList<>();
- for (int i = priorExecutions.size() - 1 ; i >= 0 ; i--) {
- Execution prior = priorExecutions.get(i) ;
- if (prior.getAssignedResourceLocation() != null) {
- list.add(prior.getAssignedResourceLocation());
- break;
+ public TaskManagerLocation getLatestPriorLocation() {
+ synchronized (priorExecutions) {
+ final int size = priorExecutions.size();
+ if (size > 0) {
+ return priorExecutions.get(size - 1).getAssignedResourceLocation();
+ }
+ else {
+ return null;
}
}
- return list;
}
EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
@@ -398,14 +400,61 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
/**
- * Gets the location preferences of this task, determined by the locations of the predecessors from which
- * it receives input data.
+ * Gets the overall preferred execution location for this vertex's current execution.
+ * The preference is determined as follows:
+ *
+ * <ol>
+ * <li>If the task execution has state to load (from a checkpoint), then the location preference
+ * is the location of the previous execution (if there is a previous execution attempt).
+ * <li>If the task execution has no state or no previous location, then the location preference
+ * is based on the task's inputs.
+ * </ol>
+ *
+ * These rules should result in the following behavior:
+ *
+ * <ul>
+ * <li>Stateless tasks are always scheduled based on co-location with inputs.
+ * <li>Stateful tasks are on their initial attempt executed based on co-location with inputs.
+ * <li>Repeated executions of stateful tasks try to co-locate the execution with its state.
+ * </ul>
+ *
+ * @return The preferred excution locations for the execution attempt.
+ *
+ * @see #getPreferredLocationsBasedOnState()
+ * @see #getPreferredLocationsBasedOnInputs()
+ */
+ public Iterable<TaskManagerLocation> getPreferredLocations() {
+ Iterable<TaskManagerLocation> basedOnState = getPreferredLocationsBasedOnState();
+ return basedOnState != null ? basedOnState : getPreferredLocationsBasedOnInputs();
+ }
+
+ /**
+ * Gets the preferred location to execute the current task execution attempt, based on the state
+ * that the execution attempt will resume.
+ *
+ * @return A size-one iterable with the location preference, or null, if there is no
+ * location preference based on the state.
+ */
+ public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnState() {
+ TaskManagerLocation priorLocation;
+ if (currentExecution.getTaskStateHandles() != null && (priorLocation = getLatestPriorLocation()) != null) {
+ return Collections.singleton(priorLocation);
+ }
+ else {
+ return null;
+ }
+ }
+
+ /**
+ * Gets the location preferences of the vertex's current task execution, as determined by the locations
+ * of the predecessors from which it receives input data.
* If there are more than MAX_DISTINCT_LOCATIONS_TO_CONSIDER different locations of source data, this
* method returns {@code null} to indicate no location preference.
*
- * @return The preferred locations for this vertex execution, or null, if there is no preference.
+ * @return The preferred locations based in input streams, or an empty iterable,
+ * if there is no input-based preference.
*/
- public Iterable<TaskManagerLocation> getPreferredLocations() {
+ public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
// otherwise, base the preferred locations on the input connections
if (inputEdges == null) {
return Collections.emptySet();
@@ -435,7 +484,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
}
// keep the locations of the input with the least preferred locations
- if(locations.isEmpty() || // nothing assigned yet
+ if (locations.isEmpty() || // nothing assigned yet
(!inputLocations.isEmpty() && inputLocations.size() < locations.size())) {
// current input has fewer preferred locations
locations.clear();
@@ -443,7 +492,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
}
- return locations;
+ return locations.isEmpty() ? Collections.<TaskManagerLocation>emptyList() : locations;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 6fac3c8..672431e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -984,8 +984,10 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
@Override
public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
- return gateway.allocateSlot(task, ResourceProfile.UNKNOWN,
- task.getTaskToExecute().getVertex().getPriorAssignedResourceLocations(), timeout);
+ Iterable<TaskManagerLocation> locationPreferences =
+ task.getTaskToExecute().getVertex().getPreferredLocations();
+
+ return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 346cc77..88fbc10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -262,7 +262,7 @@ public class SlotSharingGroupAssignment {
/**
* Gets a slot suitable for the given task vertex. This method will prefer slots that are local
- * (with respect to {@link ExecutionVertex#getPreferredLocations()}), but will return non local
+ * (with respect to {@link ExecutionVertex#getPreferredLocationsBasedOnInputs()}), but will return non local
* slots if no local slot is available. The method returns null, when this sharing group has
* no slot is available for the given JobVertexID.
*
@@ -271,7 +271,7 @@ public class SlotSharingGroupAssignment {
* @return A slot to execute the given ExecutionVertex in, or null, if none is available.
*/
public SimpleSlot getSlotForTask(ExecutionVertex vertex) {
- return getSlotForTask(vertex.getJobvertexId(), vertex.getPreferredLocations());
+ return getSlotForTask(vertex.getJobvertexId(), vertex.getPreferredLocationsBasedOnInputs());
}
/**
@@ -313,7 +313,7 @@ public class SlotSharingGroupAssignment {
* shared slot is available.
*/
public SimpleSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) {
- return getSlotForTask(constraint, vertex.getPreferredLocations());
+ return getSlotForTask(constraint, vertex.getPreferredLocationsBasedOnInputs());
}
SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable<TaskManagerLocation> locationPreferences) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 466a148..dc82440 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -174,7 +174,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
- final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();
+ final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocationsBasedOnInputs();
final boolean forceExternalLocation = false &&
preferredLocations != null && preferredLocations.iterator().hasNext();
@@ -240,7 +240,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
localOnly = true;
}
else {
- locations = vertex.getPreferredLocations();
+ locations = vertex.getPreferredLocationsBasedOnInputs();
localOnly = forceExternalLocation;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
index 269a8f3..4910862 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
@@ -65,7 +65,7 @@ public class AllocatedSlot {
JobID jobID,
TaskManagerLocation location,
int slotNumber,
- ResourceProfile resourceProfile,
+ ResourceProfile resourceProfile,
TaskManagerGateway taskManagerGateway) {
this.slotAllocationId = checkNotNull(slotAllocationId);
this.jobID = checkNotNull(jobID);
http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
new file mode 100644
index 0000000..36b7575
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * Tests that the execution vertex handles locality preferences well.
+ */
+public class ExecutionVertexLocalityTest extends TestLogger {
+
+ private final JobID jobId = new JobID();
+
+ private final JobVertexID sourceVertexId = new JobVertexID();
+ private final JobVertexID targetVertexId = new JobVertexID();
+
+ /**
+ * This test validates that vertices that have only one input stream try to
+ * co-locate their tasks with the producer.
+ */
+ @Test
+ public void testLocalityInputBasedForward() throws Exception {
+ final int parallelism = 10;
+ final TaskManagerLocation[] locations = new TaskManagerLocation[parallelism];
+
+ final ExecutionGraph graph = createTestGraph(parallelism, false);
+
+ // set the location for all sources to a distinct location
+ for (int i = 0; i < parallelism; i++) {
+ ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+
+ TaskManagerLocation location = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i);
+
+ locations[i] = location;
+ initializeLocation(source, location);
+ }
+
+ // validate that the target vertices have no location preference
+ for (int i = 0; i < parallelism; i++) {
+ ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+ Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+
+ assertTrue(preference.hasNext());
+ assertEquals(locations[i], preference.next());
+ assertFalse(preference.hasNext());
+ }
+ }
+
+ /**
+ * This test validates that vertices with too many input streams do not have a location
+ * preference any more.
+ */
+ @Test
+ public void testNoLocalityInputLargeAllToAll() throws Exception {
+ final int parallelism = 100;
+
+ final ExecutionGraph graph = createTestGraph(parallelism, true);
+
+ // set the location for all sources to a distinct location
+ for (int i = 0; i < parallelism; i++) {
+ ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+ TaskManagerLocation location = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i);
+ initializeLocation(source, location);
+ }
+
+ // validate that the target vertices have no location preference
+ for (int i = 0; i < parallelism; i++) {
+ ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+
+ Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+ assertFalse(preference.hasNext());
+ }
+ }
+
+ /**
+ * This test validates that stateful vertices schedule based in the state's location
+ * (which is the prior execution's location).
+ */
+ @Test
+ public void testLocalityBasedOnState() throws Exception {
+ final int parallelism = 10;
+ final TaskManagerLocation[] locations = new TaskManagerLocation[parallelism];
+
+ final ExecutionGraph graph = createTestGraph(parallelism, false);
+
+ // set the location for all sources and targets
+ for (int i = 0; i < parallelism; i++) {
+ ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+ ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+
+ TaskManagerLocation randomLocation = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i);
+
+ TaskManagerLocation location = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), 20000 + i);
+
+ locations[i] = location;
+ initializeLocation(source, randomLocation);
+ initializeLocation(target, location);
+
+ setState(source.getCurrentExecutionAttempt(), ExecutionState.CANCELED);
+ setState(target.getCurrentExecutionAttempt(), ExecutionState.CANCELED);
+ }
+
+ // mimic a restart: all vertices get re-initialized without actually being executed
+ for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) {
+ ejv.resetForNewExecution();
+ }
+
+ // set new location for the sources and some state for the targets
+ for (int i = 0; i < parallelism; i++) {
+ // source location
+ ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+ TaskManagerLocation randomLocation = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), 30000 + i);
+ initializeLocation(source, randomLocation);
+
+ // target state
+ ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+ target.getCurrentExecutionAttempt().setInitialState(mock(TaskStateHandles.class));
+ }
+
+ // validate that the target vertices have the state's location as the location preference
+ for (int i = 0; i < parallelism; i++) {
+ ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+ Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+
+ assertTrue(preference.hasNext());
+ assertEquals(locations[i], preference.next());
+ assertFalse(preference.hasNext());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a simple 2 vertex graph with a parallel source and a parallel target.
+ */
+ private ExecutionGraph createTestGraph(int parallelism, boolean allToAll) throws Exception {
+
+ JobVertex source = new JobVertex("source", sourceVertexId);
+ source.setParallelism(parallelism);
+ source.setInvokableClass(NoOpInvokable.class);
+
+ JobVertex target = new JobVertex("source", targetVertexId);
+ target.setParallelism(parallelism);
+ target.setInvokableClass(NoOpInvokable.class);
+
+ DistributionPattern connectionPattern = allToAll ? DistributionPattern.ALL_TO_ALL : DistributionPattern.POINTWISE;
+ target.connectNewDataSetAsInput(source, connectionPattern);
+
+ JobGraph testJob = new JobGraph(jobId, "test job", source, target);
+
+ return ExecutionGraphBuilder.buildGraph(
+ null,
+ testJob,
+ new Configuration(),
+ Executors.directExecutor(),
+ Executors.directExecutor(),
+ getClass().getClassLoader(),
+ new StandaloneCheckpointRecoveryFactory(),
+ Time.of(10, TimeUnit.SECONDS),
+ new FixedDelayRestartStrategy(10, 0L),
+ new UnregisteredMetricsGroup(),
+ 1,
+ log);
+ }
+
+ private void initializeLocation(ExecutionVertex vertex, TaskManagerLocation location) throws Exception {
+ // we need a bit of reflection magic to initialize the location without going through
+ // scheduling paths. we choose to do that, rather than the alternatives:
+ // - mocking the scheduler created fragile tests that break whenever the scheduler is adjusted
+ // - exposing test methods in the ExecutionVertex leads to undesirable setters
+
+ AllocatedSlot slot = new AllocatedSlot(
+ new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class));
+
+ SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0);
+
+ final Field locationField = Execution.class.getDeclaredField("assignedResource");
+ locationField.setAccessible(true);
+
+ locationField.set(vertex.getCurrentExecutionAttempt(), simpleSlot);
+ }
+
+ private void setState(Execution execution, ExecutionState state) throws Exception {
+ final Field stateField = Execution.class.getDeclaredField("state");
+ stateField.setAccessible(true);
+
+ stateField.set(execution, state);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index b36de77..9e692ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -106,7 +106,7 @@ public class SchedulerTestUtils {
public static Execution getTestVertex(Iterable<TaskManagerLocation> preferredLocations) {
ExecutionVertex vertex = mock(ExecutionVertex.class);
- when(vertex.getPreferredLocations()).thenReturn(preferredLocations);
+ when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocations);
when(vertex.getJobId()).thenReturn(new JobID());
when(vertex.toString()).thenReturn("TEST-VERTEX");
@@ -119,7 +119,7 @@ public class SchedulerTestUtils {
public static Execution getTestVertex(JobVertexID jid, int taskIndex, int numTasks) {
ExecutionVertex vertex = mock(ExecutionVertex.class);
- when(vertex.getPreferredLocations()).thenReturn(null);
+ when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(null);
when(vertex.getJobId()).thenReturn(new JobID());
when(vertex.getJobvertexId()).thenReturn(jid);
when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
@@ -139,7 +139,7 @@ public class SchedulerTestUtils {
ExecutionVertex vertex = mock(ExecutionVertex.class);
- when(vertex.getPreferredLocations()).thenReturn(Arrays.asList(locations));
+ when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Arrays.asList(locations));
when(vertex.getJobId()).thenReturn(new JobID());
when(vertex.getJobvertexId()).thenReturn(jid);
when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);