You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:20 UTC
[03/17] flink git commit: [FLINK-5823] [checkpoints] State backends
now also handle the checkpoint metadata
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java
new file mode 100644
index 0000000..5f4b954
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
+
+/**
+ * A checkpoint storage location for the {@link MemoryStateBackend} when it durably
+ * persists the metadata in a file system.
+ *
+ * <p>This class inherits its behavior for metadata from the {@link FsCheckpointStorageLocation},
+ * which makes checkpoint metadata cross compatible between the two classes and hence between
+ * the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend} and the
+ * {@link MemoryStateBackend}.
+ */
+public class PersistentMetadataCheckpointStorageLocation extends FsCheckpointStorageLocation {
+
+ /** The internal pointer for the {@link MemoryStateBackend}'s storage location (data inline with
+ * state handles) that gets sent to the TaskManagers to describe this storage. */
+ static final String LOCATION_POINTER = "(embedded)";
+
+ /**
+ * Creates a checkpoint storage persists metadata to a file system and stores state
+ * in line in state handles with the metadata.
+ *
+ * @param fileSystem The file system to which the metadata will be written.
+ * @param checkpointDir The directory where the checkpoint metadata will be written.
+ */
+ public PersistentMetadataCheckpointStorageLocation(FileSystem fileSystem, Path checkpointDir) {
+ super(fileSystem, checkpointDir, checkpointDir, checkpointDir);
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String getLocationAsPointer() {
+ return LOCATION_POINTER;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return getClass().getName() + " @ " + getCheckpointDirectory();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0f8033d..325e955 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -39,7 +39,6 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.blob.{BlobServer, BlobStore}
import org.apache.flink.runtime.checkpoint._
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
import org.apache.flink.runtime.client._
import org.apache.flink.runtime.clusterframework.{BootstrapTools, FlinkResourceManager}
import org.apache.flink.runtime.clusterframework.messages._
@@ -552,31 +551,31 @@ class JobManager(
case CancelJobWithSavepoint(jobId, savepointDirectory) =>
try {
- val targetDirectory = if (savepointDirectory != null) {
- savepointDirectory
- } else {
- defaultSavepointDir
- }
-
- if (targetDirectory == null) {
- log.info(s"Trying to cancel job $jobId with savepoint, but no " +
- "savepoint directory configured.")
+ log.info(s"Trying to cancel job $jobId with savepoint to $savepointDirectory")
- sender ! decorateMessage(CancellationFailure(jobId, new IllegalStateException(
- "No savepoint directory configured. You can either specify a directory " +
- "while cancelling via -s :targetDirectory or configure a cluster-wide " +
- "default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.")))
- } else {
- log.info(s"Trying to cancel job $jobId with savepoint to $targetDirectory")
+ currentJobs.get(jobId) match {
+ case Some((executionGraph, _)) =>
+ val coord = executionGraph.getCheckpointCoordinator
- currentJobs.get(jobId) match {
- case Some((executionGraph, _)) =>
+ if (coord == null) {
+ sender ! decorateMessage(CancellationFailure(jobId, new IllegalStateException(
+ s"Job $jobId is not a streaming job.")))
+ }
+ else if (savepointDirectory == null &&
+ !coord.getCheckpointStorage.hasDefaultSavepointLocation) {
+ log.info(s"Trying to cancel job $jobId with savepoint, but no " +
+ "savepoint directory configured.")
+
+ sender ! decorateMessage(CancellationFailure(jobId, new IllegalStateException(
+ "No savepoint directory configured. You can either specify a directory " +
+ "while cancelling via -s :targetDirectory or configure a cluster-wide " +
+ "default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.")))
+ } else {
// We don't want any checkpoint between the savepoint and cancellation
- val coord = executionGraph.getCheckpointCoordinator
coord.stopCheckpointScheduler()
// Trigger the savepoint
- val future = coord.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
+ val future = coord.triggerSavepoint(System.currentTimeMillis(), savepointDirectory)
val senderRef = sender()
future.handleAsync[Void](
@@ -608,15 +607,15 @@ class JobManager(
}
},
context.dispatcher)
+ }
- case None =>
- log.info(s"No job found with ID $jobId.")
- sender ! decorateMessage(
- CancellationFailure(
- jobId,
- new IllegalArgumentException(s"No job found with ID $jobId."))
- )
- }
+ case None =>
+ log.info(s"No job found with ID $jobId.")
+ sender ! decorateMessage(
+ CancellationFailure(
+ jobId,
+ new IllegalArgumentException(s"No job found with ID $jobId."))
+ )
}
} catch {
case t: Throwable =>
@@ -746,25 +745,28 @@ class JobManager(
case Some((graph, _)) =>
val checkpointCoordinator = graph.getCheckpointCoordinator()
- if (checkpointCoordinator != null) {
+ if (checkpointCoordinator == null) {
+ sender ! decorateMessage(TriggerSavepointFailure(jobId, new IllegalStateException(
+ s"Job $jobId is not a streaming job.")))
+ }
+ else if (savepointDirectory.isEmpty &&
+ !checkpointCoordinator.getCheckpointStorage.hasDefaultSavepointLocation) {
+ log.info(s"Trying to trigger a savepoint, but no savepoint directory configured.")
+
+ sender ! decorateMessage(TriggerSavepointFailure(jobId, new IllegalStateException(
+ "No savepoint directory configured. You can either specify a directory " +
+ "when triggering the savepoint via -s :targetDirectory or configure a " +
+ "cluster-/application-wide default via key '" +
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.")))
+ } else {
// Immutable copy for the future
val senderRef = sender()
try {
- val targetDirectory : String = savepointDirectory.getOrElse(
- flinkConfiguration.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY))
-
- if (targetDirectory == null) {
- throw new IllegalStateException("No savepoint directory configured. " +
- "You can either specify a directory when triggering this savepoint or " +
- "configure a cluster-wide default via key '" +
- CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.")
- }
-
// Do this async, because checkpoint coordinator operations can
// contain blocking calls to the state backend or ZooKeeper.
val savepointFuture = checkpointCoordinator.triggerSavepoint(
System.currentTimeMillis(),
- targetDirectory)
+ savepointDirectory.orNull)
savepointFuture.handleAsync[Void](
new BiFunction[CompletedCheckpoint, Throwable, Void] {
@@ -794,10 +796,6 @@ class JobManager(
senderRef ! TriggerSavepointFailure(jobId, new Exception(
"Failed to trigger savepoint", e))
}
- } else {
- sender() ! TriggerSavepointFailure(jobId, new IllegalStateException(
- "Checkpointing disabled. You can enable it via the execution environment of " +
- "your job."))
}
case None =>
@@ -809,19 +807,13 @@ class JobManager(
future {
try {
log.info(s"Disposing savepoint at '$savepointPath'.")
- //TODO user code class loader ?
- // (has not been used so far and new savepoints can simply be deleted by file)
- val savepoint = SavepointStore.loadSavepoint(
- savepointPath,
- Thread.currentThread().getContextClassLoader)
-
- log.debug(s"$savepoint")
- // Dispose checkpoint state
- savepoint.dispose()
+ // there is a corner case issue with Flink 1.1 savepoints, which may contain
+ // user-defined state handles. however, it should work for all the standard cases,
+ // where the mem/fs/rocks state backends were used
+ val classLoader = Thread.currentThread().getContextClassLoader
- // Remove the header file
- SavepointStore.removeSavepointFile(savepointPath)
+ Checkpoints.disposeSavepoint(savepointPath, flinkConfiguration, classLoader, log.logger)
senderRef ! DisposeSavepointSuccess
} catch {
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
deleted file mode 100644
index f55e0d3..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * CheckpointCoordinator tests for externalized checkpoints.
- *
- * <p>This is separate from {@link CheckpointCoordinatorTest}, because that
- * test is already huge and covers many different configurations.
- */
-public class CheckpointCoordinatorExternalizedCheckpointsTest {
-
- @Rule
- public final TemporaryFolder tmp = new TemporaryFolder();
-
- /**
- * Triggers multiple externalized checkpoints and verifies that the metadata
- * files have been created.
- */
- @Test
- public void testTriggerAndConfirmSimpleExternalizedCheckpoint()
- throws Exception {
- final JobID jid = new JobID();
-
- final ExternalizedCheckpointSettings externalizedCheckpointSettings =
- ExternalizedCheckpointSettings.externalizeCheckpoints(false);
-
- final File checkpointDir = tmp.newFolder();
- final FsStateBackend stateBackend = new FsStateBackend(checkpointDir.toURI());
-
- // create some mock Execution vertices that receive the checkpoint trigger messages
- final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
- final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
- ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
- ExecutionVertex vertex2 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID2);
-
- Map<JobVertexID, ExecutionJobVertex> jobVertices = new HashMap<>();
- jobVertices.put(vertex1.getJobvertexId(), vertex1.getJobVertex());
- jobVertices.put(vertex2.getJobvertexId(), vertex2.getJobVertex());
-
- // set up the coordinator and validate the initial state
- CheckpointCoordinator coord = new CheckpointCoordinator(
- jid,
- 600000,
- 600000,
- 0,
- Integer.MAX_VALUE,
- externalizedCheckpointSettings,
- new ExecutionVertex[] { vertex1, vertex2 },
- new ExecutionVertex[] { vertex1, vertex2 },
- new ExecutionVertex[] { vertex1, vertex2 },
- new StandaloneCheckpointIDCounter(),
- new StandaloneCompletedCheckpointStore(1),
- checkpointDir.getAbsolutePath(),
- stateBackend,
- Executors.directExecutor(),
- SharedStateRegistry.DEFAULT_FACTORY);
-
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
-
- // ---------------
- // trigger checkpoint 1
- // ---------------
-
- {
- final long timestamp1 = System.currentTimeMillis();
-
- coord.triggerCheckpoint(timestamp1, false);
-
- long checkpointId1 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId1));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId1));
-
- CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
-
- verifyExternalizedCheckpoint(latest, jid, checkpointId1, timestamp1);
- verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
- }
-
- // ---------------
- // trigger checkpoint 2
- // ---------------
-
- {
- final long timestamp2 = System.currentTimeMillis() + 7;
- coord.triggerCheckpoint(timestamp2, false);
-
- long checkpointId2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2));
-
- CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
- verifyExternalizedCheckpoint(latest, jid, checkpointId2, timestamp2);
- verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
- }
-
- // ---------------
- // trigger checkpoint 3
- // ---------------
-
- {
- final long timestamp3 = System.currentTimeMillis() + 146;
- coord.triggerCheckpoint(timestamp3, false);
-
- long checkpointId3 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId3));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId3));
-
- CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
- verifyExternalizedCheckpoint(latest, jid, checkpointId3, timestamp3);
- verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
- }
-
- coord.shutdown(JobStatus.FINISHED);
- }
-
- /**
- * Verifies an externalized completed checkpoint instance.
- *
- * <p>The provided JobID, checkpoint ID, timestamp need to match. Also, the
- * external pointer and external metadata need to be notNull and exist (currently
- * assuming that they are file system based).
- *
- * @param checkpoint Completed checkpoint to check.
- * @param jid JobID of the job the checkpoint belongs to.
- * @param checkpointId Checkpoint ID of the checkpoint to check.
- * @param timestamp Timestamp of the checkpoint to check.
- */
- private static void verifyExternalizedCheckpoint(CompletedCheckpoint checkpoint, JobID jid, long checkpointId, long timestamp) {
- assertEquals(jid, checkpoint.getJobId());
- assertEquals(checkpointId, checkpoint.getCheckpointID());
- assertEquals(timestamp, checkpoint.getTimestamp());
- assertNotNull(checkpoint.getExternalPointer());
- assertNotNull(checkpoint.getExternalizedMetadata());
- FileStateHandle fsHandle = (FileStateHandle) checkpoint.getExternalizedMetadata();
- assertTrue(new File(fsHandle.getFilePath().getPath()).exists());
- }
-
- private static void verifyExternalizedCheckpointRestore(
- CompletedCheckpoint checkpoint,
- Map<JobVertexID, ExecutionJobVertex> jobVertices,
- ExecutionVertex... vertices) throws IOException {
-
- CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(
- checkpoint.getJobId(),
- jobVertices,
- checkpoint.getExternalPointer(),
- Thread.currentThread().getContextClassLoader(),
- false);
-
- for (ExecutionVertex vertex : vertices) {
- for (OperatorID operatorID : vertex.getJobVertex().getOperatorIDs()) {
- assertEquals(checkpoint.getOperatorStates().get(operatorID), loaded.getOperatorStates().get(operatorID));
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index add7447..cbfe0ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -72,13 +71,12 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[]{vertex},
new ExecutionVertex[]{vertex},
new ExecutionVertex[]{vertex},
new StandaloneCheckpointIDCounter(),
new FailingCompletedCheckpointStore(),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index e53bf3a..d583a0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -26,12 +26,13 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.EmptyStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
-
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+
import org.junit.Test;
+
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -229,9 +230,9 @@ public class CheckpointCoordinatorMasterHooksTest {
jid, checkpointId, 123L, 125L,
Collections.<OperatorID, OperatorState>emptyMap(),
masterHookStates,
- CheckpointProperties.forStandardCheckpoint(),
- null,
- null);
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ new EmptyStreamStateHandle(),
+ "pointer");
final ExecutionAttemptID execId = new ExecutionAttemptID();
final ExecutionVertex ackVertex = mockExecutionVertex(execId);
@@ -283,9 +284,9 @@ public class CheckpointCoordinatorMasterHooksTest {
jid, checkpointId, 123L, 125L,
Collections.<OperatorID, OperatorState>emptyMap(),
masterHookStates,
- CheckpointProperties.forStandardCheckpoint(),
- null,
- null);
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ new EmptyStreamStateHandle(),
+ "pointer");
final ExecutionAttemptID execId = new ExecutionAttemptID();
final ExecutionVertex ackVertex = mockExecutionVertex(execId);
@@ -396,13 +397,12 @@ public class CheckpointCoordinatorMasterHooksTest {
600000L,
0L,
1,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[0],
ackVertices,
new ExecutionVertex[0],
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(10),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 609e91c..9f9659a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -32,10 +32,10 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.EmptyStreamStateHandle;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -135,13 +135,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] {},
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -197,13 +196,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] {},
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -250,13 +248,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] {},
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -304,13 +301,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -408,13 +404,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -529,13 +524,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -698,13 +692,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
new ExecutionVertex[] { commitVertex },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -830,13 +823,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
new ExecutionVertex[] { commitVertex },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(10),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -996,13 +988,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
200,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] { commitVertex },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1075,13 +1066,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
200000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] { commitVertex },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1140,13 +1130,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
20000L,
0L,
1,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] {triggerVertex, ackVertex1, ackVertex2},
new ExecutionVertex[0],
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1274,13 +1263,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
200000, // timeout is very long (200 s)
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex },
new ExecutionVertex[] { commitVertex },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1365,13 +1353,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
200_000, // timeout is very long (200 s)
delay, // 50 ms delay between checkpoints
1,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { vertex },
new ExecutionVertex[] { vertex },
new ExecutionVertex[] { vertex },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
- "dummy-path",
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1440,13 +1427,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1593,13 +1579,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
counter,
new StandaloneCompletedCheckpointStore(10),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1695,13 +1680,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
200000, // timeout is very long (200 s)
0L, // no extra delay
maxConcurrentAttempts,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex },
new ExecutionVertex[] { commitVertex },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1770,13 +1754,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
200000, // timeout is very long (200 s)
0L, // no extra delay
maxConcurrentAttempts, // max two concurrent checkpoints
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex },
new ExecutionVertex[] { commitVertex },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1848,13 +1831,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
200000, // timeout is very long (200 s)
0L, // no extra delay
2, // max two concurrent checkpoints
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex },
new ExecutionVertex[] { commitVertex },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1902,13 +1884,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
200000,
0L,
1, // max one checkpoint at a time => should not affect savepoints
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
checkpointIDCounter,
new StandaloneCompletedCheckpointStore(2),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1957,13 +1938,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
200000,
100000000L, // very long min delay => should not affect savepoints
1,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -2021,13 +2001,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
arrayExecutionVertices,
arrayExecutionVertices,
arrayExecutionVertices,
new StandaloneCheckpointIDCounter(),
store,
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -2137,13 +2116,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
arrayExecutionVertices,
arrayExecutionVertices,
arrayExecutionVertices,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -2285,13 +2263,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
arrayExecutionVertices,
arrayExecutionVertices,
arrayExecutionVertices,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -2436,7 +2413,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
*/
public void testStateRecoveryWithTopologyChange(int scaleType) throws Exception {
- /**
+ /*
* Old topology
* CHAIN(op1 -> op2) * parallelism1 -> CHAIN(op3 -> op4) * parallelism2
*/
@@ -2510,7 +2487,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
}
- /**
+ /*
* New topology
* CHAIN(op5 -> op1 -> op2) * newParallelism1 -> CHAIN(op3 -> op6) * newParallelism2
*/
@@ -2557,9 +2534,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
System.currentTimeMillis() + 3000,
operatorStates,
Collections.<MasterState>emptyList(),
- CheckpointProperties.forStandardCheckpoint(),
- null,
- null);
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ new EmptyStreamStateHandle(),
+ "some/mock/pointer");
when(standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn(completedCheckpoint);
@@ -2570,13 +2547,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
newJobVertex1.getTaskVertices(),
newJobVertex1.getTaskVertices(),
newJobVertex1.getTaskVertices(),
new StandaloneCheckpointIDCounter(),
standaloneCompletedCheckpointStore,
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -2719,13 +2695,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+ CheckpointRetentionPolicy.RETAIN_ON_FAILURE,
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- "fake-directory",
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -2734,7 +2709,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
for (PendingCheckpoint checkpoint : coord.getPendingCheckpoints().values()) {
CheckpointProperties props = checkpoint.getProps();
- CheckpointProperties expected = CheckpointProperties.forExternalizedCheckpoint(true);
+ CheckpointProperties expected = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE);
assertEquals(expected, props);
}
@@ -3196,13 +3171,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -3210,7 +3184,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
// Periodic
CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(
System.currentTimeMillis(),
- CheckpointProperties.forStandardCheckpoint(),
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
null,
true);
@@ -3220,7 +3194,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
// Not periodic
triggerResult = coord.triggerCheckpoint(
System.currentTimeMillis(),
- CheckpointProperties.forStandardCheckpoint(),
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
null,
false);
@@ -3376,13 +3350,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[]{vertex1},
new ExecutionVertex[]{vertex1},
new ExecutionVertex[]{vertex1},
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -3397,7 +3370,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
assertTrue(coord.triggerCheckpoint(timestamp, false));
verify(tracker, times(1))
- .reportPendingCheckpoint(eq(1L), eq(timestamp), eq(CheckpointProperties.forStandardCheckpoint()));
+ .reportPendingCheckpoint(eq(1L), eq(timestamp), eq(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)));
}
/**
@@ -3416,13 +3389,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[]{vertex1},
new ExecutionVertex[]{vertex1},
new ExecutionVertex[]{vertex1},
new StandaloneCheckpointIDCounter(),
store,
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -3434,9 +3406,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
0,
Collections.<OperatorID, OperatorState>emptyMap(),
Collections.<MasterState>emptyList(),
- CheckpointProperties.forStandardCheckpoint(),
- null,
- null));
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ new EmptyStreamStateHandle(),
+ "some/dummy/pointer"));
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
coord.setCheckpointStatsTracker(tracker);
@@ -3474,13 +3446,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000L,
0L,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[]{vertex1},
new ExecutionVertex[]{vertex1},
new ExecutionVertex[]{vertex1},
checkpointIDCounter,
completedCheckpointStore,
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -3568,13 +3539,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
arrayExecutionVertices,
arrayExecutionVertices,
arrayExecutionVertices,
new StandaloneCheckpointIDCounter(),
store,
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
deleteExecutor -> {
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointExternalResumeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointExternalResumeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointExternalResumeTest.java
new file mode 100644
index 0000000..86eec77
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointExternalResumeTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * CheckpointCoordinator tests for externalized checkpoints.
+ *
+ * <p>This is separate from {@link CheckpointCoordinatorTest}, because that
+ * test is already huge and covers many different configurations.
+ */
+public class CheckpointExternalResumeTest {
+
+ @Rule
+ public final TemporaryFolder tmp = new TemporaryFolder();
+
+ /**
+ * Triggers multiple externalized checkpoints and verifies that the metadata
+ * files have been created.
+ */
+ @Test
+ public void testTriggerAndConfirmSimpleExternalizedCheckpoint() throws Exception {
+ final JobID jid = new JobID();
+
+ final File checkpointDir = tmp.newFolder();
+ final FsStateBackend stateBackend = new FsStateBackend(checkpointDir.toURI());
+
+ // create some mock Execution vertices that receive the checkpoint trigger messages
+ final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+ final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+ ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
+ ExecutionVertex vertex2 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID2);
+
+ Map<JobVertexID, ExecutionJobVertex> jobVertices = new HashMap<>();
+ jobVertices.put(vertex1.getJobvertexId(), vertex1.getJobVertex());
+ jobVertices.put(vertex2.getJobvertexId(), vertex2.getJobVertex());
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jid,
+ 600000,
+ 600000,
+ 0,
+ Integer.MAX_VALUE,
+ CheckpointRetentionPolicy.RETAIN_ON_FAILURE,
+ new ExecutionVertex[] { vertex1, vertex2 },
+ new ExecutionVertex[] { vertex1, vertex2 },
+ new ExecutionVertex[] { vertex1, vertex2 },
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1),
+ stateBackend,
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
+
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ // ---------------
+ // trigger checkpoint 1
+ // ---------------
+
+ {
+ final long timestamp1 = System.currentTimeMillis();
+
+ coord.triggerCheckpoint(timestamp1, false);
+
+ long checkpointId1 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId1));
+
+ CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
+
+ verifyExternalizedCheckpoint(latest, jid, checkpointId1, timestamp1);
+ verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
+ }
+
+ // ---------------
+ // trigger checkpoint 2
+ // ---------------
+
+ {
+ final long timestamp2 = System.currentTimeMillis() + 7;
+ coord.triggerCheckpoint(timestamp2, false);
+
+ long checkpointId2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2));
+
+ CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
+ verifyExternalizedCheckpoint(latest, jid, checkpointId2, timestamp2);
+ verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
+ }
+
+ // ---------------
+ // trigger checkpoint 3
+ // ---------------
+
+ {
+ final long timestamp3 = System.currentTimeMillis() + 146;
+ coord.triggerCheckpoint(timestamp3, false);
+
+ long checkpointId3 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId3));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId3));
+
+ CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
+ verifyExternalizedCheckpoint(latest, jid, checkpointId3, timestamp3);
+ verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
+ }
+
+ coord.shutdown(JobStatus.FINISHED);
+ }
+
+ /**
+ * Verifies an externalized completed checkpoint instance.
+ *
+ * <p>The provided JobID, checkpoint ID, timestamp need to match. Also, the
+ * external pointer and external metadata need to be notNull and exist (currently
+ * assuming that they are file system based).
+ *
+ * @param checkpoint Completed checkpoint to check.
+ * @param jid JobID of the job the checkpoint belongs to.
+ * @param checkpointId Checkpoint ID of the checkpoint to check.
+ * @param timestamp Timestamp of the checkpoint to check.
+ */
+ private void verifyExternalizedCheckpoint(CompletedCheckpoint checkpoint, JobID jid, long checkpointId, long timestamp) {
+ assertEquals(jid, checkpoint.getJobId());
+ assertEquals(checkpointId, checkpoint.getCheckpointID());
+ assertEquals(timestamp, checkpoint.getTimestamp());
+ assertNotNull(checkpoint.getExternalPointer());
+ FileStateHandle fsHandle = (FileStateHandle) checkpoint.getMetadataHandle();
+ assertTrue(new File(fsHandle.getFilePath().getPath()).exists());
+ }
+
+ private void verifyExternalizedCheckpointRestore(
+ CompletedCheckpoint checkpoint,
+ Map<JobVertexID, ExecutionJobVertex> jobVertices,
+ ExecutionVertex... vertices) throws IOException {
+
+ String pointer = checkpoint.getExternalPointer();
+ StreamStateHandle metadataHandle = new FsStateBackend(tmp.getRoot().toURI()).resolveCheckpoint(pointer);
+
+ CompletedCheckpoint loaded = Checkpoints.loadAndValidateCheckpoint(
+ checkpoint.getJobId(),
+ jobVertices,
+ checkpoint.getExternalPointer(),
+ metadataHandle,
+ Thread.currentThread().getContextClassLoader(),
+ false);
+
+ for (ExecutionVertex vertex : vertices) {
+ for (OperatorID operatorID : vertex.getJobVertex().getOperatorIDs()) {
+ assertEquals(checkpoint.getOperatorStates().get(operatorID), loaded.getOperatorStates().get(operatorID));
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
new file mode 100644
index 0000000..ee60fc7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * A test that checks that checkpoint metadata loading works properly, including validation
+ * of resumed state and dropped state.
+ */
+public class CheckpointMetadataLoadingTest {
+
+ @Rule
+ public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ /**
+ * Tests loading and validation of savepoints with correct setup,
+ * parallelism mismatch, and a missing task.
+ */
+ @Test
+ public void testLoadAndValidateSavepoint() throws Exception {
+ File tmp = tmpFolder.newFolder();
+
+ int parallelism = 128128;
+ long checkpointId = Integer.MAX_VALUE + 123123L;
+ JobVertexID jobVertexID = new JobVertexID();
+ OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
+
+ OperatorSubtaskState subtaskState = new OperatorSubtaskState(
+ new OperatorStateHandle(Collections.emptyMap(), new ByteStreamStateHandle("testHandler", new byte[0])),
+ null,
+ null,
+ null);
+
+ OperatorState state = new OperatorState(operatorID, parallelism, parallelism);
+ state.putState(0, subtaskState);
+
+ Map<OperatorID, OperatorState> taskStates = new HashMap<>();
+ taskStates.put(operatorID, state);
+
+ JobID jobId = new JobID();
+
+ // Store savepoint
+ final SavepointV2 savepoint = new SavepointV2(checkpointId, taskStates.values(), Collections.emptyList());
+ final StreamStateHandle serializedMetadata;
+
+ try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
+ Checkpoints.storeCheckpointMetadata(savepoint, os);
+ serializedMetadata = new ByteStreamStateHandle("checkpoint", os.toByteArray());
+ }
+
+ ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
+ when(vertex.getParallelism()).thenReturn(parallelism);
+ when(vertex.getMaxParallelism()).thenReturn(parallelism);
+ when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(operatorID));
+
+ Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+ tasks.put(jobVertexID, vertex);
+
+ ClassLoader ucl = Thread.currentThread().getContextClassLoader();
+
+ // 1) Load and validate: everything correct
+ CompletedCheckpoint loaded = Checkpoints.loadAndValidateCheckpoint(jobId, tasks, "fake/path", serializedMetadata, ucl, false);
+
+ assertEquals(jobId, loaded.getJobId());
+ assertEquals(checkpointId, loaded.getCheckpointID());
+
+ // 2) Load and validate: max parallelism mismatch
+ when(vertex.getMaxParallelism()).thenReturn(222);
+ when(vertex.isMaxParallelismConfigured()).thenReturn(true);
+
+ try {
+ Checkpoints.loadAndValidateCheckpoint(jobId, tasks, "fake/path", serializedMetadata, ucl, false);
+ fail("Did not throw expected Exception");
+ } catch (IllegalStateException expected) {
+ assertTrue(expected.getMessage().contains("Max parallelism mismatch"));
+ }
+
+ // 3) Load and validate: missing vertex
+ assertNotNull(tasks.remove(jobVertexID));
+
+ try {
+ Checkpoints.loadAndValidateCheckpoint(jobId, tasks, "fake/path", serializedMetadata, ucl, false);
+ fail("Did not throw expected Exception");
+ } catch (IllegalStateException expected) {
+ assertTrue(expected.getMessage().contains("allowNonRestoredState"));
+ }
+
+ // 4) Load and validate: ignore missing vertex
+ Checkpoints.loadAndValidateCheckpoint(jobId, tasks, "fake/path", serializedMetadata, ucl, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
index a0509c4..c17172b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
@@ -31,40 +31,22 @@ import static org.junit.Assert.assertTrue;
public class CheckpointPropertiesTest {
/**
- * Tests the default checkpoint properties.
- */
- @Test
- public void testCheckpointProperties() {
- CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
-
- assertFalse(props.forceCheckpoint());
- assertFalse(props.externalizeCheckpoint());
- assertTrue(props.discardOnSubsumed());
- assertTrue(props.discardOnJobFinished());
- assertTrue(props.discardOnJobCancelled());
- assertTrue(props.discardOnJobFailed());
- assertTrue(props.discardOnJobSuspended());
- }
-
- /**
* Tests the external checkpoints properties.
*/
@Test
- public void testExternalizedCheckpointProperties() {
- CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true);
+ public void testCheckpointProperties() {
+ CheckpointProperties props = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE);
assertFalse(props.forceCheckpoint());
- assertTrue(props.externalizeCheckpoint());
assertTrue(props.discardOnSubsumed());
assertTrue(props.discardOnJobFinished());
assertTrue(props.discardOnJobCancelled());
assertFalse(props.discardOnJobFailed());
assertTrue(props.discardOnJobSuspended());
- props = CheckpointProperties.forExternalizedCheckpoint(false);
+ props = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION);
assertFalse(props.forceCheckpoint());
- assertTrue(props.externalizeCheckpoint());
assertTrue(props.discardOnSubsumed());
assertTrue(props.discardOnJobFinished());
assertFalse(props.discardOnJobCancelled());
@@ -77,10 +59,9 @@ public class CheckpointPropertiesTest {
*/
@Test
public void testSavepointProperties() {
- CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
+ CheckpointProperties props = CheckpointProperties.forSavepoint();
assertTrue(props.forceCheckpoint());
- assertTrue(props.externalizeCheckpoint());
assertFalse(props.discardOnSubsumed());
assertFalse(props.discardOnJobFinished());
assertFalse(props.discardOnJobCancelled());
@@ -94,22 +75,17 @@ public class CheckpointPropertiesTest {
@Test
public void testIsSavepoint() throws Exception {
{
- CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
- assertFalse(props.isSavepoint());
- }
-
- {
- CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true);
+ CheckpointProperties props = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE);
assertFalse(props.isSavepoint());
}
{
- CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(false);
+ CheckpointProperties props = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION);
assertFalse(props.isSavepoint());
}
{
- CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
+ CheckpointProperties props = CheckpointProperties.forSavepoint();
assertTrue(props.isSavepoint());
CheckpointProperties deserializedCheckpointProperties =
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index e98efc2..b8a2a54 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.checkpoint;
-import java.io.IOException;
-import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -34,21 +32,24 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import javax.annotation.Nullable;
+import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.net.URLClassLoader;
@@ -83,7 +84,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
10000L,
0L,
1,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)),
serHooks);
@@ -142,6 +143,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
/**
* Simulate a custom option that is not in the normal classpath.
*/
+ @SuppressWarnings("unused")
private Serializable customOption;
public CustomStateBackend(Serializable customOption) {
@@ -149,9 +151,19 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
}
@Override
+ public StreamStateHandle resolveCheckpoint(String pointer) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
+ return mock(CheckpointStorage.class);
+ }
+
+ @Override
public CheckpointStreamFactory createStreamFactory(
JobID jobId, String operatorIdentifier) throws IOException {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
@@ -159,7 +171,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
JobID jobId,
String operatorIdentifier,
@Nullable String targetLocation) throws IOException {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
@@ -171,13 +183,13 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) throws Exception {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env, String operatorIdentifier) throws Exception {
- return null;
+ throw new UnsupportedOperationException();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index df2d37a..ae354bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -27,8 +27,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.EmptyStreamStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -101,13 +101,12 @@ public class CheckpointStateRestoreTest {
200000L,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[0],
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -179,13 +178,12 @@ public class CheckpointStateRestoreTest {
200000L,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] { mock(ExecutionVertex.class) },
new ExecutionVertex[] { mock(ExecutionVertex.class) },
new ExecutionVertex[0],
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -207,7 +205,7 @@ public class CheckpointStateRestoreTest {
/**
* Tests that the allow non restored state flag is correctly handled.
*
- * The flag only applies for state that is part of the checkpoint.
+ * <p>The flag only applies for state that is part of the checkpoint.
*/
@Test
public void testNonRestoredState() throws Exception {
@@ -238,13 +236,12 @@ public class CheckpointStateRestoreTest {
Integer.MAX_VALUE,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
new ExecutionVertex[] {},
new ExecutionVertex[] {},
new ExecutionVertex[] {},
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -266,9 +263,9 @@ public class CheckpointStateRestoreTest {
2,
new HashMap<>(checkpointTaskStates),
Collections.<MasterState>emptyList(),
- CheckpointProperties.forStandardCheckpoint(),
- null,
- null);
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ new EmptyStreamStateHandle(),
+ "<pointer>");
coord.getCheckpointStore().addCheckpoint(checkpoint);
@@ -294,9 +291,9 @@ public class CheckpointStateRestoreTest {
3,
new HashMap<>(checkpointTaskStates),
Collections.<MasterState>emptyList(),
- CheckpointProperties.forStandardCheckpoint(),
- null,
- null);
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ new EmptyStreamStateHandle(),
+ "pointer");
coord.getCheckpointStore().addCheckpoint(checkpoint);
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
index 95a31d4..a410d26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
@@ -175,23 +175,23 @@ public class CheckpointStatsHistoryTest {
PendingCheckpointStats pending = mock(PendingCheckpointStats.class);
when(pending.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
when(pending.getCheckpointId()).thenReturn(checkpointId);
- when(pending.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+ when(pending.getProperties()).thenReturn(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
return pending;
}
private CompletedCheckpointStats createCompletedCheckpointStats(long checkpointId) {
CompletedCheckpointStats completed = mock(CompletedCheckpointStats.class);
when(completed.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
- when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+ when(completed.getProperties()).thenReturn(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
when(completed.getCheckpointId()).thenReturn(checkpointId);
- when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+ when(completed.getProperties()).thenReturn(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
return completed;
}
private FailedCheckpointStats createFailedCheckpointStats(long checkpointId) {
FailedCheckpointStats failed = mock(FailedCheckpointStats.class);
when(failed.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
- when(failed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+ when(failed.getProperties()).thenReturn(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
when(failed.getCheckpointId()).thenReturn(checkpointId);
return failed;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
index 6500369..b4bb88a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
@@ -45,7 +45,11 @@ public class CheckpointStatsSnapshotTest {
summary.updateSummary(createCompletedCheckpointsStats(2221, 3333, 9122));
CheckpointStatsHistory history = new CheckpointStatsHistory(1);
- RestoredCheckpointStats restored = new RestoredCheckpointStats(1, CheckpointProperties.forStandardCheckpoint(), 99119, null);
+ RestoredCheckpointStats restored = new RestoredCheckpointStats(
+ 1,
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ 99119,
+ null);
CheckpointStatsSnapshot snapshot = new CheckpointStatsSnapshot(
counts,
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 0d19cd5..82dcd02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -38,9 +38,9 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+
import org.junit.Test;
public class CheckpointStatsTrackerTest {
@@ -63,7 +63,7 @@ public class CheckpointStatsTrackerTest {
19191992L,
191929L,
123,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
false
),
null);
@@ -98,7 +98,7 @@ public class CheckpointStatsTrackerTest {
PendingCheckpointStats pending = tracker.reportPendingCheckpoint(
0,
1,
- CheckpointProperties.forStandardCheckpoint());
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.reportSubtaskStats(jobVertex.getJobVertexId(), createSubtaskStats(0));
pending.reportSubtaskStats(jobVertex.getJobVertexId(), createSubtaskStats(1));
@@ -147,7 +147,7 @@ public class CheckpointStatsTrackerTest {
PendingCheckpointStats completed1 = tracker.reportPendingCheckpoint(
0,
1,
- CheckpointProperties.forStandardCheckpoint());
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
completed1.reportSubtaskStats(jobVertex.getJobVertexId(), createSubtaskStats(0));
completed1.reportSubtaskStats(jobVertex.getJobVertexId(), createSubtaskStats(1));
@@ -159,7 +159,7 @@ public class CheckpointStatsTrackerTest {
PendingCheckpointStats failed = tracker.reportPendingCheckpoint(
1,
1,
- CheckpointProperties.forStandardCheckpoint());
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
failed.reportFailedCheckpoint(12, null);
@@ -167,7 +167,7 @@ public class CheckpointStatsTrackerTest {
PendingCheckpointStats savepoint = tracker.reportPendingCheckpoint(
2,
1,
- CheckpointProperties.forStandardSavepoint());
+ CheckpointProperties.forSavepoint());
savepoint.reportSubtaskStats(jobVertex.getJobVertexId(), createSubtaskStats(0));
savepoint.reportSubtaskStats(jobVertex.getJobVertexId(), createSubtaskStats(1));
@@ -179,9 +179,9 @@ public class CheckpointStatsTrackerTest {
PendingCheckpointStats inProgress = tracker.reportPendingCheckpoint(
3,
1,
- CheckpointProperties.forStandardCheckpoint());
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
- RestoredCheckpointStats restored = new RestoredCheckpointStats(81, CheckpointProperties.forStandardCheckpoint(), 123, null);
+ RestoredCheckpointStats restored = new RestoredCheckpointStats(81, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), 123, null);
tracker.reportRestoredCheckpoint(restored);
CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
@@ -254,7 +254,7 @@ public class CheckpointStatsTrackerTest {
PendingCheckpointStats pending = tracker.reportPendingCheckpoint(
0,
1,
- CheckpointProperties.forStandardCheckpoint());
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.reportSubtaskStats(jobVertex.getJobVertexId(), createSubtaskStats(0));
@@ -270,7 +270,7 @@ public class CheckpointStatsTrackerTest {
assertNotEquals(snapshot2, snapshot3);
// Restore operation => new snapshot
- tracker.reportRestoredCheckpoint(new RestoredCheckpointStats(12, CheckpointProperties.forStandardCheckpoint(), 12, null));
+ tracker.reportRestoredCheckpoint(new RestoredCheckpointStats(12, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), 12, null));
CheckpointStatsSnapshot snapshot4 = tracker.createSnapshot();
assertNotEquals(snapshot3, snapshot4);
@@ -373,7 +373,7 @@ public class CheckpointStatsTrackerTest {
PendingCheckpointStats pending = stats.reportPendingCheckpoint(
0,
0,
- CheckpointProperties.forStandardCheckpoint());
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
// Check counts
assertEquals(Long.valueOf(1), numCheckpoints.getValue());
@@ -415,7 +415,7 @@ public class CheckpointStatsTrackerTest {
PendingCheckpointStats nextPending = stats.reportPendingCheckpoint(
1,
11,
- CheckpointProperties.forStandardCheckpoint());
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
long failureTimestamp = 1230123L;
nextPending.reportFailedCheckpoint(failureTimestamp, null);
@@ -430,7 +430,7 @@ public class CheckpointStatsTrackerTest {
long restoreTimestamp = 183419283L;
RestoredCheckpointStats restored = new RestoredCheckpointStats(
1,
- CheckpointProperties.forStandardCheckpoint(),
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
restoreTimestamp,
null);
stats.reportRestoredCheckpoint(restored);
@@ -446,7 +446,7 @@ public class CheckpointStatsTrackerTest {
PendingCheckpointStats thirdPending = stats.reportPendingCheckpoint(
2,
5000,
- CheckpointProperties.forStandardCheckpoint());
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
thirdPending.reportSubtaskStats(jobVertex.getJobVertexId(), subtaskStats);
thirdPending.reportCompletedCheckpoint(null);
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
index dd9c2c8..2910765 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
@@ -19,9 +19,9 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+
import org.junit.Test;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -93,7 +93,7 @@ public class CompletedCheckpointStatsSummaryTest {
return new CompletedCheckpointStats(
checkpointId,
triggerTimestamp,
- CheckpointProperties.forStandardCheckpoint(),
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
1,
taskStats,
1,
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 320dc2d..d113aa1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.EmptyStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
@@ -196,7 +197,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
SharedStateRegistry sharedStateRegistry) throws IOException {
int numberOfStates = 4;
- CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
+ CheckpointProperties props = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION);
OperatorID operatorID = new OperatorID();
@@ -256,7 +257,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
long timestamp,
Map<OperatorID, OperatorState> operatorGroupState,
CheckpointProperties props) {
- super(jobId, checkpointId, timestamp, Long.MAX_VALUE, operatorGroupState, null, props, null, null);
+ super(jobId, checkpointId, timestamp, Long.MAX_VALUE, operatorGroupState, null, props,
+ new EmptyStreamStateHandle(), "<pointer");
}
@Override