You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/12 23:03:13 UTC
[02/10] flink git commit: [FLINK-1953] [runtime] Integrate new
snapshot checkpoint coordinator with jobgraph and execution graph
[FLINK-1953] [runtime] Integrate new snapshot checkpoint coordinator with jobgraph and execution graph
This closes #651
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b7f8aa1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b7f8aa1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b7f8aa1
Branch: refs/heads/master
Commit: 9b7f8aa121e4a231632296d0809029aca9ebde6a
Parents: ff750e6
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Apr 30 19:59:36 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 21:35:57 2015 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 169 ++++++-
.../CheckpointCoordinatorDeActivator.java | 56 +++
.../runtime/checkpoint/PendingCheckpoint.java | 11 +-
.../flink/runtime/checkpoint/StateForTask.java | 12 +-
.../deployment/TaskDeploymentDescriptor.java | 32 +-
.../flink/runtime/execution/Environment.java | 24 +-
.../flink/runtime/executiongraph/Execution.java | 20 +-
.../runtime/executiongraph/ExecutionGraph.java | 484 +++++++++++--------
.../executiongraph/ExecutionJobVertex.java | 3 +-
.../runtime/executiongraph/ExecutionVertex.java | 25 +-
.../apache/flink/runtime/jobgraph/JobGraph.java | 44 +-
.../jobgraph/tasks/AbstractInvokable.java | 6 +-
.../tasks/CheckpointCommittingOperator.java | 24 +
.../jobgraph/tasks/CheckpointedOperator.java | 24 +
.../jobgraph/tasks/JobSnapshottingSettings.java | 97 ++++
.../jobgraph/tasks/OperatorStateCarrier.java | 16 +-
.../messages/checkpoint/AbortCheckpoint.java | 49 --
.../checkpoint/AcknowledgeCheckpoint.java | 8 +-
.../messages/checkpoint/ConfirmCheckpoint.java | 28 +-
.../flink/runtime/state/LocalStateHandle.java | 27 +-
.../apache/flink/runtime/state/StateHandle.java | 11 +-
.../apache/flink/runtime/state/StateUtils.java | 54 +++
.../runtime/taskmanager/RuntimeEnvironment.java | 25 +-
.../apache/flink/runtime/taskmanager/Task.java | 150 +++++-
.../flink/runtime/util/SerializableObject.java | 28 ++
.../flink/runtime/jobmanager/JobManager.scala | 108 +++--
.../StreamCheckpointCoordinator.scala | 151 ------
.../messages/CheckpointingMessages.scala | 52 --
.../flink/runtime/taskmanager/TaskManager.scala | 44 +-
.../checkpoint/CheckpointCoordinatorTest.java | 14 +-
.../checkpoint/CheckpointStateRestoreTest.java | 235 +++++++++
.../checkpoint/CoordinatorShutdownTest.java | 144 ++++++
.../messages/CheckpointMessagesTest.java | 17 +-
.../operators/testutils/MockEnvironment.java | 11 +-
.../api/graph/StreamingJobGraphGenerator.java | 70 ++-
.../streaming/runtime/tasks/StreamTask.java | 41 +-
.../StreamCheckpointingITCase.java | 4 +-
.../TaskManagerFailureRecoveryITCase.java | 2 +-
38 files changed, 1630 insertions(+), 690 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9647ca4..b3f6587 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -18,10 +18,17 @@
package org.apache.flink.runtime.checkpoint;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
@@ -41,7 +48,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
- *
+ * The checkpoint coordinator coordinates the distributed snapshots of operators and state.
+ * It triggers the checkpoint by sending the messages to the relevant tasks and collects the
+ * checkpoint acknowledgements. It also collects and maintains the overview of the state handles
+ * reported by the tasks that acknowledge the checkpoint.
*/
public class CheckpointCoordinator {
@@ -76,13 +86,17 @@ public class CheckpointCoordinator {
private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger();
- /** The timer that processes the checkpoint timeouts */
- private final Timer timeoutTimer;
+ /** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */
+ private final Timer timer;
private final long checkpointTimeout;
private final int numSuccessfulCheckpointsToRetain;
+ private TimerTask periodicScheduler;
+
+ private ActorRef jobStatusListener;
+
private boolean shutdown;
// --------------------------------------------------------------------------------------------
@@ -114,9 +128,13 @@ public class CheckpointCoordinator {
this.completedCheckpoints = new ArrayDeque<SuccessfulCheckpoint>(numSuccessfulCheckpointsToRetain + 1);
this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
- timeoutTimer = new Timer("Checkpoint Timeout Handler", true);
+ timer = new Timer("Checkpoint Timer", true);
}
+ // --------------------------------------------------------------------------------------------
+ // Clean shutdown
+ // --------------------------------------------------------------------------------------------
+
/**
* Shuts down the checkpoint coordinator.
*
@@ -129,9 +147,22 @@ public class CheckpointCoordinator {
return;
}
shutdown = true;
+ LOG.info("Stopping checkpoint coordinator jor job " + job);
// shut down the thread that handles the timeouts
- timeoutTimer.cancel();
+ timer.cancel();
+
+ // make sure that the actor does not linger
+ if (jobStatusListener != null) {
+ jobStatusListener.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ jobStatusListener = null;
+ }
+
+ // the scheduling thread needs also to go away
+ if (periodicScheduler != null) {
+ periodicScheduler.cancel();
+ periodicScheduler = null;
+ }
// clear and discard all pending checkpoints
for (PendingCheckpoint pending : pendingCheckpoints.values()) {
@@ -146,6 +177,10 @@ public class CheckpointCoordinator {
completedCheckpoints.clear();
}
}
+
+ public boolean isShutdown() {
+ return shutdown;
+ }
// --------------------------------------------------------------------------------------------
// Handling checkpoints and messages
@@ -235,7 +270,7 @@ public class CheckpointCoordinator {
throw new IllegalStateException("Checkpoint coordinator has been shutdown.");
}
pendingCheckpoints.put(checkpointID, checkpoint);
- timeoutTimer.schedule(canceller, checkpointTimeout);
+ timer.schedule(canceller, checkpointTimeout);
}
// send the messages to the tasks that trigger their checkpoint
@@ -270,7 +305,8 @@ public class CheckpointCoordinator {
}
final long checkpointId = message.getCheckpointId();
- boolean checkpointCompleted = false;
+
+ SuccessfulCheckpoint completed = null;
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
@@ -286,7 +322,7 @@ public class CheckpointCoordinator {
if (checkpoint.isFullyAcknowledged()) {
LOG.info("Completed checkpoint " + checkpointId);
- SuccessfulCheckpoint completed = checkpoint.toCompletedCheckpoint();
+ completed = checkpoint.toCompletedCheckpoint();
completedCheckpoints.addLast(completed);
if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) {
completedCheckpoints.removeFirst();
@@ -295,8 +331,6 @@ public class CheckpointCoordinator {
rememberRecentCheckpointId(checkpointId);
dropSubsumedCheckpoints(completed.getTimestamp());
-
- checkpointCompleted = true;
}
}
else {
@@ -323,12 +357,13 @@ public class CheckpointCoordinator {
// send the confirmation messages to the necessary targets. we do this here
// to be outside the lock scope
- if (checkpointCompleted) {
+ if (completed != null) {
+ final long timestamp = completed.getTimestamp();
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ExecutionAttemptID attemptId = ee.getAttemptId();
- ConfirmCheckpoint confirmMessage = new ConfirmCheckpoint(job, attemptId, checkpointId);
+ ConfirmCheckpoint confirmMessage = new ConfirmCheckpoint(job, attemptId, checkpointId, timestamp);
ev.sendMessageToCurrentExecution(confirmMessage, ee.getAttemptId());
}
}
@@ -355,6 +390,64 @@ public class CheckpointCoordinator {
}
// --------------------------------------------------------------------------------------------
+ // Checkpoint State Restoring
+ // --------------------------------------------------------------------------------------------
+
+ public void restoreLatestCheckpointedState(Map<JobVertexID, ExecutionJobVertex> tasks,
+ boolean errorIfNoCheckpoint,
+ boolean allOrNothingState) throws Exception {
+ synchronized (lock) {
+ if (shutdown) {
+ throw new IllegalStateException("CheckpointCoordinator is hut down");
+ }
+
+ if (completedCheckpoints.isEmpty()) {
+ if (errorIfNoCheckpoint) {
+ throw new IllegalStateException("No completed checkpoint available");
+ } else {
+ return;
+ }
+ }
+
+ // restore from the latest checkpoint
+ SuccessfulCheckpoint latest = completedCheckpoints.getLast();
+
+ if (allOrNothingState) {
+ Map<ExecutionJobVertex, Integer> stateCounts = new HashMap<ExecutionJobVertex, Integer>();
+
+ for (StateForTask state : latest.getStates()) {
+ ExecutionJobVertex vertex = tasks.get(state.getOperatorId());
+ Execution exec = vertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt();
+ exec.setInitialState(state.getState());
+
+ Integer count = stateCounts.get(vertex);
+ if (count != null) {
+ stateCounts.put(vertex, count+1);
+ } else {
+ stateCounts.put(vertex, 1);
+ }
+ }
+
+ // validate that either all task vertices have state, or none
+ for (Map.Entry<ExecutionJobVertex, Integer> entry : stateCounts.entrySet()) {
+ ExecutionJobVertex vertex = entry.getKey();
+ if (entry.getValue() != vertex.getParallelism()) {
+ throw new IllegalStateException(
+ "The checkpoint contained state only for a subset of tasks for vertex " + vertex);
+ }
+ }
+ }
+ else {
+ for (StateForTask state : latest.getStates()) {
+ ExecutionJobVertex vertex = tasks.get(state.getOperatorId());
+ Execution exec = vertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt();
+ exec.setInitialState(state.getState());
+ }
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
// Accessors
// --------------------------------------------------------------------------------------------
@@ -377,4 +470,56 @@ public class CheckpointCoordinator {
return new ArrayList<SuccessfulCheckpoint>(this.completedCheckpoints);
}
}
+
+ // --------------------------------------------------------------------------------------------
+ // Periodic scheduling of checkpoints
+ // --------------------------------------------------------------------------------------------
+
+ public void startPeriodicCheckpointScheduler(long interval) {
+ synchronized (lock) {
+ if (shutdown) {
+ throw new IllegalArgumentException("Checkpoint coordinator is shut down");
+ }
+
+ // cancel any previous scheduler
+ stopPeriodicCheckpointScheduler();
+
+ // start a new scheduler
+ periodicScheduler = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ triggerCheckpoint();
+ }
+ catch (Exception e) {
+ LOG.error("Exception while triggering checkpoint", e);
+ }
+ }
+ };
+ timer.scheduleAtFixedRate(periodicScheduler, interval, interval);
+ }
+ }
+
+ public void stopPeriodicCheckpointScheduler() {
+ synchronized (lock) {
+ if (periodicScheduler != null) {
+ periodicScheduler.cancel();
+ periodicScheduler = null;
+ }
+ }
+ }
+
+ public ActorRef createJobStatusListener(ActorSystem actorSystem, long checkpointInterval) {
+ synchronized (lock) {
+ if (shutdown) {
+ throw new IllegalArgumentException("Checkpoint coordinator is shut down");
+ }
+
+ if (jobStatusListener == null) {
+ Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, checkpointInterval);
+ jobStatusListener = actorSystem.actorOf(props);
+ }
+ return jobStatusListener;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
new file mode 100644
index 0000000..a6c4d76
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import akka.actor.UntypedActor;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.ExecutionGraphMessages;
+
+/**
+ * This actor listens to changes in the JobStatus and activates or deactivates the periodic
+ * checkpoint scheduler.
+ */
+public class CheckpointCoordinatorDeActivator extends UntypedActor {
+
+ private final CheckpointCoordinator coordinator;
+ private final long interval;
+
+ public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator, long interval) {
+ this.coordinator = coordinator;
+ this.interval = interval;
+ }
+
+ @Override
+ public void onReceive(Object message) {
+ if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
+ JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus();
+
+ if (status == JobStatus.RUNNING) {
+ // start the checkpoint scheduler
+ coordinator.startPeriodicCheckpointScheduler(interval);
+ }
+ else {
+ // anything else should stop the trigger for now
+ coordinator.stopPeriodicCheckpointScheduler();
+ }
+ }
+
+ // we ignore all other messages
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index e221238..f25bff9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
import java.util.ArrayList;
import java.util.List;
@@ -31,6 +32,9 @@ import java.util.Map;
* A pending checkpoint is a checkpoint that has been started, but has not been
* acknowledged by all tasks that need to acknowledge it. Once all tasks have
* acknowledged it, it becomes a {@link SuccessfulCheckpoint}.
+ *
+ * <p>Note that the pending checkpoint, as well as the successful checkpoint keep the
+ * state handles always as serialized values, never as actual values.</p>
*/
public class PendingCheckpoint {
@@ -117,12 +121,12 @@ public class PendingCheckpoint {
return completed;
}
else {
- throw new IllegalStateException("Cannot complete checkpoint while nit all tasks are acknowledged");
+ throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged");
}
}
}
- public boolean acknowledgeTask(ExecutionAttemptID attemptID, StateHandle state) {
+ public boolean acknowledgeTask(ExecutionAttemptID attemptID, SerializedValue<StateHandle<?>> state) {
synchronized (lock) {
if (discarded) {
return false;
@@ -158,6 +162,7 @@ public class PendingCheckpoint {
@Override
public String toString() {
- return "";
+ return String.format("PendingCheckpoint %d @ %d - confirmed=%d, pending=%d",
+ checkpointId, checkpointTimestamp, getNumberOfAcknowledgedTasks(), getNumberOfNonAcknowledgedTasks());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
index 83a6dc8..26b3eb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
@@ -20,16 +20,22 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
/**
* Simple bean to describe the state belonging to a parallel operator.
* Since we hold the state across execution attempts, we identify a task by its
* JobVertexId and subtask index.
+ *
+ * The state itself is kept in serialized from, since the checkpoint coordinator itself
+ * is never looking at it anyways and only sends it back out in case of a recovery.
+ * Furthermore, the state may involve user-defined classes that are not accessible without
+ * the respective classloader.
*/
public class StateForTask {
/** The state of the parallel operator */
- private final StateHandle state;
+ private final SerializedValue<StateHandle<?>> state;
/** The vertex id of the parallel operator */
private final JobVertexID operatorId;
@@ -37,7 +43,7 @@ public class StateForTask {
/** The index of the parallel subtask */
private final int subtask;
- public StateForTask(StateHandle state, JobVertexID operatorId, int subtask) {
+ public StateForTask(SerializedValue<StateHandle<?>> state, JobVertexID operatorId, int subtask) {
if (state == null || operatorId == null || subtask < 0) {
throw new IllegalArgumentException();
}
@@ -49,7 +55,7 @@ public class StateForTask {
// --------------------------------------------------------------------------------------------
- public StateHandle getState() {
+ public SerializedValue<StateHandle<?>> getState() {
return state;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 5d96903..0a1268d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
import java.io.Serializable;
import java.util.Collection;
@@ -77,9 +78,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
/** The list of JAR files required to run this task. */
private final List<BlobKey> requiredJarFiles;
- private StateHandle operatorStates;
-
-
+ private final SerializedValue<StateHandle<?>> operatorState;
+
/**
* Constructs a task deployment descriptor.
*/
@@ -89,15 +89,18 @@ public final class TaskDeploymentDescriptor implements Serializable {
Configuration taskConfiguration, String invokableClassName,
List<ResultPartitionDeploymentDescriptor> producedPartitions,
List<InputGateDeploymentDescriptor> inputGates,
- List<BlobKey> requiredJarFiles, int targetSlotNumber) {
+ List<BlobKey> requiredJarFiles, int targetSlotNumber,
+ SerializedValue<StateHandle<?>> operatorState) {
+ checkArgument(indexInSubtaskGroup >= 0);
+ checkArgument(numberOfSubtasks > indexInSubtaskGroup);
+ checkArgument(targetSlotNumber >= 0);
+
this.jobID = checkNotNull(jobID);
this.vertexID = checkNotNull(vertexID);
this.executionId = checkNotNull(executionId);
this.taskName = checkNotNull(taskName);
- checkArgument(indexInSubtaskGroup >= 0);
this.indexInSubtaskGroup = indexInSubtaskGroup;
- checkArgument(numberOfSubtasks > indexInSubtaskGroup);
this.numberOfSubtasks = numberOfSubtasks;
this.jobConfiguration = checkNotNull(jobConfiguration);
this.taskConfiguration = checkNotNull(taskConfiguration);
@@ -105,8 +108,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
this.producedPartitions = checkNotNull(producedPartitions);
this.inputGates = checkNotNull(inputGates);
this.requiredJarFiles = checkNotNull(requiredJarFiles);
- checkArgument(targetSlotNumber >= 0);
this.targetSlotNumber = targetSlotNumber;
+ this.operatorState = operatorState;
}
public TaskDeploymentDescriptor(
@@ -115,14 +118,11 @@ public final class TaskDeploymentDescriptor implements Serializable {
Configuration taskConfiguration, String invokableClassName,
List<ResultPartitionDeploymentDescriptor> producedPartitions,
List<InputGateDeploymentDescriptor> inputGates,
- List<BlobKey> requiredJarFiles, int targetSlotNumber,
- StateHandle operatorStates) {
+ List<BlobKey> requiredJarFiles, int targetSlotNumber) {
this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks,
jobConfiguration, taskConfiguration, invokableClassName, producedPartitions,
- inputGates, requiredJarFiles, targetSlotNumber);
-
- setOperatorState(operatorStates);
+ inputGates, requiredJarFiles, targetSlotNumber, null);
}
/**
@@ -232,11 +232,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
return strBuilder.toString();
}
- public void setOperatorState(StateHandle operatorStates) {
- this.operatorStates = operatorStates;
- }
-
- public StateHandle getOperatorStates() {
- return operatorStates;
+ public SerializedValue<StateHandle<?>> getOperatorState() {
+ return operatorState;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 081e3ca..755f1ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.execution;
-import akka.actor.ActorRef;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
@@ -31,6 +30,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
import java.util.Map;
import java.util.concurrent.Future;
@@ -148,6 +148,25 @@ public interface Environment {
*/
void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators);
+ /**
+ * Confirms that the invokable has successfully completed all steps it needed to
+ * to for the checkpoint with the give checkpoint-ID. This method does not include
+ * any state in the checkpoint.
+ *
+ * @param checkpointId The ID of the checkpoint.
+ */
+ void acknowledgeCheckpoint(long checkpointId);
+
+ /**
+ * Confirms that the invokable has successfully completed all steps it needed to
+ * to for the checkpoint with the give checkpoint-ID. This method does include
+ * the given state in the checkpoint.
+ *
+ * @param checkpointId The ID of the checkpoint.
+ * @param state A handle to the state to be included in the checkpoint.
+ */
+ void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state);
+
// --------------------------------------------------------------------------------------------
// Fields relevant to the I/O system. Should go into Task
// --------------------------------------------------------------------------------------------
@@ -159,7 +178,4 @@ public interface Environment {
InputGate getInputGate(int index);
InputGate[] getAllInputGates();
-
- // this should go away
- ActorRef getJobManager();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 4e046dd..731d70f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
@@ -129,7 +130,7 @@ public class Execution implements Serializable {
private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
- private StateHandle operatorState;
+ private SerializedValue<StateHandle<?>> operatorState;
// --------------------------------------------------------------------------------------------
@@ -204,6 +205,13 @@ public class Execution implements Serializable {
partialInputChannelDeploymentDescriptors = null;
}
+ public void setInitialState(SerializedValue<StateHandle<?>> initialState) {
+ if (state != ExecutionState.CREATED) {
+ throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
+ }
+ this.operatorState = initialState;
+ }
+
// --------------------------------------------------------------------------------------------
// Actions
// --------------------------------------------------------------------------------------------
@@ -325,7 +333,7 @@ public class Execution implements Serializable {
attemptNumber, slot.getInstance().getInstanceConnectionInfo().getHostname()));
}
- final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot);
+ final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot, operatorState);
// register this execution at the execution graph, to receive call backs
vertex.getExecutionGraph().registerExecution(this);
@@ -903,12 +911,4 @@ public class Execution implements Serializable {
return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(),
(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
}
-
- public void setOperatorState(StateHandle operatorStates) {
- this.operatorState = operatorStates;
- }
-
- public StateHandle getOperatorState() {
- return operatorState;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index d38913e..90cf42e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,12 +18,14 @@
package org.apache.flink.runtime.executiongraph;
-import akka.actor.ActorContext;
import akka.actor.ActorRef;
+
+import akka.actor.ActorSystem;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
@@ -32,20 +34,20 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmanager.StreamCheckpointCoordinator;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.ExceptionUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Tuple3;
-import scala.concurrent.duration.Duration;
+
import scala.concurrent.duration.FiniteDuration;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -54,7 +56,6 @@ import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static akka.dispatch.Futures.future;
@@ -80,8 +81,11 @@ import static akka.dispatch.Futures.future;
* about deployment of tasks and updates in the task status always use the ExecutionAttemptID to
* address the message receiver.</li>
* </ul>
- *
- *
+ *
+ * <p>The ExecutionGraph implements {@link java.io.Serializable}, because it can be archived by
+ * sending it to an archive actor via an actor message. The execution graph does contain some
+ * non-serializable fields. These fields are not required in the archived form and are cleared
+ * in the {@link #prepareForArchiving()} method.</p>
*/
public class ExecutionGraph implements Serializable {
@@ -92,9 +96,15 @@ public class ExecutionGraph implements Serializable {
/** The log object used for debugging. */
static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
+
+ private static final int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
// --------------------------------------------------------------------------------------------
+ /** The lock used to secure all access to mutable fields, especially the tracking of progress
+ * within the job. */
+ private final SerializableObject progressLock = new SerializableObject();
+
/** The ID of the job this graph has been built for. */
private final JobID jobID;
@@ -104,9 +114,6 @@ public class ExecutionGraph implements Serializable {
/** The job configuration that was originally attached to the JobGraph. */
private final Configuration jobConfiguration;
- /** The classloader for the user code. Needed for calls into user code classes */
- private ClassLoader userClassLoader;
-
/** All job vertices that are part of this graph */
private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
@@ -123,8 +130,11 @@ public class ExecutionGraph implements Serializable {
* inside the BlobService and are referenced via the BLOB keys. */
private final List<BlobKey> requiredJarFiles;
+ /** Listeners that receive messages when the entire job switches it status (such as from
+ * RUNNING to FINISHED) */
private final List<ActorRef> jobStatusListenerActors;
+ /** Listeners that receive messages whenever a single task execution changes its status */
private final List<ActorRef> executionListenerActors;
/** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when
@@ -133,10 +143,6 @@ public class ExecutionGraph implements Serializable {
* at {@code stateTimestamps[RUNNING.ordinal()]}. */
private final long[] stateTimestamps;
- /** The lock used to secure all access to mutable fields, especially the tracking of progress
- * within the job. */
- private final Object progressLock = new Object();
-
/** The timeout for all messages that require a response/acknowledgement */
private final FiniteDuration timeout;
@@ -158,8 +164,11 @@ public class ExecutionGraph implements Serializable {
* from results than need to be materialized. */
private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
+ /** Flag that indicate whether the executed dataflow should be periodically snapshotted */
+ private boolean snapshotCheckpointsEnabled;
+
- // ------ Execution status and progress -------
+ // ------ Execution status and progress. These values are volatile, and accessed under the lock -------
/** Current status of the job execution */
private volatile JobStatus state = JobStatus.CREATED;
@@ -168,31 +177,36 @@ public class ExecutionGraph implements Serializable {
* that was not recoverable and triggered job failure */
private volatile Throwable failureCause;
- /** The scheduler to use for scheduling new tasks as they are needed */
- private Scheduler scheduler;
-
/** The position of the vertex that is next expected to finish.
* This is an index into the "verticesInCreationOrder" collection.
* Once this value has reached the number of vertices, the job is done. */
- private int nextVertexToFinish;
-
-
-
- private ActorContext parentContext;
-
- private ActorRef stateCheckpointerActor;
-
- private boolean checkpointingEnabled;
+ private volatile int nextVertexToFinish;
+
+
+ // ------ Fields that are relevant to the execution and need to be cleared before archiving -------
- private long checkpointingInterval = 5000;
+ /** The scheduler to use for scheduling new tasks as they are needed */
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ private Scheduler scheduler;
- public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) {
- this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>());
- }
+ /** The classloader for the user code. Needed for calls into user code classes */
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ private ClassLoader userClassLoader;
+
+ /** The coordinator for checkpoints, if snapshot checkpoints are enabled */
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ private CheckpointCoordinator checkpointCoordinator;
+
+
+ // --------------------------------------------------------------------------------------------
+ // Constructors
+ // --------------------------------------------------------------------------------------------
- public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig,
- FiniteDuration timeout, List<BlobKey> requiredJarFiles) {
- this(jobId, jobName, jobConfig, timeout, requiredJarFiles, Thread.currentThread().getContextClassLoader());
+ /**
+ * This constructor is for tests only, because it does not include class loading information.
+ */
+ ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) {
+ this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>(), ExecutionGraph.class.getClassLoader());
}
public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout,
@@ -224,18 +238,8 @@ public class ExecutionGraph implements Serializable {
}
// --------------------------------------------------------------------------------------------
-
- public void setStateCheckpointerActor(ActorRef stateCheckpointerActor) {
- this.stateCheckpointerActor = stateCheckpointerActor;
- }
-
- public ActorRef getStateCheckpointerActor() {
- return stateCheckpointerActor;
- }
-
- public void setParentContext(ActorContext parentContext) {
- this.parentContext = parentContext;
- }
+ // Configuration of Data-flow wide execution settings
+ // --------------------------------------------------------------------------------------------
public void setNumberOfRetriesLeft(int numberOfRetriesLeft) {
if (numberOfRetriesLeft < -1) {
@@ -259,46 +263,97 @@ public class ExecutionGraph implements Serializable {
return delayBeforeRetrying;
}
- public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
- + "vertices and %d intermediate results.", topologiallySorted.size(), tasks.size(), intermediateResults.size()));
+ public boolean isQueuedSchedulingAllowed() {
+ return this.allowQueuedScheduling;
+ }
+
+ public void setQueuedSchedulingAllowed(boolean allowed) {
+ this.allowQueuedScheduling = allowed;
+ }
+
+ public void setScheduleMode(ScheduleMode scheduleMode) {
+ this.scheduleMode = scheduleMode;
+ }
+
+ public ScheduleMode getScheduleMode() {
+ return scheduleMode;
+ }
+
+ public void enableSnaphotCheckpointing(long interval, long checkpointTimeout,
+ List<ExecutionJobVertex> verticesToTrigger,
+ List<ExecutionJobVertex> verticesToWaitFor,
+ List<ExecutionJobVertex> verticesToCommitTo,
+ ActorSystem actorSystem)
+ {
+ // simple sanity checks
+ if (interval < 10 || checkpointTimeout < 10) {
+ throw new IllegalArgumentException();
+ }
+ if (state != JobStatus.CREATED) {
+ throw new IllegalStateException("Job must be in CREATED state");
}
+
+ ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
+ ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
+ ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
- final long createTimestamp = System.currentTimeMillis();
+ // disable to make sure existing checkpoint coordinators are cleared
+ disableSnaphotCheckpointing();
- for (AbstractJobVertex jobVertex : topologiallySorted) {
-
- // create the execution job vertex and attach it to the graph
- ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);
- ejv.connectToPredecessors(this.intermediateResults);
-
- ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
- if (previousTask != null) {
- throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
- jobVertex.getID(), ejv, previousTask));
- }
-
- for (IntermediateResult res : ejv.getProducedDataSets()) {
- IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
- if (previousDataSet != null) {
- throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
- res.getId(), res, previousDataSet));
- }
- }
-
- this.verticesInCreationOrder.add(ejv);
+ // create the coordinator that triggers and commits checkpoints and holds the state
+ snapshotCheckpointsEnabled = true;
+ checkpointCoordinator = new CheckpointCoordinator(jobID, NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN,
+ checkpointTimeout, tasksToTrigger, tasksToWaitFor, tasksToCommitTo);
+
+ // the periodic checkpoint scheduler is activated and deactivated as a result of
+ // job status changes (running -> on, all other states -> off)
+ registerJobStatusListener(checkpointCoordinator.createJobStatusListener(actorSystem, interval));
+ }
+
+ public void disableSnaphotCheckpointing() {
+ if (state != JobStatus.CREATED) {
+ throw new IllegalStateException("Job must be in CREATED state");
+ }
+
+ snapshotCheckpointsEnabled = false;
+ if (checkpointCoordinator != null) {
+ checkpointCoordinator.shutdown();
+ checkpointCoordinator = null;
}
}
+
+ public boolean isSnapshotCheckpointsEnabled() {
+ return snapshotCheckpointsEnabled;
+ }
- public void setCheckpointingEnabled(boolean checkpointingEnabled) {
- this.checkpointingEnabled = checkpointingEnabled;
+ public CheckpointCoordinator getCheckpointCoordinator() {
+ return checkpointCoordinator;
}
- public void setCheckpointingInterval(long checkpointingInterval) {
- this.checkpointingInterval = checkpointingInterval;
+ private ExecutionVertex[] collectExecutionVertices(List<ExecutionJobVertex> jobVertices) {
+ if (jobVertices.size() == 1) {
+ ExecutionJobVertex jv = jobVertices.get(0);
+ if (jv.getGraph() != this) {
+ throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
+ }
+ return jv.getTaskVertices();
+ }
+ else {
+ ArrayList<ExecutionVertex> all = new ArrayList<ExecutionVertex>();
+ for (ExecutionJobVertex jv : jobVertices) {
+ if (jv.getGraph() != this) {
+ throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
+ }
+ all.addAll(Arrays.asList(jv.getTaskVertices()));
+ }
+ return all.toArray(new ExecutionVertex[all.size()]);
+ }
}
+ // --------------------------------------------------------------------------------------------
+ // Properties and Status of the Execution Graph
+ // --------------------------------------------------------------------------------------------
+
/**
* Returns a list of BLOB keys referring to the JAR files required to run this job
* @return list of BLOB keys referring to the JAR files required to run this job
@@ -307,8 +362,6 @@ public class ExecutionGraph implements Serializable {
return this.requiredJarFiles;
}
- // --------------------------------------------------------------------------------------------
-
public Scheduler getScheduler() {
return scheduler;
}
@@ -396,26 +449,42 @@ public class ExecutionGraph implements Serializable {
return this.stateTimestamps[status.ordinal()];
}
- public boolean isQueuedSchedulingAllowed() {
- return this.allowQueuedScheduling;
- }
+ // --------------------------------------------------------------------------------------------
+ // Actions
+ // --------------------------------------------------------------------------------------------
- public void setQueuedSchedulingAllowed(boolean allowed) {
- this.allowQueuedScheduling = allowed;
- }
+ public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
+ + "vertices and %d intermediate results.", topologiallySorted.size(), tasks.size(), intermediateResults.size()));
+ }
- public void setScheduleMode(ScheduleMode scheduleMode) {
- this.scheduleMode = scheduleMode;
- }
+ final long createTimestamp = System.currentTimeMillis();
- public ScheduleMode getScheduleMode() {
- return scheduleMode;
- }
+ for (AbstractJobVertex jobVertex : topologiallySorted) {
- // --------------------------------------------------------------------------------------------
- // Actions
- // --------------------------------------------------------------------------------------------
+ // create the execution job vertex and attach it to the graph
+ ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);
+ ejv.connectToPredecessors(this.intermediateResults);
+ ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
+ if (previousTask != null) {
+ throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
+ jobVertex.getID(), ejv, previousTask));
+ }
+
+ for (IntermediateResult res : ejv.getProducedDataSets()) {
+ IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
+ if (previousDataSet != null) {
+ throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
+ res.getId(), res, previousDataSet));
+ }
+ }
+
+ this.verticesInCreationOrder.add(ejv);
+ }
+ }
+
public void scheduleForExecution(Scheduler scheduler) throws JobException {
if (scheduler == null) {
throw new IllegalArgumentException("Scheduler must not be null.");
@@ -431,32 +500,24 @@ public class ExecutionGraph implements Serializable {
switch (scheduleMode) {
case FROM_SOURCES:
- // initially, we simply take the ones without inputs.
- // next, we implement the logic to go back from vertices that need computation
- // to the ones we need to start running
+ // simply take the vertices without inputs.
for (ExecutionJobVertex ejv : this.tasks.values()) {
if (ejv.getJobVertex().isInputVertex()) {
ejv.scheduleAll(scheduler, allowQueuedScheduling);
}
}
-
break;
case ALL:
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
ejv.scheduleAll(scheduler, allowQueuedScheduling);
}
-
break;
case BACKTRACKING:
+ // go back from vertices that need computation to the ones we need to run
throw new JobException("BACKTRACKING is currently not supported as schedule mode.");
}
-
- if (checkpointingEnabled) {
- stateCheckpointerActor = StreamCheckpointCoordinator.spawn(parentContext, this,
- Duration.create(checkpointingInterval, TimeUnit.MILLISECONDS));
- }
}
else {
throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
@@ -508,6 +569,83 @@ public class ExecutionGraph implements Serializable {
}
}
+ public void restart() {
+ try {
+ if (state == JobStatus.FAILED) {
+ if (!transitionState(JobStatus.FAILED, JobStatus.RESTARTING)) {
+ throw new IllegalStateException("Execution Graph left the state FAILED while trying to restart.");
+ }
+ }
+
+ synchronized (progressLock) {
+ if (state != JobStatus.RESTARTING) {
+ throw new IllegalStateException("Can only restart job from state restarting.");
+ }
+ if (scheduler == null) {
+ throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
+ }
+
+ this.currentExecutions.clear();
+
+ for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
+ jv.resetForNewExecution();
+ }
+
+ for (int i = 0; i < stateTimestamps.length; i++) {
+ stateTimestamps[i] = 0;
+ }
+ nextVertexToFinish = 0;
+ transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
+
+ // if we have checkpointed state, reload it into the executions
+ if (checkpointCoordinator != null) {
+ checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
+ }
+ }
+
+ scheduleForExecution(scheduler);
+ }
+ catch (Throwable t) {
+ fail(t);
+ }
+ }
+
+ /**
+ * This method cleans fields that are irrelevant for the archived execution attempt.
+ */
+ public void prepareForArchiving() {
+ if (!state.isTerminalState()) {
+ throw new IllegalStateException("Can only archive the job from a terminal state");
+ }
+
+ // clear the non-serializable fields
+ userClassLoader = null;
+ scheduler = null;
+ checkpointCoordinator = null;
+
+ for (ExecutionJobVertex vertex : verticesInCreationOrder) {
+ vertex.prepareForArchiving();
+ }
+
+ intermediateResults.clear();
+ currentExecutions.clear();
+ requiredJarFiles.clear();
+ jobStatusListenerActors.clear();
+ executionListenerActors.clear();
+ }
+
+ /**
+ * For testing: This waits until the job execution has finished.
+ * @throws InterruptedException
+ */
+ public void waitUntilFinished() throws InterruptedException {
+ synchronized (progressLock) {
+ while (nextVertexToFinish < verticesInCreationOrder.size()) {
+ progressLock.wait();
+ }
+ }
+ }
+
private boolean transitionState(JobStatus current, JobStatus newState) {
return transitionState(current, newState, null);
}
@@ -551,24 +689,32 @@ public class ExecutionGraph implements Serializable {
if (nextPos == verticesInCreationOrder.size()) {
// we are done, transition to the final state
-
+ JobStatus current;
while (true) {
- JobStatus current = this.state;
- if (current == JobStatus.RUNNING && transitionState(current, JobStatus.FINISHED)) {
- break;
+ current = this.state;
+
+ if (current == JobStatus.RUNNING) {
+ if (transitionState(current, JobStatus.FINISHED)) {
+ postRunCleanup();
+ break;
+ }
}
- if (current == JobStatus.CANCELLING && transitionState(current, JobStatus.CANCELED)) {
- break;
+ else if (current == JobStatus.CANCELLING) {
+ if (transitionState(current, JobStatus.CANCELED)) {
+ postRunCleanup();
+ break;
+ }
}
- if (current == JobStatus.FAILING) {
+ else if (current == JobStatus.FAILING) {
if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) {
numberOfRetriesLeft--;
future(new Callable<Object>() {
@Override
public Object call() throws Exception {
- try{
+ try {
Thread.sleep(delayBeforeRetrying);
- }catch(InterruptedException e){
+ }
+ catch(InterruptedException e){
// should only happen on shutdown
}
restart();
@@ -578,13 +724,15 @@ public class ExecutionGraph implements Serializable {
break;
}
else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) {
+ postRunCleanup();
break;
}
}
- if (current == JobStatus.CANCELED || current == JobStatus.CREATED || current == JobStatus.FINISHED) {
+ else {
fail(new Exception("ExecutionGraph went into final state from state " + current));
}
}
+ // done transitioning the state
// also, notify waiters
progressLock.notifyAll();
@@ -592,6 +740,19 @@ public class ExecutionGraph implements Serializable {
}
}
}
+
+ private void postRunCleanup() {
+ try {
+ CheckpointCoordinator coord = this.checkpointCoordinator;
+ this.checkpointCoordinator = null;
+ if (coord != null) {
+ coord.shutdown();
+ }
+ }
+ catch (Exception e) {
+ LOG.error("Error while cleaning up after execution", e);
+ }
+ }
// --------------------------------------------------------------------------------------------
// Callbacks and Callback Utilities
@@ -623,13 +784,6 @@ public class ExecutionGraph implements Serializable {
return false;
}
}
-
- public void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , StateHandle> states) {
- synchronized (this.progressLock) {
- for (Map.Entry<Tuple3<JobVertexID, Integer, Long>, StateHandle> state : states.entrySet())
- tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
- }
- }
public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
@@ -670,21 +824,19 @@ public class ExecutionGraph implements Serializable {
// Listeners & Observers
// --------------------------------------------------------------------------------------------
- public void registerJobStatusListener(ActorRef listener){
- this.jobStatusListenerActors.add(listener);
- }
-
- public void registerExecutionListener(ActorRef listener){
- this.executionListenerActors.add(listener);
+ public void registerJobStatusListener(ActorRef listener) {
+ if (listener != null) {
+ this.jobStatusListenerActors.add(listener);
+ }
}
- public boolean containsJobStatusListener(ActorRef listener) {
- return this.jobStatusListenerActors.contains(listener);
+ public void registerExecutionListener(ActorRef listener) {
+ if (listener != null) {
+ this.executionListenerActors.add(listener);
+ }
}
-
- /**
- * NOTE: This method never throws an error, only logs errors caused by the notified listeners.
- */
+
+
private void notifyJobStatusChange(JobStatus newState, Throwable error) {
if (jobStatusListenerActors.size() > 0) {
ExecutionGraphMessages.JobStatusChanged message =
@@ -695,10 +847,7 @@ public class ExecutionGraph implements Serializable {
}
}
}
-
- /**
- * NOTE: This method never throws an error, only logs errors caused by the notified listeners.
- */
+
void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
newExecutionState, Throwable error)
{
@@ -722,65 +871,4 @@ public class ExecutionGraph implements Serializable {
fail(error);
}
}
-
- public void restart() {
- try {
- if (state == JobStatus.FAILED) {
- if (!transitionState(JobStatus.FAILED, JobStatus.RESTARTING)) {
- throw new IllegalStateException("Execution Graph left the state FAILED while trying to restart.");
- }
- }
-
- synchronized (progressLock) {
- if (state != JobStatus.RESTARTING) {
- throw new IllegalStateException("Can only restart job from state restarting.");
- }
- if (scheduler == null) {
- throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
- }
-
- this.currentExecutions.clear();
-
- for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
- jv.resetForNewExecution();
- }
-
- for (int i = 0; i < stateTimestamps.length; i++) {
- stateTimestamps[i] = 0;
- }
- nextVertexToFinish = 0;
- transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
- }
-
- scheduleForExecution(scheduler);
- }
- catch (Throwable t) {
- fail(t);
- }
- }
-
- /**
- * This method cleans fields that are irrelevant for the archived execution attempt.
- */
- public void prepareForArchiving() {
- if (!state.isTerminalState()) {
- throw new IllegalStateException("Can only archive the job from a terminal state");
- }
-
- userClassLoader = null;
-
- for (ExecutionJobVertex vertex : verticesInCreationOrder) {
- vertex.prepareForArchiving();
- }
-
- intermediateResults.clear();
- currentExecutions.clear();
- requiredJarFiles.clear();
- jobStatusListenerActors.clear();
- executionListenerActors.clear();
-
- scheduler = null;
- parentContext = null;
- stateCheckpointerActor = null;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index acbc17a..59b3bb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.util.SerializableObject;
import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
@@ -52,7 +53,7 @@ public class ExecutionJobVertex implements Serializable {
/** Use the same log for all ExecutionGraph classes */
private static final Logger LOG = ExecutionGraph.LOG;
- private final Object stateMonitor = new Object();
+ private final SerializableObject stateMonitor = new SerializableObject();
private final ExecutionGraph graph;
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a44fc6f..2ad3a55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import akka.actor.ActorRef;
+
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -41,8 +42,11 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+
import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
import org.slf4j.Logger;
+
import scala.concurrent.duration.FiniteDuration;
import java.io.Serializable;
@@ -95,8 +99,6 @@ public class ExecutionVertex implements Serializable {
private volatile boolean scheduleLocalOnly;
- private StateHandle operatorState;
-
// --------------------------------------------------------------------------------------------
public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
@@ -212,14 +214,6 @@ public class ExecutionVertex implements Serializable {
public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
return currentExecution.getAssignedResourceLocation();
}
-
- public void setOperatorState(StateHandle operatorState) {
- this.operatorState = operatorState;
- }
-
- public StateHandle getOperatorState() {
- return operatorState;
- }
public ExecutionGraph getExecutionGraph() {
return this.jobVertex.getGraph();
@@ -421,11 +415,6 @@ public class ExecutionVertex implements Serializable {
if (grp != null) {
this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
}
-
- if (operatorState != null) {
- execution.setOperatorState(operatorState);
- }
-
}
else {
throw new IllegalStateException("Cannot reset a vertex that is in state " + state);
@@ -524,6 +513,7 @@ public class ExecutionVertex implements Serializable {
// clear the unnecessary fields in this class
this.resultPartitions = null;
this.inputEdges = null;
+ this.locationConstraint = null;
this.locationConstraintInstances = null;
}
@@ -588,10 +578,13 @@ public class ExecutionVertex implements Serializable {
/**
* Creates a task deployment descriptor to deploy a subtask to the given target slot.
+ *
+ * TODO: This should actually be in the EXECUTION
*/
TaskDeploymentDescriptor createDeploymentDescriptor(
ExecutionAttemptID executionId,
- SimpleSlot targetSlot) {
+ SimpleSlot targetSlot,
+ SerializedValue<StateHandle<?>> operatorState) {
// Produced intermediate results
List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<ResultPartitionDeploymentDescriptor>(resultPartitions.size());
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 6d895f9..28fa78e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -38,9 +38,18 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
/**
- * A job graph represents an entire Flink runtime job.
+ * The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts.
+ * All programs from higher level APIs are transformed into JobGraphs.
+ *
+ * <p>The JobGraph is a graph of vertices and intermediate results that are connected together to
+ * form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph
+ * but inside certain special vertices that establish the feedback channel amongst themselves.</p>
+ *
+ * <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate result
+ * define the characteristics of the concrete operation and intermediate data.</p>
*/
public class JobGraph implements Serializable {
@@ -74,11 +83,12 @@ public class JobGraph implements Serializable {
/** flag to enable queued scheduling */
private boolean allowQueuedScheduling;
+ /** The mode in which the job is scheduled */
private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
- private boolean checkpointingEnabled = false;
+ /** The settings for asynchronous snapshotting */
+ private JobSnapshottingSettings snapshotSettings;
- private long checkpointingInterval = 10000;
// --------------------------------------------------------------------------------------------
@@ -258,20 +268,24 @@ public class JobGraph implements Serializable {
return this.taskVertices.size();
}
- public void setCheckpointingEnabled(boolean checkpointingEnabled) {
- this.checkpointingEnabled = checkpointingEnabled;
- }
-
- public boolean isCheckpointingEnabled() {
- return checkpointingEnabled;
- }
-
- public void setCheckpointingInterval(long checkpointingInterval) {
- this.checkpointingInterval = checkpointingInterval;
+ /**
+ * Sets the settings for asynchronous snapshots. A value of {@code null} means that
+ * snapshotting is not enabled.
+ *
+ * @param settings The snapshot settings, or null, to disable snapshotting.
+ */
+ public void setSnapshotSettings(JobSnapshottingSettings settings) {
+ this.snapshotSettings = settings;
}
- public long getCheckpointingInterval() {
- return checkpointingInterval;
+ /**
+ * Gets the settings for asynchronous snapshots. This method returns null, when
+ * snapshotting is not enabled.
+ *
+ * @return The snapshot settings, or null, if snapshotting is not enabled.
+ */
+ public JobSnapshottingSettings getSnapshotSettings() {
+ return snapshotSettings;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 445c842..1cf5db2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -40,7 +40,7 @@ public abstract class AbstractInvokable {
/** The environment assigned to this invokable. */
- private volatile Environment environment;
+ private Environment environment;
/** The execution config, cached from the deserialization from the JobConfiguration */
private ExecutionConfig executionConfig;
@@ -66,14 +66,14 @@ public abstract class AbstractInvokable {
* @param environment
* the environment of this task
*/
- public final void setEnvironment(final Environment environment) {
+ public final void setEnvironment(Environment environment) {
this.environment = environment;
}
/**
* Returns the environment of this task.
*
- * @return the environment of this task or <code>null</code> if the environment has not yet been set
+ * @return The environment of this task.
*/
public Environment getEnvironment() {
return this.environment;
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
new file mode 100644
index 0000000..69cb1f8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+public interface CheckpointCommittingOperator {
+
+ void confirmCheckpoint(long checkpointId, long timestamp);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
new file mode 100644
index 0000000..d07b07e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+public interface CheckpointedOperator {
+
+ void triggerCheckpoint(long checkpointId, long timestamp);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
new file mode 100644
index 0000000..86c9b60
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.List;
+
+/**
+ * The JobSnapshottingSettings are attached to a JobGraph and describe the settings
+ * for the asynchronous snapshotting of the JobGraph, such as interval, and which vertices
+ * need to participate.
+ */
+public class JobSnapshottingSettings implements java.io.Serializable{
+
+ private static final long serialVersionUID = -2593319571078198180L;
+
+ /** The default time in which pending checkpoints need to be acknowledged before timing out */
+ public static final long DEFAULT_SNAPSHOT_TIMEOUT = 10 * 60 * 1000; // 10 minutes
+
+ private final List<JobVertexID> verticesToTrigger;
+
+ private final List<JobVertexID> verticesToAcknowledge;
+
+ private final List<JobVertexID> verticesToConfirm;
+
+ private final long checkpointInterval;
+
+ private final long checkpointTimeout;
+
+
+ public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger,
+ List<JobVertexID> verticesToAcknowledge,
+ List<JobVertexID> verticesToConfirm,
+ long checkpointInterval)
+ {
+ this(verticesToTrigger, verticesToAcknowledge, verticesToConfirm, checkpointInterval, DEFAULT_SNAPSHOT_TIMEOUT);
+ }
+
+ public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger,
+ List<JobVertexID> verticesToAcknowledge,
+ List<JobVertexID> verticesToConfirm,
+ long checkpointInterval, long checkpointTimeout)
+ {
+ this.verticesToTrigger = verticesToTrigger;
+ this.verticesToAcknowledge = verticesToAcknowledge;
+ this.verticesToConfirm = verticesToConfirm;
+ this.checkpointInterval = checkpointInterval;
+ this.checkpointTimeout = checkpointTimeout;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public List<JobVertexID> getVerticesToTrigger() {
+ return verticesToTrigger;
+ }
+
+ public List<JobVertexID> getVerticesToAcknowledge() {
+ return verticesToAcknowledge;
+ }
+
+ public List<JobVertexID> getVerticesToConfirm() {
+ return verticesToConfirm;
+ }
+
+ public long getCheckpointInterval() {
+ return checkpointInterval;
+ }
+
+ public long getCheckpointTimeout() {
+ return checkpointTimeout;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return String.format("SnapshotSettings: interval=%d, timeout=%d, trigger=%s, ack=%s, commit=%s",
+ checkpointInterval, checkpointTimeout, verticesToTrigger, verticesToAcknowledge, verticesToConfirm);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
index 670dc3f..576edb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
@@ -21,12 +21,18 @@ package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.runtime.state.StateHandle;
/**
- * This is an interface meant to be implemented by any invokable that has to support state recovery.
- * It is mainly used by the TaskManager to identify operators that support state recovery in order
- * to inject their initial state upon creation.
+ * This interface must be implemented by any invokable that has recoverable state.
+ * The method {@link #setInitialState(org.apache.flink.runtime.state.StateHandle)} is used
+ * to set the initial state of the operator, upon recovery.
*/
-public interface OperatorStateCarrier {
+public interface OperatorStateCarrier<T extends StateHandle<?>> {
- public void injectState(StateHandle stateHandle);
+ /**
+ * Sets the initial state of the operator, upon recovery. The initial state is typically
+ * a snapshot of the state from a previous execution.
+ *
+ * @param stateHandle The handle to the state.
+ */
+ public void setInitialState(T stateHandle);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java
deleted file mode 100644
index 0493ba6..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.checkpoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-
-/**
- * This message is sent from the {@link org.apache.flink.runtime.jobmanager.JobManager} to the
- * {@link org.apache.flink.runtime.taskmanager.TaskManager} to tell a task that the checkpoint
- * has been confirmed and that the task can commit the checkpoint to the outside world.
- */
-public class AbortCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
-
- private static final long serialVersionUID = 2094094662279578953L;
-
- public AbortCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
- super(job, taskExecutionId, checkpointId);
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public boolean equals(Object o) {
- return this == o || ( (o instanceof AbortCheckpoint) && super.equals(o));
- }
-
- @Override
- public String toString() {
- return String.format("AbortCheckpoint %d for (%s/%s)",
- getCheckpointId(), getJob(), getTaskExecutionId());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index dd94e37..db12e0a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.messages.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
/**
* This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
@@ -33,18 +34,19 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
private static final long serialVersionUID = -7606214777192401493L;
- private final StateHandle state;
+ private final SerializedValue<StateHandle<?>> state;
public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
this(job, taskExecutionId, checkpointId, null);
}
- public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, StateHandle state) {
+ public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId,
+ SerializedValue<StateHandle<?>> state) {
super(job, taskExecutionId, checkpointId);
this.state = state;
}
- public StateHandle getState() {
+ public SerializedValue<StateHandle<?>> getState() {
return state;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
index cdfd202..d3a4374 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
@@ -30,15 +30,39 @@ public class ConfirmCheckpoint extends AbstractCheckpointMessage implements java
private static final long serialVersionUID = 2094094662279578953L;
- public ConfirmCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+ /** The timestamp associated with the checkpoint */
+ private final long timestamp;
+
+ public ConfirmCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
super(job, taskExecutionId, checkpointId);
+ this.timestamp = timestamp;
}
// --------------------------------------------------------------------------------------------
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32));
+ }
+
@Override
public boolean equals(Object o) {
- return this == o || ( (o instanceof ConfirmCheckpoint) && super.equals(o));
+ if (this == o) {
+ return true;
+ }
+ else if (o instanceof ConfirmCheckpoint) {
+ ConfirmCheckpoint that = (ConfirmCheckpoint) o;
+ return this.timestamp == that.timestamp && super.equals(o);
+ }
+ else {
+ return false;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index 98712b5..f47b054 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -18,36 +18,23 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
import java.util.Map;
/**
- * A StateHandle that includes a copy of the state itself. This state handle is recommended for
- * cases where the operatorState is lightweight enough to pass throughout the network.
- *
- * State is kept in a byte[] because it may contain userclasses, which akka is not able to handle.
+ * A StateHandle that includes a map of operator states directly.
*/
-public class LocalStateHandle implements StateHandle{
+public class LocalStateHandle implements StateHandle<Map<String, OperatorState<?>>> {
+
+ private static final long serialVersionUID = 2093619217898039610L;
- transient private Map<String, OperatorState<?>> stateMap;
- private final byte[] state;
+ private final Map<String, OperatorState<?>> stateMap;
- public LocalStateHandle(Map<String,OperatorState<?>> state) throws IOException {
+ public LocalStateHandle(Map<String,OperatorState<?>> state) {
this.stateMap = state;
- this.state = InstantiationUtil.serializeObject(state);
}
@Override
- public Map<String,OperatorState<?>> getState(ClassLoader usercodeClassloader) {
- if(stateMap == null) {
- try {
- stateMap = (Map<String, OperatorState<?>>) InstantiationUtil.deserializeObject(this.state, usercodeClassloader);
- } catch (Exception e) {
- throw new RuntimeException("Error while deserializing the state", e);
- }
- }
+ public Map<String,OperatorState<?>> getState() {
return stateMap;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
index 1852ce8..409383c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
@@ -19,20 +19,19 @@
package org.apache.flink.runtime.state;
import java.io.Serializable;
-import java.util.Map;
/**
* StateHandle is a general handle interface meant to abstract operator state fetching.
* A StateHandle implementation can for example include the state itself in cases where the state
* is lightweight or fetching it lazily from some external storage when the state is too large.
*/
-public interface StateHandle extends Serializable {
+public interface StateHandle<T> extends Serializable {
/**
- * getState should retrieve and return the state managed the handle.
+ * This retrieves and return the state represented by the handle.
*
- * @return
+ * @return The state represented by the handle.
+ * @throws java.lang.Exception Thrown, if the state cannot be fetched.
*/
- public Map<String,OperatorState<?>> getState(ClassLoader userClassloader);
-
+ T getState() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
new file mode 100644
index 0000000..2cdfef3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
+
+/**
+ * A collection of utility methods for dealing with operator state.
+ */
+public class StateUtils {
+
+ /**
+ * Utility method to define a common generic bound to be used for setting a generic state
+ * handle on a generic state carrier.
+ *
+ * This has no impact on runtime, since internally, it performs
+ * unchecked casts. The purpose is merely to allow the use of generic interfaces without resorting
+ * to raw types, by giving the compiler a common type bound.
+ *
+ * @param op The state carrier operator.
+ * @param state The state handle.
+ * @param <T> Type bound for the
+ */
+ public static <T extends StateHandle<?>> void setOperatorState(OperatorStateCarrier<?> op, StateHandle<?> state) {
+ @SuppressWarnings("unchecked")
+ OperatorStateCarrier<T> typedOp = (OperatorStateCarrier<T>) op;
+ @SuppressWarnings("unchecked")
+ T typedHandle = (T) state;
+
+ typedOp.setInitialState(typedHandle);
+ }
+
+
+ // ------------------------------------------------------------------------
+
+ /** Do not instantiate */
+ private StateUtils() {}
+}