You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/03 12:37:27 UTC
[1/6] flink git commit: [FLINK-5499] [JobManager] Reuse the resource
location of prior execution attempt in allocating slot
Repository: flink
Updated Branches:
refs/heads/master d52d006c6 -> 7b1857d84
[FLINK-5499] [JobManager] Reuse the resource location of prior execution attempt in allocating slot
This closes #3125
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2e107b1c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2e107b1c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2e107b1c
Branch: refs/heads/master
Commit: 2e107b1cfaa6e31fe478191c74aa25d53ab49943
Parents: d52d006
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Authored: Mon Jan 16 17:28:19 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 10:28:22 2017 +0100
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionVertex.java | 17 +++++++++++++++++
.../apache/flink/runtime/instance/SlotPool.java | 4 ++--
2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2e107b1c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 6084ad6..d840d89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -264,6 +264,23 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
}
+ /**
+ * Just return the last assigned resource location if found
+ *
+ * @return The collection of TaskManagerLocation
+ */
+ public List<TaskManagerLocation> getPriorAssignedResourceLocations() {
+ List<TaskManagerLocation> list = new ArrayList<>();
+ for (int i = priorExecutions.size() - 1 ; i >= 0 ; i--) {
+ Execution prior = priorExecutions.get(i) ;
+ if (prior.getAssignedResourceLocation() != null) {
+ list.add(prior.getAssignedResourceLocation());
+ break;
+ }
+ }
+ return list;
+ }
+
EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
synchronized (priorExecutions) {
return new EvictingBoundedList<>(priorExecutions);
http://git-wip-us.apache.org/repos/asf/flink/blob/2e107b1c/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 2a9aca7..6fac3c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -984,8 +984,8 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
@Override
public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
- return gateway.allocateSlot(
- task, ResourceProfile.UNKNOWN, Collections.<TaskManagerLocation>emptyList(), timeout);
+ return gateway.allocateSlot(task, ResourceProfile.UNKNOWN,
+ task.getTaskToExecute().getVertex().getPriorAssignedResourceLocations(), timeout);
}
}
[4/6] flink git commit: [hotfix] Minor code cleanups in the
ExecutionGraph's Execution
Posted by se...@apache.org.
[hotfix] Minor code cleanups in the ExecutionGraph's Execution
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10e4e321
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10e4e321
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10e4e321
Branch: refs/heads/master
Commit: 10e4e321b335b6f9376501f90715e31b71b02da8
Parents: 2e107b1
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 17:21:36 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 10:28:23 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/executiongraph/Execution.java | 24 ++++++++------------
.../taskmanager/TaskManagerLocation.java | 2 +-
2 files changed, 10 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/10e4e321/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 18a4445..c2fe5ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
+
import org.slf4j.Logger;
import java.util.ArrayList;
@@ -70,6 +71,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
/**
* A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
@@ -112,7 +114,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private final Time timeout;
- private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
+ private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
private volatile ExecutionState state = CREATED;
@@ -120,8 +122,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private volatile Throwable failureCause; // once assigned, never changes
- private volatile TaskManagerLocation assignedResourceLocation; // for the archived execution
-
private TaskStateHandles taskStateHandles;
/** The executor which is used to execute futures. */
@@ -189,7 +189,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
@Override
public TaskManagerLocation getAssignedResourceLocation() {
- return assignedResourceLocation;
+ // returns non-null only when a location is already assigned
+ return assignedResource != null ? assignedResource.getTaskManagerLocation() : null;
}
public Throwable getFailureCause() {
@@ -226,11 +227,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* @param checkpointStateHandles all checkpointed operator state
*/
public void setInitialState(TaskStateHandles checkpointStateHandles) {
-
- if (state != ExecutionState.CREATED) {
- throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
- }
-
+ checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED");
this.taskStateHandles = checkpointStateHandles;
}
@@ -343,7 +340,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
}
this.assignedResource = slot;
- this.assignedResourceLocation = slot.getTaskManagerLocation();
// race double check, did we fail/cancel and do we need to release the slot?
if (this.state != DEPLOYING) {
@@ -353,7 +349,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
if (LOG.isInfoEnabled()) {
LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getSimpleName(),
- attemptNumber, assignedResourceLocation.getHostname()));
+ attemptNumber, getAssignedResourceLocation().getHostname()));
}
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
@@ -373,12 +369,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
@Override
public Void apply(Throwable failure) {
if (failure instanceof TimeoutException) {
- String taskname = vertex.getTaskName() + '(' +
- (getParallelSubtaskIndex() + 1) + '/' +
- vertex.getTotalNumberOfParallelSubtasks() + ") (" + attemptId + ')';
+ String taskname = vertex.getTaskNameWithSubtaskIndex()+ " (" + attemptId + ')';
markFailed(new Exception(
- "Cannot deploy task " + taskname + " - TaskManager (" + assignedResourceLocation
+ "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
+ ") not responding after a timeout of " + timeout, failure));
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/10e4e321/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
index 01d0654..956a2a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
@@ -219,7 +219,7 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav
@Override
public int compareTo(@Nonnull TaskManagerLocation o) {
- // decide based on address first
+ // decide based on resource ID first
int resourceIdCmp = this.resourceID.getResourceIdString().compareTo(o.resourceID.getResourceIdString());
if (resourceIdCmp != 0) {
return resourceIdCmp;
[2/6] flink git commit: [hotfix] [jobmanager] Cleanups in the
ExecutionGraph
Posted by se...@apache.org.
[hotfix] [jobmanager] Cleanups in the ExecutionGraph
- Making fields final where possible
- Making fields volatile where needed or advisable
- Remove some dead code/functionality
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe4fe587
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe4fe587
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe4fe587
Branch: refs/heads/master
Commit: fe4fe5872883f3de362c4d6864b21a66bcbf5d4e
Parents: 4820b41
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 19:55:59 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 10:28:23 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/executiongraph/Execution.java | 33 +++++++++++---------
.../runtime/executiongraph/ExecutionVertex.java | 19 ++---------
.../runtime/jobmanager/scheduler/Scheduler.java | 5 +--
3 files changed, 24 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c2fe5ea..e29e5b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -104,8 +104,13 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// --------------------------------------------------------------------------------------------
+ /** The executor which is used to execute futures. */
+ private final Executor executor;
+
+ /** The execution vertex whose task this execution executes */
private final ExecutionVertex vertex;
+ /** The unique ID marking the specific execution instant of the task */
private final ExecutionAttemptID attemptId;
private final long[] stateTimestamps;
@@ -122,41 +127,39 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private volatile Throwable failureCause; // once assigned, never changes
- private TaskStateHandles taskStateHandles;
+ /** The handle to the state that the task gets on restore */
+ private volatile TaskStateHandles taskState;
- /** The executor which is used to execute futures. */
- private Executor executor;
+ // ------------------------ Accumulators & Metrics ------------------------
- // ------------------------- Accumulators ---------------------------------
-
- /* Lock for updating the accumulators atomically. Prevents final accumulators to be overwritten
- * by partial accumulators on a late heartbeat*/
+ /** Lock for updating the accumulators atomically.
+ * Prevents final accumulators to be overwritten by partial accumulators on a late heartbeat */
private final Object accumulatorLock = new Object();
/* Continuously updated map of user-defined accumulators */
private volatile Map<String, Accumulator<?, ?>> userAccumulators;
- private IOMetrics ioMetrics;
+
+ private volatile IOMetrics ioMetrics;
// --------------------------------------------------------------------------------------------
-
+
public Execution(
Executor executor,
ExecutionVertex vertex,
int attemptNumber,
long startTimestamp,
Time timeout) {
- this.executor = checkNotNull(executor);
+ this.executor = checkNotNull(executor);
this.vertex = checkNotNull(vertex);
this.attemptId = new ExecutionAttemptID();
+ this.timeout = checkNotNull(timeout);
this.attemptNumber = attemptNumber;
this.stateTimestamps = new long[ExecutionState.values().length];
markTimestamp(ExecutionState.CREATED, startTimestamp);
- this.timeout = checkNotNull(timeout);
-
this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
}
@@ -217,7 +220,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
public TaskStateHandles getTaskStateHandles() {
- return taskStateHandles;
+ return taskState;
}
/**
@@ -228,7 +231,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
*/
public void setInitialState(TaskStateHandles checkpointStateHandles) {
checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED");
- this.taskStateHandles = checkpointStateHandles;
+ this.taskState = checkpointStateHandles;
}
// --------------------------------------------------------------------------------------------
@@ -355,7 +358,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
attemptId,
slot,
- taskStateHandles,
+ taskState,
attemptNumber);
// register this execution at the execution graph, to receive call backs
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index d840d89..0bb3514 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -77,9 +77,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
private final ExecutionJobVertex jobVertex;
- private Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;
+ private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;
- private ExecutionEdge[][] inputEdges;
+ private final ExecutionEdge[][] inputEdges;
private final int subTaskIndex;
@@ -92,10 +92,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
private volatile CoLocationConstraint locationConstraint;
+ /** The current or latest execution attempt of this vertex's task */
private volatile Execution currentExecution; // this field must never be null
- private volatile boolean scheduleLocalOnly;
-
// --------------------------------------------------------------------------------------------
public ExecutionVertex(
@@ -398,18 +397,6 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
}
- public void setScheduleLocalOnly(boolean scheduleLocalOnly) {
- if (scheduleLocalOnly && inputEdges != null && inputEdges.length > 0) {
- throw new IllegalArgumentException("Strictly local scheduling is only supported for sources.");
- }
-
- this.scheduleLocalOnly = scheduleLocalOnly;
- }
-
- public boolean isScheduleLocalOnly() {
- return scheduleLocalOnly;
- }
-
/**
* Gets the location preferences of this task, determined by the locations of the predecessors from which
* it receives input data.
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index aa09314..466a148 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -153,9 +153,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
return FlinkCompletableFuture.completed((SimpleSlot) ret);
}
else if (ret instanceof Future) {
- return (Future) ret;
+ return (Future<SimpleSlot>) ret;
}
else {
+ // this should never happen, simply guard this case with an exception
throw new RuntimeException();
}
}
@@ -174,7 +175,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();
- final boolean forceExternalLocation = vertex.isScheduleLocalOnly() &&
+ final boolean forceExternalLocation = false &&
preferredLocations != null && preferredLocations.iterator().hasNext();
synchronized (globalLock) {
[3/6] flink git commit: [hotfix] [jobmanager] Reduce complexits when
archiving ExecutionVertex
Posted by se...@apache.org.
[hotfix] [jobmanager] Reduce complexits when archiving ExecutionVertex
This fixes the inefficiency where the archiving operation iterated over the entire
evicted history of prior execution attempts when converting them to
archived executions.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4820b413
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4820b413
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4820b413
Branch: refs/heads/master
Commit: 4820b413a55a3bbb1853251ecdb94b4c70dc5e2b
Parents: 10e4e32
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 19:52:57 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 10:28:23 2017 +0100
----------------------------------------------------------------------
.../executiongraph/ArchivedExecutionVertex.java | 22 ++++-
.../flink/runtime/util/EvictingBoundedList.java | 73 ++++++++++++++-
.../runtime/util/EvictingBoundedListTest.java | 97 +++++++++++++++++++-
3 files changed, 182 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index 56fc7a6..5053cae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {
private static final long serialVersionUID = -6708241535015028576L;
+
private final int subTaskIndex;
private final EvictingBoundedList<ArchivedExecution> priorExecutions;
@@ -35,13 +36,11 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
private final ArchivedExecution currentExecution; // this field must never be null
+ // ------------------------------------------------------------------------
+
public ArchivedExecutionVertex(ExecutionVertex vertex) {
this.subTaskIndex = vertex.getParallelSubtaskIndex();
- EvictingBoundedList<Execution> copyOfPriorExecutionsList = vertex.getCopyOfPriorExecutionsList();
- priorExecutions = new EvictingBoundedList<>(copyOfPriorExecutionsList.getSizeLimit());
- for (Execution priorExecution : copyOfPriorExecutionsList) {
- priorExecutions.add(priorExecution != null ? priorExecution.archive() : null);
- }
+ this.priorExecutions = vertex.getCopyOfPriorExecutionsList().map(ARCHIVER);
this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
}
@@ -93,4 +92,17 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
throw new IllegalArgumentException("attempt does not exist");
}
}
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private static final EvictingBoundedList.Function<Execution, ArchivedExecution> ARCHIVER =
+ new EvictingBoundedList.Function<Execution, ArchivedExecution>() {
+
+ @Override
+ public ArchivedExecution apply(Execution value) {
+ return value.archive();
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
index f4c155a..2c5b6a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
@@ -29,8 +29,9 @@ import java.util.NoSuchElementException;
* This class implements a list (array based) that is physically bounded in maximum size, but can virtually grow beyond
* the bounded size. When the list grows beyond the size bound, elements are dropped from the head of the list (FIFO
* order). If dropped elements are accessed, a default element is returned instead.
- * <p>
- * TODO this class could eventually implement the whole actual List interface.
+ *
+ * <p>The list by itself is serializable, but a full list can only be serialized if the values
+ * are also serializable.
*
* @param <T> type of the list elements
*/
@@ -38,12 +39,25 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
private static final long serialVersionUID = -1863961980953613146L;
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ /** the default element returned for positions that were evicted */
private final T defaultElement;
+
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ /** the array (viewed as a circular buffer) that holds the latest (= non-evicted) elements */
private final Object[] elements;
+
+ /** The next index to put an element in the array */
private int idx;
+
+ /** The current number of (virtual) elements in the list */
private int count;
+
+ /** Modification count for fail-fast iterators */
private long modCount;
+ // ------------------------------------------------------------------------
+
public EvictingBoundedList(int sizeLimit) {
this(sizeLimit, null);
}
@@ -65,6 +79,8 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
this.modCount = 0L;
}
+ // ------------------------------------------------------------------------
+
public int size() {
return count;
}
@@ -93,8 +109,11 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
}
public T get(int index) {
- Preconditions.checkArgument(index >= 0 && index < count);
- return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length);
+ if (index >= 0 && index < count) {
+ return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length);
+ } else {
+ throw new IndexOutOfBoundsException(String.valueOf(index));
+ }
}
public int getSizeLimit() {
@@ -157,4 +176,50 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
}
};
}
+
+ /**
+ * Creates a new list that replaces its elements with transformed elements.
+ * The list retains the same size and position-to-element mapping.
+ *
+ * <p>Note that null values are automatically mapped to null values.
+ *
+ * @param transform The function used to transform each element
+ * @param <R> The type of the elements in the result list.
+ *
+ * @return The list with the mapped elements
+ */
+ public <R> EvictingBoundedList<R> map(Function<T, R> transform) {
+ // map the default element
+ final R newDefault = defaultElement == null ? null : transform.apply(defaultElement);
+
+ // copy the list with the new default
+ final EvictingBoundedList<R> result = new EvictingBoundedList<>(elements.length, newDefault);
+ result.count = count;
+ result.idx = idx;
+
+ // map all the entries in the list
+ final int numElements = Math.min(elements.length, count);
+ for (int i = 0; i < numElements; i++) {
+ result.elements[i] = transform.apply(accessInternal(i));
+ }
+
+ return result;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A simple unary function that can be used to transform elements via the
+ * {@link EvictingBoundedList#map(Function)} method.
+ */
+ public interface Function<I, O> {
+
+ /**
+ * Transforms the value.
+ *
+ * @param value The value to transform
+ * @return The transformed value
+ */
+ O apply(I value);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
index e0a1c70..7109dac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
@@ -28,9 +28,12 @@ import java.util.NoSuchElementException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
public class EvictingBoundedListTest {
@Test
@@ -114,7 +117,7 @@ public class EvictingBoundedListTest {
try {
list.get(0);
fail();
- } catch (IllegalArgumentException ignore) {
+ } catch (IndexOutOfBoundsException ignore) {
}
}
@@ -161,4 +164,96 @@ public class EvictingBoundedListTest {
}
}
+
+ @Test
+ public void testMapWithHalfFullList() {
+ final Object[] originals = { new Object(), new Object(), new Object() };
+ final Object defaultValue = new Object();
+
+ final EvictingBoundedList<Object> original = new EvictingBoundedList<>(5, defaultValue);
+ for (Object o : originals) {
+ original.add(o);
+ }
+
+ final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper());
+
+ assertEquals(original.size(), transformed.size());
+ assertEquals(original.getSizeLimit(), transformed.getSizeLimit());
+ assertEquals(defaultValue, transformed.getDefaultElement().original);
+
+ int i = 0;
+ for (TransformedObject to : transformed) {
+ assertEquals(originals[i++], to.original);
+ }
+
+ try {
+ transformed.get(originals.length);
+ fail("should have failed with an exception");
+ } catch (IndexOutOfBoundsException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testMapWithEvictedElements() {
+ final Object[] originals = { new Object(), new Object(), new Object(), new Object(), new Object() };
+ final Object defaultValue = new Object();
+
+ final EvictingBoundedList<Object> original = new EvictingBoundedList<>(2, defaultValue);
+ for (Object o : originals) {
+ original.add(o);
+ }
+
+ final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper());
+
+ assertEquals(originals.length, transformed.size());
+ assertEquals(original.size(), transformed.size());
+ assertEquals(original.getSizeLimit(), transformed.getSizeLimit());
+ assertEquals(defaultValue, transformed.getDefaultElement().original);
+
+ for (int i = 0; i < originals.length; i++) {
+ if (i < originals.length - transformed.getSizeLimit()) {
+ assertEquals(transformed.getDefaultElement(), transformed.get(i));
+ } else {
+ assertEquals(originals[i], transformed.get(i).original);
+ }
+ }
+
+ try {
+ transformed.get(originals.length);
+ fail("should have failed with an exception");
+ } catch (IndexOutOfBoundsException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testMapWithNullDefault() {
+ final EvictingBoundedList<Object> original = new EvictingBoundedList<>(5, null);
+ final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper());
+
+ assertEquals(original.size(), transformed.size());
+ assertNull(transformed.getDefaultElement());
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class TransformedObject {
+
+ final Object original;
+
+ TransformedObject(Object original) {
+ this.original = checkNotNull(original);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class Mapper implements EvictingBoundedList.Function<Object, TransformedObject> {
+
+ @Override
+ public TransformedObject apply(Object value) {
+ return new TransformedObject(value);
+ }
+ }
}
[6/6] flink git commit: [hotfix] [tests] Increase timeout for
AkkaRpcActorTest to mitigate occasional CI test timeouts
Posted by se...@apache.org.
[hotfix] [tests] Increase timeout for AkkaRpcActorTest to mitigate occasional CI test timeouts
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b1857d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b1857d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b1857d8
Branch: refs/heads/master
Commit: 7b1857d849efd3611be797679b11e8c7b7ed61fb
Parents: b9ed4ff
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 3 11:25:12 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 12:46:14 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7b1857d8/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index c73240c..3c40bc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -172,7 +172,7 @@ public class AkkaRpcActorTest extends TestLogger {
* @throws ExecutionException
* @throws InterruptedException
*/
- @Test(timeout=1000)
+ @Test(timeout=5000)
public void testRpcEndpointTerminationFuture() throws Exception {
final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
rpcEndpoint.start();
[5/6] flink git commit: [FLINK-5499] [JobManager] Make the location
preferences combined by state and inputs.
Posted by se...@apache.org.
[FLINK-5499] [JobManager] Make the location preferences combined by state and inputs.
Reusing the prior location (for state locality) takes precedence over input locality.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9ed4ff1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9ed4ff1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9ed4ff1
Branch: refs/heads/master
Commit: b9ed4ff151c5d3a64be395c660160b5619e32c7f
Parents: fe4fe58
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 20:34:33 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 12:46:14 2017 +0100
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionVertex.java | 83 +++++--
.../apache/flink/runtime/instance/SlotPool.java | 6 +-
.../instance/SlotSharingGroupAssignment.java | 6 +-
.../runtime/jobmanager/scheduler/Scheduler.java | 4 +-
.../runtime/jobmanager/slots/AllocatedSlot.java | 2 +-
.../ExecutionVertexLocalityTest.java | 244 +++++++++++++++++++
.../scheduler/SchedulerTestUtils.java | 6 +-
7 files changed, 323 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 0bb3514..cb2e177 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.util.EvictingBoundedList;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
+
import org.slf4j.Logger;
import java.io.IOException;
@@ -264,20 +265,21 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
/**
- * Just return the last assigned resource location if found
- *
- * @return The collection of TaskManagerLocation
+ * Gets the location where the latest completed/canceled/failed execution of the vertex's
+ * task happened.
+ *
+ * @return The latest prior execution location, or null, if there is none, yet.
*/
- public List<TaskManagerLocation> getPriorAssignedResourceLocations() {
- List<TaskManagerLocation> list = new ArrayList<>();
- for (int i = priorExecutions.size() - 1 ; i >= 0 ; i--) {
- Execution prior = priorExecutions.get(i) ;
- if (prior.getAssignedResourceLocation() != null) {
- list.add(prior.getAssignedResourceLocation());
- break;
+ public TaskManagerLocation getLatestPriorLocation() {
+ synchronized (priorExecutions) {
+ final int size = priorExecutions.size();
+ if (size > 0) {
+ return priorExecutions.get(size - 1).getAssignedResourceLocation();
+ }
+ else {
+ return null;
}
}
- return list;
}
EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
@@ -398,14 +400,61 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
/**
- * Gets the location preferences of this task, determined by the locations of the predecessors from which
- * it receives input data.
+ * Gets the overall preferred execution location for this vertex's current execution.
+ * The preference is determined as follows:
+ *
+ * <ol>
+ * <li>If the task execution has state to load (from a checkpoint), then the location preference
+ * is the location of the previous execution (if there is a previous execution attempt).
+ * <li>If the task execution has no state or no previous location, then the location preference
+ * is based on the task's inputs.
+ * </ol>
+ *
+ * These rules should result in the following behavior:
+ *
+ * <ul>
+ * <li>Stateless tasks are always scheduled based on co-location with inputs.
+ * <li>Stateful tasks are on their initial attempt executed based on co-location with inputs.
+ * <li>Repeated executions of stateful tasks try to co-locate the execution with its state.
+ * </ul>
+ *
+ * @return The preferred excution locations for the execution attempt.
+ *
+ * @see #getPreferredLocationsBasedOnState()
+ * @see #getPreferredLocationsBasedOnInputs()
+ */
+ public Iterable<TaskManagerLocation> getPreferredLocations() {
+ Iterable<TaskManagerLocation> basedOnState = getPreferredLocationsBasedOnState();
+ return basedOnState != null ? basedOnState : getPreferredLocationsBasedOnInputs();
+ }
+
+ /**
+ * Gets the preferred location to execute the current task execution attempt, based on the state
+ * that the execution attempt will resume.
+ *
+ * @return A size-one iterable with the location preference, or null, if there is no
+ * location preference based on the state.
+ */
+ public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnState() {
+ TaskManagerLocation priorLocation;
+ if (currentExecution.getTaskStateHandles() != null && (priorLocation = getLatestPriorLocation()) != null) {
+ return Collections.singleton(priorLocation);
+ }
+ else {
+ return null;
+ }
+ }
+
+ /**
+ * Gets the location preferences of the vertex's current task execution, as determined by the locations
+ * of the predecessors from which it receives input data.
* If there are more than MAX_DISTINCT_LOCATIONS_TO_CONSIDER different locations of source data, this
* method returns {@code null} to indicate no location preference.
*
- * @return The preferred locations for this vertex execution, or null, if there is no preference.
+ * @return The preferred locations based in input streams, or an empty iterable,
+ * if there is no input-based preference.
*/
- public Iterable<TaskManagerLocation> getPreferredLocations() {
+ public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
// otherwise, base the preferred locations on the input connections
if (inputEdges == null) {
return Collections.emptySet();
@@ -435,7 +484,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
}
// keep the locations of the input with the least preferred locations
- if(locations.isEmpty() || // nothing assigned yet
+ if (locations.isEmpty() || // nothing assigned yet
(!inputLocations.isEmpty() && inputLocations.size() < locations.size())) {
// current input has fewer preferred locations
locations.clear();
@@ -443,7 +492,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
}
- return locations;
+ return locations.isEmpty() ? Collections.<TaskManagerLocation>emptyList() : locations;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 6fac3c8..672431e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -984,8 +984,10 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
@Override
public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
- return gateway.allocateSlot(task, ResourceProfile.UNKNOWN,
- task.getTaskToExecute().getVertex().getPriorAssignedResourceLocations(), timeout);
+ Iterable<TaskManagerLocation> locationPreferences =
+ task.getTaskToExecute().getVertex().getPreferredLocations();
+
+ return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 346cc77..88fbc10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -262,7 +262,7 @@ public class SlotSharingGroupAssignment {
/**
* Gets a slot suitable for the given task vertex. This method will prefer slots that are local
- * (with respect to {@link ExecutionVertex#getPreferredLocations()}), but will return non local
+ * (with respect to {@link ExecutionVertex#getPreferredLocationsBasedOnInputs()}), but will return non local
* slots if no local slot is available. The method returns null, when this sharing group has
* no slot is available for the given JobVertexID.
*
@@ -271,7 +271,7 @@ public class SlotSharingGroupAssignment {
* @return A slot to execute the given ExecutionVertex in, or null, if none is available.
*/
public SimpleSlot getSlotForTask(ExecutionVertex vertex) {
- return getSlotForTask(vertex.getJobvertexId(), vertex.getPreferredLocations());
+ return getSlotForTask(vertex.getJobvertexId(), vertex.getPreferredLocationsBasedOnInputs());
}
/**
@@ -313,7 +313,7 @@ public class SlotSharingGroupAssignment {
* shared slot is available.
*/
public SimpleSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) {
- return getSlotForTask(constraint, vertex.getPreferredLocations());
+ return getSlotForTask(constraint, vertex.getPreferredLocationsBasedOnInputs());
}
SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable<TaskManagerLocation> locationPreferences) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 466a148..dc82440 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -174,7 +174,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
- final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();
+ final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocationsBasedOnInputs();
final boolean forceExternalLocation = false &&
preferredLocations != null && preferredLocations.iterator().hasNext();
@@ -240,7 +240,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
localOnly = true;
}
else {
- locations = vertex.getPreferredLocations();
+ locations = vertex.getPreferredLocationsBasedOnInputs();
localOnly = forceExternalLocation;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
index 269a8f3..4910862 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
@@ -65,7 +65,7 @@ public class AllocatedSlot {
JobID jobID,
TaskManagerLocation location,
int slotNumber,
- ResourceProfile resourceProfile,
+ ResourceProfile resourceProfile,
TaskManagerGateway taskManagerGateway) {
this.slotAllocationId = checkNotNull(slotAllocationId);
this.jobID = checkNotNull(jobID);
http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
new file mode 100644
index 0000000..36b7575
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * Tests that the execution vertex handles locality preferences well.
+ */
+public class ExecutionVertexLocalityTest extends TestLogger {
+
+ private final JobID jobId = new JobID();
+
+ private final JobVertexID sourceVertexId = new JobVertexID();
+ private final JobVertexID targetVertexId = new JobVertexID();
+
+ /**
+ * This test validates that vertices that have only one input stream try to
+ * co-locate their tasks with the producer.
+ */
+ @Test
+ public void testLocalityInputBasedForward() throws Exception {
+ final int parallelism = 10;
+ final TaskManagerLocation[] locations = new TaskManagerLocation[parallelism];
+
+ final ExecutionGraph graph = createTestGraph(parallelism, false);
+
+ // set the location for all sources to a distinct location
+ for (int i = 0; i < parallelism; i++) {
+ ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+
+ TaskManagerLocation location = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i);
+
+ locations[i] = location;
+ initializeLocation(source, location);
+ }
+
+ // validate that the target vertices have no location preference
+ for (int i = 0; i < parallelism; i++) {
+ ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+ Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+
+ assertTrue(preference.hasNext());
+ assertEquals(locations[i], preference.next());
+ assertFalse(preference.hasNext());
+ }
+ }
+
+ /**
+ * This test validates that vertices with too many input streams do not have a location
+ * preference any more.
+ */
+ @Test
+ public void testNoLocalityInputLargeAllToAll() throws Exception {
+ final int parallelism = 100;
+
+ final ExecutionGraph graph = createTestGraph(parallelism, true);
+
+ // set the location for all sources to a distinct location
+ for (int i = 0; i < parallelism; i++) {
+ ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+ TaskManagerLocation location = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i);
+ initializeLocation(source, location);
+ }
+
+ // validate that the target vertices have no location preference
+ for (int i = 0; i < parallelism; i++) {
+ ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+
+ Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+ assertFalse(preference.hasNext());
+ }
+ }
+
+ /**
+ * This test validates that stateful vertices schedule based in the state's location
+ * (which is the prior execution's location).
+ */
+ @Test
+ public void testLocalityBasedOnState() throws Exception {
+ final int parallelism = 10;
+ final TaskManagerLocation[] locations = new TaskManagerLocation[parallelism];
+
+ final ExecutionGraph graph = createTestGraph(parallelism, false);
+
+ // set the location for all sources and targets
+ for (int i = 0; i < parallelism; i++) {
+ ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+ ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+
+ TaskManagerLocation randomLocation = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i);
+
+ TaskManagerLocation location = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), 20000 + i);
+
+ locations[i] = location;
+ initializeLocation(source, randomLocation);
+ initializeLocation(target, location);
+
+ setState(source.getCurrentExecutionAttempt(), ExecutionState.CANCELED);
+ setState(target.getCurrentExecutionAttempt(), ExecutionState.CANCELED);
+ }
+
+ // mimic a restart: all vertices get re-initialized without actually being executed
+ for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) {
+ ejv.resetForNewExecution();
+ }
+
+ // set new location for the sources and some state for the targets
+ for (int i = 0; i < parallelism; i++) {
+ // source location
+ ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
+ TaskManagerLocation randomLocation = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), 30000 + i);
+ initializeLocation(source, randomLocation);
+
+ // target state
+ ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+ target.getCurrentExecutionAttempt().setInitialState(mock(TaskStateHandles.class));
+ }
+
+ // validate that the target vertices have the state's location as the location preference
+ for (int i = 0; i < parallelism; i++) {
+ ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
+ Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+
+ assertTrue(preference.hasNext());
+ assertEquals(locations[i], preference.next());
+ assertFalse(preference.hasNext());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a simple 2 vertex graph with a parallel source and a parallel target.
+ */
+ private ExecutionGraph createTestGraph(int parallelism, boolean allToAll) throws Exception {
+
+ JobVertex source = new JobVertex("source", sourceVertexId);
+ source.setParallelism(parallelism);
+ source.setInvokableClass(NoOpInvokable.class);
+
+ JobVertex target = new JobVertex("source", targetVertexId);
+ target.setParallelism(parallelism);
+ target.setInvokableClass(NoOpInvokable.class);
+
+ DistributionPattern connectionPattern = allToAll ? DistributionPattern.ALL_TO_ALL : DistributionPattern.POINTWISE;
+ target.connectNewDataSetAsInput(source, connectionPattern);
+
+ JobGraph testJob = new JobGraph(jobId, "test job", source, target);
+
+ return ExecutionGraphBuilder.buildGraph(
+ null,
+ testJob,
+ new Configuration(),
+ Executors.directExecutor(),
+ Executors.directExecutor(),
+ getClass().getClassLoader(),
+ new StandaloneCheckpointRecoveryFactory(),
+ Time.of(10, TimeUnit.SECONDS),
+ new FixedDelayRestartStrategy(10, 0L),
+ new UnregisteredMetricsGroup(),
+ 1,
+ log);
+ }
+
+ private void initializeLocation(ExecutionVertex vertex, TaskManagerLocation location) throws Exception {
+ // we need a bit of reflection magic to initialize the location without going through
+ // scheduling paths. we choose to do that, rather than the alternatives:
+ // - mocking the scheduler created fragile tests that break whenever the scheduler is adjusted
+ // - exposing test methods in the ExecutionVertex leads to undesirable setters
+
+ AllocatedSlot slot = new AllocatedSlot(
+ new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class));
+
+ SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0);
+
+ final Field locationField = Execution.class.getDeclaredField("assignedResource");
+ locationField.setAccessible(true);
+
+ locationField.set(vertex.getCurrentExecutionAttempt(), simpleSlot);
+ }
+
+ private void setState(Execution execution, ExecutionState state) throws Exception {
+ final Field stateField = Execution.class.getDeclaredField("state");
+ stateField.setAccessible(true);
+
+ stateField.set(execution, state);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index b36de77..9e692ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -106,7 +106,7 @@ public class SchedulerTestUtils {
public static Execution getTestVertex(Iterable<TaskManagerLocation> preferredLocations) {
ExecutionVertex vertex = mock(ExecutionVertex.class);
- when(vertex.getPreferredLocations()).thenReturn(preferredLocations);
+ when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocations);
when(vertex.getJobId()).thenReturn(new JobID());
when(vertex.toString()).thenReturn("TEST-VERTEX");
@@ -119,7 +119,7 @@ public class SchedulerTestUtils {
public static Execution getTestVertex(JobVertexID jid, int taskIndex, int numTasks) {
ExecutionVertex vertex = mock(ExecutionVertex.class);
- when(vertex.getPreferredLocations()).thenReturn(null);
+ when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(null);
when(vertex.getJobId()).thenReturn(new JobID());
when(vertex.getJobvertexId()).thenReturn(jid);
when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
@@ -139,7 +139,7 @@ public class SchedulerTestUtils {
ExecutionVertex vertex = mock(ExecutionVertex.class);
- when(vertex.getPreferredLocations()).thenReturn(Arrays.asList(locations));
+ when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Arrays.asList(locations));
when(vertex.getJobId()).thenReturn(new JobID());
when(vertex.getJobvertexId()).thenReturn(jid);
when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);