You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/05 17:57:46 UTC
[4/4] flink git commit: [FLINK-4256] [distributed runtime] Clean up
serializability of ExecutionGraph
[FLINK-4256] [distributed runtime] Clean up serializability of ExecutionGraph
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9b0dad8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9b0dad8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9b0dad8
Branch: refs/heads/master
Commit: c9b0dad89b79f11ce13fe78f8a0823b8266f930f
Parents: 2a4b222
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 4 16:48:37 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 5 19:57:01 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/executiongraph/Execution.java | 36 +++++++++++---------
.../runtime/executiongraph/ExecutionGraph.java | 22 ++++--------
.../executiongraph/ExecutionJobVertex.java | 9 +++--
.../runtime/executiongraph/ExecutionVertex.java | 22 ++++++------
.../api/graph/StreamingJobGraphGenerator.java | 1 -
5 files changed, 39 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index fd296c3..5bab780 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
import akka.dispatch.OnComplete;
import akka.dispatch.OnFailure;
+
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -56,7 +57,6 @@ import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -87,23 +87,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
* or other re-computation), this class tracks the state of a single execution of that vertex and the resources.
*
- * NOTE ABOUT THE DESIGN RATIONAL:
+ * <p>NOTE ABOUT THE DESIGN RATIONAL:
*
- * In several points of the code, we need to deal with possible concurrent state changes and actions.
+ * <p>In several points of the code, we need to deal with possible concurrent state changes and actions.
* For example, while the call to deploy a task (send it to the TaskManager) happens, the task gets cancelled.
*
- * We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that
+ * <p>We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that
* it is guaranteed that any "cancel command" will only pick up after deployment is done and that the "cancel
* command" call will never overtake the deploying call.
*
- * This blocks the threads big time, because the remote calls may take long. Depending of their locking behavior, it
+ * <p>This blocks the threads big time, because the remote calls may take long. Depending of their locking behavior, it
* may even result in distributed deadlocks (unless carefully avoided). We therefore use atomic state updates and
* occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting
* actions if it is not. Many actions are also idempotent (like canceling).
*/
-public class Execution implements Serializable {
-
- private static final long serialVersionUID = 42L;
+public class Execution {
private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
@@ -136,15 +134,16 @@ public class Execution implements Serializable {
private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
+ /** The state with which the execution attempt should start */
private SerializedValue<StateHandle<?>> operatorState;
- private Map<Integer, SerializedValue<StateHandle<?>>> operatorKvState;
-
/** The execution context which is used to execute futures. */
- @SuppressWarnings("NonSerializableFieldInSerializableClass")
private ExecutionContext executionContext;
- /* Lock for updating the accumulators atomically. */
+ // ------------------------- Accumulators ---------------------------------
+
+ /* Lock for updating the accumulators atomically. Prevents final accumulators to be overwritten
+ * by partial accumulators on a late heartbeat*/
private final SerializableObject accumulatorLock = new SerializableObject();
/* Continuously updated map of user-defined accumulators */
@@ -217,7 +216,7 @@ public class Execution implements Serializable {
}
public boolean isFinished() {
- return state == FINISHED || state == FAILED || state == CANCELED;
+ return state.isTerminal();
}
/**
@@ -236,14 +235,18 @@ public class Execution implements Serializable {
}
public void setInitialState(
- SerializedValue<StateHandle<?>> initialState,
- Map<Integer, SerializedValue<StateHandle<?>>> initialKvState) {
+ SerializedValue<StateHandle<?>> initialState,
+ Map<Integer, SerializedValue<StateHandle<?>>> initialKvState) {
+
+ if (initialKvState != null && initialKvState.size() > 0) {
+ throw new UnsupportedOperationException("Error: inconsistent handling of key/value state snapshots");
+ }
if (state != ExecutionState.CREATED) {
throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
}
+
this.operatorState = initialState;
- this.operatorKvState = initialKvState;
}
// --------------------------------------------------------------------------------------------
@@ -371,7 +374,6 @@ public class Execution implements Serializable {
attemptId,
slot,
operatorState,
- operatorKvState,
attemptNumber);
// register this execution at the execution graph, to receive call backs
http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 4229105..b778fa6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import akka.actor.ActorSystem;
+
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
@@ -58,13 +59,14 @@ import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
-import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
@@ -104,15 +106,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* about deployment of tasks and updates in the task status always use the ExecutionAttemptID to
* address the message receiver.</li>
* </ul>
- *
- * <p>The ExecutionGraph implements {@link java.io.Serializable}, because it can be archived by
- * sending it to an archive actor via an actor message. The execution graph does contain some
- * non-serializable fields. These fields are not required in the archived form and are cleared
- * in the {@link #prepareForArchiving()} method.</p>
*/
-public class ExecutionGraph implements Serializable {
-
- private static final long serialVersionUID = 42L;
+public class ExecutionGraph {
private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
@@ -179,7 +174,7 @@ public class ExecutionGraph implements Serializable {
// ------ Configuration of the Execution -------
/** The execution configuration (see {@link ExecutionConfig}) related to this specific job. */
- private SerializedValue<ExecutionConfig> serializedExecutionConfig;
+ private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
* to deploy them immediately. */
@@ -208,30 +203,25 @@ public class ExecutionGraph implements Serializable {
// ------ Fields that are relevant to the execution and need to be cleared before archiving -------
/** The scheduler to use for scheduling new tasks as they are needed */
- @SuppressWarnings("NonSerializableFieldInSerializableClass")
private Scheduler scheduler;
/** Strategy to use for restarts */
- @SuppressWarnings("NonSerializableFieldInSerializableClass")
private RestartStrategy restartStrategy;
/** The classloader for the user code. Needed for calls into user code classes */
- @SuppressWarnings("NonSerializableFieldInSerializableClass")
private ClassLoader userClassLoader;
/** The coordinator for checkpoints, if snapshot checkpoints are enabled */
- @SuppressWarnings("NonSerializableFieldInSerializableClass")
private CheckpointCoordinator checkpointCoordinator;
/** The coordinator for savepoints, if snapshot checkpoints are enabled */
private transient SavepointCoordinator savepointCoordinator;
- /** Checkpoint stats tracker seperate from the coordinator in order to be
+ /** Checkpoint stats tracker separate from the coordinator in order to be
* available after archiving. */
private CheckpointStatsTracker checkpointStatsTracker;
/** The execution context which is used to execute futures. */
- @SuppressWarnings("NonSerializableFieldInSerializableClass")
private ExecutionContext executionContext;
// ------ Fields that are only relevant for archived execution graphs ------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 7d4be79..7b28b31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -42,20 +42,19 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.util.SerializableObject;
+
import org.slf4j.Logger;
+
import scala.concurrent.duration.FiniteDuration;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class ExecutionJobVertex implements Serializable {
-
- private static final long serialVersionUID = 42L;
-
+public class ExecutionJobVertex {
+
/** Use the same log for all ExecutionGraph classes */
private static final Logger LOG = ExecutionGraph.LOG;
http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e20f466..08bf57f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -42,9 +42,9 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.SerializedValue;
+
import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
@@ -69,9 +69,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
* The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of
* which time it spawns an {@link Execution}.
*/
-public class ExecutionVertex implements Serializable {
-
- private static final long serialVersionUID = 42L;
+public class ExecutionVertex {
private static final Logger LOG = ExecutionGraph.LOG;
@@ -91,6 +89,9 @@ public class ExecutionVertex implements Serializable {
private final FiniteDuration timeout;
+ /** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
+ private final String taskNameWithSubtask;
+
private volatile CoLocationConstraint locationConstraint;
private volatile Execution currentExecution; // this field must never be null
@@ -115,8 +116,11 @@ public class ExecutionVertex implements Serializable {
IntermediateResult[] producedDataSets,
FiniteDuration timeout,
long createTimestamp) {
+
this.jobVertex = jobVertex;
this.subTaskIndex = subTaskIndex;
+ this.taskNameWithSubtask = String.format("%s (%d/%d)",
+ jobVertex.getJobVertex().getName(), subTaskIndex + 1, jobVertex.getParallelism());
this.resultPartitions = new LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition>(producedDataSets.length, 1);
@@ -172,11 +176,7 @@ public class ExecutionVertex implements Serializable {
}
public String getTaskNameWithSubtaskIndex() {
- return String.format(
- "%s (%d/%d)",
- jobVertex.getJobVertex().getName(),
- subTaskIndex + 1,
- getTotalNumberOfParallelSubtasks());
+ return this.taskNameWithSubtask;
}
public int getTotalNumberOfParallelSubtasks() {
@@ -547,10 +547,9 @@ public class ExecutionVertex implements Serializable {
*/
public void prepareForArchiving() throws IllegalStateException {
Execution execution = currentExecution;
- ExecutionState state = execution.getState();
// sanity check
- if (!(state == FINISHED || state == CANCELED || state == FAILED)) {
+ if (!execution.isFinished()) {
throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state.");
}
@@ -637,7 +636,6 @@ public class ExecutionVertex implements Serializable {
ExecutionAttemptID executionId,
SimpleSlot targetSlot,
SerializedValue<StateHandle<?>> operatorState,
- Map<Integer, SerializedValue<StateHandle<?>>> operatorKvState,
int attemptNumber) {
// Produced intermediate results
http://git-wip-us.apache.org/repos/asf/flink/blob/c9b0dad8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 71cc7f2..3abecc1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -469,7 +469,6 @@ public class StreamingJobGraphGenerator {
if (vertex.isInputVertex()) {
triggerVertices.add(vertex.getID());
}
- // TODO: add check whether the user function implements the checkpointing interface
commitVertices.add(vertex.getID());
ackVertices.add(vertex.getID());
}