You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/03 12:37:28 UTC

[2/6] flink git commit: [hotfix] [jobmanager] Cleanups in the ExecutionGraph

[hotfix] [jobmanager] Cleanups in the ExecutionGraph

  - Making fields final where possible
  - Making fields volatile where needed or advisable
  - Remove some dead code/functionality


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe4fe587
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe4fe587
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe4fe587

Branch: refs/heads/master
Commit: fe4fe5872883f3de362c4d6864b21a66bcbf5d4e
Parents: 4820b41
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 19:55:59 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 10:28:23 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 33 +++++++++++---------
 .../runtime/executiongraph/ExecutionVertex.java | 19 ++---------
 .../runtime/jobmanager/scheduler/Scheduler.java |  5 +--
 3 files changed, 24 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c2fe5ea..e29e5b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -104,8 +104,13 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	// --------------------------------------------------------------------------------------------
 
+	/** The executor which is used to execute futures. */
+	private final Executor executor;
+
+	/** The execution vertex whose task this execution executes */ 
 	private final ExecutionVertex vertex;
 
+	/** The unique ID marking the specific execution instant of the task */
 	private final ExecutionAttemptID attemptId;
 
 	private final long[] stateTimestamps;
@@ -122,41 +127,39 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	private volatile Throwable failureCause;          // once assigned, never changes
 
-	private TaskStateHandles taskStateHandles;
+	/** The handle to the state that the task gets on restore */
+	private volatile TaskStateHandles taskState;
 
-	/** The executor which is used to execute futures. */
-	private Executor executor;
+	// ------------------------ Accumulators & Metrics ------------------------
 
-	// ------------------------- Accumulators ---------------------------------
-	
-	/* Lock for updating the accumulators atomically. Prevents final accumulators to be overwritten
-	* by partial accumulators on a late heartbeat*/
+	/** Lock for updating the accumulators atomically.
+	 * Prevents final accumulators to be overwritten by partial accumulators on a late heartbeat */
 	private final Object accumulatorLock = new Object();
 
 	/* Continuously updated map of user-defined accumulators */
 	private volatile Map<String, Accumulator<?, ?>> userAccumulators;
-	private IOMetrics ioMetrics;
+
+	private volatile IOMetrics ioMetrics;
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	public Execution(
 			Executor executor,
 			ExecutionVertex vertex,
 			int attemptNumber,
 			long startTimestamp,
 			Time timeout) {
-		this.executor = checkNotNull(executor);
 
+		this.executor = checkNotNull(executor);
 		this.vertex = checkNotNull(vertex);
 		this.attemptId = new ExecutionAttemptID();
+		this.timeout = checkNotNull(timeout);
 
 		this.attemptNumber = attemptNumber;
 
 		this.stateTimestamps = new long[ExecutionState.values().length];
 		markTimestamp(ExecutionState.CREATED, startTimestamp);
 
-		this.timeout = checkNotNull(timeout);
-
 		this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
 	}
 
@@ -217,7 +220,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	public TaskStateHandles getTaskStateHandles() {
-		return taskStateHandles;
+		return taskState;
 	}
 
 	/**
@@ -228,7 +231,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 */
 	public void setInitialState(TaskStateHandles checkpointStateHandles) {
 		checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED");
-		this.taskStateHandles = checkpointStateHandles;
+		this.taskState = checkpointStateHandles;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -355,7 +358,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
 				attemptId,
 				slot,
-				taskStateHandles,
+				taskState,
 				attemptNumber);
 
 			// register this execution at the execution graph, to receive call backs

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index d840d89..0bb3514 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -77,9 +77,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	private final ExecutionJobVertex jobVertex;
 
-	private Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;
+	private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;
 
-	private ExecutionEdge[][] inputEdges;
+	private final ExecutionEdge[][] inputEdges;
 
 	private final int subTaskIndex;
 
@@ -92,10 +92,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	private volatile CoLocationConstraint locationConstraint;
 
+	/** The current or latest execution attempt of this vertex's task */
 	private volatile Execution currentExecution;	// this field must never be null
 
-	private volatile boolean scheduleLocalOnly;
-
 	// --------------------------------------------------------------------------------------------
 
 	public ExecutionVertex(
@@ -398,18 +397,6 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		}
 	}
 
-	public void setScheduleLocalOnly(boolean scheduleLocalOnly) {
-		if (scheduleLocalOnly && inputEdges != null && inputEdges.length > 0) {
-			throw new IllegalArgumentException("Strictly local scheduling is only supported for sources.");
-		}
-
-		this.scheduleLocalOnly = scheduleLocalOnly;
-	}
-
-	public boolean isScheduleLocalOnly() {
-		return scheduleLocalOnly;
-	}
-
 	/**
 	 * Gets the location preferences of this task, determined by the locations of the predecessors from which
 	 * it receives input data.

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index aa09314..466a148 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -153,9 +153,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 			return FlinkCompletableFuture.completed((SimpleSlot) ret);
 		}
 		else if (ret instanceof Future) {
-			return (Future) ret;
+			return (Future<SimpleSlot>) ret;
 		}
 		else {
+			// this should never happen, simply guard this case with an exception
 			throw new RuntimeException();
 		}
 	}
@@ -174,7 +175,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 		final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 		
 		final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();
-		final boolean forceExternalLocation = vertex.isScheduleLocalOnly() &&
+		final boolean forceExternalLocation = false &&
 									preferredLocations != null && preferredLocations.iterator().hasNext();
 	
 		synchronized (globalLock) {