You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/03 12:37:28 UTC
[2/6] flink git commit: [hotfix] [jobmanager] Cleanups in the
ExecutionGraph
[hotfix] [jobmanager] Cleanups in the ExecutionGraph
- Making fields final where possible
- Making fields volatile where needed or advisable
- Remove some dead code/functionality
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe4fe587
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe4fe587
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe4fe587
Branch: refs/heads/master
Commit: fe4fe5872883f3de362c4d6864b21a66bcbf5d4e
Parents: 4820b41
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 19:55:59 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 10:28:23 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/executiongraph/Execution.java | 33 +++++++++++---------
.../runtime/executiongraph/ExecutionVertex.java | 19 ++---------
.../runtime/jobmanager/scheduler/Scheduler.java | 5 +--
3 files changed, 24 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c2fe5ea..e29e5b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -104,8 +104,13 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// --------------------------------------------------------------------------------------------
+ /** The executor which is used to execute futures. */
+ private final Executor executor;
+
+ /** The execution vertex whose task this execution executes */
private final ExecutionVertex vertex;
+ /** The unique ID marking the specific execution instant of the task */
private final ExecutionAttemptID attemptId;
private final long[] stateTimestamps;
@@ -122,41 +127,39 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private volatile Throwable failureCause; // once assigned, never changes
- private TaskStateHandles taskStateHandles;
+ /** The handle to the state that the task gets on restore */
+ private volatile TaskStateHandles taskState;
- /** The executor which is used to execute futures. */
- private Executor executor;
+ // ------------------------ Accumulators & Metrics ------------------------
- // ------------------------- Accumulators ---------------------------------
-
- /* Lock for updating the accumulators atomically. Prevents final accumulators to be overwritten
- * by partial accumulators on a late heartbeat*/
+ /** Lock for updating the accumulators atomically.
+ * Prevents final accumulators to be overwritten by partial accumulators on a late heartbeat */
private final Object accumulatorLock = new Object();
/* Continuously updated map of user-defined accumulators */
private volatile Map<String, Accumulator<?, ?>> userAccumulators;
- private IOMetrics ioMetrics;
+
+ private volatile IOMetrics ioMetrics;
// --------------------------------------------------------------------------------------------
-
+
public Execution(
Executor executor,
ExecutionVertex vertex,
int attemptNumber,
long startTimestamp,
Time timeout) {
- this.executor = checkNotNull(executor);
+ this.executor = checkNotNull(executor);
this.vertex = checkNotNull(vertex);
this.attemptId = new ExecutionAttemptID();
+ this.timeout = checkNotNull(timeout);
this.attemptNumber = attemptNumber;
this.stateTimestamps = new long[ExecutionState.values().length];
markTimestamp(ExecutionState.CREATED, startTimestamp);
- this.timeout = checkNotNull(timeout);
-
this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
}
@@ -217,7 +220,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
public TaskStateHandles getTaskStateHandles() {
- return taskStateHandles;
+ return taskState;
}
/**
@@ -228,7 +231,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
*/
public void setInitialState(TaskStateHandles checkpointStateHandles) {
checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED");
- this.taskStateHandles = checkpointStateHandles;
+ this.taskState = checkpointStateHandles;
}
// --------------------------------------------------------------------------------------------
@@ -355,7 +358,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
attemptId,
slot,
- taskStateHandles,
+ taskState,
attemptNumber);
// register this execution at the execution graph, to receive call backs
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index d840d89..0bb3514 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -77,9 +77,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
private final ExecutionJobVertex jobVertex;
- private Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;
+ private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;
- private ExecutionEdge[][] inputEdges;
+ private final ExecutionEdge[][] inputEdges;
private final int subTaskIndex;
@@ -92,10 +92,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
private volatile CoLocationConstraint locationConstraint;
+ /** The current or latest execution attempt of this vertex's task */
private volatile Execution currentExecution; // this field must never be null
- private volatile boolean scheduleLocalOnly;
-
// --------------------------------------------------------------------------------------------
public ExecutionVertex(
@@ -398,18 +397,6 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
}
- public void setScheduleLocalOnly(boolean scheduleLocalOnly) {
- if (scheduleLocalOnly && inputEdges != null && inputEdges.length > 0) {
- throw new IllegalArgumentException("Strictly local scheduling is only supported for sources.");
- }
-
- this.scheduleLocalOnly = scheduleLocalOnly;
- }
-
- public boolean isScheduleLocalOnly() {
- return scheduleLocalOnly;
- }
-
/**
* Gets the location preferences of this task, determined by the locations of the predecessors from which
* it receives input data.
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index aa09314..466a148 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -153,9 +153,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
return FlinkCompletableFuture.completed((SimpleSlot) ret);
}
else if (ret instanceof Future) {
- return (Future) ret;
+ return (Future<SimpleSlot>) ret;
}
else {
+ // this should never happen, simply guard this case with an exception
throw new RuntimeException();
}
}
@@ -174,7 +175,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();
- final boolean forceExternalLocation = vertex.isScheduleLocalOnly() &&
+ final boolean forceExternalLocation = false &&
preferredLocations != null && preferredLocations.iterator().hasNext();
synchronized (globalLock) {