You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/10/14 08:06:02 UTC
[4/7] flink git commit: [FLINK-4512] [FLIP-10] Add option to persist
periodic checkpoints
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html
index faeac34..d21349c 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html
@@ -50,6 +50,10 @@ limitations under the License.
<p><strong>Average:</strong><span> {{ jobCheckpointStats['size']['avg'] | humanizeBytes }}</span></p>
</td>
</tr>
+ <tr ng-if="jobCheckpointStats['external-path']">
+ <td colspan="4"><strong>Latest Checkpoint Path:</strong> {{ jobCheckpointStats['external-path'] }}
+ </td>
+ </tr>
</tbody>
</table>
<div ng-if="!showHistory && jobCheckpointStats && jobCheckpointStats['history'].length > 0"><a ng-click="toggleHistory()" class="btn btn-default"><strong>Show history</strong> ({{ jobCheckpointStats['history'].length }}) <i class="fa fa-chevron-down"></i></a></div>
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e95afe0..ab4bde7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -18,17 +18,20 @@
package org.apache.flink.runtime.checkpoint;
-import akka.dispatch.Futures;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
@@ -43,7 +46,6 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -70,7 +72,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class CheckpointCoordinator {
- protected static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
/** The number of recent checkpoints whose IDs are remembered */
private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
@@ -106,9 +108,9 @@ public class CheckpointCoordinator {
* accessing this don't block the job manager actor and run asynchronously. */
private final CompletedCheckpointStore completedCheckpointStore;
- /** Store for savepoints. */
- private final SavepointStore savepointStore;
-
+ /** Default directory for persistent checkpoints; <code>null</code> if none configured. */
+ private final String checkpointDirectory;
+
/** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */
private final ArrayDeque<Long> recentPendingCheckpoints;
@@ -157,6 +159,9 @@ public class CheckpointCoordinator {
/** Helper for tracking checkpoint statistics */
private final CheckpointStatsTracker statsTracker;
+ /** Default checkpoint properties **/
+ private final CheckpointProperties checkpointProperties;
+
// --------------------------------------------------------------------------------------------
public CheckpointCoordinator(
@@ -165,12 +170,13 @@ public class CheckpointCoordinator {
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpointAttempts,
+ ExternalizedCheckpointSettings externalizeSettings,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
- SavepointStore savepointStore,
+ String checkpointDirectory,
CheckpointStatsTracker statsTracker) {
// sanity checks
@@ -179,6 +185,12 @@ public class CheckpointCoordinator {
checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1");
+ if (externalizeSettings.externalizeCheckpoints() && checkpointDirectory == null) {
+ throw new IllegalStateException("CheckpointConfig says to persist periodic " +
+ "checkpoints, but no checkpoint directory has been configured. You can " +
+ "configure configure one via key '" + ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
+ }
+
// it does not make sense to schedule checkpoints more often then the desired
// time between checkpoints
if (baseInterval < minPauseBetweenCheckpoints) {
@@ -196,12 +208,19 @@ public class CheckpointCoordinator {
this.pendingCheckpoints = new LinkedHashMap<>();
this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
- this.savepointStore = checkNotNull(savepointStore);
+ this.checkpointDirectory = checkpointDirectory;
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.statsTracker = checkNotNull(statsTracker);
this.timer = new Timer("Checkpoint Timer", true);
+ if (externalizeSettings.externalizeCheckpoints()) {
+ LOG.info("Persisting periodic checkpoints externally at {}.", checkpointDirectory);
+ checkpointProperties = CheckpointProperties.forExternalizedCheckpoint(externalizeSettings.deleteOnCancellation());
+ } else {
+ checkpointProperties = CheckpointProperties.forStandardCheckpoint();
+ }
+
try {
// Make sure the checkpoint ID enumerator is running. Possibly
// issues a blocking call to ZooKeeper.
@@ -219,33 +238,9 @@ public class CheckpointCoordinator {
* Shuts down the checkpoint coordinator.
*
* <p>After this method has been called, the coordinator does not accept
- * and further messages and cannot trigger any further checkpoints. All
- * checkpoint state is discarded.
- */
- public void shutdown() throws Exception {
- shutdown(true);
- }
-
- /**
- * Suspends the checkpoint coordinator.
- *
- * <p>After this method has been called, the coordinator does not accept
* and further messages and cannot trigger any further checkpoints.
- *
- * <p>The difference to shutdown is that checkpoint state in the store
- * and counter is kept around if possible to recover later.
*/
- public void suspend() throws Exception {
- shutdown(false);
- }
-
- /**
- * Shuts down the checkpoint coordinator.
- *
- * @param shutdownStoreAndCounter Depending on this flag the checkpoint
- * state services are shut down or suspended.
- */
- private void shutdown(boolean shutdownStoreAndCounter) throws Exception {
+ public void shutdown(JobStatus jobStatus) throws Exception {
synchronized (lock) {
if (!shutdown) {
shutdown = true;
@@ -263,13 +258,8 @@ public class CheckpointCoordinator {
}
pendingCheckpoints.clear();
- if (shutdownStoreAndCounter) {
- completedCheckpointStore.shutdown();
- checkpointIdCounter.shutdown();
- } else {
- completedCheckpointStore.suspend();
- checkpointIdCounter.suspend();
- }
+ completedCheckpointStore.shutdown(jobStatus);
+ checkpointIdCounter.shutdown(jobStatus);
}
}
}
@@ -282,29 +272,49 @@ public class CheckpointCoordinator {
// Handling checkpoints and messages
// --------------------------------------------------------------------------------------------
- public Future<String> triggerSavepoint(long timestamp) throws Exception {
- CheckpointTriggerResult result = triggerCheckpoint(timestamp, CheckpointProperties.forStandardSavepoint());
+ /**
+ * Triggers a savepoint with the given savepoint directory as a target.
+ *
+ * @param timestamp The timestamp for the savepoint.
+ * @param targetDirectory Target directory for the savepoint.
+ * @return A future to the completed checkpoint
+ * @throws IllegalStateException If no savepoint directory has been
+ * specified and no default savepoint directory has been
+ * configured
+ * @throws Exception Failures during triggering are forwarded
+ */
+ public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, String targetDirectory) throws Exception {
+ checkNotNull(targetDirectory, "Savepoint target directory");
+
+ CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
+ CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory);
if (result.isSuccess()) {
- PendingSavepoint savepoint = (PendingSavepoint) result.getPendingCheckpoint();
- return savepoint.getCompletionFuture();
- }
- else {
- return Futures.failed(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message()));
+ return result.getPendingCheckpoint().getCompletionFuture();
+ } else {
+ Throwable cause = new Exception("Failed to trigger savepoint: " + result.getFailureReason().message());
+ Future<CompletedCheckpoint> failed = FlinkCompletableFuture.completedExceptionally(cause);
+ return failed;
}
}
/**
- * Triggers a new checkpoint and uses the given timestamp as the checkpoint
+ * Triggers a new standard checkpoint and uses the given timestamp as the checkpoint
* timestamp.
*
* @param timestamp The timestamp for the checkpoint.
+ * @return <code>true</code> if triggering the checkpoint succeeded.
*/
public boolean triggerCheckpoint(long timestamp) throws Exception {
- return triggerCheckpoint(timestamp, CheckpointProperties.forStandardCheckpoint()).isSuccess();
+ return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory).isSuccess();
}
- CheckpointTriggerResult triggerCheckpoint(long timestamp, CheckpointProperties props) throws Exception {
+ CheckpointTriggerResult triggerCheckpoint(long timestamp, CheckpointProperties props, String targetDirectory) throws Exception {
+ // Sanity check
+ if (props.externalizeCheckpoint() && targetDirectory == null) {
+ throw new IllegalStateException("No target directory specified to persist checkpoint to.");
+ }
+
// make some eager pre-checks
synchronized (lock) {
// abort if the coordinator has been shutdown in the meantime
@@ -315,7 +325,7 @@ public class CheckpointCoordinator {
// validate whether the checkpoint can be triggered, with respect to the limit of
// concurrent checkpoints, and the minimum time between checkpoints.
// these checks are not relevant for savepoints
- if (!props.isSavepoint()) {
+ if (!props.forceCheckpoint()) {
// sanity check: there should never be more than one trigger request queued
if (triggerRequestQueued) {
LOG.warn("Trying to trigger another checkpoint while one was queued already");
@@ -402,9 +412,13 @@ public class CheckpointCoordinator {
return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
}
- final PendingCheckpoint checkpoint = props.isSavepoint() ?
- new PendingSavepoint(job, checkpointID, timestamp, ackTasks, savepointStore) :
- new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
+ final PendingCheckpoint checkpoint = new PendingCheckpoint(
+ job,
+ checkpointID,
+ timestamp,
+ ackTasks,
+ props,
+ targetDirectory);
// schedule the timer that will clean up the expired checkpoints
TimerTask canceller = new TimerTask() {
@@ -439,7 +453,7 @@ public class CheckpointCoordinator {
if (shutdown) {
return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
}
- else if (!props.isSavepoint()) {
+ else if (!props.forceCheckpoint()) {
if (triggerRequestQueued) {
LOG.warn("Trying to trigger another checkpoint while one was queued already");
return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
@@ -566,7 +580,7 @@ public class CheckpointCoordinator {
}
if (!haveMoreRecentPending && !triggerRequestQueued) {
LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId);
- triggerCheckpoint(System.currentTimeMillis());
+ triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), checkpoint.getTargetDirectory());
} else if (!haveMoreRecentPending) {
LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId);
triggerQueuedRequests();
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
index 76af4be..48cec7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
@@ -18,29 +18,27 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
/**
* A checkpoint ID counter.
*/
public interface CheckpointIDCounter {
/**
- * Starts the {@link CheckpointIDCounter} service.
+ * Starts the {@link CheckpointIDCounter} service down.
*/
void start() throws Exception;
/**
- * Shuts the {@link CheckpointIDCounter} service down and frees all created
- * resources.
- */
- void shutdown() throws Exception;
-
- /**
- * Suspends the counter.
+ * Shuts the {@link CheckpointIDCounter} service.
+ *
+ * <p>The job status is forwarded and used to decide whether state should
+ * actually be discarded or kept.
*
- * <p>If the implementation allows recovery, the counter state needs to be
- * kept. Otherwise, this acts as shutdown.
+ * @param jobStatus Job state on shut down
*/
- void suspend() throws Exception;
+ void shutdown(JobStatus jobStatus) throws Exception;
/**
* Atomically increments the current checkpoint ID.
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 7ea645a..e4856cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -18,44 +18,252 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import java.io.Serializable;
+
/**
* The configuration of a checkpoint, such as whether
* <ul>
- * <li>The checkpoint is a savepoint</li>
- * <li>The checkpoint must be full, or may be incremental</li>
- * <li>The checkpoint format must be the common (cross backend) format, or may be state-backend specific</li>
+ * <li>The checkpoint should be persisted</li>
+ * <li>The checkpoint must be full, or may be incremental (not yet implemented)</li>
+ * <li>The checkpoint format must be the common (cross backend) format,
+ * or may be state-backend specific (not yet implemented)</li>
+ * <li>when the checkpoint should be garbage collected</li>
* </ul>
*/
-public class CheckpointProperties {
+public class CheckpointProperties implements Serializable {
+
+ private static final long serialVersionUID = -8835900655844879469L;
+
+ private final boolean forced;
+
+ private final boolean externalize;
- private final boolean isSavepoint;
+ private final boolean discardSubsumed;
+ private final boolean discardFinished;
+ private final boolean discardCancelled;
+ private final boolean discardFailed;
+ private final boolean discardSuspended;
- private CheckpointProperties(boolean isSavepoint) {
- this.isSavepoint = isSavepoint;
+ CheckpointProperties(
+ boolean forced,
+ boolean externalize,
+ boolean discardSubsumed,
+ boolean discardFinished,
+ boolean discardCancelled,
+ boolean discardFailed,
+ boolean discardSuspended) {
+
+ this.forced = forced;
+ this.externalize = externalize;
+ this.discardSubsumed = discardSubsumed;
+ this.discardFinished = discardFinished;
+ this.discardCancelled = discardCancelled;
+ this.discardFailed = discardFailed;
+ this.discardSuspended = discardSuspended;
+
+ // Not persisted, but needs manual clean up
+ if (!externalize && !(discardSubsumed && discardFinished && discardCancelled
+ && discardFailed && discardSuspended)) {
+ throw new IllegalStateException("CheckpointProperties say to *not* persist the " +
+ "checkpoint, but the checkpoint requires manual cleanup.");
+ }
}
// ------------------------------------------------------------------------
- public boolean isSavepoint() {
- return isSavepoint;
+ /**
+ * Returns whether the checkpoint should be forced.
+ *
+ * <p>Forced checkpoints ignore the configured maximum number of concurrent
+ * checkpoints and minimum time between checkpoints. Furthermore, they are
+ * not subsumed by more recent checkpoints as long as they are pending.
+ *
+ * @return <code>true</code> if the checkpoint should be forced;
+ * <code>false</code> otherwise.
+ *
+ * @see CheckpointCoordinator
+ * @see PendingCheckpoint
+ */
+ public boolean forceCheckpoint() {
+ return forced;
+ }
+
+ /**
+ * Returns whether the checkpoint should be persisted externally.
+ *
+ * @return <code>true</code> if the checkpoint should be persisted
+ * externally; <code>false</code> otherwise.
+ *
+ * @see PendingCheckpoint
+ */
+ public boolean externalizeCheckpoint() {
+ return externalize;
}
// ------------------------------------------------------------------------
+ // Garbage collection behaviour
+ // ------------------------------------------------------------------------
+
+ /**
+ * Returns whether the checkpoint should be discarded when it is subsumed.
+ *
+ * <p>A checkpoint is subsumed when the maximum number of retained
+ * checkpoints is reached and a more recent checkpoint completes..
+ *
+ * @return <code>true</code> if the checkpoint should be discarded when it
+ * is subsumed; <code>false</code> otherwise.
+ *
+ * @see CompletedCheckpointStore
+ */
+ public boolean discardOnSubsumed() {
+ return discardSubsumed;
+ }
+
+ /**
+ * Returns whether the checkpoint should be discarded when the owning job
+ * reaches the {@link JobStatus#FINISHED} state.
+ *
+ * @return <code>true</code> if the checkpoint should be discarded when the
+ * owning job reaches the {@link JobStatus#FINISHED} state; <code>false</code>
+ * otherwise.
+ *
+ * @see CompletedCheckpointStore
+ */
+ public boolean discardOnJobFinished() {
+ return discardFinished;
+ }
+
+ /**
+ * Returns whether the checkpoint should be discarded when the owning job
+ * reaches the {@link JobStatus#CANCELED} state.
+ *
+ * @return <code>true</code> if the checkpoint should be discarded when the
+ * owning job reaches the {@link JobStatus#CANCELED} state; <code>false</code>
+ * otherwise.
+ *
+ * @see CompletedCheckpointStore
+ */
+ public boolean discardOnJobCancelled() {
+ return discardCancelled;
+ }
+
+ /**
+ * Returns whether the checkpoint should be discarded when the owning job
+ * reaches the {@link JobStatus#FAILED} state.
+ *
+ * @return <code>true</code> if the checkpoint should be discarded when the
+ * owning job reaches the {@link JobStatus#FAILED} state; <code>false</code>
+ * otherwise.
+ *
+ * @see CompletedCheckpointStore
+ */
+ public boolean discardOnJobFailed() {
+ return discardFailed;
+ }
+
+ /**
+ * Returns whether the checkpoint should be discarded when the owning job
+ * reaches the {@link JobStatus#SUSPENDED} state.
+ *
+ * @return <code>true</code> if the checkpoint should be discarded when the
+ * owning job reaches the {@link JobStatus#SUSPENDED} state; <code>false</code>
+ * otherwise.
+ *
+ * @see CompletedCheckpointStore
+ */
+ public boolean discardOnJobSuspended() {
+ return discardSuspended;
+ }
+
+ // ------------------------------------------------------------------------
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ CheckpointProperties that = (CheckpointProperties) o;
+ return forced == that.forced &&
+ externalize == that.externalize &&
+ discardSubsumed == that.discardSubsumed &&
+ discardFinished == that.discardFinished &&
+ discardCancelled == that.discardCancelled &&
+ discardFailed == that.discardFailed &&
+ discardSuspended == that.discardSuspended;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (forced ? 1 : 0);
+ result = 31 * result + (externalize ? 1 : 0);
+ result = 31 * result + (discardSubsumed ? 1 : 0);
+ result = 31 * result + (discardFinished ? 1 : 0);
+ result = 31 * result + (discardCancelled ? 1 : 0);
+ result = 31 * result + (discardFailed ? 1 : 0);
+ result = 31 * result + (discardSuspended ? 1 : 0);
+ return result;
+ }
@Override
public String toString() {
- return "CheckpointProperties {" +
- "isSavepoint=" + isSavepoint +
+ return "CheckpointProperties{" +
+ "forced=" + forced +
+ ", externalize=" + externalizeCheckpoint() +
+ ", discardSubsumed=" + discardSubsumed +
+ ", discardFinished=" + discardFinished +
+ ", discardCancelled=" + discardCancelled +
+ ", discardFailed=" + discardFailed +
+ ", discardSuspended=" + discardSuspended +
'}';
}
// ------------------------------------------------------------------------
+ /**
+ * Creates the checkpoint properties for a (manually triggered) savepoint.
+ *
+ * <p>Savepoints are forced and persisted externally. They have to be
+ * garbage collected manually.
+ *
+ * @return Checkpoint properties for a (manually triggered) savepoint.
+ */
public static CheckpointProperties forStandardSavepoint() {
- return new CheckpointProperties(true);
+ return new CheckpointProperties(true, true, false, false, false, false, false);
}
+ /**
+ * Creates the checkpoint properties for a regular checkpoint.
+ *
+ * <p>Regular checkpoints are not forced and not persisted externally. They
+ * are garbage collected automatically.
+ *
+ * @return Checkpoint properties for a regular checkpoint.
+ */
public static CheckpointProperties forStandardCheckpoint() {
- return new CheckpointProperties(false);
+ return new CheckpointProperties(false, false, true, true, true, true, true);
+ }
+
+ /**
+ * Creates the checkpoint properties for an external checkpoint.
+ *
+ * <p>External checkpoints are not forced, but persisted externally. They
+ * are garbage collected automatically, except when the owning job
+ * terminates in state {@link JobStatus#FAILED}. The user is required to
+ * configure the clean up behaviour on job cancellation.
+ *
+ * @param deleteOnCancellation Flag indicating whether to discard on cancellation.
+ *
+ * @return Checkpoint properties for an external checkpoint.
+ */
+ public static CheckpointProperties forExternalizedCheckpoint(boolean deleteOnCancellation) {
+ return new CheckpointProperties(false, true, true, true, deleteOnCancellation, false, true);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 0d279f1..e135272 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -19,11 +19,15 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
@@ -34,7 +38,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
* and that is considered completed.
*/
-public class CompletedCheckpoint implements StateObject {
+public class CompletedCheckpoint implements Serializable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompletedCheckpoint.class);
private static final long serialVersionUID = -8360248179615702014L;
@@ -51,9 +57,11 @@ public class CompletedCheckpoint implements StateObject {
/** States of the different task groups belonging to this checkpoint */
private final Map<JobVertexID, TaskState> taskStates;
- /** Flag to indicate whether the completed checkpoint data should be deleted when this
- * handle to the checkpoint is disposed */
- private final boolean deleteStateWhenDisposed;
+ /** Properties for this checkpoint. */
+ private final CheckpointProperties props;
+
+ /** External path if persisted checkpoint; <code>null</code> otherwise. */
+ private final String externalPath;
// ------------------------------------------------------------------------
@@ -63,7 +71,8 @@ public class CompletedCheckpoint implements StateObject {
long timestamp,
long completionTimestamp,
Map<JobVertexID, TaskState> taskStates,
- boolean deleteStateWhenDisposed) {
+ CheckpointProperties props,
+ String externalPath) {
checkArgument(checkpointID >= 0);
checkArgument(timestamp >= 0);
@@ -74,7 +83,13 @@ public class CompletedCheckpoint implements StateObject {
this.timestamp = timestamp;
this.duration = completionTimestamp - timestamp;
this.taskStates = checkNotNull(taskStates);
- this.deleteStateWhenDisposed = deleteStateWhenDisposed;
+ this.props = checkNotNull(props);
+ this.externalPath = externalPath;
+
+ if (props.externalizeCheckpoint() && externalPath == null) {
+ throw new NullPointerException("Checkpoint properties say that the checkpoint " +
+ "should have been persisted, but missing external path.");
+ }
}
// ------------------------------------------------------------------------
@@ -95,19 +110,50 @@ public class CompletedCheckpoint implements StateObject {
return duration;
}
- @Override
- public void discardState() throws Exception {
- if (deleteStateWhenDisposed) {
+ public CheckpointProperties getProperties() {
+ return props;
+ }
- try {
- StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
- } finally {
- taskStates.clear();
+ public boolean subsume() throws Exception {
+ if (props.discardOnSubsumed()) {
+ discard();
+ return true;
+ }
+
+ return false;
+ }
+
+ public boolean discard(JobStatus jobStatus) throws Exception {
+ if (jobStatus == JobStatus.FINISHED && props.discardOnJobFinished() ||
+ jobStatus == JobStatus.CANCELED && props.discardOnJobCancelled() ||
+ jobStatus == JobStatus.FAILED && props.discardOnJobFailed() ||
+ jobStatus == JobStatus.SUSPENDED && props.discardOnJobSuspended()) {
+
+ discard();
+ return true;
+ } else {
+ if (externalPath != null) {
+ LOG.info("Persistent checkpoint with ID {} at '{}' not discarded.",
+ checkpointID,
+ externalPath);
}
+
+ return false;
+ }
+ }
+
+ private void discard() throws Exception {
+ try {
+ if (externalPath != null) {
+ SavepointStore.removeSavepoint(externalPath);
+ }
+
+ StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
+ } finally {
+ taskStates.clear();
}
}
- @Override
public long getStateSize() throws IOException {
long result = 0L;
@@ -126,6 +172,10 @@ public class CompletedCheckpoint implements StateObject {
return taskStates.get(jobVertexID);
}
+ public String getExternalPath() {
+ return externalPath;
+ }
+
// --------------------------------------------------------------------------------------------
@Override
@@ -153,4 +203,5 @@ public class CompletedCheckpoint implements StateObject {
public String toString() {
return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index c52fc25..d2c0f6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
import java.util.List;
/**
@@ -38,7 +40,7 @@ public interface CompletedCheckpointStore {
*
* <p>Only a bounded number of checkpoints is kept. When exceeding the maximum number of
* retained checkpoints, the oldest one will be discarded via {@link
- * CompletedCheckpoint#discard(ClassLoader)}.
+ * CompletedCheckpoint#discard()}.
*/
void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception;
@@ -49,17 +51,14 @@ public interface CompletedCheckpointStore {
CompletedCheckpoint getLatestCheckpoint() throws Exception;
/**
- * Shuts down the store and discards all checkpoint instances.
- */
- void shutdown() throws Exception;
-
- /**
- * Suspends the store.
+ * Shuts down the store.
+ *
+ * <p>The job status is forwarded and used to decide whether state should
+ * actually be discarded or kept.
*
- * <p>If the implementation allows recovery, checkpoint state needs to be
- * kept around. Otherwise, this should act like shutdown.
+ * @param jobStatus Job state on shut down
*/
- void suspend() throws Exception;
+ void shutdown(JobStatus jobStatus) throws Exception;
/**
* Returns all {@link CompletedCheckpoint} instances.
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 2ca9d69..983f1d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -19,6 +19,11 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -28,6 +33,8 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
@@ -46,6 +53,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class PendingCheckpoint {
+ private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
+
private final Object lock = new Object();
private final JobID jobId;
@@ -58,7 +67,17 @@ public class PendingCheckpoint {
private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
- private final boolean disposeWhenSubsumed;
+ /**
+ * The checkpoint properties. If the checkpoint should be persisted
+ * externally, it happens in {@link #finalizeCheckpoint()}.
+ */
+ private final CheckpointProperties props;
+
+ /** Target directory to potentially persist checkpoint to; <code>null</code> if none configured. */
+ private final String targetDirectory;
+
+ /** The promise to fulfill once the checkpoint has been completed. */
+ private final FlinkCompletableFuture<CompletedCheckpoint> onCompletionPromise = new FlinkCompletableFuture<>();
private int numAcknowledgedTasks;
@@ -70,23 +89,21 @@ public class PendingCheckpoint {
JobID jobId,
long checkpointId,
long checkpointTimestamp,
- Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm) {
- this(jobId, checkpointId, checkpointTimestamp, verticesToConfirm, true);
- }
-
- PendingCheckpoint(
- JobID jobId,
- long checkpointId,
- long checkpointTimestamp,
Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm,
- boolean disposeWhenSubsumed)
- {
+ CheckpointProperties props,
+ String targetDirectory) {
this.jobId = checkNotNull(jobId);
this.checkpointId = checkpointId;
this.checkpointTimestamp = checkpointTimestamp;
this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
- this.disposeWhenSubsumed = disposeWhenSubsumed;
this.taskStates = new HashMap<>();
+ this.props = checkNotNull(props);
+ this.targetDirectory = targetDirectory;
+
+ // Sanity check
+ if (props.externalizeCheckpoint() && targetDirectory == null) {
+ throw new NullPointerException("No target directory specified to persist checkpoint to.");
+ }
checkArgument(verticesToConfirm.size() > 0,
"Checkpoint needs at least one vertex that commits the checkpoint");
@@ -137,33 +154,71 @@ public class PendingCheckpoint {
* @return True if the checkpoint can be subsumed, false otherwise.
*/
public boolean canBeSubsumed() {
- return true;
+ // If the checkpoint is forced, it cannot be subsumed.
+ return !props.forceCheckpoint();
+ }
+
+ CheckpointProperties getProps() {
+ return props;
+ }
+
+ String getTargetDirectory() {
+ return targetDirectory;
}
// ------------------------------------------------------------------------
// Progress and Completion
// ------------------------------------------------------------------------
+ /**
+ * Returns the completion future.
+ *
+ * @return A future to the completed checkpoint
+ */
+ public Future<CompletedCheckpoint> getCompletionFuture() {
+ return onCompletionPromise;
+ }
+
public CompletedCheckpoint finalizeCheckpoint() throws Exception {
synchronized (lock) {
- if (discarded) {
- throw new IllegalStateException("pending checkpoint is discarded");
- }
- if (notYetAcknowledgedTasks.isEmpty()) {
- CompletedCheckpoint completed = new CompletedCheckpoint(
- jobId,
- checkpointId,
- checkpointTimestamp,
- System.currentTimeMillis(),
- new HashMap<>(taskStates),
- disposeWhenSubsumed);
-
- dispose(false);
-
- return completed;
- }
- else {
- throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged");
+ try {
+ if (discarded) {
+ throw new IllegalStateException("pending checkpoint is discarded");
+ }
+ if (notYetAcknowledgedTasks.isEmpty()) {
+ // Persist if required
+ String externalPath = null;
+ if (props.externalizeCheckpoint()) {
+ try {
+ Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
+ externalPath = SavepointStore.storeSavepoint(
+ targetDirectory,
+ savepoint);
+ } catch (Throwable t) {
+ LOG.error("Failed to persist checkpoints " + checkpointId + ".", t);
+ }
+ }
+
+ CompletedCheckpoint completed = new CompletedCheckpoint(
+ jobId,
+ checkpointId,
+ checkpointTimestamp,
+ System.currentTimeMillis(),
+ new HashMap<>(taskStates),
+ props,
+ externalPath);
+
+ onCompletionPromise.complete(completed);
+
+ dispose(false);
+
+ return completed;
+ } else {
+ throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged");
+ }
+ } catch (Throwable t) {
+ onCompletionPromise.completeExceptionally(t);
+ throw t;
}
}
}
@@ -180,7 +235,6 @@ public class PendingCheckpoint {
ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID);
if (vertex != null) {
-
if (checkpointStateHandles != null) {
List<KeyGroupsStateHandle> keyGroupsState = checkpointStateHandles.getKeyGroupsStateHandle();
ChainedStateHandle<StreamStateHandle> nonPartitionedState =
@@ -256,18 +310,36 @@ public class PendingCheckpoint {
* Aborts a checkpoint because it expired (took too long).
*/
public void abortExpired() throws Exception {
- dispose(true);
+ try {
+ onCompletionPromise.completeExceptionally(new Exception("Checkpoint expired before completing"));
+ } finally {
+ dispose(true);
+ }
}
/**
* Aborts the pending checkpoint because a newer completed checkpoint subsumed it.
*/
public void abortSubsumed() throws Exception {
- dispose(true);
+ try {
+ if (props.forceCheckpoint()) {
+ onCompletionPromise.completeExceptionally(new Exception("Bug: forced checkpoints must never be subsumed"));
+
+ throw new IllegalStateException("Bug: forced checkpoints must never be subsumed");
+ } else {
+ onCompletionPromise.completeExceptionally(new Exception("Checkpoints has been subsumed"));
+ }
+ } finally {
+ dispose(true);
+ }
}
public void abortDeclined() throws Exception {
- dispose(true);
+ try {
+ onCompletionPromise.completeExceptionally(new Exception("Checkpoint was declined (tasks not ready)"));
+ } finally {
+ dispose(true);
+ }
}
/**
@@ -275,10 +347,14 @@ public class PendingCheckpoint {
* @param cause The error's exception.
*/
public void abortError(Throwable cause) throws Exception {
- dispose(true);
+ try {
+ onCompletionPromise.completeExceptionally(new Exception("Checkpoint failed: " + cause.getMessage(), cause));
+ } finally {
+ dispose(true);
+ }
}
- protected void dispose(boolean releaseState) throws Exception {
+ private void dispose(boolean releaseState) throws Exception {
synchronized (lock) {
try {
discarded = true;
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
deleted file mode 100644
index 0bb6a91..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.util.ExceptionUtils;
-import org.slf4j.Logger;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A pending savepoint is like a pending checkpoint, but it additionally performs some
- * actions upon completion, like notifying the triggerer.
- */
-public class PendingSavepoint extends PendingCheckpoint {
-
- private static final Logger LOG = CheckpointCoordinator.LOG;
-
- private final SavepointStore store;
-
- /** The promise to fulfill once the savepoint is complete */
- private final Promise<String> onCompletionPromise;
-
- // --------------------------------------------------------------------------------------------
-
- public PendingSavepoint(
- JobID jobId,
- long checkpointId,
- long checkpointTimestamp,
- Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm,
- SavepointStore store)
- {
- super(jobId, checkpointId, checkpointTimestamp, verticesToConfirm, false);
-
- this.store = checkNotNull(store);
- this.onCompletionPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
- }
-
- // --------------------------------------------------------------------------------------------
- // Savepoint completion
- // --------------------------------------------------------------------------------------------
-
- public Future<String> getCompletionFuture() {
- return onCompletionPromise.future();
- }
-
- @Override
- public CompletedCheckpoint finalizeCheckpoint() throws Exception {
- // finalize checkpoint (this also disposes this pending checkpoint)
- CompletedCheckpoint completedCheckpoint = super.finalizeCheckpoint();
-
- // now store the checkpoint externally as a savepoint
- try {
- Savepoint savepoint = new SavepointV1(
- completedCheckpoint.getCheckpointID(),
- completedCheckpoint.getTaskStates().values());
-
- String path = store.storeSavepoint(savepoint);
- onCompletionPromise.success(path);
- }
- catch (Throwable t) {
- LOG.warn("Failed to store savepoint.", t);
- onCompletionPromise.failure(t);
-
- ExceptionUtils.rethrow(t, "Failed to store savepoint.");
- }
-
- return completedCheckpoint;
- }
-
- // ------------------------------------------------------------------------
- // Cancellation / Disposal
- // ------------------------------------------------------------------------
-
- @Override
- public boolean canBeSubsumed() {
- return false;
- }
-
- @Override
- public void abortSubsumed() throws Exception {
- try {
- Exception e = new Exception("Bug: Savepoints must never be subsumed");
- onCompletionPromise.failure(e);
- throw e;
- }
- finally {
- dispose(true);
- }
- }
-
- @Override
- public void abortExpired() throws Exception {
- try {
- LOG.info("Savepoint with checkpoint ID " + getCheckpointId() + " expired before completing.");
- onCompletionPromise.failure(new Exception("Savepoint expired before completing"));
- }
- finally {
- dispose(true);
- }
- }
-
- @Override
- public void abortDeclined() throws Exception {
- try {
- LOG.info("Savepoint with checkpoint ID " + getCheckpointId() + " was declined (tasks not ready).");
- onCompletionPromise.failure(new Exception("Savepoint was declined (tasks not ready)"));
- }
- finally {
- dispose(true);
- }
- }
-
- @Override
- public void abortError(Throwable cause) throws Exception {
- try {
- LOG.info("Savepoint with checkpoint ID " + getCheckpointId() + " failed due to an error", cause);
- onCompletionPromise.failure(
- new Exception("Savepoint could not be completed: " + cause.getMessage(), cause));
- }
- finally {
- dispose(true);
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return String.format("Pending Savepoint %d @ %d - confirmed=%d, pending=%d",
- getCheckpointId(), getCheckpointTimestamp(),
- getNumberOfAcknowledgedTasks(), getNumberOfNonAcknowledgedTasks());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
index 84cbe19..e4ed996 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import java.util.concurrent.atomic.AtomicLong;
@@ -35,10 +36,7 @@ public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
public void start() throws Exception {}
@Override
- public void shutdown() throws Exception {}
-
- @Override
- public void suspend() throws Exception {}
+ public void shutdown(JobStatus jobStatus) throws Exception {}
@Override
public long getAndIncrement() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index aecb51e..a9624fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -41,7 +41,7 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa
throws Exception {
return new StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory
- .NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader);
+ .NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 5e03988..082bca9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -18,8 +18,10 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.state.StateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -32,6 +34,8 @@ import static org.apache.flink.util.Preconditions.checkArgument;
*/
public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
+ private static final Logger LOG = LoggerFactory.getLogger(StandaloneCompletedCheckpointStore.class);
+
/** The maximum number of checkpoints to retain (at least 1). */
private final int maxNumberOfCheckpointsToRetain;
@@ -44,16 +48,10 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at
* least 1). Adding more checkpoints than this results
* in older checkpoints being discarded.
- * @param userClassLoader The user class loader used to discard checkpoints
*/
- public StandaloneCompletedCheckpointStore(
- int maxNumberOfCheckpointsToRetain,
- ClassLoader userClassLoader) {
-
+ public StandaloneCompletedCheckpointStore(int maxNumberOfCheckpointsToRetain) {
checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
-
this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
-
this.checkpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
}
@@ -64,9 +62,9 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
- checkpoints.addLast(checkpoint);
+ checkpoints.add(checkpoint);
if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
- checkpoints.removeFirst().discardState();
+ checkpoints.remove().subsume();
}
}
@@ -86,17 +84,16 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
}
@Override
- public void shutdown() throws Exception {
+ public void shutdown(JobStatus jobStatus) throws Exception {
try {
- StateUtil.bestEffortDiscardAllStateObjects(checkpoints);
+ LOG.info("Shutting down");
+
+ for (CompletedCheckpoint checkpoint : checkpoints) {
+ checkpoint.discard(jobStatus);
+ }
} finally {
checkpoints.clear();
}
}
- @Override
- public void suspend() throws Exception {
- // Do a regular shutdown, because we can't recovery anything
- shutdown();
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
index 0bceb8b..80e79b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -23,6 +23,7 @@ import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,30 +92,17 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
}
@Override
- public void shutdown() throws Exception {
+ public void shutdown(JobStatus jobStatus) throws Exception {
synchronized (startStopLock) {
if (isStarted) {
LOG.info("Shutting down.");
sharedCount.close();
client.getConnectionStateListenable().removeListener(connStateListener);
- LOG.info("Removing {} from ZooKeeper", counterPath);
- client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
-
- isStarted = false;
- }
- }
- }
-
- @Override
- public void suspend() throws Exception {
- synchronized (startStopLock) {
- if (isStarted) {
- LOG.info("Suspending.");
- sharedCount.close();
- client.getConnectionStateListenable().removeListener(connStateListener);
-
- // Don't remove any state
+ if (jobStatus.isGloballyTerminalState()) {
+ LOG.info("Removing {} from ZooKeeper", counterPath);
+ client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
+ }
isStarted = false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 55a0bbb..f47012d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -55,7 +55,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac
throws Exception {
return ZooKeeperUtils.createCompletedCheckpoints(client, config, jobId,
- NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader);
+ NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index b826d9f..4f67921 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -24,9 +24,10 @@ import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +36,7 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.List;
+import java.util.concurrent.Callable;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -75,9 +77,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
/** The maximum number of checkpoints to retain (at least 1). */
private final int maxNumberOfCheckpointsToRetain;
- /** User class loader for discarding {@link CompletedCheckpoint} instances. */
- private final ClassLoader userClassLoader;
-
/** Local completed checkpoints. */
private final ArrayDeque<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;
@@ -88,7 +87,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
* least 1). Adding more checkpoints than this results
* in older checkpoints being discarded. On recovery,
* we will only start with a single checkpoint.
- * @param userClassLoader The user class loader used to discard checkpoints
* @param client The Curator ZooKeeper client
* @param checkpointsPath The ZooKeeper path for the checkpoints (needs to
* start with a '/')
@@ -98,7 +96,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
*/
public ZooKeeperCompletedCheckpointStore(
int maxNumberOfCheckpointsToRetain,
- ClassLoader userClassLoader,
CuratorFramework client,
String checkpointsPath,
RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage) throws Exception {
@@ -107,7 +104,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
checkNotNull(stateStorage, "State storage");
this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
- this.userClassLoader = checkNotNull(userClassLoader, "User class loader");
checkNotNull(client, "Curator client");
checkNotNull(checkpointsPath, "Checkpoints path");
@@ -172,7 +168,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
for (int i = 0; i < numberOfInitialCheckpoints - 1; i++) {
try {
- removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
+ removeSubsumed(initialCheckpoints.get(i));
}
catch (Exception e) {
LOG.error("Failed to discard checkpoint", e);
@@ -200,7 +196,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
// Everything worked, let's remove a previous checkpoint if necessary.
if (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) {
- removeFromZooKeeperAndDiscardCheckpoint(checkpointStateHandles.removeFirst());
+ removeSubsumed(checkpointStateHandles.removeFirst());
}
LOG.debug("Added {} to {}.", checkpoint, path);
@@ -233,68 +229,90 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
}
@Override
- public void shutdown() throws Exception {
- LOG.info("Shutting down");
+ public void shutdown(JobStatus jobStatus) throws Exception {
+ if (jobStatus.isGloballyTerminalState()) {
+ LOG.info("Shutting down");
- for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
- try {
- removeFromZooKeeperAndDiscardCheckpoint(checkpoint);
- }
- catch (Exception e) {
- LOG.error("Failed to discard checkpoint.", e);
+ for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
+ try {
+ removeShutdown(checkpoint, jobStatus);
+ } catch (Exception e) {
+ LOG.error("Failed to discard checkpoint.", e);
+ }
}
+
+ checkpointStateHandles.clear();
+
+ String path = "/" + client.getNamespace();
+
+ LOG.info("Removing {} from ZooKeeper", path);
+ ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
+ } else {
+ LOG.info("Suspending");
+
+ // Clear the local handles, but don't remove any state
+ checkpointStateHandles.clear();
}
+ }
- checkpointStateHandles.clear();
+ // ------------------------------------------------------------------------
- String path = "/" + client.getNamespace();
+ private void removeSubsumed(final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
+ Callable<Void> action = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ stateHandleAndPath.f0.retrieveState().subsume();
+ return null;
+ }
+ };
- LOG.info("Removing {} from ZooKeeper", path);
- ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
+ remove(stateHandleAndPath, action);
}
- @Override
- public void suspend() throws Exception {
- LOG.info("Suspending");
+ private void removeShutdown(
+ final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
+ final JobStatus jobStatus) throws Exception {
- // Clear the local handles, but don't remove any state
- checkpointStateHandles.clear();
+ Callable<Void> action = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ CompletedCheckpoint checkpoint = stateHandleAndPath.f0.retrieveState();
+ checkpoint.discard(jobStatus);
+ return null;
+ }
+ };
+
+ remove(stateHandleAndPath, action);
}
/**
* Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle.
*/
- private void removeFromZooKeeperAndDiscardCheckpoint(
- final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
+ private void remove(
+ final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
+ final Callable<Void> action) throws Exception {
- final BackgroundCallback callback = new BackgroundCallback() {
+ BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
try {
if (event.getType() == CuratorEventType.DELETE) {
if (event.getResultCode() == 0) {
- // The checkpoint
try {
- CompletedCheckpoint checkpoint = stateHandleAndPath.f0.retrieveState();
- checkpoint.discardState();
- // Discard the checkpoint
- LOG.debug("Discarded " + checkpoint);
+ action.call();
} finally {
// Discard the state handle
stateHandleAndPath.f0.discardState();
}
- }
- else {
+ } else {
throw new IllegalStateException("Unexpected result code " +
event.getResultCode() + " in '" + event + "' callback.");
}
- }
- else {
+ } else {
throw new IllegalStateException("Unexpected event type " +
event.getType() + " in '" + event + "' callback.");
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.error("Failed to discard checkpoint.", e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
deleted file mode 100644
index 49f51be..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A file system based {@link SavepointStore}.
- *
- * <p>Stored savepoints have the following format:
- * <pre>
- * MagicNumber SavepointVersion Savepoint
- * - MagicNumber => int
- * - SavepointVersion => int (returned by Savepoint#getVersion())
- * - Savepoint => bytes (serialized via version-specific SavepointSerializer)
- * </pre>
- */
-public class FsSavepointStore implements SavepointStore {
-
- private static final Logger LOG = LoggerFactory.getLogger(FsSavepointStore.class);
-
- /** Magic number for sanity checks against stored savepoints. */
- int MAGIC_NUMBER = 0x4960672d;
-
- /** Root path for savepoints. */
- private final Path rootPath;
-
- /** Prefix for savepoint files. */
- private final String prefix;
-
- /** File system to use for file access. */
- private final FileSystem fileSystem;
-
- /**
- * Creates a new file system based {@link SavepointStore}.
- *
- * @param rootPath Root path for savepoints
- * @param prefix Prefix for savepoint files
- * @throws IOException On failure to access root path
- */
- FsSavepointStore(String rootPath, String prefix) throws IOException {
- this.rootPath = new Path(checkNotNull(rootPath, "Root path"));
- this.prefix = checkNotNull(prefix, "Prefix");
-
- this.fileSystem = FileSystem.get(this.rootPath.toUri());
- }
-
- @Override
- public <T extends Savepoint> String storeSavepoint(T savepoint) throws IOException {
- Preconditions.checkNotNull(savepoint, "Savepoint");
-
- Exception latestException = null;
- Path path = null;
- FSDataOutputStream fdos = null;
-
- // Try to create a FS output stream
- for (int attempt = 0; attempt < 10; attempt++) {
- path = new Path(rootPath, FileUtils.getRandomFilename(prefix));
- try {
- fdos = fileSystem.create(path, false);
- break;
- } catch (Exception e) {
- latestException = e;
- }
- }
-
- if (fdos == null) {
- throw new IOException("Failed to create file output stream at " + path, latestException);
- }
-
- boolean success = false;
- try (DataOutputStream dos = new DataOutputStream(fdos)) {
- // Write header
- dos.writeInt(MAGIC_NUMBER);
- dos.writeInt(savepoint.getVersion());
-
- // Write savepoint
- SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint);
- serializer.serialize(savepoint, dos);
- success = true;
- } finally {
- if (!success && fileSystem.exists(path)) {
- if (!fileSystem.delete(path, true)) {
- LOG.warn("Failed to delete file " + path + " after failed write.");
- }
- }
- }
-
- return path.toString();
- }
-
- @Override
- public Savepoint loadSavepoint(String path) throws IOException {
- Preconditions.checkNotNull(path, "Path");
-
- try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) {
- int magicNumber = dis.readInt();
-
- if (magicNumber == MAGIC_NUMBER) {
- int version = dis.readInt();
-
- SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version);
- return serializer.deserialize(dis);
- } else {
- throw new RuntimeException("Unexpected magic number. This is most likely " +
- "caused by trying to load a Flink 1.0 savepoint. You cannot load a " +
- "savepoint triggered by Flink 1.0 with this version of Flink. If it is " +
- "_not_ a Flink 1.0 savepoint, this error indicates that the specified " +
- "file is not a proper savepoint or the file has been corrupted.");
- }
- }
- }
-
- @Override
- public void disposeSavepoint(String path) throws Exception {
- Preconditions.checkNotNull(path, "Path");
-
- try {
- Savepoint savepoint = loadSavepoint(path);
- LOG.info("Disposing savepoint: " + path);
- savepoint.dispose();
-
- Path filePath = new Path(path);
-
- if (fileSystem.exists(filePath)) {
- if (!fileSystem.delete(filePath, true)) {
- throw new IOException("Failed to delete " + filePath + ".");
- }
- } else {
- throw new IllegalArgumentException("Invalid path '" + filePath.toUri() + "'.");
- }
- } catch (Throwable t) {
- throw new IOException("Failed to dispose savepoint " + path + ".", t);
- }
- }
-
- @Override
- public void shutdown() throws Exception {
- // Nothing to do, because the savepoint life-cycle is independent of
- // the cluster life-cycle.
- }
-
- private FSDataInputStream createFsInputStream(Path path) throws IOException {
- if (fileSystem.exists(path)) {
- return fileSystem.open(path);
- } else {
- throw new IllegalArgumentException("Invalid path '" + path.toUri() + "'.");
- }
- }
-
- /**
- * Returns the savepoint root path.
- *
- * @return Savepoint root path
- */
- Path getRootPath() {
- return rootPath;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
deleted file mode 100644
index 2cf8f31..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Heap-backed savepoint store.
- *
- * <p>The life-cycle of savepoints is bound to the life-cycle of the cluster.
- */
-public class HeapSavepointStore implements SavepointStore {
-
- private static final Logger LOG = LoggerFactory.getLogger(HeapSavepointStore.class);
-
- private final Object shutDownLock = new Object();
-
- /** Stored savepoints. */
- private final Map<String, Savepoint> savepoints = new HashMap<>(1);
-
- /** ID counter to identify savepoints. */
- private final AtomicInteger currentId = new AtomicInteger();
-
- /** Flag indicating whether state store has been shut down. */
- private boolean shutDown;
-
- /** Shut down hook. */
- private final Thread shutdownHook;
-
- /**
- * Creates a heap-backed savepoint store.
- *
- * <p>Savepoint are discarded on {@link #shutdown()}.
- */
- public HeapSavepointStore() {
- this.shutdownHook = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- shutdown();
- } catch (Throwable t) {
- LOG.warn("Failure during shut down hook.", t);
- }
- }
- });
-
- try {
- Runtime.getRuntime().addShutdownHook(shutdownHook);
- } catch (IllegalStateException ignored) {
- // JVM is already shutting down
- } catch (Throwable t) {
- LOG.warn("Failed to register shutdown hook.");
- }
- }
-
- @Override
- public <T extends Savepoint> String storeSavepoint(T savepoint) throws IOException {
- Preconditions.checkNotNull(savepoint, "Savepoint");
-
- synchronized (shutDownLock) {
- if (shutDown) {
- throw new IllegalStateException("Shut down");
- } else {
- String path = "jobmanager://savepoints/" + currentId.incrementAndGet();
- savepoints.put(path, savepoint);
- return path;
- }
- }
- }
-
- @Override
- public Savepoint loadSavepoint(String path) throws IOException {
- Preconditions.checkNotNull(path, "Path");
-
- Savepoint savepoint;
- synchronized (shutDownLock) {
- savepoint = savepoints.get(path);
- }
-
- if (savepoint != null) {
- return savepoint;
- } else {
- throw new IllegalArgumentException("Invalid path '" + path + "'.");
- }
- }
-
- @Override
- public void disposeSavepoint(String path) throws Exception {
- Preconditions.checkNotNull(path, "Path");
-
- Savepoint savepoint;
- synchronized (shutDownLock) {
- savepoint = savepoints.remove(path);
- }
-
- if (savepoint != null) {
- savepoint.dispose();
- } else {
- throw new IllegalArgumentException("Invalid path '" + path + "'.");
- }
- }
-
- @Override
- public void shutdown() throws Exception {
- synchronized (shutDownLock) {
- // This is problematic as the user code class loader is not
- // available at this point.
- for (Savepoint savepoint : savepoints.values()) {
- try {
- savepoint.dispose();
- } catch (Throwable t) {
- LOG.warn("Failed to dispose savepoint " + savepoint.getCheckpointId(), t);
- }
- }
-
- savepoints.clear();
-
- // Remove shutdown hook to prevent resource leaks, unless this is
- // invoked by the shutdown hook itself.
- if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
- try {
- Runtime.getRuntime().removeShutdownHook(shutdownHook);
- } catch (IllegalStateException ignored) {
- // Race, JVM is in shutdown already, we can safely ignore this
- } catch (Throwable t) {
- LOG.warn("Failed to unregister shut down hook.");
- }
- }
-
- shutDown = true;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 47917b4..845008d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -19,16 +19,18 @@
package org.apache.flink.runtime.checkpoint.savepoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
- * The SavepointLoader is a utility to load and verify a Savepoint, and to create a checkpoint from it.
+ * The SavepointLoader is a utility to load and verify a Savepoint, and to create a checkpoint from it.
*/
public class SavepointLoader {
@@ -39,7 +41,6 @@ public class SavepointLoader {
*
* @param jobId The JobID of the job to load the savepoint for.
* @param tasks Tasks that will possibly be reset
- * @param savepointStore The store that holds the savepoint.
* @param savepointPath The path of the savepoint to rollback to
*
* @throws IllegalStateException If mismatch between program and savepoint state
@@ -48,13 +49,12 @@ public class SavepointLoader {
public static CompletedCheckpoint loadAndValidateSavepoint(
JobID jobId,
Map<JobVertexID, ExecutionJobVertex> tasks,
- SavepointStore savepointStore,
- String savepointPath) throws Exception {
+ String savepointPath) throws IOException {
// (1) load the savepoint
- Savepoint savepoint = savepointStore.loadSavepoint(savepointPath);
+ Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath);
final Map<JobVertexID, TaskState> taskStates = new HashMap<>(savepoint.getTaskStates().size());
-
+
// (2) validate it (parallelism, etc)
for (TaskState taskState : savepoint.getTaskStates()) {
ExecutionJobVertex executionJobVertex = tasks.get(taskState.getJobVertexID());
@@ -66,12 +66,12 @@ public class SavepointLoader {
else {
String msg = String.format("Failed to rollback to savepoint %s. " +
"Max parallelism mismatch between savepoint state and new program. " +
- "Cannot map operator %s with parallelism %d to new program with " +
+ "Cannot map operator %s with max parallelism %d to new program with " +
"parallelism %d. This indicates that the program has been changed " +
"in a non-compatible way after the savepoint.",
savepoint,
taskState.getJobVertexID(),
- taskState.getParallelism(),
+ taskState.getMaxParallelism(),
executionJobVertex.getParallelism());
throw new IllegalStateException(msg);
@@ -87,8 +87,9 @@ public class SavepointLoader {
}
// (3) convert to checkpoint so the system can fall back to it
- return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L, taskStates, false);
- }
+ CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
+ return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L, taskStates, props, savepointPath);
+ }
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
index 211209c..6a55b33 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
@@ -25,7 +25,7 @@ import java.io.IOException;
/**
* Serializer for {@link Savepoint} instances.
*
- * <p>This serializer is used to read/write a savepoint via {@link FsSavepointStore}.
+ * <p>This serializer is used to read/write a savepoint via {@link SavepointStore}.
*
* <p>Version-specific serializers are accessed via the {@link SavepointSerializers} helper.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 68b88d2..4b65418 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -18,23 +18,105 @@
package org.apache.flink.runtime.checkpoint.savepoint;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
- * Savepoint store used to persist {@link Savepoint} instances.
+ * A file system based savepoint store.
*
- * <p>The main implementation is the {@link FsSavepointStore}. We also have the
- * {@link HeapSavepointStore} for historical reasons (introduced in Flink 1.0).
+ * <p>Stored savepoints have the following format:
+ * <pre>
+ * MagicNumber SavepointVersion Savepoint
+ * - MagicNumber => int
+ * - SavepointVersion => int (returned by Savepoint#getVersion())
+ * - Savepoint => bytes (serialized via version-specific SavepointSerializer)
+ * </pre>
*/
-public interface SavepointStore {
+public class SavepointStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SavepointStore.class);
+
+ /** Magic number for sanity checks against stored savepoints. */
+ private static final int MAGIC_NUMBER = 0x4960672d;
+
+ /** Prefix for savepoint files. */
+ private static final String prefix = "savepoint-";
/**
* Stores the savepoint.
*
+ * @param targetDirectory Target directory to store savepoint in
* @param savepoint Savepoint to be stored
* @param <T> Savepoint type
* @return Path of stored savepoint
* @throws Exception Failures during store are forwarded
*/
- <T extends Savepoint> String storeSavepoint(T savepoint) throws Exception;
+ public static <T extends Savepoint> String storeSavepoint(
+ String targetDirectory,
+ T savepoint) throws IOException {
+
+ checkNotNull(targetDirectory, "Target directory");
+ checkNotNull(savepoint, "Savepoint");
+
+ Exception latestException = null;
+ Path path = null;
+ FSDataOutputStream fdos = null;
+
+ FileSystem fs = null;
+
+ // Try to create a FS output stream
+ for (int attempt = 0; attempt < 10; attempt++) {
+ path = new Path(targetDirectory, FileUtils.getRandomFilename(prefix));
+
+ if (fs == null) {
+ fs = FileSystem.get(path.toUri());
+ }
+
+ try {
+ fdos = fs.create(path, false);
+ break;
+ } catch (Exception e) {
+ latestException = e;
+ }
+ }
+
+ if (fdos == null) {
+ throw new IOException("Failed to create file output stream at " + path, latestException);
+ }
+
+ boolean success = false;
+ try (DataOutputStream dos = new DataOutputStream(fdos)) {
+ // Write header
+ dos.writeInt(MAGIC_NUMBER);
+ dos.writeInt(savepoint.getVersion());
+
+ // Write savepoint
+ SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint);
+ serializer.serialize(savepoint, dos);
+ success = true;
+ } finally {
+ if (!success && fs.exists(path)) {
+ if (!fs.delete(path, true)) {
+ LOG.warn("Failed to delete file {} after failed write.", path);
+ }
+ }
+ }
+
+ return path.toString();
+ }
/**
* Loads the savepoint at the specified path.
@@ -43,24 +125,62 @@ public interface SavepointStore {
* @return The loaded savepoint
* @throws Exception Failures during load are forwared
*/
- Savepoint loadSavepoint(String path) throws Exception;
+ public static Savepoint loadSavepoint(String path) throws IOException {
+ Preconditions.checkNotNull(path, "Path");
- /**
- * Disposes the savepoint at the specified path.
- *
- * @param path Path of savepoint to dispose
- * @throws Exception Failures during diposal are forwarded
- */
- void disposeSavepoint(String path) throws Exception;
+ try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) {
+ int magicNumber = dis.readInt();
+
+ if (magicNumber == MAGIC_NUMBER) {
+ int version = dis.readInt();
+
+ SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version);
+ return serializer.deserialize(dis);
+ } else {
+ throw new RuntimeException("Unexpected magic number. This is most likely " +
+ "caused by trying to load a Flink 1.0 savepoint. You cannot load a " +
+ "savepoint triggered by Flink 1.0 with this version of Flink. If it is " +
+ "_not_ a Flink 1.0 savepoint, this error indicates that the specified " +
+ "file is not a proper savepoint or the file has been corrupted.");
+ }
+ }
+ }
/**
- * Shut downs the savepoint store.
+ * Removes the savepoint meta data w/o loading and disposing it.
*
- * <p>Only necessary for implementations where the savepoint life-cycle is
- * bound to the cluster life-cycle.
- *
- * @throws Exception Failures during shut down are forwarded
+ * @param path Path of savepoint to remove
+ * @throws Exception Failures during disposal are forwarded
*/
- void shutdown() throws Exception;
+ public static void removeSavepoint(String path) throws IOException {
+ Preconditions.checkNotNull(path, "Path");
+
+ try {
+ LOG.info("Removing savepoint: {}.", path);
+
+ Path filePath = new Path(path);
+ FileSystem fs = FileSystem.get(filePath.toUri());
+
+ if (fs.exists(filePath)) {
+ if (!fs.delete(filePath, true)) {
+ throw new IOException("Failed to delete " + filePath + ".");
+ }
+ } else {
+ throw new IllegalArgumentException("Invalid path '" + filePath.toUri() + "'.");
+ }
+ } catch (Throwable t) {
+ throw new IOException("Failed to dispose savepoint " + path + ".", t);
+ }
+ }
+
+ private static FSDataInputStream createFsInputStream(Path path) throws IOException {
+ FileSystem fs = FileSystem.get(path.toUri());
+
+ if (fs.exists(path)) {
+ return fs.open(path);
+ } else {
+ throw new IllegalArgumentException("Invalid path '" + path.toUri() + "'.");
+ }
+ }
}