You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/03 12:37:27 UTC

[1/6] flink git commit: [FLINK-5499] [JobManager] Reuse the resource location of prior execution attempt in allocating slot

Repository: flink
Updated Branches:
  refs/heads/master d52d006c6 -> 7b1857d84


[FLINK-5499] [JobManager] Reuse the resource location of prior execution attempt in allocating slot

This closes #3125


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2e107b1c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2e107b1c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2e107b1c

Branch: refs/heads/master
Commit: 2e107b1cfaa6e31fe478191c74aa25d53ab49943
Parents: d52d006
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Authored: Mon Jan 16 17:28:19 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 10:28:22 2017 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionVertex.java    | 17 +++++++++++++++++
 .../apache/flink/runtime/instance/SlotPool.java    |  4 ++--
 2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2e107b1c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 6084ad6..d840d89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -264,6 +264,23 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		}
 	}
 
+	/**
+	 * Just return the last assigned resource location if found
+	 *
+	 * @return The collection of TaskManagerLocation
+	 */
+	public List<TaskManagerLocation> getPriorAssignedResourceLocations() {
+		List<TaskManagerLocation> list = new ArrayList<>();
+		for (int i = priorExecutions.size() - 1 ; i >= 0 ; i--) {
+			Execution prior = priorExecutions.get(i) ;
+			if (prior.getAssignedResourceLocation() != null) {
+				list.add(prior.getAssignedResourceLocation());
+				break;
+			}
+		}
+		return list;
+	}
+
 	EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
 		synchronized (priorExecutions) {
 			return new EvictingBoundedList<>(priorExecutions);

http://git-wip-us.apache.org/repos/asf/flink/blob/2e107b1c/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 2a9aca7..6fac3c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -984,8 +984,8 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
 		@Override
 		public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
-			return gateway.allocateSlot(
-					task, ResourceProfile.UNKNOWN, Collections.<TaskManagerLocation>emptyList(), timeout);
+			return gateway.allocateSlot(task, ResourceProfile.UNKNOWN,
+					task.getTaskToExecute().getVertex().getPriorAssignedResourceLocations(), timeout);
 		}
 	}
 


[4/6] flink git commit: [hotfix] Minor code cleanups in the ExecutionGraph's Execution

Posted by se...@apache.org.
[hotfix] Minor code cleanups in the ExecutionGraph's Execution


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10e4e321
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10e4e321
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10e4e321

Branch: refs/heads/master
Commit: 10e4e321b335b6f9376501f90715e31b71b02da8
Parents: 2e107b1
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 17:21:36 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 10:28:23 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 24 ++++++++------------
 .../taskmanager/TaskManagerLocation.java        |  2 +-
 2 files changed, 10 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10e4e321/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 18a4445..c2fe5ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
@@ -70,6 +71,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
 import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
@@ -112,7 +114,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	private final Time timeout;
 
-	private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
+	private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
 
 	private volatile ExecutionState state = CREATED;
 
@@ -120,8 +122,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	private volatile Throwable failureCause;          // once assigned, never changes
 
-	private volatile TaskManagerLocation assignedResourceLocation; // for the archived execution
-
 	private TaskStateHandles taskStateHandles;
 
 	/** The executor which is used to execute futures. */
@@ -189,7 +189,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	@Override
 	public TaskManagerLocation getAssignedResourceLocation() {
-		return assignedResourceLocation;
+		// returns non-null only when a location is already assigned
+		return assignedResource != null ? assignedResource.getTaskManagerLocation() : null;
 	}
 
 	public Throwable getFailureCause() {
@@ -226,11 +227,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param checkpointStateHandles all checkpointed operator state
 	 */
 	public void setInitialState(TaskStateHandles checkpointStateHandles) {
-
-		if (state != ExecutionState.CREATED) {
-			throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
-		}
-
+		checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED");
 		this.taskStateHandles = checkpointStateHandles;
 	}
 
@@ -343,7 +340,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
 			}
 			this.assignedResource = slot;
-			this.assignedResourceLocation = slot.getTaskManagerLocation();
 
 			// race double check, did we fail/cancel and do we need to release the slot?
 			if (this.state != DEPLOYING) {
@@ -353,7 +349,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 			if (LOG.isInfoEnabled()) {
 				LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getSimpleName(),
-						attemptNumber, assignedResourceLocation.getHostname()));
+						attemptNumber, getAssignedResourceLocation().getHostname()));
 			}
 
 			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
@@ -373,12 +369,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				@Override
 				public Void apply(Throwable failure) {
 					if (failure instanceof TimeoutException) {
-						String taskname = vertex.getTaskName() + '(' +
-							(getParallelSubtaskIndex() + 1) + '/' +
-							vertex.getTotalNumberOfParallelSubtasks() + ") (" + attemptId + ')';
+						String taskname = vertex.getTaskNameWithSubtaskIndex()+ " (" + attemptId + ')';
 
 						markFailed(new Exception(
-							"Cannot deploy task " + taskname + " - TaskManager (" + assignedResourceLocation
+							"Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
 								+ ") not responding after a timeout of " + timeout, failure));
 					}
 					else {

http://git-wip-us.apache.org/repos/asf/flink/blob/10e4e321/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
index 01d0654..956a2a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
@@ -219,7 +219,7 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav
 
 	@Override
 	public int compareTo(@Nonnull TaskManagerLocation o) {
-		// decide based on address first
+		// decide based on resource ID first
 		int resourceIdCmp = this.resourceID.getResourceIdString().compareTo(o.resourceID.getResourceIdString());
 		if (resourceIdCmp != 0) {
 			return resourceIdCmp;


[2/6] flink git commit: [hotfix] [jobmanager] Cleanups in the ExecutionGraph

Posted by se...@apache.org.
[hotfix] [jobmanager] Cleanups in the ExecutionGraph

  - Making fields final where possible
  - Making fields volatile where needed or advisable
  - Remove some dead code/functionality


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe4fe587
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe4fe587
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe4fe587

Branch: refs/heads/master
Commit: fe4fe5872883f3de362c4d6864b21a66bcbf5d4e
Parents: 4820b41
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 19:55:59 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 10:28:23 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 33 +++++++++++---------
 .../runtime/executiongraph/ExecutionVertex.java | 19 ++---------
 .../runtime/jobmanager/scheduler/Scheduler.java |  5 +--
 3 files changed, 24 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c2fe5ea..e29e5b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -104,8 +104,13 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	// --------------------------------------------------------------------------------------------
 
+	/** The executor which is used to execute futures. */
+	private final Executor executor;
+
+	/** The execution vertex whose task this execution executes */ 
 	private final ExecutionVertex vertex;
 
+	/** The unique ID marking the specific execution instant of the task */
 	private final ExecutionAttemptID attemptId;
 
 	private final long[] stateTimestamps;
@@ -122,41 +127,39 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	private volatile Throwable failureCause;          // once assigned, never changes
 
-	private TaskStateHandles taskStateHandles;
+	/** The handle to the state that the task gets on restore */
+	private volatile TaskStateHandles taskState;
 
-	/** The executor which is used to execute futures. */
-	private Executor executor;
+	// ------------------------ Accumulators & Metrics ------------------------
 
-	// ------------------------- Accumulators ---------------------------------
-	
-	/* Lock for updating the accumulators atomically. Prevents final accumulators to be overwritten
-	* by partial accumulators on a late heartbeat*/
+	/** Lock for updating the accumulators atomically.
+	 * Prevents final accumulators to be overwritten by partial accumulators on a late heartbeat */
 	private final Object accumulatorLock = new Object();
 
 	/* Continuously updated map of user-defined accumulators */
 	private volatile Map<String, Accumulator<?, ?>> userAccumulators;
-	private IOMetrics ioMetrics;
+
+	private volatile IOMetrics ioMetrics;
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	public Execution(
 			Executor executor,
 			ExecutionVertex vertex,
 			int attemptNumber,
 			long startTimestamp,
 			Time timeout) {
-		this.executor = checkNotNull(executor);
 
+		this.executor = checkNotNull(executor);
 		this.vertex = checkNotNull(vertex);
 		this.attemptId = new ExecutionAttemptID();
+		this.timeout = checkNotNull(timeout);
 
 		this.attemptNumber = attemptNumber;
 
 		this.stateTimestamps = new long[ExecutionState.values().length];
 		markTimestamp(ExecutionState.CREATED, startTimestamp);
 
-		this.timeout = checkNotNull(timeout);
-
 		this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
 	}
 
@@ -217,7 +220,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	public TaskStateHandles getTaskStateHandles() {
-		return taskStateHandles;
+		return taskState;
 	}
 
 	/**
@@ -228,7 +231,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 */
 	public void setInitialState(TaskStateHandles checkpointStateHandles) {
 		checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED");
-		this.taskStateHandles = checkpointStateHandles;
+		this.taskState = checkpointStateHandles;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -355,7 +358,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
 				attemptId,
 				slot,
-				taskStateHandles,
+				taskState,
 				attemptNumber);
 
 			// register this execution at the execution graph, to receive call backs

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index d840d89..0bb3514 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -77,9 +77,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	private final ExecutionJobVertex jobVertex;
 
-	private Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;
+	private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;
 
-	private ExecutionEdge[][] inputEdges;
+	private final ExecutionEdge[][] inputEdges;
 
 	private final int subTaskIndex;
 
@@ -92,10 +92,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	private volatile CoLocationConstraint locationConstraint;
 
+	/** The current or latest execution attempt of this vertex's task */
 	private volatile Execution currentExecution;	// this field must never be null
 
-	private volatile boolean scheduleLocalOnly;
-
 	// --------------------------------------------------------------------------------------------
 
 	public ExecutionVertex(
@@ -398,18 +397,6 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		}
 	}
 
-	public void setScheduleLocalOnly(boolean scheduleLocalOnly) {
-		if (scheduleLocalOnly && inputEdges != null && inputEdges.length > 0) {
-			throw new IllegalArgumentException("Strictly local scheduling is only supported for sources.");
-		}
-
-		this.scheduleLocalOnly = scheduleLocalOnly;
-	}
-
-	public boolean isScheduleLocalOnly() {
-		return scheduleLocalOnly;
-	}
-
 	/**
 	 * Gets the location preferences of this task, determined by the locations of the predecessors from which
 	 * it receives input data.

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index aa09314..466a148 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -153,9 +153,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 			return FlinkCompletableFuture.completed((SimpleSlot) ret);
 		}
 		else if (ret instanceof Future) {
-			return (Future) ret;
+			return (Future<SimpleSlot>) ret;
 		}
 		else {
+			// this should never happen, simply guard this case with an exception
 			throw new RuntimeException();
 		}
 	}
@@ -174,7 +175,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 		final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 		
 		final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();
-		final boolean forceExternalLocation = vertex.isScheduleLocalOnly() &&
+		final boolean forceExternalLocation = false &&
 									preferredLocations != null && preferredLocations.iterator().hasNext();
 	
 		synchronized (globalLock) {


[3/6] flink git commit: [hotfix] [jobmanager] Reduce complexits when archiving ExecutionVertex

Posted by se...@apache.org.
[hotfix] [jobmanager] Reduce complexits when archiving ExecutionVertex

This fixes the inefficiency where the archiving operation iterated over the entire
evicted history of prior execution attempts when converting them to
archived executions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4820b413
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4820b413
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4820b413

Branch: refs/heads/master
Commit: 4820b413a55a3bbb1853251ecdb94b4c70dc5e2b
Parents: 10e4e32
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 19:52:57 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 10:28:23 2017 +0100

----------------------------------------------------------------------
 .../executiongraph/ArchivedExecutionVertex.java | 22 ++++-
 .../flink/runtime/util/EvictingBoundedList.java | 73 ++++++++++++++-
 .../runtime/util/EvictingBoundedListTest.java   | 97 +++++++++++++++++++-
 3 files changed, 182 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index 56fc7a6..5053cae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
 public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {
 
 	private static final long serialVersionUID = -6708241535015028576L;
+
 	private final int subTaskIndex;
 
 	private final EvictingBoundedList<ArchivedExecution> priorExecutions;
@@ -35,13 +36,11 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 
 	private final ArchivedExecution currentExecution;    // this field must never be null
 
+	// ------------------------------------------------------------------------
+
 	public ArchivedExecutionVertex(ExecutionVertex vertex) {
 		this.subTaskIndex = vertex.getParallelSubtaskIndex();
-		EvictingBoundedList<Execution> copyOfPriorExecutionsList = vertex.getCopyOfPriorExecutionsList();
-		priorExecutions = new EvictingBoundedList<>(copyOfPriorExecutionsList.getSizeLimit());
-		for (Execution priorExecution : copyOfPriorExecutionsList) {
-			priorExecutions.add(priorExecution != null ? priorExecution.archive() : null);
-		}
+		this.priorExecutions = vertex.getCopyOfPriorExecutionsList().map(ARCHIVER);
 		this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
 		this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
 	}
@@ -93,4 +92,17 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 			throw new IllegalArgumentException("attempt does not exist");
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static final EvictingBoundedList.Function<Execution, ArchivedExecution> ARCHIVER =
+			new EvictingBoundedList.Function<Execution, ArchivedExecution>() {
+
+		@Override
+		public ArchivedExecution apply(Execution value) {
+			return value.archive();
+		}
+	};
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
index f4c155a..2c5b6a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
@@ -29,8 +29,9 @@ import java.util.NoSuchElementException;
  * This class implements a list (array based) that is physically bounded in maximum size, but can virtually grow beyond
  * the bounded size. When the list grows beyond the size bound, elements are dropped from the head of the list (FIFO
  * order). If dropped elements are accessed, a default element is returned instead.
- * <p>
- * TODO this class could eventually implement the whole actual List interface.
+ * 
+ * <p>The list by itself is serializable, but a full list can only be serialized if the values
+ * are also serializable.
  *
  * @param <T> type of the list elements
  */
@@ -38,12 +39,25 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
 
 	private static final long serialVersionUID = -1863961980953613146L;
 
+	@SuppressWarnings("NonSerializableFieldInSerializableClass")
+	/** the default element returned for positions that were evicted */
 	private final T defaultElement;
+
+	@SuppressWarnings("NonSerializableFieldInSerializableClass")
+	/** the array (viewed as a circular buffer) that holds the latest (= non-evicted) elements */
 	private final Object[] elements;
+
+	/** The next index to put an element in the array */
 	private int idx;
+
+	/** The current number of (virtual) elements in the list */
 	private int count;
+
+	/** Modification count for fail-fast iterators */
 	private long modCount;
 
+	// ------------------------------------------------------------------------
+
 	public EvictingBoundedList(int sizeLimit) {
 		this(sizeLimit, null);
 	}
@@ -65,6 +79,8 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
 		this.modCount = 0L;
 	}
 
+	// ------------------------------------------------------------------------
+
 	public int size() {
 		return count;
 	}
@@ -93,8 +109,11 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
 	}
 
 	public T get(int index) {
-		Preconditions.checkArgument(index >= 0 && index < count);
-		return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length);
+		if (index >= 0 && index < count) {
+			return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length);
+		} else {
+			throw new IndexOutOfBoundsException(String.valueOf(index));
+		}
 	}
 
 	public int getSizeLimit() {
@@ -157,4 +176,50 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
 			}
 		};
 	}
+
+	/**
+	 * Creates a new list that replaces its elements with transformed elements.
+	 * The list retains the same size and position-to-element mapping.
+	 * 
+	 * <p>Note that null values are automatically mapped to null values.
+	 * 
+	 * @param transform The function used to transform each element
+	 * @param <R> The type of the elements in the result list.
+	 * 
+	 * @return The list with the mapped elements
+	 */
+	public <R> EvictingBoundedList<R> map(Function<T, R> transform) {
+		// map the default element
+		final R newDefault = defaultElement == null ? null : transform.apply(defaultElement);
+
+		// copy the list with the new default
+		final EvictingBoundedList<R> result = new EvictingBoundedList<>(elements.length, newDefault);
+		result.count = count;
+		result.idx = idx;
+
+		// map all the entries in the list
+		final int numElements = Math.min(elements.length, count);
+		for (int i = 0; i < numElements; i++) {
+			result.elements[i] = transform.apply(accessInternal(i));
+		}
+
+		return result;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A simple unary function that can be used to transform elements via the
+	 * {@link EvictingBoundedList#map(Function)} method.
+	 */
+	public interface Function<I, O> {
+
+		/**
+		 * Transforms the value.
+		 * 
+		 * @param value The value to transform
+		 * @return The transformed value
+		 */
+		O apply(I value);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
index e0a1c70..7109dac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
@@ -28,9 +28,12 @@ import java.util.NoSuchElementException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 public class EvictingBoundedListTest {
 
 	@Test
@@ -114,7 +117,7 @@ public class EvictingBoundedListTest {
 		try {
 			list.get(0);
 			fail();
-		} catch (IllegalArgumentException ignore) {
+		} catch (IndexOutOfBoundsException ignore) {
 		}
 	}
 
@@ -161,4 +164,96 @@ public class EvictingBoundedListTest {
 
 		}
 	}
+
+	@Test
+	public void testMapWithHalfFullList() {
+		final Object[] originals = { new Object(), new Object(), new Object() };
+		final Object defaultValue = new Object();
+
+		final EvictingBoundedList<Object> original = new EvictingBoundedList<>(5, defaultValue);
+		for (Object o : originals) {
+			original.add(o);
+		}
+
+		final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper());
+
+		assertEquals(original.size(), transformed.size());
+		assertEquals(original.getSizeLimit(), transformed.getSizeLimit());
+		assertEquals(defaultValue, transformed.getDefaultElement().original);
+
+		int i = 0;
+		for (TransformedObject to : transformed) {
+			assertEquals(originals[i++], to.original); 
+		}
+
+		try {
+			transformed.get(originals.length);
+			fail("should have failed with an exception");
+		} catch (IndexOutOfBoundsException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testMapWithEvictedElements() {
+		final Object[] originals = { new Object(), new Object(), new Object(), new Object(), new Object() };
+		final Object defaultValue = new Object();
+
+		final EvictingBoundedList<Object> original = new EvictingBoundedList<>(2, defaultValue);
+		for (Object o : originals) {
+			original.add(o);
+		}
+
+		final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper());
+
+		assertEquals(originals.length, transformed.size());
+		assertEquals(original.size(), transformed.size());
+		assertEquals(original.getSizeLimit(), transformed.getSizeLimit());
+		assertEquals(defaultValue, transformed.getDefaultElement().original);
+
+		for (int i = 0; i < originals.length; i++) {
+			if (i < originals.length - transformed.getSizeLimit()) {
+				assertEquals(transformed.getDefaultElement(), transformed.get(i));
+			} else {
+				assertEquals(originals[i], transformed.get(i).original);
+			}
+		}
+
+		try {
+			transformed.get(originals.length);
+			fail("should have failed with an exception");
+		} catch (IndexOutOfBoundsException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testMapWithNullDefault() {
+		final EvictingBoundedList<Object> original = new EvictingBoundedList<>(5, null);
+		final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper());
+
+		assertEquals(original.size(), transformed.size());
+		assertNull(transformed.getDefaultElement());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TransformedObject {
+
+		final Object original;
+
+		TransformedObject(Object original) {
+			this.original = checkNotNull(original);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class Mapper implements EvictingBoundedList.Function<Object, TransformedObject> {
+
+		@Override
+		public TransformedObject apply(Object value) {
+			return new TransformedObject(value);
+		}
+	}
 }


[6/6] flink git commit: [hotfix] [tests] Increase timeout for AkkaRpcActorTest to mitigate occasional CI test timeouts

Posted by se...@apache.org.
[hotfix] [tests] Increase timeout for AkkaRpcActorTest to mitigate occasional CI test timeouts


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b1857d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b1857d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b1857d8

Branch: refs/heads/master
Commit: 7b1857d849efd3611be797679b11e8c7b7ed61fb
Parents: b9ed4ff
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 3 11:25:12 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 12:46:14 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b1857d8/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index c73240c..3c40bc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -172,7 +172,7 @@ public class AkkaRpcActorTest extends TestLogger {
 	 * @throws ExecutionException
 	 * @throws InterruptedException
 	 */
-	@Test(timeout=1000)
+	@Test(timeout=5000)
 	public void testRpcEndpointTerminationFuture() throws Exception {
 		final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
 		rpcEndpoint.start();


[5/6] flink git commit: [FLINK-5499] [JobManager] Make the location preferences combined by state and inputs.

Posted by se...@apache.org.
[FLINK-5499] [JobManager] Make the location preferences combined by state and inputs.

Reusing the prior location (for state locality) takes precedence over input locality.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9ed4ff1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9ed4ff1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9ed4ff1

Branch: refs/heads/master
Commit: b9ed4ff151c5d3a64be395c660160b5619e32c7f
Parents: fe4fe58
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 20:34:33 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 12:46:14 2017 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionVertex.java |  83 +++++--
 .../apache/flink/runtime/instance/SlotPool.java |   6 +-
 .../instance/SlotSharingGroupAssignment.java    |   6 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |   4 +-
 .../runtime/jobmanager/slots/AllocatedSlot.java |   2 +-
 .../ExecutionVertexLocalityTest.java            | 244 +++++++++++++++++++
 .../scheduler/SchedulerTestUtils.java           |   6 +-
 7 files changed, 323 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 0bb3514..cb2e177 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.util.EvictingBoundedList;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 
 import java.io.IOException;
@@ -264,20 +265,21 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	}
 
 	/**
-	 * Just return the last assigned resource location if found
-	 *
-	 * @return The collection of TaskManagerLocation
+	 * Gets the location where the latest completed/canceled/failed execution of the vertex's
+	 * task happened.
+	 * 
+	 * @return The latest prior execution location, or null, if there is none, yet.
 	 */
-	public List<TaskManagerLocation> getPriorAssignedResourceLocations() {
-		List<TaskManagerLocation> list = new ArrayList<>();
-		for (int i = priorExecutions.size() - 1 ; i >= 0 ; i--) {
-			Execution prior = priorExecutions.get(i) ;
-			if (prior.getAssignedResourceLocation() != null) {
-				list.add(prior.getAssignedResourceLocation());
-				break;
+	public TaskManagerLocation getLatestPriorLocation() {
+		synchronized (priorExecutions) {
+			final int size = priorExecutions.size();
+			if (size > 0) {
+				return priorExecutions.get(size - 1).getAssignedResourceLocation();
+			}
+			else {
+				return null;
 			}
 		}
-		return list;
 	}
 
 	EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
@@ -398,14 +400,61 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	}
 
 	/**
-	 * Gets the location preferences of this task, determined by the locations of the predecessors from which
-	 * it receives input data.
+	 * Gets the overall preferred execution location for this vertex's current execution.
+	 * The preference is determined as follows:
+	 * 
+	 * <ol>
+	 *     <li>If the task execution has state to load (from a checkpoint), then the location preference
+	 *         is the location of the previous execution (if there is a previous execution attempt).
+	 *     <li>If the task execution has no state or no previous location, then the location preference
+	 *         is based on the task's inputs.
+	 * </ol>
+	 * 
+	 * These rules should result in the following behavior:
+	 * 
+	 * <ul>
+	 *     <li>Stateless tasks are always scheduled based on co-location with inputs.
+	 *     <li>Stateful tasks are on their initial attempt executed based on co-location with inputs.
+	 *     <li>Repeated executions of stateful tasks try to co-locate the execution with its state.
+	 * </ul>
+	 * 
+	 * @return The preferred excution locations for the execution attempt.
+	 * 
+	 * @see #getPreferredLocationsBasedOnState()
+	 * @see #getPreferredLocationsBasedOnInputs() 
+	 */
+	public Iterable<TaskManagerLocation> getPreferredLocations() {
+		Iterable<TaskManagerLocation> basedOnState = getPreferredLocationsBasedOnState();
+		return basedOnState != null ? basedOnState : getPreferredLocationsBasedOnInputs();
+	}
+	
+	/**
+	 * Gets the preferred location to execute the current task execution attempt, based on the state
+	 * that the execution attempt will resume.
+	 * 
+	 * @return A size-one iterable with the location preference, or null, if there is no
+	 *         location preference based on the state.
+	 */
+	public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnState() {
+		TaskManagerLocation priorLocation;
+		if (currentExecution.getTaskStateHandles() != null && (priorLocation = getLatestPriorLocation()) != null) {
+			return Collections.singleton(priorLocation);
+		}
+		else {
+			return null;
+		}
+	}
+
+	/**
+	 * Gets the location preferences of the vertex's current task execution, as determined by the locations
+	 * of the predecessors from which it receives input data.
 	 * If there are more than MAX_DISTINCT_LOCATIONS_TO_CONSIDER different locations of source data, this
 	 * method returns {@code null} to indicate no location preference.
 	 *
-	 * @return The preferred locations for this vertex execution, or null, if there is no preference.
+	 * @return The preferred locations based in input streams, or an empty iterable,
+	 *         if there is no input-based preference.
 	 */
-	public Iterable<TaskManagerLocation> getPreferredLocations() {
+	public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
 		// otherwise, base the preferred locations on the input connections
 		if (inputEdges == null) {
 			return Collections.emptySet();
@@ -435,7 +484,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 					}
 				}
 				// keep the locations of the input with the least preferred locations
-				if(locations.isEmpty() || // nothing assigned yet
+				if (locations.isEmpty() || // nothing assigned yet
 						(!inputLocations.isEmpty() && inputLocations.size() < locations.size())) {
 					// current input has fewer preferred locations
 					locations.clear();
@@ -443,7 +492,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 				}
 			}
 
-			return locations;
+			return locations.isEmpty() ? Collections.<TaskManagerLocation>emptyList() : locations;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 6fac3c8..672431e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -984,8 +984,10 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
 		@Override
 		public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
-			return gateway.allocateSlot(task, ResourceProfile.UNKNOWN,
-					task.getTaskToExecute().getVertex().getPriorAssignedResourceLocations(), timeout);
+			Iterable<TaskManagerLocation> locationPreferences = 
+					task.getTaskToExecute().getVertex().getPreferredLocations();
+
+			return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 346cc77..88fbc10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -262,7 +262,7 @@ public class SlotSharingGroupAssignment {
 
 	/**
 	 * Gets a slot suitable for the given task vertex. This method will prefer slots that are local
-	 * (with respect to {@link ExecutionVertex#getPreferredLocations()}), but will return non local
+	 * (with respect to {@link ExecutionVertex#getPreferredLocationsBasedOnInputs()}), but will return non local
 	 * slots if no local slot is available. The method returns null, when this sharing group has
 	 * no slot is available for the given JobVertexID. 
 	 *
@@ -271,7 +271,7 @@ public class SlotSharingGroupAssignment {
 	 * @return A slot to execute the given ExecutionVertex in, or null, if none is available.
 	 */
 	public SimpleSlot getSlotForTask(ExecutionVertex vertex) {
-		return getSlotForTask(vertex.getJobvertexId(), vertex.getPreferredLocations());
+		return getSlotForTask(vertex.getJobvertexId(), vertex.getPreferredLocationsBasedOnInputs());
 	}
 
 	/**
@@ -313,7 +313,7 @@ public class SlotSharingGroupAssignment {
 	 *         shared slot is available.
 	 */
 	public SimpleSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) {
-		return getSlotForTask(constraint, vertex.getPreferredLocations());
+		return getSlotForTask(constraint, vertex.getPreferredLocationsBasedOnInputs());
 	}
 	
 	SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable<TaskManagerLocation> locationPreferences) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 466a148..dc82440 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -174,7 +174,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 
 		final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 		
-		final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();
+		final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocationsBasedOnInputs();
 		final boolean forceExternalLocation = false &&
 									preferredLocations != null && preferredLocations.iterator().hasNext();
 	
@@ -240,7 +240,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 						localOnly = true;
 					}
 					else {
-						locations = vertex.getPreferredLocations();
+						locations = vertex.getPreferredLocationsBasedOnInputs();
 						localOnly = forceExternalLocation;
 					}
 					

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
index 269a8f3..4910862 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
@@ -65,7 +65,7 @@ public class AllocatedSlot {
 			JobID jobID,
 			TaskManagerLocation location,
 			int slotNumber,
-			ResourceProfile resourceProfile,		
+			ResourceProfile resourceProfile,
 			TaskManagerGateway taskManagerGateway) {
 		this.slotAllocationId = checkNotNull(slotAllocationId);
 		this.jobID = checkNotNull(jobID);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
new file mode 100644
index 0000000..36b7575
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * Tests that the execution vertex handles locality preferences well.
+ */
+public class ExecutionVertexLocalityTest extends TestLogger {
+
+	private final JobID jobId = new JobID();
+
+	private final JobVertexID sourceVertexId = new JobVertexID();
+	private final JobVertexID targetVertexId = new JobVertexID();
+
+	/**
+	 * This test validates that vertices that have only one input stream try to
+	 * co-locate their tasks with the producer.
+	 */
+	@Test
+	public void testLocalityInputBasedForward() throws Exception {
+		final int parallelism = 10;
+		final TaskManagerLocation[] locations = new TaskManagerLocation[parallelism];
+
+		final ExecutionGraph graph = createTestGraph(parallelism, false);
+
+		// set the location for all sources to a distinct location
+		for (int i = 0; i < parallelism; i++) {
+			ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+
+			TaskManagerLocation location = new TaskManagerLocation(
+					ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i);
+
+			locations[i] = location;
+			initializeLocation(source, location);
+		}
+
+		// validate that the target vertices have no location preference
+		for (int i = 0; i < parallelism; i++) {
+			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+			Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+
+			assertTrue(preference.hasNext());
+			assertEquals(locations[i], preference.next());
+			assertFalse(preference.hasNext());
+		}
+	}
+
+	/**
+	 * This test validates that vertices with too many input streams do not have a location
+	 * preference any more.
+	 */
+	@Test
+	public void testNoLocalityInputLargeAllToAll() throws Exception {
+		final int parallelism = 100;
+
+		final ExecutionGraph graph = createTestGraph(parallelism, true);
+
+		// set the location for all sources to a distinct location
+		for (int i = 0; i < parallelism; i++) {
+			ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+			TaskManagerLocation location = new TaskManagerLocation(
+					ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i);
+			initializeLocation(source, location);
+		}
+
+		// validate that the target vertices have no location preference
+		for (int i = 0; i < parallelism; i++) {
+			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+
+			Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+			assertFalse(preference.hasNext());
+		}
+	}
+
+	/**
+	 * This test validates that stateful vertices schedule based in the state's location
+	 * (which is the prior execution's location).
+	 */
+	@Test
+	public void testLocalityBasedOnState() throws Exception {
+		final int parallelism = 10;
+		final TaskManagerLocation[] locations = new TaskManagerLocation[parallelism];
+
+		final ExecutionGraph graph = createTestGraph(parallelism, false);
+
+		// set the location for all sources and targets
+		for (int i = 0; i < parallelism; i++) {
+			ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+
+			TaskManagerLocation randomLocation = new TaskManagerLocation(
+					ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i);
+			
+			TaskManagerLocation location = new TaskManagerLocation(
+					ResourceID.generate(), InetAddress.getLoopbackAddress(), 20000 + i);
+
+			locations[i] = location;
+			initializeLocation(source, randomLocation);
+			initializeLocation(target, location);
+
+			setState(source.getCurrentExecutionAttempt(), ExecutionState.CANCELED);
+			setState(target.getCurrentExecutionAttempt(), ExecutionState.CANCELED);
+		}
+
+		// mimic a restart: all vertices get re-initialized without actually being executed
+		for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) {
+			ejv.resetForNewExecution();
+		}
+
+		// set new location for the sources and some state for the targets
+		for (int i = 0; i < parallelism; i++) {
+			// source location
+			ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+			TaskManagerLocation randomLocation = new TaskManagerLocation(
+					ResourceID.generate(), InetAddress.getLoopbackAddress(), 30000 + i);
+			initializeLocation(source, randomLocation);
+
+			// target state
+			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+			target.getCurrentExecutionAttempt().setInitialState(mock(TaskStateHandles.class));
+		}
+
+		// validate that the target vertices have the state's location as the location preference
+		for (int i = 0; i < parallelism; i++) {
+			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+			Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+
+			assertTrue(preference.hasNext());
+			assertEquals(locations[i], preference.next());
+			assertFalse(preference.hasNext());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a simple 2 vertex graph with a parallel source and a parallel target.
+	 */
+	private ExecutionGraph createTestGraph(int parallelism, boolean allToAll) throws Exception {
+
+		JobVertex source = new JobVertex("source", sourceVertexId);
+		source.setParallelism(parallelism);
+		source.setInvokableClass(NoOpInvokable.class);
+
+		JobVertex target = new JobVertex("source", targetVertexId);
+		target.setParallelism(parallelism);
+		target.setInvokableClass(NoOpInvokable.class);
+
+		DistributionPattern connectionPattern = allToAll ? DistributionPattern.ALL_TO_ALL : DistributionPattern.POINTWISE;
+		target.connectNewDataSetAsInput(source, connectionPattern);
+
+		JobGraph testJob = new JobGraph(jobId, "test job", source, target);
+
+		return ExecutionGraphBuilder.buildGraph(
+				null,
+				testJob,
+				new Configuration(),
+				Executors.directExecutor(),
+				Executors.directExecutor(),
+				getClass().getClassLoader(),
+				new StandaloneCheckpointRecoveryFactory(),
+				Time.of(10, TimeUnit.SECONDS),
+				new FixedDelayRestartStrategy(10, 0L),
+				new UnregisteredMetricsGroup(),
+				1,
+				log);
+	}
+
+	private void initializeLocation(ExecutionVertex vertex, TaskManagerLocation location) throws Exception {
+		// we need a bit of reflection magic to initialize the location without going through
+		// scheduling paths. we choose to do that, rather than the alternatives:
+		//  - mocking the scheduler created fragile tests that break whenever the scheduler is adjusted
+		//  - exposing test methods in the ExecutionVertex leads to undesirable setters 
+
+		AllocatedSlot slot = new AllocatedSlot(
+				new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class));
+
+		SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0);
+
+		final Field locationField = Execution.class.getDeclaredField("assignedResource");
+		locationField.setAccessible(true);
+
+		locationField.set(vertex.getCurrentExecutionAttempt(), simpleSlot);
+	}
+
+	private void setState(Execution execution, ExecutionState state) throws Exception {
+		final Field stateField = Execution.class.getDeclaredField("state");
+		stateField.setAccessible(true);
+
+		stateField.set(execution, state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index b36de77..9e692ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -106,7 +106,7 @@ public class SchedulerTestUtils {
 	public static Execution getTestVertex(Iterable<TaskManagerLocation> preferredLocations) {
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		
-		when(vertex.getPreferredLocations()).thenReturn(preferredLocations);
+		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocations);
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
 		
@@ -119,7 +119,7 @@ public class SchedulerTestUtils {
 	public static Execution getTestVertex(JobVertexID jid, int taskIndex, int numTasks) {
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		
-		when(vertex.getPreferredLocations()).thenReturn(null);
+		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(null);
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.getJobvertexId()).thenReturn(jid);
 		when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
@@ -139,7 +139,7 @@ public class SchedulerTestUtils {
 
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 
-		when(vertex.getPreferredLocations()).thenReturn(Arrays.asList(locations));
+		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Arrays.asList(locations));
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.getJobvertexId()).thenReturn(jid);
 		when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);