You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/28 18:36:49 UTC
[06/11] flink git commit: [FLINK-5897] [checkpoints] Make checkpoint
externalization not depend strictly on FileSystems
[FLINK-5897] [checkpoints] Make checkpoint externalization not depend strictly on FileSystems
That is the first step towards checkpoints that can be externalized to other stores as well,
like k/v stores and databases, if supported by the state backend.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b7f21d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b7f21d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b7f21d8
Branch: refs/heads/master
Commit: 5b7f21d891b410ca0046efdaf12caf5e73deadf4
Parents: 9912de2
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 22 22:18:50 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 28 18:59:10 2017 +0100
----------------------------------------------------------------------
.../org/apache/flink/util/ExceptionUtils.java | 4 +-
.../checkpoint/CheckpointCoordinator.java | 20 ++-
.../runtime/checkpoint/CompletedCheckpoint.java | 135 +++++++++++++++----
.../checkpoint/CompletedCheckpointStore.java | 9 ++
.../runtime/checkpoint/PendingCheckpoint.java | 112 ++++++++++-----
.../StandaloneCompletedCheckpointStore.java | 4 +
.../ZooKeeperCompletedCheckpointStore.java | 5 +
.../checkpoint/savepoint/SavepointLoader.java | 19 ++-
.../checkpoint/savepoint/SavepointStore.java | 93 ++++++++++---
.../apache/flink/runtime/state/StateUtil.java | 17 +--
.../flink/runtime/jobmanager/JobManager.scala | 14 +-
.../CheckpointCoordinatorFailureTest.java | 5 +
.../CompletedCheckpointStoreTest.java | 2 +-
.../checkpoint/CompletedCheckpointTest.java | 17 ++-
.../checkpoint/PendingCheckpointTest.java | 25 ++--
.../jobmanager/JobManagerHARecoveryTest.java | 4 +
.../runtime/jobmanager/JobManagerITCase.scala | 2 +-
.../JobManagerHACheckpointRecoveryITCase.java | 2 +-
18 files changed, 365 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index fea25ff..7167a0b 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -257,7 +257,7 @@ public final class ExceptionUtils {
throw (Error) t;
}
else {
- throw new IOException(t);
+ throw new IOException(t.getMessage(), t);
}
}
@@ -268,7 +268,7 @@ public final class ExceptionUtils {
* @param searchType the type of exception to search for in the chain.
* @return True, if the searched type is nested in the throwable, false otherwise.
*/
- public static boolean containsThrowable(Throwable throwable, Class searchType) {
+ public static boolean containsThrowable(Throwable throwable, Class<?> searchType) {
if (throwable == null || searchType == null) {
return false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index c1c65b5..6da6f7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.TaskStateHandles;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -758,13 +759,19 @@ public class CheckpointCoordinator {
CompletedCheckpoint completedCheckpoint = null;
try {
- completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
+ // externalize the checkpoint if required
+ if (pendingCheckpoint.getProps().externalizeCheckpoint()) {
+ completedCheckpoint = pendingCheckpoint.finalizeCheckpointExternalized();
+ } else {
+ completedCheckpoint = pendingCheckpoint.finalizeCheckpointNonExternalized();
+ }
completedCheckpointStore.addCheckpoint(completedCheckpoint);
rememberRecentCheckpointId(checkpointId);
dropSubsumedCheckpoints(checkpointId);
- } catch (Exception exception) {
+ }
+ catch (Exception exception) {
// abort the current pending checkpoint if it has not been discarded yet
if (!pendingCheckpoint.isDiscarded()) {
pendingCheckpoint.abortError(exception);
@@ -779,8 +786,8 @@ public class CheckpointCoordinator {
public void run() {
try {
cc.discard();
- } catch (Exception nestedException) {
- LOG.warn("Could not properly discard completed checkpoint {}.", cc.getCheckpointID(), nestedException);
+ } catch (Throwable t) {
+ LOG.warn("Could not properly discard completed checkpoint {}.", cc.getCheckpointID(), t);
}
}
});
@@ -808,11 +815,12 @@ public class CheckpointCoordinator {
builder.append(", ");
}
// Remove last two chars ", "
- builder.delete(builder.length() - 2, builder.length());
+ builder.setLength(builder.length() - 2);
LOG.debug(builder.toString());
}
+ // send the "notify complete" call to all vertices
final long timestamp = completedCheckpoint.getTimestamp();
for (ExecutionVertex ev : tasksToCommitTo) {
@@ -934,7 +942,7 @@ public class CheckpointCoordinator {
latest.getCheckpointID(),
latest.getProperties(),
restoreTimestamp,
- latest.getExternalPath());
+ latest.getExternalPointer());
statsTracker.reportRestoredCheckpoint(restored);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index db86484..17ce4d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -18,11 +18,14 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats.DiscardCallback;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,8 +38,36 @@ import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
- * and that is considered completed.
+ * A CompletedCheckpoint describes a checkpoint after all required tasks acknowledged it (with their state)
+ * and that is considered successful. The CompletedCheckpoint class contains all the metadata of the
+ * checkpoint, i.e., checkpoint ID, timestamps, and the handles to all states that are part of the
+ * checkpoint.
+ *
+ * <h2>Size the CompletedCheckpoint Instances</h2>
+ *
+ * In most cases, the CompletedCheckpoint objects are very small, because the handles to the checkpoint
+ * states are only pointers (such as file paths). However, the some state backend implementations may
+ * choose to store some payload data directly with the metadata (for example to avoid many small files).
+ * If those thresholds are increased to large values, the memory consumption of the CompletedCheckpoint
+ * objects can be significant.
+ *
+ * <h2>Externalized Metadata</h2>
+ *
+ * The metadata of the CompletedCheckpoint is optionally also persisted in an external storage
+ * system. In that case, the checkpoint is called <i>externalized</i>.
+ *
+ * <p>Externalized checkpoints have an external pointer, which points to the metadata. For example
+ * when externalizing to a file system, that pointer is the file path to the checkpoint's folder
+ * or the metadata file. For a state backend that stores metadata in database tables, the pointer
+ * could be the table name and row key. The pointer is encoded as a String.
+ *
+ * <h2>Externalized Metadata and High-availability</h2>
+ *
+ * For high availability setups, the checkpoint metadata must be stored persistent and available
+ * as well. The high-availability services that stores the checkpoint ground-truth (meaning what are
+ * the latest completed checkpoints in what order) often rely on checkpoints being externalized. That
+ * way, those services only store pointers to the externalized metadata, rather than the complete
+ * metadata itself (for example ZooKeeper's ZNode payload should ideally be less than megabytes).
*/
public class CompletedCheckpoint implements Serializable {
@@ -44,8 +75,12 @@ public class CompletedCheckpoint implements Serializable {
private static final long serialVersionUID = -8360248179615702014L;
+ // ------------------------------------------------------------------------
+
+ /** The ID of the job that the checkpoint belongs to */
private final JobID job;
+ /** The ID (logical timestamp) of the checkpoint */
private final long checkpointID;
/** The timestamp when the checkpoint was triggered. */
@@ -60,23 +95,41 @@ public class CompletedCheckpoint implements Serializable {
/** Properties for this checkpoint. */
private final CheckpointProperties props;
- /** External path if persisted checkpoint; <code>null</code> otherwise. */
- private final String externalPath;
+ /** The state handle to the externalized meta data, if the metadata has been externalized */
+ @Nullable
+ private final StreamStateHandle externalizedMetadata;
+
+ /** External pointer to the completed checkpoint (for example file path) if externalized; null otherwise. */
+ @Nullable
+ private final String externalPointer;
/** Optional stats tracker callback for discard. */
@Nullable
- private transient CompletedCheckpointStats.DiscardCallback discardCallback;
+ private transient volatile DiscardCallback discardCallback;
// ------------------------------------------------------------------------
- public CompletedCheckpoint(
+ @VisibleForTesting
+ CompletedCheckpoint(
JobID job,
long checkpointID,
long timestamp,
long completionTimestamp,
Map<JobVertexID, TaskState> taskStates) {
- this(job, checkpointID, timestamp, completionTimestamp, taskStates, CheckpointProperties.forStandardCheckpoint(), null);
+ this(job, checkpointID, timestamp, completionTimestamp, taskStates,
+ CheckpointProperties.forStandardCheckpoint());
+ }
+
+ public CompletedCheckpoint(
+ JobID job,
+ long checkpointID,
+ long timestamp,
+ long completionTimestamp,
+ Map<JobVertexID, TaskState> taskStates,
+ CheckpointProperties props) {
+
+ this(job, checkpointID, timestamp, completionTimestamp, taskStates, props, null, null);
}
public CompletedCheckpoint(
@@ -86,24 +139,27 @@ public class CompletedCheckpoint implements Serializable {
long completionTimestamp,
Map<JobVertexID, TaskState> taskStates,
CheckpointProperties props,
- String externalPath) {
+ @Nullable StreamStateHandle externalizedMetadata,
+ @Nullable String externalPointer) {
checkArgument(checkpointID >= 0);
checkArgument(timestamp >= 0);
checkArgument(completionTimestamp >= 0);
+ checkArgument((externalPointer == null) == (externalizedMetadata == null),
+ "external pointer without externalized metadata must be both null or both non-null");
+
+ checkArgument(!props.externalizeCheckpoint() || externalPointer != null,
+ "Checkpoint properties require externalized checkpoint, but checkpoint is not externalized");
+
this.job = checkNotNull(job);
this.checkpointID = checkpointID;
this.timestamp = timestamp;
this.duration = completionTimestamp - timestamp;
this.taskStates = checkNotNull(taskStates);
this.props = checkNotNull(props);
- this.externalPath = externalPath;
-
- if (props.externalizeCheckpoint() && externalPath == null) {
- throw new NullPointerException("Checkpoint properties say that the checkpoint " +
- "should have been persisted, but missing external path.");
- }
+ this.externalizedMetadata = externalizedMetadata;
+ this.externalPointer = externalPointer;
}
// ------------------------------------------------------------------------
@@ -146,10 +202,9 @@ public class CompletedCheckpoint implements Serializable {
discard();
return true;
} else {
- if (externalPath != null) {
+ if (externalPointer != null) {
LOG.info("Persistent checkpoint with ID {} at '{}' not discarded.",
- checkpointID,
- externalPath);
+ checkpointID, externalPointer);
}
return false;
@@ -158,14 +213,36 @@ public class CompletedCheckpoint implements Serializable {
void discard() throws Exception {
try {
- if (externalPath != null) {
- SavepointStore.removeSavepointFile(externalPath);
+ // collect exceptions and continue cleanup
+ Exception exception = null;
+
+ // drop the metadata, if we have some
+ if (externalizedMetadata != null) {
+ try {
+ externalizedMetadata.discardState();
+ }
+ catch (Exception e) {
+ exception = e;
+ }
}
- StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
- } finally {
+ // drop the actual state
+ try {
+ StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
+ }
+ catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ if (exception != null) {
+ throw exception;
+ }
+ }
+ finally {
taskStates.clear();
+ // to be null-pointer safe, copy reference to stack
+ DiscardCallback discardCallback = this.discardCallback;
if (discardCallback != null) {
discardCallback.notifyDiscardedCheckpoint();
}
@@ -190,8 +267,18 @@ public class CompletedCheckpoint implements Serializable {
return taskStates.get(jobVertexID);
}
- public String getExternalPath() {
- return externalPath;
+ public boolean isExternalized() {
+ return externalizedMetadata != null;
+ }
+
+ @Nullable
+ public StreamStateHandle getExternalizedMetadata() {
+ return externalizedMetadata;
+ }
+
+ @Nullable
+ public String getExternalPointer() {
+ return externalPointer;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index d2c0f6c..e91e038 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -72,4 +72,13 @@ public interface CompletedCheckpointStore {
*/
int getNumberOfRetainedCheckpoints();
+ /**
+ * This method returns whether the completed checkpoint store requires checkpoints to be
+ * externalized. Externalized checkpoints have their meta data persisted, which the checkpoint
+ * store can exploit (for example by simply pointing the persisted metadata).
+ *
+ * @return True, if the store requires that checkpoints are externalized before being added, false
+ * if the store stores the metadata itself.
+ */
+ boolean requiresExternalizedCheckpoints();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 908ff7f..2c392b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
import java.io.IOException;
import java.util.HashMap;
@@ -28,6 +29,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
@@ -41,7 +44,10 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,7 +80,7 @@ public class PendingCheckpoint {
/**
* The checkpoint properties. If the checkpoint should be persisted
- * externally, it happens in {@link #finalizeCheckpoint()}.
+ * externally, it happens in {@link #finalizeCheckpointExternalized()}.
*/
private final CheckpointProperties props;
@@ -203,46 +209,80 @@ public class PendingCheckpoint {
return onCompletionPromise;
}
- public CompletedCheckpoint finalizeCheckpoint() {
+ public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException {
synchronized (lock) {
- Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
-
- // Persist if required
- String externalPath = null;
- if (props.externalizeCheckpoint()) {
- try {
- Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
- externalPath = SavepointStore.storeSavepoint(
- targetDirectory,
- savepoint
- );
- } catch (IOException e) {
- LOG.error("Failed to persist checkpoint {}.",checkpointId, e);
- }
- }
+ checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
- CompletedCheckpoint completed = new CompletedCheckpoint(
- jobId,
- checkpointId,
- checkpointTimestamp,
- System.currentTimeMillis(),
- new HashMap<>(taskStates),
- props,
- externalPath);
+ // make sure we fulfill the promise with an exception if something fails
+ try {
+ // externalize the metadata
+ final Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
- onCompletionPromise.complete(completed);
+ // TEMP FIX - The savepoint store is strictly typed to file systems currently
+ // but the checkpoints think more generic. we need to work with file handles
+ // here until the savepoint serializer accepts a generic stream factory
- if (statsCallback != null) {
- // Finalize the statsCallback and give the completed checkpoint a
- // callback for discards.
- CompletedCheckpointStats.DiscardCallback discardCallback = statsCallback.reportCompletedCheckpoint(externalPath);
- completed.setDiscardCallback(discardCallback);
+ final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
+ final String externalPointer = metadataHandle.getFilePath().getParent().toString();
+
+ return finalizeInternal(metadataHandle, externalPointer);
+ }
+ catch (Throwable t) {
+ onCompletionPromise.completeExceptionally(t);
+ ExceptionUtils.rethrowIOException(t);
+ return null; // silence the compiler
+ }
+ }
+ }
+
+ public CompletedCheckpoint finalizeCheckpointNonExternalized() {
+ synchronized (lock) {
+ checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
+
+ // make sure we fulfill the promise with an exception if something fails
+ try {
+ // finalize without external metadata
+ return finalizeInternal(null, null);
}
+ catch (Throwable t) {
+ onCompletionPromise.completeExceptionally(t);
+ ExceptionUtils.rethrow(t);
+ return null; // silence the compiler
+ }
+ }
+ }
- dispose(false);
+ @GuardedBy("lock")
+ private CompletedCheckpoint finalizeInternal(
+ @Nullable StreamStateHandle externalMetadata,
+ @Nullable String externalPointer) {
- return completed;
+ assert(Thread.holdsLock(lock));
+
+ CompletedCheckpoint completed = new CompletedCheckpoint(
+ jobId,
+ checkpointId,
+ checkpointTimestamp,
+ System.currentTimeMillis(),
+ new HashMap<>(taskStates),
+ props,
+ externalMetadata,
+ externalPointer);
+
+ onCompletionPromise.complete(completed);
+
+ if (statsCallback != null) {
+ // Finalize the statsCallback and give the completed checkpoint a
+ // callback for discards.
+ CompletedCheckpointStats.DiscardCallback discardCallback =
+ statsCallback.reportCompletedCheckpoint(externalPointer);
+ completed.setDiscardCallback(discardCallback);
}
+
+ // mark this pending checkpoint as disposed, but do NOT drop the state
+ dispose(false);
+
+ return completed;
}
/**
@@ -411,9 +451,9 @@ public class PendingCheckpoint {
public void run() {
try {
StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
- } catch (Exception e) {
- LOG.warn("Could not properly dispose the pending checkpoint " +
- "{} of job {}.", checkpointId, jobId, e);
+ } catch (Throwable t) {
+ LOG.warn("Could not properly dispose the pending checkpoint {} of job {}.",
+ checkpointId, jobId, t);
} finally {
taskStates.clear();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 082bca9..a0248b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -96,4 +96,8 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
}
}
+ @Override
+ public boolean requiresExternalizedCheckpoints() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index fdd0d40..4b03cea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -125,6 +125,11 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
LOG.info("Initialized in '{}'.", checkpointsPath);
}
+ @Override
+ public boolean requiresExternalizedCheckpoints() {
+ return true;
+ }
+
/**
* Gets the latest checkpoint from ZooKeeper and removes all others.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 950a9a0..60f0287 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -19,11 +19,13 @@
package org.apache.flink.runtime.checkpoint.savepoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,22 +48,27 @@ public class SavepointLoader {
* @param jobId The JobID of the job to load the savepoint for.
* @param tasks Tasks that will possibly be reset
* @param savepointPath The path of the savepoint to rollback to
- * @param userClassLoader The user code classloader
+ * @param classLoader The class loader to resolve serialized classes in legacy savepoint versions.
* @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped
* to any job vertex in tasks.
*
* @throws IllegalStateException If mismatch between program and savepoint state
- * @throws Exception If savepoint store failure
+ * @throws IOException If savepoint store failure
*/
public static CompletedCheckpoint loadAndValidateSavepoint(
JobID jobId,
Map<JobVertexID, ExecutionJobVertex> tasks,
String savepointPath,
- ClassLoader userClassLoader,
+ ClassLoader classLoader,
boolean allowNonRestoredState) throws IOException {
// (1) load the savepoint
- Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath, userClassLoader);
+ final Tuple2<Savepoint, StreamStateHandle> savepointAndHandle =
+ SavepointStore.loadSavepointWithHandle(savepointPath, classLoader);
+
+ final Savepoint savepoint = savepointAndHandle.f0;
+ final StreamStateHandle metadataHandle = savepointAndHandle.f1;
+
final Map<JobVertexID, TaskState> taskStates = new HashMap<>(savepoint.getTaskStates().size());
boolean expandedToLegacyIds = false;
@@ -114,10 +121,12 @@ public class SavepointLoader {
// (3) convert to checkpoint so the system can fall back to it
CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
- return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L, taskStates, props, savepointPath);
+ return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L,
+ taskStates, props, metadataHandle, savepointPath);
}
// ------------------------------------------------------------------------
+ /** This class is not meant to be instantiated */
private SavepointLoader() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 95370a5..5c8ac6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -19,12 +19,15 @@
package org.apache.flink.runtime.checkpoint.savepoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
@@ -118,6 +121,28 @@ public class SavepointStore {
* @throws IOException Failures during store are forwarded
*/
public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException {
+ // write and create the file handle
+ FileStateHandle metadataFileHandle = storeSavepointToHandle(directory, savepoint);
+
+ // we return the savepoint directory path here!
+ // The directory path also works to resume from and is more elegant than the direct
+ // metadata file pointer
+ return metadataFileHandle.getFilePath().getParent().toString();
+ }
+
+ /**
+ * Stores the savepoint metadata file to a state handle.
+ *
+ * @param directory Target directory to store savepoint in
+ * @param savepoint Savepoint to be stored
+ *
+ * @return State handle to the checkpoint metadata
+ * @throws IOException Failures during store are forwarded
+ */
+ public static <T extends Savepoint> FileStateHandle storeSavepointToHandle(
+ String directory,
+ T savepoint) throws IOException {
+
checkNotNull(directory, "Target directory");
checkNotNull(savepoint, "Savepoint");
@@ -127,10 +152,9 @@ public class SavepointStore {
final FileSystem fs = FileSystem.get(basePath.toUri());
boolean success = false;
- try (FSDataOutputStream fdos = fs.create(metadataFilePath, WriteMode.NO_OVERWRITE);
+ try (FSDataOutputStream fdos = fs.create(metadataFilePath, WriteMode.NO_OVERWRITE);
DataOutputStream dos = new DataOutputStream(fdos))
{
-
// Write header
dos.writeInt(MAGIC_NUMBER);
dos.writeInt(savepoint.getVersion());
@@ -138,7 +162,13 @@ public class SavepointStore {
// Write savepoint
SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint);
serializer.serialize(savepoint, dos);
+
+ // construct result handle
+ FileStateHandle handle = new FileStateHandle(metadataFilePath, dos.size());
+
+ // all good!
success = true;
+ return handle;
}
finally {
if (!success && fs.exists(metadataFilePath)) {
@@ -147,22 +177,37 @@ public class SavepointStore {
}
}
}
-
- // we return the savepoint directory path here!
- // The directory path also works to resume from and is more elegant than the direct
- // metadata file pointer
- return basePath.toString();
}
/**
* Loads the savepoint at the specified path.
*
* @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
+ * @param classLoader The class loader used to resolve serialized classes from legacy savepoint formats.
* @return The loaded savepoint
+ *
* @throws IOException Failures during load are forwarded
*/
- public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader userClassLoader) throws IOException {
- Preconditions.checkNotNull(savepointFileOrDirectory, "Path");
+ public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader classLoader) throws IOException {
+ return loadSavepointWithHandle(savepointFileOrDirectory, classLoader).f0;
+ }
+
+ /**
+ * Loads the savepoint at the specified path. This methods returns the savepoint, as well as the
+ * handle to the metadata.
+ *
+ * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
+ * @param classLoader The class loader used to resolve serialized classes from legacy savepoint formats.
+ * @return The loaded savepoint
+ *
+ * @throws IOException Failures during load are forwarded
+ */
+ public static Tuple2<Savepoint, StreamStateHandle> loadSavepointWithHandle(
+ String savepointFileOrDirectory,
+ ClassLoader classLoader) throws IOException {
+
+ checkNotNull(savepointFileOrDirectory, "savepointFileOrDirectory");
+ checkNotNull(classLoader, "classLoader");
Path path = new Path(savepointFileOrDirectory);
@@ -180,11 +225,13 @@ public class SavepointStore {
LOG.info("Using savepoint file in {}", path);
} else {
throw new IOException("Cannot find meta data file in directory " + path
- + ". Please try to load the savepoint directly from the meta data file "
- + "instead of the directory.");
+ + ". Please try to load the savepoint directly from the meta data file "
+ + "instead of the directory.");
}
}
+ // load the savepoint
+ final Savepoint savepoint;
try (DataInputStream dis = new DataInputViewStreamWrapper(fs.open(path))) {
int magicNumber = dis.readInt();
@@ -192,15 +239,27 @@ public class SavepointStore {
int version = dis.readInt();
SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version);
- return serializer.deserialize(dis, userClassLoader);
+ savepoint = serializer.deserialize(dis, classLoader);
} else {
- throw new RuntimeException("Unexpected magic number. This is most likely " +
- "caused by trying to load a Flink 1.0 savepoint. You cannot load a " +
- "savepoint triggered by Flink 1.0 with this version of Flink. If it is " +
- "_not_ a Flink 1.0 savepoint, this error indicates that the specified " +
- "file is not a proper savepoint or the file has been corrupted.");
+ throw new RuntimeException("Unexpected magic number. This can have multiple reasons: " +
+ "(1) You are trying to load a Flink 1.0 savepoint, which is not supported by this " +
+ "version of Flink. (2) The file you were pointing to is not a savepoint at all. " +
+ "(3) The savepoint file has been corrupted.");
}
}
+
+ // construct the stream handle to the metadata file
+ // we get the size best-effort
+ long size = 0;
+ try {
+ size = fs.getFileStatus(path).getLen();
+ }
+ catch (Exception ignored) {
+ // we don't know the size, but we don't want to fail the savepoint loading for that
+ }
+ StreamStateHandle metadataHandle = new FileStateHandle(path, size);
+
+ return new Tuple2<>(savepoint, metadataHandle);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index c6f5c86..b250831 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FutureUtil;
import java.util.concurrent.RunnableFuture;
@@ -42,26 +43,22 @@ public class StateUtil {
Iterable<? extends StateObject> handlesToDiscard) throws Exception {
if (handlesToDiscard != null) {
-
- Exception suppressedExceptions = null;
+ Exception exception = null;
for (StateObject state : handlesToDiscard) {
if (state != null) {
try {
state.discardState();
- } catch (Exception ex) {
- //best effort to still cleanup other states and deliver exceptions in the end
- if (suppressedExceptions == null) {
- suppressedExceptions = new Exception(ex);
- }
- suppressedExceptions.addSuppressed(ex);
+ }
+ catch (Exception ex) {
+ exception = ExceptionUtils.firstOrSuppressed(ex, exception);
}
}
}
- if (suppressedExceptions != null) {
- throw suppressedExceptions;
+ if (exception != null) {
+ throw exception;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 21749cb..87cd4ac 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.jobmanager
-import java.io.{File, IOException}
+import java.io.IOException
import java.net._
import java.util.UUID
import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _}
@@ -50,7 +50,7 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
import org.apache.flink.runtime.executiongraph._
import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager}
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
@@ -77,7 +77,7 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util._
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
-import org.apache.flink.runtime.{FlinkActor, JobException, LeaderSessionMessageFilter, LogMessages}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
import org.jboss.netty.channel.ChannelException
@@ -611,7 +611,7 @@ class JobManager(
new BiFunction[CompletedCheckpoint, Throwable, Void] {
override def apply(success: CompletedCheckpoint, cause: Throwable): Void = {
if (success != null) {
- val path = success.getExternalPath()
+ val path = success.getExternalPointer()
log.info(s"Savepoint stored in $path. Now cancelling $jobId.")
executionGraph.cancel()
senderRef ! decorateMessage(CancellationSuccess(jobId, path))
@@ -787,11 +787,11 @@ class JobManager(
new BiFunction[CompletedCheckpoint, Throwable, Void] {
override def apply(success: CompletedCheckpoint, cause: Throwable): Void = {
if (success != null) {
- if (success.getExternalPath != null) {
+ if (success.getExternalPointer != null) {
senderRef ! TriggerSavepointSuccess(
jobId,
success.getCheckpointID,
- success.getExternalPath,
+ success.getExternalPointer,
success.getTimestamp
)
} else {
@@ -1784,7 +1784,7 @@ class JobManager(
case t: Throwable =>
log.error(s"Could not properly unregister job $jobID form the library cache.", t)
}
- jobManagerMetricGroup.map(_.removeJob(jobID))
+ jobManagerMetricGroup.foreach(_.removeJob(jobID))
futureOption
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index d4c3a2d..9517257 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -134,5 +134,10 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
public int getNumberOfRetainedCheckpoints() {
return -1;
}
+
+ @Override
+ public boolean requiresExternalizedCheckpoints() {
+ return false;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 725b85f..f77c755 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -237,7 +237,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
Map<JobVertexID, TaskState> taskGroupStates,
CheckpointProperties props) {
- super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates, props, null);
+ super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates, props);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 0d933ff..b34e9a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -19,9 +19,11 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -55,7 +57,9 @@ public class CompletedCheckpointTest {
// Verify discard call is forwarded to state
CompletedCheckpoint checkpoint = new CompletedCheckpoint(
- new JobID(), 0, 0, 1, taskStates, CheckpointProperties.forStandardCheckpoint(), file.getAbsolutePath());
+ new JobID(), 0, 0, 1, taskStates, CheckpointProperties.forStandardCheckpoint(),
+ new FileStateHandle(new Path(file.toURI()), file.length()),
+ file.getAbsolutePath());
checkpoint.discard(JobStatus.FAILED);
@@ -74,7 +78,7 @@ public class CompletedCheckpointTest {
boolean discardSubsumed = true;
CheckpointProperties props = new CheckpointProperties(false, false, discardSubsumed, true, true, true, true);
CompletedCheckpoint checkpoint = new CompletedCheckpoint(
- new JobID(), 0, 0, 1, taskStates, props, null);
+ new JobID(), 0, 0, 1, taskStates, props);
// Subsume
checkpoint.subsume();
@@ -104,7 +108,9 @@ public class CompletedCheckpointTest {
// Keep
CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
CompletedCheckpoint checkpoint = new CompletedCheckpoint(
- new JobID(), 0, 0, 1, new HashMap<>(taskStates), props, externalPath);
+ new JobID(), 0, 0, 1, new HashMap<>(taskStates), props,
+ new FileStateHandle(new Path(file.toURI()), file.length()),
+ externalPath);
checkpoint.discard(status);
verify(state, times(0)).discardState();
@@ -113,7 +119,7 @@ public class CompletedCheckpointTest {
// Discard
props = new CheckpointProperties(false, false, true, true, true, true, true);
checkpoint = new CompletedCheckpoint(
- new JobID(), 0, 0, 1, new HashMap<>(taskStates), props, null);
+ new JobID(), 0, 0, 1, new HashMap<>(taskStates), props);
checkpoint.discard(status);
verify(state, times(1)).discardState();
@@ -135,8 +141,7 @@ public class CompletedCheckpointTest {
0,
1,
new HashMap<>(taskStates),
- CheckpointProperties.forStandardCheckpoint(),
- null);
+ CheckpointProperties.forStandardCheckpoint());
CompletedCheckpointStats.DiscardCallback callback = mock(CompletedCheckpointStats.DiscardCallback.class);
completed.setDiscardCallback(callback);
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 3a85c4c..6f04f39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -24,9 +24,11 @@ import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+
import org.mockito.Mockito;
import java.io.File;
@@ -49,9 +51,6 @@ import static org.mockito.Mockito.verify;
public class PendingCheckpointTest {
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
-
private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS = new HashMap<>();
private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID();
@@ -59,6 +58,9 @@ public class PendingCheckpointTest {
ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class));
}
+ @Rule
+ public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
/**
* Tests that pending checkpoints can be subsumed iff they are forced.
*/
@@ -96,7 +98,7 @@ public class PendingCheckpointTest {
PendingCheckpoint pending = createPendingCheckpoint(persisted, tmp.getAbsolutePath());
pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
assertEquals(0, tmp.listFiles().length);
- pending.finalizeCheckpoint();
+ pending.finalizeCheckpointExternalized();
assertEquals(1, tmp.listFiles().length);
// Ephemeral checkpoint
@@ -105,7 +107,7 @@ public class PendingCheckpointTest {
pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
assertEquals(1, tmp.listFiles().length);
- pending.finalizeCheckpoint();
+ pending.finalizeCheckpointNonExternalized();
assertEquals(1, tmp.listFiles().length);
}
@@ -148,7 +150,8 @@ public class PendingCheckpointTest {
assertFalse(future.isDone());
pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
- pending.finalizeCheckpoint();
+ assertTrue(pending.isFullyAcknowledged());
+ pending.finalizeCheckpointExternalized();
assertTrue(future.isDone());
// Finalize (missing ACKs)
@@ -157,7 +160,13 @@ public class PendingCheckpointTest {
assertFalse(future.isDone());
try {
- pending.finalizeCheckpoint();
+ pending.finalizeCheckpointNonExternalized();
+ fail("Did not throw expected Exception");
+ } catch (IllegalStateException ignored) {
+ // Expected
+ }
+ try {
+ pending.finalizeCheckpointExternalized();
fail("Did not throw expected Exception");
} catch (IllegalStateException ignored) {
// Expected
@@ -233,7 +242,7 @@ public class PendingCheckpointTest {
pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
verify(callback, times(1)).reportSubtaskStats(any(JobVertexID.class), any(SubtaskStateStats.class));
- pending.finalizeCheckpoint();
+ pending.finalizeCheckpointNonExternalized();
verify(callback, times(1)).reportCompletedCheckpoint(any(String.class));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 5a38be2..cbb077c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -483,6 +483,10 @@ public class JobManagerHARecoveryTest {
return checkpoints.size();
}
+ @Override
+ public boolean requiresExternalizedCheckpoints() {
+ return false;
+ }
}
static class MyCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 60b12d2..75f1fd4 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -979,7 +979,7 @@ class JobManagerITCase(_system: ActorSystem)
jobManager.tell(TriggerSavepoint(jobGraph.getJobID(), Option.apply("any")), testActor)
val checkpoint = Mockito.mock(classOf[CompletedCheckpoint])
- when(checkpoint.getExternalPath).thenReturn("Expected test savepoint path")
+ when(checkpoint.getExternalPointer).thenReturn("Expected test savepoint path")
// Succeed the promise
savepointPromise.complete(checkpoint)
http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 60a3a62..f910e49 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -167,7 +167,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism);
ActorSystem testSystem = null;
- JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
+ final JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
LeaderRetrievalService leaderRetrievalService = null;
ActorSystem taskManagerSystem = null;