You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:58:57 UTC
[06/47] flink git commit: [FLINK-2354] [runtime] Add job graph and
checkpoint recovery
[FLINK-2354] [runtime] Add job graph and checkpoint recovery
This closes #1153.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73c73e92
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73c73e92
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73c73e92
Branch: refs/heads/master
Commit: 73c73e92750ab8fb068d0a3cb37afcb642084fc0
Parents: 3aaee1e
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Sep 1 17:25:46 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 20 00:16:51 2015 +0200
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 21 +
.../checkpoint/CheckpointCoordinator.java | 223 +++---
.../runtime/checkpoint/CheckpointIDCounter.java | 43 ++
.../checkpoint/CheckpointRecoveryFactory.java | 61 ++
.../runtime/checkpoint/CompletedCheckpoint.java | 81 +++
.../checkpoint/CompletedCheckpointStore.java | 69 ++
.../runtime/checkpoint/PendingCheckpoint.java | 6 +-
.../StandaloneCheckpointIDCounter.java | 47 ++
.../StandaloneCheckpointRecoveryFactory.java | 52 ++
.../StandaloneCompletedCheckpointStore.java | 100 +++
.../flink/runtime/checkpoint/StateForTask.java | 21 +-
.../checkpoint/SuccessfulCheckpoint.java | 82 ---
.../ZooKeeperCheckpointIDCounter.java | 130 ++++
.../ZooKeeperCheckpointRecoveryFactory.java | 66 ++
.../ZooKeeperCompletedCheckpointStore.java | 293 ++++++++
.../runtime/executiongraph/ExecutionGraph.java | 51 +-
.../apache/flink/runtime/jobgraph/JobGraph.java | 5 +
.../flink/runtime/jobmanager/RecoveryMode.java | 13 +
.../StandaloneSubmittedJobGraphStore.java | 65 ++
.../runtime/jobmanager/SubmittedJobGraph.java | 77 ++
.../jobmanager/SubmittedJobGraphStore.java | 93 +++
.../ZooKeeperSubmittedJobGraphStore.java | 379 ++++++++++
.../leaderelection/LeaderElectionService.java | 1 +
.../ZooKeeperLeaderElectionService.java | 1 +
.../flink/runtime/state/StateBackend.java | 39 +
.../state/StateHandleProviderFactory.java | 61 ++
.../flink/runtime/util/LeaderElectionUtils.java | 67 --
.../flink/runtime/util/ZooKeeperUtils.java | 138 +++-
.../zookeeper/ZooKeeperStateHandleStore.java | 384 ++++++++++
.../flink/runtime/jobmanager/JobInfo.scala | 25 +-
.../flink/runtime/jobmanager/JobManager.scala | 594 ++++++++++-----
.../runtime/messages/JobManagerMessages.scala | 16 +
.../runtime/minicluster/FlinkMiniCluster.scala | 4 +-
.../checkpoint/CheckpointCoordinatorTest.java | 190 ++---
.../checkpoint/CheckpointIDCounterTest.java | 194 +++++
.../checkpoint/CheckpointStateRestoreTest.java | 19 +-
.../CompletedCheckpointStoreTest.java | 297 ++++++++
.../StandaloneCompletedCheckpointStoreTest.java | 33 +
...ZooKeeperCompletedCheckpointStoreITCase.java | 101 +++
.../BlobLibraryCacheManagerTest.java | 4 +
.../PartitionRequestClientFactoryTest.java | 2 +-
...ManagerSubmittedJobGraphsRecoveryITCase.java | 460 ++++++++++++
.../StandaloneSubmittedJobGraphStoreTest.java | 53 ++
.../ZooKeeperSubmittedJobGraphsStoreITCase.java | 283 ++++++++
.../JobManagerLeaderElectionTest.java | 27 +-
.../TestingLeaderElectionService.java | 4 +
.../ZooKeeperLeaderRetrievalTest.java | 21 +-
.../messages/CheckpointMessagesTest.java | 2 +-
.../runtime/taskmanager/TaskCancelTest.java | 67 +-
.../runtime/testutils/CommonTestUtils.java | 75 +-
.../testutils/JobManagerActorTestUtils.java | 166 +++++
.../runtime/testutils/JobManagerProcess.java | 226 ++++++
.../runtime/testutils/TaskManagerProcess.java | 133 ++++
.../flink/runtime/testutils/TestJvmProcess.java | 267 +++++++
.../runtime/testutils/ZooKeeperTestUtils.java | 94 +++
.../ZooKeeperStateHandleStoreITCase.java | 591 +++++++++++++++
.../zookeeper/ZooKeeperTestEnvironment.java | 133 ++++
.../ExecutionGraphRestartTest.scala | 21 +-
.../runtime/testingUtils/TestingCluster.scala | 44 +-
.../testingUtils/TestingJobManager.scala | 32 +-
.../streaming/runtime/tasks/StreamTask.java | 2 +-
.../test/util/ForkableFlinkMiniCluster.scala | 36 +-
.../checkpointing/StateCheckpoinedITCase.java | 391 ----------
.../checkpointing/StateCheckpointedITCase.java | 391 ++++++++++
...tJobManagerProcessFailureRecoveryITCase.java | 289 ++++++++
.../AbstractProcessFailureRecoveryTest.java | 444 ------------
...ctTaskManagerProcessFailureRecoveryTest.java | 397 +++++++++++
.../flink/test/recovery/ChaosMonkeyITCase.java | 713 +++++++++++++++++++
.../JobManagerCheckpointRecoveryITCase.java | 395 ++++++++++
...anagerProcessFailureBatchRecoveryITCase.java | 140 ++++
.../ProcessFailureBatchRecoveryITCase.java | 115 ---
.../recovery/ProcessFailureCancelingITCase.java | 4 +-
.../ProcessFailureStreamingRecoveryITCase.java | 234 ------
...anagerProcessFailureBatchRecoveryITCase.java | 115 +++
...erProcessFailureStreamingRecoveryITCase.java | 234 ++++++
.../ZooKeeperLeaderElectionITCase.java | 61 +-
.../org/apache/flink/yarn/YarnJobManager.scala | 11 +-
77 files changed, 8901 insertions(+), 1918 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 5d6f1c7..be730a0 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -407,6 +407,12 @@ public final class ConfigConstants {
*/
public static final String STATE_BACKEND = "state.backend";
+ /**
+ * File system state backend base path for recoverable state handles. Recovery state is written
+ * to this path and the file state handles are persisted for recovery.
+ */
+ public static final String STATE_BACKEND_FS_RECOVERY_PATH = "state.backend.fs.dir.recovery";
+
// ----------------------------- Miscellaneous ----------------------------
/**
@@ -433,6 +439,15 @@ public final class ConfigConstants {
public static final String ZOOKEEPER_LEADER_PATH = "ha.zookeeper.dir.leader";
+ /** ZooKeeper root path (ZNode) for job graphs. */
+ public static final String ZOOKEEPER_JOBGRAPHS_PATH = "ha.zookeeper.dir.jobgraphs";
+
+ /** ZooKeeper root path (ZNode) for completed checkpoints. */
+ public static final String ZOOKEEPER_CHECKPOINTS_PATH = "ha.zookeeper.dir.checkpoints";
+
+ /** ZooKeeper root path (ZNode) for checkpoint counters. */
+ public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "ha.zookeeper.dir.checkpoint-counter";
+
public static final String ZOOKEEPER_SESSION_TIMEOUT = "ha.zookeeper.client.session-timeout";
public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "ha.zookeeper.client.connection-timeout";
@@ -699,6 +714,12 @@ public final class ConfigConstants {
public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader";
+ public static final String DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH = "/jobgraphs";
+
+ public static final String DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH = "/checkpoints";
+
+ public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter";
+
public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 60000;
public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT = 15000;
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 8f0b19b..fdb59d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,16 +21,16 @@ package org.apache.flink.runtime.checkpoint;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
-
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
@@ -38,7 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -48,13 +47,19 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
/**
* The checkpoint coordinator coordinates the distributed snapshots of operators and state.
* It triggers the checkpoint by sending the messages to the relevant tasks and collects the
* checkpoint acknowledgements. It also collects and maintains the overview of the state handles
* reported by the tasks that acknowledge the checkpoint.
+ *
+ * <p>Depending on the configured {@link RecoveryMode}, the behaviour of the {@link
+ * CompletedCheckpointStore} and {@link CheckpointIDCounter} change. The default standalone
+ * implementations don't support any recovery.
*/
public class CheckpointCoordinator {
@@ -79,12 +84,20 @@ public class CheckpointCoordinator {
private final ExecutionVertex[] tasksToCommitTo;
private final Map<Long, PendingCheckpoint> pendingCheckpoints;
-
- private final ArrayDeque<SuccessfulCheckpoint> completedCheckpoints;
+
+ /**
+ * Completed checkpoints. Implementations can be blocking. Make sure calls to methods
+ * accessing this don't block the job manager actor and run asynchronously.
+ */
+ private final CompletedCheckpointStore completedCheckpointStore;
private final ArrayDeque<Long> recentPendingCheckpoints;
- private final AtomicLong checkpointIdCounter = new AtomicLong(1);
+ /**
+ * Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these
+ * need to be ascending across job managers.
+ */
+ private final CheckpointIDCounter checkpointIdCounter;
private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger();
@@ -93,8 +106,6 @@ public class CheckpointCoordinator {
private final long checkpointTimeout;
- private final int numSuccessfulCheckpointsToRetain;
-
private TimerTask periodicScheduler;
private ActorGateway jobStatusListener;
@@ -110,61 +121,62 @@ public class CheckpointCoordinator {
public CheckpointCoordinator(
JobID job,
- int numSuccessfulCheckpointsToRetain,
long checkpointTimeout,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
- ClassLoader userClassLoader) {
+ ClassLoader userClassLoader,
+ CheckpointIDCounter checkpointIDCounter,
+ CompletedCheckpointStore completedCheckpointStore,
+ RecoveryMode recoveryMode) throws Exception {
- // some sanity checks
- if (job == null || tasksToTrigger == null ||
- tasksToWaitFor == null || tasksToCommitTo == null) {
- throw new NullPointerException();
- }
- if (numSuccessfulCheckpointsToRetain < 1) {
- throw new IllegalArgumentException("Must retain at least one successful checkpoint");
- }
- if (checkpointTimeout < 1) {
- throw new IllegalArgumentException("Checkpoint timeout must be larger than zero");
- }
+ // Sanity check
+ checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero");
- this.job = job;
- this.numSuccessfulCheckpointsToRetain = numSuccessfulCheckpointsToRetain;
+ this.job = checkNotNull(job);
this.checkpointTimeout = checkpointTimeout;
- this.tasksToTrigger = tasksToTrigger;
- this.tasksToWaitFor = tasksToWaitFor;
- this.tasksToCommitTo = tasksToCommitTo;
+ this.tasksToTrigger = checkNotNull(tasksToTrigger);
+ this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
+ this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
this.pendingCheckpoints = new LinkedHashMap<Long, PendingCheckpoint>();
- this.completedCheckpoints = new ArrayDeque<SuccessfulCheckpoint>(numSuccessfulCheckpointsToRetain + 1);
+ this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
this.userClassLoader = userClassLoader;
+ this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
+ checkpointIDCounter.start();
- timer = new Timer("Checkpoint Timer", true);
+ this.timer = new Timer("Checkpoint Timer", true);
- // Add shutdown hook to clean up state handles
- shutdownHook = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- CheckpointCoordinator.this.shutdown();
- }
- catch (Throwable t) {
- LOG.error("Error during shutdown of blob service via JVM shutdown hook: " +
- t.getMessage(), t);
+ if (recoveryMode == RecoveryMode.STANDALONE) {
+ // Add shutdown hook to clean up state handles when no checkpoint recovery is
+ // possible. In case of another configured recovery mode, the checkpoints need to be
+ // available for the standby job managers.
+ this.shutdownHook = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ CheckpointCoordinator.this.shutdown();
+ }
+ catch (Throwable t) {
+ LOG.error("Error during shutdown of checkpoint coordniator via " +
+ "JVM shutdown hook: " + t.getMessage(), t);
+ }
}
- }
- });
+ });
- try {
- // Add JVM shutdown hook to call shutdown of service
- Runtime.getRuntime().addShutdownHook(shutdownHook);
- }
- catch (IllegalStateException ignored) {
- // JVM is already shutting down. No need to do anything.
+ try {
+ // Add JVM shutdown hook to call shutdown of service
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ }
+ catch (IllegalStateException ignored) {
+ // JVM is already shutting down. No need to do anything.
+ }
+ catch (Throwable t) {
+ LOG.error("Cannot register checkpoint coordinator shutdown hook.", t);
+ }
}
- catch (Throwable t) {
- LOG.error("Cannot register checkpoint coordinator shutdown hook.", t);
+ else {
+ this.shutdownHook = null;
}
}
@@ -178,41 +190,39 @@ public class CheckpointCoordinator {
* After this method has been called, the coordinator does not accept and further
* messages and cannot trigger any further checkpoints.
*/
- public void shutdown() {
+ public void shutdown() throws Exception {
synchronized (lock) {
- try {
- if (shutdown) {
- return;
- }
- shutdown = true;
- LOG.info("Stopping checkpoint coordinator for job " + job);
-
- // shut down the thread that handles the timeouts
- timer.cancel();
-
- // make sure that the actor does not linger
- if (jobStatusListener != null) {
- jobStatusListener.tell(PoisonPill.getInstance());
- jobStatusListener = null;
- }
-
- // the scheduling thread needs also to go away
- if (periodicScheduler != null) {
- periodicScheduler.cancel();
- periodicScheduler = null;
- }
-
- // clear and discard all pending checkpoints
- for (PendingCheckpoint pending : pendingCheckpoints.values()) {
- pending.discard(userClassLoader, true);
- }
- pendingCheckpoints.clear();
-
- // clean and discard all successful checkpoints
- for (SuccessfulCheckpoint checkpoint : completedCheckpoints) {
- checkpoint.discard(userClassLoader);
+ try {
+ if (!shutdown) {
+ shutdown = true;
+ LOG.info("Stopping checkpoint coordinator for job " + job);
+
+ // shut down the thread that handles the timeouts
+ timer.cancel();
+
+ // make sure that the actor does not linger
+ if (jobStatusListener != null) {
+ jobStatusListener.tell(PoisonPill.getInstance());
+ jobStatusListener = null;
+ }
+
+ // the scheduling thread needs also to go away
+ if (periodicScheduler != null) {
+ periodicScheduler.cancel();
+ periodicScheduler = null;
+ }
+
+ checkpointIdCounter.stop();
+
+ // clear and discard all pending checkpoints
+ for (PendingCheckpoint pending : pendingCheckpoints.values()) {
+ pending.discard(userClassLoader, true);
+ }
+ pendingCheckpoints.clear();
+
+ // clean and discard all successful checkpoints
+ completedCheckpointStore.discardAllCheckpoints();
}
- completedCheckpoints.clear();
}
finally {
// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
@@ -244,7 +254,7 @@ public class CheckpointCoordinator {
* Triggers a new checkpoint and uses the current system time as the
* checkpoint time.
*/
- public void triggerCheckpoint() {
+ public void triggerCheckpoint() throws Exception {
triggerCheckpoint(System.currentTimeMillis());
}
@@ -254,7 +264,7 @@ public class CheckpointCoordinator {
*
* @param timestamp The timestamp for the checkpoint.
*/
- public boolean triggerCheckpoint(final long timestamp) {
+ public boolean triggerCheckpoint(final long timestamp) throws Exception {
if (shutdown) {
LOG.error("Cannot trigger checkpoint, checkpoint coordinator has been shutdown.");
return false;
@@ -354,7 +364,7 @@ public class CheckpointCoordinator {
}
}
- public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message) {
+ public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception {
if (shutdown || message == null) {
return;
}
@@ -365,7 +375,7 @@ public class CheckpointCoordinator {
final long checkpointId = message.getCheckpointId();
- SuccessfulCheckpoint completed = null;
+ CompletedCheckpoint completed = null;
PendingCheckpoint checkpoint;
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
@@ -380,13 +390,13 @@ public class CheckpointCoordinator {
if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState())) {
if (checkpoint.isFullyAcknowledged()) {
+ completed = checkpoint.toCompletedCheckpoint();
+
+ completedCheckpointStore.addCheckpoint(completed);
+
LOG.info("Completed checkpoint " + checkpointId);
+ LOG.debug(completed.getStates().toString());
- completed = checkpoint.toCompletedCheckpoint();
- completedCheckpoints.addLast(completed);
- if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) {
- completedCheckpoints.removeFirst().discard(userClassLoader);
- }
pendingCheckpoints.remove(checkpointId);
rememberRecentCheckpointId(checkpointId);
@@ -456,25 +466,30 @@ public class CheckpointCoordinator {
// Checkpoint State Restoring
// --------------------------------------------------------------------------------------------
- public void restoreLatestCheckpointedState(Map<JobVertexID, ExecutionJobVertex> tasks,
- boolean errorIfNoCheckpoint,
- boolean allOrNothingState) throws Exception {
+ public void restoreLatestCheckpointedState(
+ Map<JobVertexID, ExecutionJobVertex> tasks,
+ boolean errorIfNoCheckpoint,
+ boolean allOrNothingState) throws Exception {
+
synchronized (lock) {
if (shutdown) {
throw new IllegalStateException("CheckpointCoordinator is shut down");
}
-
- if (completedCheckpoints.isEmpty()) {
+
+ // Recover the checkpoints
+ completedCheckpointStore.recover();
+
+ // restore from the latest checkpoint
+ CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
+
+ if (latest == null) {
if (errorIfNoCheckpoint) {
throw new IllegalStateException("No completed checkpoint available");
} else {
return;
}
}
-
- // restore from the latest checkpoint
- SuccessfulCheckpoint latest = completedCheckpoints.getLast();
-
+
if (allOrNothingState) {
Map<ExecutionJobVertex, Integer> stateCounts = new HashMap<ExecutionJobVertex, Integer>();
@@ -519,7 +534,9 @@ public class CheckpointCoordinator {
}
public int getNumberOfRetainedSuccessfulCheckpoints() {
- return this.completedCheckpoints.size();
+ synchronized (lock) {
+ return completedCheckpointStore.getNumberOfRetainedCheckpoints();
+ }
}
public Map<Long, PendingCheckpoint> getPendingCheckpoints() {
@@ -528,9 +545,9 @@ public class CheckpointCoordinator {
}
}
- public List<SuccessfulCheckpoint> getSuccessfulCheckpoints() {
+ public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception {
synchronized (lock) {
- return new ArrayList<SuccessfulCheckpoint>(this.completedCheckpoints);
+ return completedCheckpointStore.getAllCheckpoints();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
new file mode 100644
index 0000000..34b7946
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * A checkpoint ID counter.
+ */
+public interface CheckpointIDCounter {
+
+ /**
+ * Starts the {@link CheckpointIDCounter} service.
+ */
+ void start() throws Exception;
+
+ /**
+ * Stops the {@link CheckpointIDCounter} service.
+ */
+ void stop() throws Exception;
+
+ /**
+ * Atomically increments the current checkpoint ID.
+ *
+ * @return The previous checkpoint ID
+ */
+ long getAndIncrement() throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
new file mode 100644
index 0000000..aa6e94b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * A factory for per Job checkpoint recovery components.
+ */
+public interface CheckpointRecoveryFactory {
+
+ /**
+ * The number of {@link CompletedCheckpoint} instances to retain.
+ */
+ int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
+
+ /**
+ * Starts the {@link CheckpointRecoveryFactory} service.
+ */
+ void start();
+
+ /**
+ * Stops the {@link CheckpointRecoveryFactory} service.
+ */
+ void stop();
+
+ /**
+ * Creates a {@link CompletedCheckpointStore} instance for a job.
+ *
+ * @param jobId Job ID to recover checkpoints for
+ * @param userClassLoader User code class loader of the job
+ * @return {@link CompletedCheckpointStore} instance for the job
+ */
+ CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
+ throws Exception;
+
+ /**
+ * Creates a {@link CheckpointIDCounter} instance for a job.
+ *
+ * @param jobId Job ID to recover checkpoints for
+ * @return {@link CheckpointIDCounter} instance for the job
+ */
+ CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
new file mode 100644
index 0000000..ea3c26d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
+ * and that is considered completed.
+ */
+public class CompletedCheckpoint implements Serializable {
+
+ private static final long serialVersionUID = -8360248179615702014L;
+
+ private final JobID job;
+
+ private final long checkpointID;
+
+ private final long timestamp;
+
+ private final ArrayList<StateForTask> states;
+
+ public CompletedCheckpoint(JobID job, long checkpointID, long timestamp, ArrayList<StateForTask> states) {
+ this.job = job;
+ this.checkpointID = checkpointID;
+ this.timestamp = timestamp;
+ this.states = states;
+ }
+
+ public JobID getJobId() {
+ return job;
+ }
+
+ public long getCheckpointID() {
+ return checkpointID;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public List<StateForTask> getStates() {
+ return states;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public void discard(ClassLoader userClassLoader) {
+ for(StateForTask state: states){
+ state.discard(userClassLoader);
+ }
+ states.clear();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
new file mode 100644
index 0000000..d024aea
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.util.List;
+
+/**
+ * A bounded LIFO-queue of {@link CompletedCheckpoint} instances.
+ */
+public interface CompletedCheckpointStore {
+
+ /**
+ * Recover available {@link CompletedCheckpoint} instances.
+ *
+ * <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest
+ * available checkpoint.
+ */
+ void recover() throws Exception;
+
+ /**
+ * Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints.
+ *
+ * <p>Only a bounded number of checkpoints is kept. When exceeding the maximum number of
+ * retained checkpoints, the oldest one will be discarded via {@link
+ * CompletedCheckpoint#discard(ClassLoader)}.
+ */
+ void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception;
+
+ /**
+ * Returns the latest {@link CompletedCheckpoint} instance or <code>null</code> if none was
+ * added.
+ */
+ CompletedCheckpoint getLatestCheckpoint() throws Exception;
+
+ /**
+ * Discards all added {@link CompletedCheckpoint} instances via {@link
+ * CompletedCheckpoint#discard(ClassLoader)}.
+ */
+ void discardAllCheckpoints() throws Exception;
+
+ /**
+ * Returns all {@link CompletedCheckpoint} instances.
+ *
+ * <p>Returns an empty list if no checkpoint has been added yet.
+ */
+ List<CompletedCheckpoint> getAllCheckpoints() throws Exception;
+
+ /**
+ * Returns the current number of retained checkpoints.
+ */
+ int getNumberOfRetainedCheckpoints();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 370ae50..81159f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -31,7 +31,7 @@ import org.apache.flink.util.SerializedValue;
/**
* A pending checkpoint is a checkpoint that has been started, but has not been
* acknowledged by all tasks that need to acknowledge it. Once all tasks have
- * acknowledged it, it becomes a {@link SuccessfulCheckpoint}.
+ * acknowledged it, it becomes a {@link CompletedCheckpoint}.
*
* <p>Note that the pending checkpoint, as well as the successful checkpoint keep the
* state handles always as serialized values, never as actual values.</p>
@@ -109,13 +109,13 @@ public class PendingCheckpoint {
return collectedStates;
}
- public SuccessfulCheckpoint toCompletedCheckpoint() {
+ public CompletedCheckpoint toCompletedCheckpoint() {
synchronized (lock) {
if (discarded) {
throw new IllegalStateException("pending checkpoint is discarded");
}
if (notYetAcknowledgedTasks.isEmpty()) {
- SuccessfulCheckpoint completed = new SuccessfulCheckpoint(jobId, checkpointId,
+ CompletedCheckpoint completed = new CompletedCheckpoint(jobId, checkpointId,
checkpointTimestamp, new ArrayList<StateForTask>(collectedStates));
discard(null, false);
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
new file mode 100644
index 0000000..052d743
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#STANDALONE}.
+ *
+ * <p>Simple wrapper of an {@link AtomicLong}. This is sufficient, because job managers are not
+ * recoverable in this recovery mode.
+ */
+public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
+
+ private final AtomicLong checkpointIdCounter = new AtomicLong(1);
+
+ @Override
+ public void start() throws Exception {
+ }
+
+ @Override
+ public void stop() throws Exception {
+ }
+
+ @Override
+ public long getAndIncrement() throws Exception {
+ return checkpointIdCounter.getAndIncrement();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
new file mode 100644
index 0000000..324a0be
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+
+/**
+ * {@link CheckpointCoordinator} components in {@link RecoveryMode#STANDALONE}.
+ */
+public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
+
+ @Override
+ public void start() {
+ // Nothing to do
+ }
+
+ @Override
+ public void stop() {
+ // Nothing to do
+ }
+
+ @Override
+ public CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
+ throws Exception {
+
+ return new StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory
+ .NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader);
+ }
+
+ @Override
+ public CheckpointIDCounter createCheckpointIDCounter(JobID ignored) {
+ return new StandaloneCheckpointIDCounter();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
new file mode 100644
index 0000000..c31606a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#STANDALONE}.
+ */
+class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
+
+ /** The maximum number of checkpoints to retain (at least 1). */
+ private final int maxNumberOfCheckpointsToRetain;
+
+ /** User class loader for discarding {@link CompletedCheckpoint} instances. */
+ private final ClassLoader userClassLoader;
+
+ /** The completed checkpoints. */
+ private final ArrayDeque<CompletedCheckpoint> checkpoints;
+
+ /**
+ * Creates {@link StandaloneCompletedCheckpointStore}.
+ *
+ * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at
+ * least 1). Adding more checkpoints than this results
+ * in older checkpoints being discarded.
+ * @param userClassLoader The user class loader used to discard checkpoints
+ */
+ public StandaloneCompletedCheckpointStore(
+ int maxNumberOfCheckpointsToRetain,
+ ClassLoader userClassLoader) {
+
+ checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
+
+ this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
+ this.userClassLoader = checkNotNull(userClassLoader, "User class loader");
+
+ this.checkpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
+ }
+
+ @Override
+ public void recover() throws Exception {
+ // Nothing to do
+ }
+
+ @Override
+ public void addCheckpoint(CompletedCheckpoint checkpoint) {
+ checkpoints.addLast(checkpoint);
+ if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
+ checkpoints.removeFirst().discard(userClassLoader);
+ }
+ }
+
+ @Override
+ public CompletedCheckpoint getLatestCheckpoint() {
+ return checkpoints.isEmpty() ? null : checkpoints.getLast();
+ }
+
+ @Override
+ public List<CompletedCheckpoint> getAllCheckpoints() {
+ return new ArrayList<>(checkpoints);
+ }
+
+ @Override
+ public int getNumberOfRetainedCheckpoints() {
+ return checkpoints.size();
+ }
+
+ @Override
+ public void discardAllCheckpoints() {
+ for (CompletedCheckpoint checkpoint : checkpoints) {
+ checkpoint.discard(userClassLoader);
+ }
+
+ checkpoints.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
index 120c503..d1428f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
@@ -24,6 +24,11 @@ import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
/**
* Simple bean to describe the state belonging to a parallel operator.
* Since we hold the state across execution attempts, we identify a task by its
@@ -34,8 +39,10 @@ import org.slf4j.LoggerFactory;
* Furthermore, the state may involve user-defined classes that are not accessible without
* the respective classloader.
*/
-public class StateForTask {
-
+public class StateForTask implements Serializable {
+
+ private static final long serialVersionUID = -2394696997971923995L;
+
private static final Logger LOG = LoggerFactory.getLogger(StateForTask.class);
/** The state of the parallel operator */
@@ -48,12 +55,10 @@ public class StateForTask {
private final int subtask;
public StateForTask(SerializedValue<StateHandle<?>> state, JobVertexID operatorId, int subtask) {
- if (state == null || operatorId == null || subtask < 0) {
- throw new IllegalArgumentException();
- }
-
- this.state = state;
- this.operatorId = operatorId;
+ this.state = checkNotNull(state, "State");
+ this.operatorId = checkNotNull(operatorId, "Operator ID");
+
+ checkArgument(subtask >= 0, "Negative subtask index");
this.subtask = subtask;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
deleted file mode 100644
index be0b301..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.api.common.JobID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
- * and that is considered completed.
- */
-public class SuccessfulCheckpoint {
-
- private static final Logger LOG = LoggerFactory.getLogger(SuccessfulCheckpoint.class);
-
- private final JobID job;
-
- private final long checkpointID;
-
- private final long timestamp;
-
- private final List<StateForTask> states;
-
-
- public SuccessfulCheckpoint(JobID job, long checkpointID, long timestamp, List<StateForTask> states) {
- this.job = job;
- this.checkpointID = checkpointID;
- this.timestamp = timestamp;
- this.states = states;
- }
-
- public JobID getJobId() {
- return job;
- }
-
- public long getCheckpointID() {
- return checkpointID;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public List<StateForTask> getStates() {
- return states;
- }
-
- // --------------------------------------------------------------------------------------------
-
- public void discard(ClassLoader userClassLoader) {
- for(StateForTask state: states){
- state.discard(userClassLoader);
- }
- states.clear();
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
new file mode 100644
index 0000000..6673050
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.VersionedValue;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
+ *
+ * <p>Each counter creates a ZNode:
+ * <pre>
+ * +----O /flink/checkpoint-counter/<job-id> 1 [persistent]
+ * .
+ * .
+ * .
+ * +----O /flink/checkpoint-counter/<job-id> N [persistent]
+ * </pre>
+ *
+ * <p>The checkpoints IDs are required to be ascending (per job). In order to guarantee this in case
+ * of job manager failures we use ZooKeeper to have a shared counter across job manager instances.
+ */
+public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCheckpointIDCounter.class);
+
+ /** Curator ZooKeeper client */
+ private final CuratorFramework client;
+
+ /** Path of the shared count */
+ private final String counterPath;
+
+ /** Curator recipe for shared counts */
+ private final SharedCount sharedCount;
+
+ /** Connection state listener to monitor the client connection */
+ private final SharedCountConnectionStateListener connStateListener =
+ new SharedCountConnectionStateListener();
+
+ /**
+ * Creates a {@link ZooKeeperCheckpointIDCounter} instance.
+ *
+ * @param client Curator ZooKeeper client
+ * @param counterPath ZooKeeper path for the counter. It's sufficient to have a path per-job.
+ * @throws Exception
+ */
+ public ZooKeeperCheckpointIDCounter(CuratorFramework client, String counterPath) throws Exception {
+ this.client = checkNotNull(client, "Curator client");
+ this.counterPath = checkNotNull(counterPath, "Counter path");
+ this.sharedCount = new SharedCount(client, counterPath, 1);
+ }
+
+ @Override
+ public void start() throws Exception {
+ sharedCount.start();
+ client.getConnectionStateListenable().addListener(connStateListener);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ sharedCount.close();
+ client.getConnectionStateListenable().removeListener(connStateListener);
+
+ LOG.info("Removing {} from ZooKeeper", counterPath);
+ client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
+ }
+
+ @Override
+ public long getAndIncrement() throws Exception {
+ while (true) {
+ ConnectionState connState = connStateListener.getLastState();
+
+ if (connState != null) {
+ throw new IllegalStateException("Connection state: " + connState);
+ }
+
+ VersionedValue<Integer> current = sharedCount.getVersionedValue();
+
+ Integer newCount = current.getValue() + 1;
+
+ if (sharedCount.trySetCount(current, newCount)) {
+ return current.getValue();
+ }
+ }
+ }
+
+ /**
+ * Connection state listener. In case of {@link ConnectionState#SUSPENDED} or {@link
+ * ConnectionState#LOST} we are not guaranteed to read a current count from ZooKeeper.
+ */
+ private class SharedCountConnectionStateListener implements ConnectionStateListener {
+
+ private volatile ConnectionState lastState;
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) {
+ lastState = newState;
+ }
+ }
+
+ private ConnectionState getLastState() {
+ return lastState;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
new file mode 100644
index 0000000..2659e7e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link CheckpointCoordinator} components in {@link RecoveryMode#ZOOKEEPER}.
+ */
+public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
+
+ private final CuratorFramework client;
+
+ private final Configuration config;
+
+ public ZooKeeperCheckpointRecoveryFactory(CuratorFramework client, Configuration config) {
+ this.client = checkNotNull(client, "Curator client");
+ this.config = checkNotNull(config, "Configuration");
+ }
+
+ @Override
+ public void start() {
+ // Nothing to do
+ }
+
+ @Override
+ public void stop() {
+ client.close();
+ }
+
+ @Override
+ public CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
+ throws Exception {
+
+ return ZooKeeperUtils.createCompletedCheckpoints(client, config, jobId,
+ NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader);
+ }
+
+ @Override
+ public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws Exception {
+ return ZooKeeperUtils.createCheckpointIDCounter(client, config, jobID);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
new file mode 100644
index 0000000..62ab440
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
+ *
+ * <p>Checkpoints are added under a ZNode per job:
+ * <pre>
+ * +----O /flink/checkpoints/<job-id> [persistent]
+ * . |
+ * . +----O /flink/checkpoints/<job-id>/1 [persistent]
+ * . . .
+ * . . .
+ * . . .
+ * . +----O /flink/checkpoints/<job-id>/N [persistent]
+ * </pre>
+ *
+ * <p>During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one,
+ * only the latest one is used and older ones are discarded (even if the maximum number
+ * of retained checkpoints is greater than one).
+ *
+ * <p>If there is a network partition and multiple JobManagers run concurrent checkpoints for the
+ * same program, it is OK to take any valid successful checkpoint as long as the "history" of
+ * checkpoints is consistent. Currently, after recovery we start out with only a single
+ * checkpoint to circumvent those situations.
+ */
+public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
+
+ /** Curator ZooKeeper client */
+ private final CuratorFramework client;
+
+ /** Completed checkpoints in ZooKeeper */
+ private final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper;
+
+ /** The maximum number of checkpoints to retain (at least 1). */
+ private final int maxNumberOfCheckpointsToRetain;
+
+ /** User class loader for discarding {@link CompletedCheckpoint} instances. */
+ private final ClassLoader userClassLoader;
+
+ /** Local completed checkpoints. */
+ private final ArrayDeque<Tuple2<StateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;
+
+ /**
+ * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
+ *
+ * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at
+ * least 1). Adding more checkpoints than this results
+ * in older checkpoints being discarded. On recovery,
+ * we will only start with a single checkpoint.
+ * @param userClassLoader The user class loader used to discard checkpoints
+ * @param client The Curator ZooKeeper client
+ * @param checkpointsPath The ZooKeeper path for the checkpoints (needs to
+ * start with a '/')
+ * @param stateHandleProvider The state handle provider for checkpoints
+ * @throws Exception
+ */
+ public ZooKeeperCompletedCheckpointStore(
+ int maxNumberOfCheckpointsToRetain,
+ ClassLoader userClassLoader,
+ CuratorFramework client,
+ String checkpointsPath,
+ StateHandleProvider<CompletedCheckpoint> stateHandleProvider) throws Exception {
+
+ checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
+
+ this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
+ this.userClassLoader = checkNotNull(userClassLoader, "User class loader");
+
+ checkNotNull(client, "Curator client");
+ checkNotNull(checkpointsPath, "Checkpoints path");
+ checkNotNull(stateHandleProvider, "State handle provider");
+
+ // Ensure that the checkpoints path exists
+ client.newNamespaceAwareEnsurePath(checkpointsPath)
+ .ensure(client.getZookeeperClient());
+
+ // All operations will have the path as root
+ this.client = client.usingNamespace(client.getNamespace() + checkpointsPath);
+
+ this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(
+ this.client, stateHandleProvider);
+
+ this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
+
+ LOG.info("Initialized in '{}'.", checkpointsPath);
+ }
+
+ /**
+ * Gets the latest checkpoint from ZooKeeper and removes all others.
+ *
+ * <p><strong>Important</strong>: Even if there are more than one checkpoint in ZooKeeper,
+ * this will only recover the latest and discard the others. Otherwise, there is no guarantee
+ * that the history of checkpoints is consistent.
+ */
+ @Override
+ public void recover() throws Exception {
+ LOG.info("Recovering checkpoints from ZooKeeper.");
+
+ // Get all there is first
+ List<Tuple2<StateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
+ while (true) {
+ try {
+ initialCheckpoints = checkpointsInZooKeeper.getAllSortedByName();
+ break;
+ }
+ catch (ConcurrentModificationException e) {
+ LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
+ }
+ }
+
+ int numberOfInitialCheckpoints = initialCheckpoints.size();
+
+ LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints);
+
+ if (numberOfInitialCheckpoints > 0) {
+ // Take the last one. This is the latest checkpoints, because path names are strictly
+ // increasing (checkpoint ID).
+ Tuple2<StateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints
+ .get(numberOfInitialCheckpoints - 1);
+
+ CompletedCheckpoint latestCheckpoint = latest.f0.getState(userClassLoader);
+
+ checkpointStateHandles.add(latest);
+
+ LOG.info("Initialized with {}. Removing all older checkpoints.", latestCheckpoint);
+
+ for (int i = 0; i < numberOfInitialCheckpoints - 1; i++) {
+ try {
+ removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
+ }
+ catch (Exception e) {
+ LOG.error("Failed to discard checkpoint", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Synchronously writes the new checkpoints to ZooKeeper and asynchronously removes older ones.
+ *
+ * @param checkpoint Completed checkpoint to add.
+ */
+ @Override
+ public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
+ checkNotNull(checkpoint, "Checkpoint");
+
+ // First add the new one. If it fails, we don't want to loose existing data.
+ String path = String.format("/%s", checkpoint.getCheckpointID());
+
+ final StateHandle<CompletedCheckpoint> stateHandle = checkpointsInZooKeeper.add(path, checkpoint);
+
+ checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path));
+
+ // Everything worked, let's remove a previous checkpoint if necessary.
+ if (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) {
+ removeFromZooKeeperAndDiscardCheckpoint(checkpointStateHandles.removeFirst());
+ }
+
+ LOG.debug("Added {} to {}.", checkpoint, path);
+ }
+
+ @Override
+ public CompletedCheckpoint getLatestCheckpoint() throws Exception {
+ if (checkpointStateHandles.isEmpty()) {
+ return null;
+ }
+ else {
+ return checkpointStateHandles.getLast().f0.getState(userClassLoader);
+ }
+ }
+
+ @Override
+ public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
+ List<CompletedCheckpoint> checkpoints = new ArrayList<>(checkpointStateHandles.size());
+
+ for (Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandle : checkpointStateHandles) {
+ checkpoints.add(stateHandle.f0.getState(userClassLoader));
+ }
+
+ return checkpoints;
+ }
+
+ @Override
+ public int getNumberOfRetainedCheckpoints() {
+ return checkpointStateHandles.size();
+ }
+
+ @Override
+ public void discardAllCheckpoints() throws Exception {
+ for (Tuple2<StateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
+ try {
+ removeFromZooKeeperAndDiscardCheckpoint(checkpoint);
+ }
+ catch (Exception e) {
+ LOG.error("Failed to discard checkpoint.", e);
+ }
+ }
+
+ checkpointStateHandles.clear();
+
+ String path = "/" + client.getNamespace();
+
+ LOG.info("Removing {} from ZooKeeper", path);
+ ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
+ }
+
+ /**
+ * Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle.
+ */
+ private void removeFromZooKeeperAndDiscardCheckpoint(
+ final Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
+
+ final BackgroundCallback callback = new BackgroundCallback() {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+ try {
+ if (event.getType() == CuratorEventType.DELETE) {
+ if (event.getResultCode() == 0) {
+ // The checkpoint
+ CompletedCheckpoint checkpoint = stateHandleAndPath
+ .f0.getState(userClassLoader);
+
+ checkpoint.discard(userClassLoader);
+
+ // Discard the state handle
+ stateHandleAndPath.f0.discardState();
+
+ // Discard the checkpoint
+ LOG.debug("Discarded " + checkpoint);
+ }
+ else {
+ throw new IllegalStateException("Unexpected result code " +
+ event.getResultCode() + " in '" + event + "' callback.");
+ }
+ }
+ else {
+ throw new IllegalStateException("Unexpected event type " +
+ event.getType() + " in '" + event + "' callback.");
+ }
+ }
+ catch (Exception e) {
+ LOG.error("Failed to discard checkpoint.", e);
+ }
+ }
+ };
+
+ // Remove state handle from ZooKeeper first. If this fails, we can still recover, but if
+ // we remove a state handle and fail to remove it from ZooKeeper, we end up in an
+ // inconsistent state.
+ checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ef00484..9430d80 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -39,6 +41,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -110,8 +113,6 @@ public class ExecutionGraph implements Serializable {
/** The log object used for debugging. */
static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
- private static final int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
-
// --------------------------------------------------------------------------------------------
/** The lock used to secure all access to mutable fields, especially the tracking of progress
@@ -347,7 +348,11 @@ public class ExecutionGraph implements Serializable {
List<ExecutionJobVertex> verticesToWaitFor,
List<ExecutionJobVertex> verticesToCommitTo,
ActorSystem actorSystem,
- UUID leaderSessionID) {
+ UUID leaderSessionID,
+ CheckpointIDCounter checkpointIDCounter,
+ CompletedCheckpointStore completedCheckpointStore,
+ RecoveryMode recoveryMode) throws Exception {
+
// simple sanity checks
if (interval < 10 || checkpointTimeout < 10) {
throw new IllegalArgumentException();
@@ -367,12 +372,14 @@ public class ExecutionGraph implements Serializable {
snapshotCheckpointsEnabled = true;
checkpointCoordinator = new CheckpointCoordinator(
jobID,
- NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN,
checkpointTimeout,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
- userClassLoader);
+ userClassLoader,
+ checkpointIDCounter,
+ completedCheckpointStore,
+ recoveryMode);
// the periodic checkpoint scheduler is activated and deactivated as a result of
// job status changes (running -> on, all other states -> off)
@@ -382,8 +389,14 @@ public class ExecutionGraph implements Serializable {
interval,
leaderSessionID));
}
-
- public void disableSnaphotCheckpointing() {
+
+ /**
+ * Disables checkpointing.
+ *
+ * <p>The shutdown of the checkpoint coordinator might block. Make sure that calls to this
+ * method don't block the job manager actor and run asynchronously.
+ */
+ public void disableSnaphotCheckpointing() throws Exception {
if (state != JobStatus.CREATED) {
throw new IllegalStateException("Job must be in CREATED state");
}
@@ -773,6 +786,20 @@ public class ExecutionGraph implements Serializable {
}
/**
+ * Restores the latest checkpointed state.
+ *
+ * <p>The recovery of checkpoints might block. Make sure that calls to this method don't
+ * block the job manager actor and run asynchronously.
+ */
+ public void restoreLatestCheckpointedState() throws Exception {
+ synchronized (progressLock) {
+ if (checkpointCoordinator != null) {
+ checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
+ }
+ }
+ }
+
+ /**
* This method cleans fields that are irrelevant for the archived execution attempt.
*/
public void prepareForArchiving() {
@@ -886,7 +913,13 @@ public class ExecutionGraph implements Serializable {
}
}, executionContext);
} else {
- restart();
+ future(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ restart();
+ return null;
+ }
+ }, executionContext);
}
break;
}
@@ -906,7 +939,7 @@ public class ExecutionGraph implements Serializable {
}
}
}
-
+
private void postRunCleanup() {
try {
CheckpointCoordinator coord = this.checkpointCoordinator;
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 6b36e2d..a64d63c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -538,4 +538,9 @@ public class JobGraph implements Serializable {
}
}
}
+
+ @Override
+ public String toString() {
+ return "JobGraph(jobId: " + jobID + ")";
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
index 2e75b19..17322d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
@@ -35,6 +35,19 @@ public enum RecoveryMode {
ZOOKEEPER;
/**
+ * Return the configured {@link RecoveryMode}.
+ *
+ * @param config The config to parse
+ * @return Configured recovery mode or {@link ConfigConstants#DEFAULT_RECOVERY_MODE} if not
+ * configured.
+ */
+ public static RecoveryMode fromConfig(Configuration config) {
+ return RecoveryMode.valueOf(config.getString(
+ ConfigConstants.RECOVERY_MODE,
+ ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase());
+ }
+
+ /**
* Returns true if the defined recovery mode supports high availability.
*
* @param configuration Configuration which contains the recovery mode
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
new file mode 100644
index 0000000..db36f92
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import scala.Option;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#STANDALONE}.
+ *
+ * <p>All operations are NoOps, because {@link JobGraph} instances cannot be recovered in this
+ * recovery mode.
+ */
+public class StandaloneSubmittedJobGraphStore implements SubmittedJobGraphStore {
+
+ @Override
+ public void start(SubmittedJobGraphListener jobGraphListener) throws Exception {
+ // Nothing to do
+ }
+
+ @Override
+ public void stop() {
+ // Nothing to do
+ }
+
+ @Override
+ public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+ // Nothing to do
+ }
+
+ @Override
+ public void removeJobGraph(JobID jobId) throws Exception {
+ // Nothing to do
+ }
+
+ @Override
+ public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
+ return Option.empty();
+ }
+
+ @Override
+ public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
+ return Collections.emptyList();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
new file mode 100644
index 0000000..48da3b8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A recoverable {@link JobGraph} and {@link JobInfo}.
+ */
+public class SubmittedJobGraph implements Serializable {
+
+ private static final long serialVersionUID = 2836099271734771825L;
+
+ /** The submitted {@link JobGraph} */
+ private final JobGraph jobGraph;
+
+ /** The {@link JobInfo}. */
+ private final JobInfo jobInfo;
+
+ /**
+ * Creates a {@link SubmittedJobGraph}.
+ *
+ * @param jobGraph The submitted {@link JobGraph}
+ * @param jobInfo The {@link JobInfo}
+ */
+ public SubmittedJobGraph(JobGraph jobGraph, JobInfo jobInfo) {
+ this.jobGraph = checkNotNull(jobGraph, "Job graph");
+ this.jobInfo = checkNotNull(jobInfo, "Job info");
+ }
+
+ /**
+ * Returns the submitted {@link JobGraph}.
+ */
+ public JobGraph getJobGraph() {
+ return jobGraph;
+ }
+
+ /**
+ * Returns the {@link JobID} of the submitted {@link JobGraph}.
+ */
+ public JobID getJobId() {
+ return jobGraph.getJobID();
+ }
+
+ /**
+ * Returns the {@link JobInfo} of the client who submitted the {@link JobGraph}.
+ */
+ public JobInfo getJobInfo() throws Exception {
+ return jobInfo;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("SubmittedJobGraph(%s, %s)", jobGraph, jobInfo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
new file mode 100644
index 0000000..bd628cd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import scala.Option;
+
+import java.util.List;
+
+/**
+ * {@link SubmittedJobGraph} instances for recovery.
+ */
+public interface SubmittedJobGraphStore {
+
+ /**
+ * Starts the {@link SubmittedJobGraphStore} service.
+ */
+ void start(SubmittedJobGraphListener jobGraphListener) throws Exception;
+
+ /**
+ * Stops the {@link SubmittedJobGraphStore} service.
+ */
+ void stop() throws Exception;
+
+ /**
+ * Returns a list of all submitted {@link JobGraph} instances.
+ */
+ List<SubmittedJobGraph> recoverJobGraphs() throws Exception;
+
+ /**
+ * Returns the {@link SubmittedJobGraph} with the given {@link JobID}.
+ *
+ * <p>An Exception is thrown, if no job graph with the given ID exists.
+ */
+ Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception;
+
+ /**
+ * Adds the {@link SubmittedJobGraph} instance.
+ *
+ * <p>If a job graph with the same {@link JobID} exists, it is replaced.
+ */
+ void putJobGraph(SubmittedJobGraph jobGraph) throws Exception;
+
+ /**
+ * Removes the {@link SubmittedJobGraph} with the given {@link JobID} if it exists.
+ */
+ void removeJobGraph(JobID jobId) throws Exception;
+
+ /**
+ * A listener for {@link SubmittedJobGraph} instances. This is used to react to races between
+ * multiple running {@link SubmittedJobGraphStore} instances (on multiple job managers).
+ */
+ interface SubmittedJobGraphListener {
+
+ /**
+ * Callback for {@link SubmittedJobGraph} instances added by a different {@link
+ * SubmittedJobGraphStore} instance.
+ *
+ * <p><strong>Important:</strong> It is possible to get false positives and be notified
+ * about a job graph, which was added by this instance.
+ *
+ * @param jobId The {@link JobID} of the added job graph
+ */
+ void onAddedJobGraph(JobID jobId);
+
+ /**
+ * Callback for {@link SubmittedJobGraph} instances removed by a different {@link
+ * SubmittedJobGraphStore} instance.
+ *
+ * @param jobId The {@link JobID} of the removed job graph
+ */
+ void onRemovedJobGraph(JobID jobId);
+
+ }
+
+}