You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/05 17:57:46 UTC

[4/4] flink git commit: [FLINK-4256] [distributed runtime] Clean up serializability of ExecutionGraph

[FLINK-4256] [distributed runtime] Clean up serializability of ExecutionGraph


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9b0dad8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9b0dad8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9b0dad8

Branch: refs/heads/master
Commit: c9b0dad89b79f11ce13fe78f8a0823b8266f930f
Parents: 2a4b222
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 4 16:48:37 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 5 19:57:01 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 36 +++++++++++---------
 .../runtime/executiongraph/ExecutionGraph.java  | 22 ++++--------
 .../executiongraph/ExecutionJobVertex.java      |  9 +++--
 .../runtime/executiongraph/ExecutionVertex.java | 22 ++++++------
 .../api/graph/StreamingJobGraphGenerator.java   |  1 -
 5 files changed, 39 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index fd296c3..5bab780 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.dispatch.OnComplete;
 import akka.dispatch.OnFailure;
+
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -56,7 +57,6 @@ import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -87,23 +87,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
  * or other re-computation), this class tracks the state of a single execution of that vertex and the resources.
  * 
- * NOTE ABOUT THE DESIGN RATIONAL:
+ * <p>NOTE ABOUT THE DESIGN RATIONAL:
  * 
- * In several points of the code, we need to deal with possible concurrent state changes and actions.
+ * <p>In several points of the code, we need to deal with possible concurrent state changes and actions.
  * For example, while the call to deploy a task (send it to the TaskManager) happens, the task gets cancelled.
  * 
- * We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that
+ * <p>We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that
  * it is guaranteed that any "cancel command" will only pick up after deployment is done and that the "cancel
  * command" call will never overtake the deploying call.
  * 
- * This blocks the threads big time, because the remote calls may take long. Depending of their locking behavior, it
+ * <p>This blocks the threads big time, because the remote calls may take long. Depending of their locking behavior, it
  * may even result in distributed deadlocks (unless carefully avoided). We therefore use atomic state updates and
  * occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting
  * actions if it is not. Many actions are also idempotent (like canceling).
  */
-public class Execution implements Serializable {
-
-	private static final long serialVersionUID = 42L;
+public class Execution {
 
 	private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
@@ -136,15 +134,16 @@ public class Execution implements Serializable {
 
 	private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
 
+	/** The state with which the execution attempt should start */
 	private SerializedValue<StateHandle<?>> operatorState;
 
-	private Map<Integer, SerializedValue<StateHandle<?>>> operatorKvState;
-
 	/** The execution context which is used to execute futures. */
-	@SuppressWarnings("NonSerializableFieldInSerializableClass")
 	private ExecutionContext executionContext;
 
-	/* Lock for updating the accumulators atomically. */
+	// ------------------------- Accumulators ---------------------------------
+	
+	/* Lock for updating the accumulators atomically. Prevents final accumulators to be overwritten
+	* by partial accumulators on a late heartbeat*/
 	private final SerializableObject accumulatorLock = new SerializableObject();
 
 	/* Continuously updated map of user-defined accumulators */
@@ -217,7 +216,7 @@ public class Execution implements Serializable {
 	}
 
 	public boolean isFinished() {
-		return state == FINISHED || state == FAILED || state == CANCELED;
+		return state.isTerminal();
 	}
 
 	/**
@@ -236,14 +235,18 @@ public class Execution implements Serializable {
 	}
 
 	public void setInitialState(
-		SerializedValue<StateHandle<?>> initialState,
-		Map<Integer, SerializedValue<StateHandle<?>>> initialKvState) {
+			SerializedValue<StateHandle<?>> initialState,
+			Map<Integer, SerializedValue<StateHandle<?>>> initialKvState) {
+
+		if (initialKvState != null && initialKvState.size() > 0) {
+			throw new UnsupportedOperationException("Error: inconsistent handling of key/value state snapshots");
+		}
 
 		if (state != ExecutionState.CREATED) {
 			throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
 		}
+
 		this.operatorState = initialState;
-		this.operatorKvState = initialKvState;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -371,7 +374,6 @@ public class Execution implements Serializable {
 				attemptId,
 				slot,
 				operatorState,
-				operatorKvState,
 				attemptNumber);
 
 			// register this execution at the execution graph, to receive call backs

http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 4229105..b778fa6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import akka.actor.ActorSystem;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -58,13 +59,14 @@ import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -104,15 +106,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *         about deployment of tasks and updates in the task status always use the ExecutionAttemptID to
  *         address the message receiver.</li>
  * </ul>
- * 
- * <p>The ExecutionGraph implements {@link java.io.Serializable}, because it can be archived by
- * sending it to an archive actor via an actor message. The execution graph does contain some
- * non-serializable fields. These fields are not required in the archived form and are cleared
- * in the {@link #prepareForArchiving()} method.</p>
  */
-public class ExecutionGraph implements Serializable {
-
-	private static final long serialVersionUID = 42L;
+public class ExecutionGraph {
 
 	private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
@@ -179,7 +174,7 @@ public class ExecutionGraph implements Serializable {
 	// ------ Configuration of the Execution -------
 
 	/** The execution configuration (see {@link ExecutionConfig}) related to this specific job. */
-	private SerializedValue<ExecutionConfig> serializedExecutionConfig;
+	private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
 	/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
 	 * to deploy them immediately. */
@@ -208,30 +203,25 @@ public class ExecutionGraph implements Serializable {
 	// ------ Fields that are relevant to the execution and need to be cleared before archiving  -------
 
 	/** The scheduler to use for scheduling new tasks as they are needed */
-	@SuppressWarnings("NonSerializableFieldInSerializableClass")
 	private Scheduler scheduler;
 
 	/** Strategy to use for restarts */
-	@SuppressWarnings("NonSerializableFieldInSerializableClass")
 	private RestartStrategy restartStrategy;
 
 	/** The classloader for the user code. Needed for calls into user code classes */
-	@SuppressWarnings("NonSerializableFieldInSerializableClass")
 	private ClassLoader userClassLoader;
 
 	/** The coordinator for checkpoints, if snapshot checkpoints are enabled */
-	@SuppressWarnings("NonSerializableFieldInSerializableClass")
 	private CheckpointCoordinator checkpointCoordinator;
 
 	/** The coordinator for savepoints, if snapshot checkpoints are enabled */
 	private transient SavepointCoordinator savepointCoordinator;
 
-	/** Checkpoint stats tracker seperate from the coordinator in order to be
+	/** Checkpoint stats tracker separate from the coordinator in order to be
 	 * available after archiving. */
 	private CheckpointStatsTracker checkpointStatsTracker;
 
 	/** The execution context which is used to execute futures. */
-	@SuppressWarnings("NonSerializableFieldInSerializableClass")
 	private ExecutionContext executionContext;
 
 	// ------ Fields that are only relevant for archived execution graphs ------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 7d4be79..7b28b31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -42,20 +42,19 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.util.SerializableObject;
+
 import org.slf4j.Logger;
+
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ExecutionJobVertex implements Serializable {
-	
-	private static final long serialVersionUID = 42L;
-	
+public class ExecutionJobVertex {
+
 	/** Use the same log for all ExecutionGraph classes */
 	private static final Logger LOG = ExecutionGraph.LOG;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e20f466..08bf57f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -42,9 +42,9 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -69,9 +69,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
  * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of
  * which time it spawns an {@link Execution}.
  */
-public class ExecutionVertex implements Serializable {
-
-	private static final long serialVersionUID = 42L;
+public class ExecutionVertex {
 
 	private static final Logger LOG = ExecutionGraph.LOG;
 
@@ -91,6 +89,9 @@ public class ExecutionVertex implements Serializable {
 
 	private final FiniteDuration timeout;
 
+	/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
+	private final String taskNameWithSubtask;
+
 	private volatile CoLocationConstraint locationConstraint;
 
 	private volatile Execution currentExecution;	// this field must never be null
@@ -115,8 +116,11 @@ public class ExecutionVertex implements Serializable {
 			IntermediateResult[] producedDataSets,
 			FiniteDuration timeout,
 			long createTimestamp) {
+
 		this.jobVertex = jobVertex;
 		this.subTaskIndex = subTaskIndex;
+		this.taskNameWithSubtask = String.format("%s (%d/%d)",
+				jobVertex.getJobVertex().getName(), subTaskIndex + 1, jobVertex.getParallelism());
 
 		this.resultPartitions = new LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition>(producedDataSets.length, 1);
 
@@ -172,11 +176,7 @@ public class ExecutionVertex implements Serializable {
 	}
 
 	public String getTaskNameWithSubtaskIndex() {
-		return String.format(
-				"%s (%d/%d)",
-				jobVertex.getJobVertex().getName(),
-				subTaskIndex + 1,
-				getTotalNumberOfParallelSubtasks());
+		return this.taskNameWithSubtask;
 	}
 
 	public int getTotalNumberOfParallelSubtasks() {
@@ -547,10 +547,9 @@ public class ExecutionVertex implements Serializable {
 	 */
 	public void prepareForArchiving() throws IllegalStateException {
 		Execution execution = currentExecution;
-		ExecutionState state = execution.getState();
 
 		// sanity check
-		if (!(state == FINISHED || state == CANCELED || state == FAILED)) {
+		if (!execution.isFinished()) {
 			throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state.");
 		}
 
@@ -637,7 +636,6 @@ public class ExecutionVertex implements Serializable {
 			ExecutionAttemptID executionId,
 			SimpleSlot targetSlot,
 			SerializedValue<StateHandle<?>> operatorState,
-			Map<Integer, SerializedValue<StateHandle<?>>> operatorKvState,
 			int attemptNumber) {
 
 		// Produced intermediate results

http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 71cc7f2..3abecc1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -469,7 +469,6 @@ public class StreamingJobGraphGenerator {
 				if (vertex.isInputVertex()) {
 					triggerVertices.add(vertex.getID());
 				}
-				// TODO: add check whether the user function implements the checkpointing interface
 				commitVertices.add(vertex.getID());
 				ackVertices.add(vertex.getID());
 			}