You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:19 UTC
[02/17] flink git commit: [FLINK-5823] [checkpoints] State backends
now also handle the checkpoint metadata
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 293675c..98b6647 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -25,11 +25,12 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.mockito.Mockito;
import java.io.File;
import java.util.Collections;
@@ -63,7 +64,7 @@ public class CompletedCheckpointTest {
new JobID(), 0, 0, 1,
taskStates,
Collections.<MasterState>emptyList(),
- CheckpointProperties.forStandardCheckpoint(),
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
new FileStateHandle(new Path(file.toURI()), file.length()),
file.getAbsolutePath());
@@ -81,16 +82,18 @@ public class CompletedCheckpointTest {
Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
operatorStates.put(new OperatorID(), state);
+ StreamStateHandle metadataHandle = mock(StreamStateHandle.class);
+
boolean discardSubsumed = true;
- CheckpointProperties props = new CheckpointProperties(false, false, false, discardSubsumed, true, true, true, true);
+ CheckpointProperties props = new CheckpointProperties(false, false, discardSubsumed, true, true, true, true);
CompletedCheckpoint checkpoint = new CompletedCheckpoint(
new JobID(), 0, 0, 1,
operatorStates,
Collections.<MasterState>emptyList(),
props,
- null,
- null);
+ metadataHandle,
+ "some/mock/pointer");
SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
@@ -100,6 +103,7 @@ public class CompletedCheckpointTest {
checkpoint.discardOnSubsume();
verify(state, times(1)).discardState();
+ verify(metadataHandle).discardState();
}
/**
@@ -107,49 +111,48 @@ public class CompletedCheckpointTest {
*/
@Test
public void testCleanUpOnShutdown() throws Exception {
- File file = tmpFolder.newFile();
- String externalPath = file.getAbsolutePath();
-
JobStatus[] terminalStates = new JobStatus[] {
JobStatus.FINISHED, JobStatus.CANCELED, JobStatus.FAILED, JobStatus.SUSPENDED
};
- OperatorState state = mock(OperatorState.class);
- Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
- operatorStates.put(new OperatorID(), state);
-
for (JobStatus status : terminalStates) {
- Mockito.reset(state);
+
+ OperatorState state = mock(OperatorState.class);
+ Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
+ operatorStates.put(new OperatorID(), state);
+
+ StreamStateHandle metadataHandle = mock(StreamStateHandle.class);
// Keep
- CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false, false);
+ CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
CompletedCheckpoint checkpoint = new CompletedCheckpoint(
new JobID(), 0, 0, 1,
new HashMap<>(operatorStates),
Collections.<MasterState>emptyList(),
props,
- new FileStateHandle(new Path(file.toURI()), file.length()),
- externalPath);
+ metadataHandle,
+ "mock://some/pointer");
SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
checkpoint.discardOnShutdown(status);
verify(state, times(0)).discardState();
- assertEquals(true, file.exists());
+ verify(metadataHandle, times(0)).discardState();
// Discard
- props = new CheckpointProperties(false, false, false, true, true, true, true, true);
+ props = new CheckpointProperties(false, false, true, true, true, true, true);
checkpoint = new CompletedCheckpoint(
new JobID(), 0, 0, 1,
new HashMap<>(operatorStates),
Collections.<MasterState>emptyList(),
props,
- null,
- null);
+ metadataHandle,
+ "pointer");
checkpoint.discardOnShutdown(status);
verify(state, times(1)).discardState();
+ verify(metadataHandle, times(1)).discardState();
}
}
@@ -169,9 +172,9 @@ public class CompletedCheckpointTest {
1,
new HashMap<>(operatorStates),
Collections.<MasterState>emptyList(),
- CheckpointProperties.forStandardCheckpoint(),
- null,
- null);
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ mock(StreamStateHandle.class),
+ "pointer");
CompletedCheckpointStats.DiscardCallback callback = mock(CompletedCheckpointStats.DiscardCallback.class);
completed.setDiscardCallback(callback);
@@ -192,7 +195,7 @@ public class CompletedCheckpointTest {
CompletedCheckpointStats completed = new CompletedCheckpointStats(
123123123L,
10123L,
- CheckpointProperties.forStandardCheckpoint(),
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
1337,
taskStats,
1337,
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 7d6c7b5..1d44444 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -29,15 +29,14 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-
import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import scala.concurrent.Await;
@@ -80,7 +79,7 @@ public class CoordinatorShutdownTest extends TestLogger {
60000,
0L,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
@@ -149,7 +148,7 @@ public class CoordinatorShutdownTest extends TestLogger {
60000,
0L,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 6a84a11..a53d6d2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -95,14 +94,13 @@ public class ExecutionGraphCheckpointCoordinatorTest {
100,
100,
1,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
counter,
store,
- null,
new MemoryStateBackend(),
CheckpointStatsTrackerTest.createTestTracker());
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
index f1a56be..7668f62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+
import org.junit.Test;
import java.io.NotSerializableException;
@@ -47,7 +48,7 @@ public class FailedCheckpointStatsTest {
FailedCheckpointStats failed = new FailedCheckpointStats(
0,
triggerTimestamp,
- CheckpointProperties.forStandardCheckpoint(),
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
1,
taskStats,
0,
@@ -73,7 +74,7 @@ public class FailedCheckpointStatsTest {
FailedCheckpointStats failed = new FailedCheckpointStats(
123123123L,
triggerTimestamp,
- CheckpointProperties.forStandardCheckpoint(),
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
1337,
taskStats,
3,
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
index 6c5e8fd..73db317 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
@@ -42,7 +42,7 @@ public class PendingCheckpointStatsTest {
public void testReportSubtaskStats() throws Exception {
long checkpointId = Integer.MAX_VALUE + 1222L;
long triggerTimestamp = Integer.MAX_VALUE - 1239L;
- CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
+ CheckpointProperties props = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION);
TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
int totalSubtaskCount = task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks();
@@ -138,7 +138,7 @@ public class PendingCheckpointStatsTest {
PendingCheckpointStats pending = new PendingCheckpointStats(
0,
1,
- CheckpointProperties.forStandardCheckpoint(),
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks(),
taskStats,
callback);
@@ -199,7 +199,7 @@ public class PendingCheckpointStatsTest {
PendingCheckpointStats pending = new PendingCheckpointStats(
0,
triggerTimestamp,
- CheckpointProperties.forStandardCheckpoint(),
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks(),
taskStats,
callback);
@@ -251,7 +251,7 @@ public class PendingCheckpointStatsTest {
PendingCheckpointStats pending = new PendingCheckpointStats(
123123123L,
10123L,
- CheckpointProperties.forStandardCheckpoint(),
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
1337,
taskStats,
mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class));
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index bf79457..7b6992b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -26,13 +28,15 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
+
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
-import java.io.File;
+import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.Collections;
@@ -43,7 +47,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -55,6 +58,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.when;
+/**
+ * Tests for the {@link PendingCheckpoint}.
+ */
public class PendingCheckpointTest {
private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS = new HashMap<>();
@@ -63,7 +69,7 @@ public class PendingCheckpointTest {
static {
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
when(jobVertex.getOperatorIDs()).thenReturn(Collections.singletonList(new OperatorID()));
-
+
ExecutionVertex vertex = mock(ExecutionVertex.class);
when(vertex.getMaxParallelism()).thenReturn(128);
when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(1);
@@ -80,8 +86,8 @@ public class PendingCheckpointTest {
@Test
public void testCanBeSubsumed() throws Exception {
// Forced checkpoints cannot be subsumed
- CheckpointProperties forced = new CheckpointProperties(true, true, false, false, false, false, false, false);
- PendingCheckpoint pending = createPendingCheckpoint(forced, "ignored");
+ CheckpointProperties forced = new CheckpointProperties(true, true, false, false, false, false, false);
+ PendingCheckpoint pending = createPendingCheckpoint(forced);
assertFalse(pending.canBeSubsumed());
try {
@@ -92,48 +98,21 @@ public class PendingCheckpointTest {
}
// Non-forced checkpoints can be subsumed
- CheckpointProperties subsumed = new CheckpointProperties(false, true, false, false, false, false, false, false);
- pending = createPendingCheckpoint(subsumed, "ignored");
+ CheckpointProperties subsumed = new CheckpointProperties(false, true, false, false, false, false, false);
+ pending = createPendingCheckpoint(subsumed);
assertTrue(pending.canBeSubsumed());
}
/**
- * Tests that the persist checkpoint property is respected by the pending
- * checkpoint when finalizing.
- */
- @Test
- public void testPersistExternally() throws Exception {
- File tmp = tmpFolder.newFolder();
-
- // Persisted checkpoint
- CheckpointProperties persisted = new CheckpointProperties(false, true, false, false, false, false, false, false);
-
- PendingCheckpoint pending = createPendingCheckpoint(persisted, tmp.getAbsolutePath());
- pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
- assertEquals(0, tmp.listFiles().length);
- pending.finalizeCheckpointExternalized();
- assertEquals(1, tmp.listFiles().length);
-
- // Ephemeral checkpoint
- CheckpointProperties ephemeral = new CheckpointProperties(false, false, false, true, true, true, true, true);
- pending = createPendingCheckpoint(ephemeral, null);
- pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
-
- assertEquals(1, tmp.listFiles().length);
- pending.finalizeCheckpointNonExternalized();
- assertEquals(1, tmp.listFiles().length);
- }
-
- /**
* Tests that the completion future is succeeded on finalize and failed on
* abort and failures during finalize.
*/
@Test
public void testCompletionFuture() throws Exception {
- CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false, false);
+ CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
// Abort declined
- PendingCheckpoint pending = createPendingCheckpoint(props, "ignored");
+ PendingCheckpoint pending = createPendingCheckpoint(props);
CompletableFuture<CompletedCheckpoint> future = pending.getCompletionFuture();
assertFalse(future.isDone());
@@ -141,7 +120,7 @@ public class PendingCheckpointTest {
assertTrue(future.isDone());
// Abort expired
- pending = createPendingCheckpoint(props, "ignored");
+ pending = createPendingCheckpoint(props);
future = pending.getCompletionFuture();
assertFalse(future.isDone());
@@ -149,7 +128,7 @@ public class PendingCheckpointTest {
assertTrue(future.isDone());
// Abort subsumed
- pending = createPendingCheckpoint(props, "ignored");
+ pending = createPendingCheckpoint(props);
future = pending.getCompletionFuture();
assertFalse(future.isDone());
@@ -157,29 +136,22 @@ public class PendingCheckpointTest {
assertTrue(future.isDone());
// Finalize (all ACK'd)
- String target = tmpFolder.newFolder().getAbsolutePath();
- pending = createPendingCheckpoint(props, target);
+ pending = createPendingCheckpoint(props);
future = pending.getCompletionFuture();
assertFalse(future.isDone());
pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
assertTrue(pending.isFullyAcknowledged());
- pending.finalizeCheckpointExternalized();
+ pending.finalizeCheckpoint();
assertTrue(future.isDone());
// Finalize (missing ACKs)
- pending = createPendingCheckpoint(props, "ignored");
+ pending = createPendingCheckpoint(props);
future = pending.getCompletionFuture();
assertFalse(future.isDone());
try {
- pending.finalizeCheckpointNonExternalized();
- fail("Did not throw expected Exception");
- } catch (IllegalStateException ignored) {
- // Expected
- }
- try {
- pending.finalizeCheckpointExternalized();
+ pending.finalizeCheckpoint();
fail("Did not throw expected Exception");
} catch (IllegalStateException ignored) {
// Expected
@@ -192,16 +164,14 @@ public class PendingCheckpointTest {
@Test
@SuppressWarnings("unchecked")
public void testAbortDiscardsState() throws Exception {
- CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false, false);
+ CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
QueueExecutor executor = new QueueExecutor();
OperatorState state = mock(OperatorState.class);
doNothing().when(state).registerSharedStates(any(SharedStateRegistry.class));
- String targetDir = tmpFolder.newFolder().getAbsolutePath();
-
// Abort declined
- PendingCheckpoint pending = createPendingCheckpoint(props, targetDir, executor);
+ PendingCheckpoint pending = createPendingCheckpoint(props, executor);
setTaskState(pending, state);
pending.abortDeclined();
@@ -212,7 +182,7 @@ public class PendingCheckpointTest {
// Abort error
Mockito.reset(state);
- pending = createPendingCheckpoint(props, targetDir, executor);
+ pending = createPendingCheckpoint(props, executor);
setTaskState(pending, state);
pending.abortError(new Exception("Expected Test Exception"));
@@ -223,7 +193,7 @@ public class PendingCheckpointTest {
// Abort expired
Mockito.reset(state);
- pending = createPendingCheckpoint(props, targetDir, executor);
+ pending = createPendingCheckpoint(props, executor);
setTaskState(pending, state);
pending.abortExpired();
@@ -234,7 +204,7 @@ public class PendingCheckpointTest {
// Abort subsumed
Mockito.reset(state);
- pending = createPendingCheckpoint(props, targetDir, executor);
+ pending = createPendingCheckpoint(props, executor);
setTaskState(pending, state);
pending.abortSubsumed();
@@ -251,20 +221,22 @@ public class PendingCheckpointTest {
{
// Complete successfully
PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
- PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+ PendingCheckpoint pending = createPendingCheckpoint(
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.setStatsCallback(callback);
pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
verify(callback, times(1)).reportSubtaskStats(any(JobVertexID.class), any(SubtaskStateStats.class));
- pending.finalizeCheckpointNonExternalized();
+ pending.finalizeCheckpoint();
verify(callback, times(1)).reportCompletedCheckpoint(any(String.class));
}
{
// Fail subsumed
PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
- PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+ PendingCheckpoint pending = createPendingCheckpoint(
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.setStatsCallback(callback);
pending.abortSubsumed();
@@ -274,7 +246,8 @@ public class PendingCheckpointTest {
{
// Fail subsumed
PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
- PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+ PendingCheckpoint pending = createPendingCheckpoint(
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.setStatsCallback(callback);
pending.abortDeclined();
@@ -284,7 +257,8 @@ public class PendingCheckpointTest {
{
// Fail subsumed
PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
- PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+ PendingCheckpoint pending = createPendingCheckpoint(
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.setStatsCallback(callback);
pending.abortError(new Exception("Expected test error"));
@@ -294,7 +268,8 @@ public class PendingCheckpointTest {
{
// Fail subsumed
PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
- PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+ PendingCheckpoint pending = createPendingCheckpoint(
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.setStatsCallback(callback);
pending.abortExpired();
@@ -303,41 +278,43 @@ public class PendingCheckpointTest {
}
/**
- * FLINK-5985
- * <p>
- * Ensures that subtasks that acknowledge their state as 'null' are considered stateless. This means that they
+ * FLINK-5985.
+ *
+ * <p>Ensures that subtasks that acknowledge their state as 'null' are considered stateless. This means that they
* should not appear in the task states map of the checkpoint.
*/
@Test
public void testNullSubtaskStateLeadsToStatelessTask() throws Exception {
- PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+ PendingCheckpoint pending = createPendingCheckpoint(
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class));
Assert.assertTrue(pending.getOperatorStates().isEmpty());
}
/**
- * FLINK-5985
- * <p>
- * This tests checks the inverse of {@link #testNullSubtaskStateLeadsToStatelessTask()}. We want to test that
+ * FLINK-5985.
+ *
+ * <p>This tests checks the inverse of {@link #testNullSubtaskStateLeadsToStatelessTask()}. We want to test that
* for subtasks that acknowledge some state are given an entry in the task states of the checkpoint.
*/
@Test
public void testNonNullSubtaskStateLeadsToStatefulTask() throws Exception {
- PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+ PendingCheckpoint pending = createPendingCheckpoint(
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.acknowledgeTask(ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class));
Assert.assertFalse(pending.getOperatorStates().isEmpty());
}
@Test
- public void testSetCanceller() {
- final CheckpointProperties props = new CheckpointProperties(false, false, false, true, true, true, true, true);
+ public void testSetCanceller() throws Exception {
+ final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true, true, true);
- PendingCheckpoint aborted = createPendingCheckpoint(props, null);
+ PendingCheckpoint aborted = createPendingCheckpoint(props);
aborted.abortDeclined();
assertTrue(aborted.isDiscarded());
assertFalse(aborted.setCancellerHandle(mock(ScheduledFuture.class)));
- PendingCheckpoint pending = createPendingCheckpoint(props, null);
+ PendingCheckpoint pending = createPendingCheckpoint(props);
ScheduledFuture<?> canceller = mock(ScheduledFuture.class);
assertTrue(pending.setCancellerHandle(canceller));
@@ -347,23 +324,25 @@ public class PendingCheckpointTest {
// ------------------------------------------------------------------------
- private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {
- return createPendingCheckpoint(props, targetDirectory, Executors.directExecutor());
+ private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props) throws IOException {
+ return createPendingCheckpoint(props, Executors.directExecutor());
}
- private static PendingCheckpoint createPendingCheckpoint(
- CheckpointProperties props,
- String targetDirectory,
- Executor executor) {
+ private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Executor executor) throws IOException {
+
+ final Path checkpointDir = new Path(tmpFolder.newFolder().toURI());
+ final FsCheckpointStorageLocation location = new FsCheckpointStorageLocation(
+ LocalFileSystem.getSharedInstance(), checkpointDir, checkpointDir, checkpointDir);
+
+ final Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
- Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
return new PendingCheckpoint(
new JobID(),
0,
1,
ackTasks,
props,
- targetDirectory,
+ location,
executor);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
index d43283d..85b1516 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
@@ -31,7 +31,7 @@ public class RestoredCheckpointStatsTest {
public void testSimpleAccess() throws Exception {
long checkpointId = Integer.MAX_VALUE + 1L;
long triggerTimestamp = Integer.MAX_VALUE + 1L;
- CheckpointProperties props = new CheckpointProperties(true, true, false, false, false, true, false, true);
+ CheckpointProperties props = new CheckpointProperties(true, true, false, false, true, false, true);
long restoreTimestamp = Integer.MAX_VALUE + 1L;
String externalPath = "external-path";
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 7c19b19..a167130 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.EmptyStreamStateHandle;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
@@ -32,8 +33,10 @@ import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.ErrorListenerPathable;
import org.apache.curator.utils.EnsurePath;
+
import org.junit.Test;
import org.junit.runner.RunWith;
+
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -92,8 +95,9 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
1L,
new HashMap<>(),
null,
- CheckpointProperties.forStandardCheckpoint(),
- null, null);
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ new EmptyStreamStateHandle(),
+ "<pointer>");
final CompletedCheckpoint completedCheckpoint2 = new CompletedCheckpoint(
new JobID(),
@@ -102,8 +106,9 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
2L,
new HashMap<>(),
null,
- CheckpointProperties.forStandardCheckpoint(),
- null, null);
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ new EmptyStreamStateHandle(),
+ "<pointer");
final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
expectedCheckpointIds.add(1L);
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
index 3498a41..ccc8afe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
@@ -21,9 +21,11 @@ package org.apache.flink.runtime.checkpoint.hooks;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import javax.annotation.Nullable;
+
import java.net.URL;
import java.net.URLClassLoader;
import java.util.concurrent.CompletableFuture;
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
deleted file mode 100644
index a461569..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class SavepointLoaderTest {
-
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
-
- /**
- * Tests loading and validation of savepoints with correct setup,
- * parallelism mismatch, and a missing task.
- */
- @Test
- public void testLoadAndValidateSavepoint() throws Exception {
- File tmp = tmpFolder.newFolder();
-
- int parallelism = 128128;
- long checkpointId = Integer.MAX_VALUE + 123123L;
- JobVertexID jobVertexID = new JobVertexID();
- OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
-
- OperatorSubtaskState subtaskState = new OperatorSubtaskState(
- new OperatorStateHandle(Collections.emptyMap(), new ByteStreamStateHandle("testHandler", new byte[0])),
- null,
- null,
- null);
-
- OperatorState state = new OperatorState(operatorID, parallelism, parallelism);
- state.putState(0, subtaskState);
-
- Map<OperatorID, OperatorState> taskStates = new HashMap<>();
- taskStates.put(operatorID, state);
-
- JobID jobId = new JobID();
-
- // Store savepoint
- SavepointV2 savepoint = new SavepointV2(checkpointId, taskStates.values(), Collections.<MasterState>emptyList());
- String path = SavepointStore.storeSavepoint(tmp.getAbsolutePath(), savepoint);
-
- ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
- when(vertex.getParallelism()).thenReturn(parallelism);
- when(vertex.getMaxParallelism()).thenReturn(parallelism);
- when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(operatorID));
-
- Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
- tasks.put(jobVertexID, vertex);
-
- ClassLoader ucl = Thread.currentThread().getContextClassLoader();
-
- // 1) Load and validate: everything correct
- CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false);
-
- assertEquals(jobId, loaded.getJobId());
- assertEquals(checkpointId, loaded.getCheckpointID());
-
- // 2) Load and validate: max parallelism mismatch
- when(vertex.getMaxParallelism()).thenReturn(222);
- when(vertex.isMaxParallelismConfigured()).thenReturn(true);
-
- try {
- SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false);
- fail("Did not throw expected Exception");
- } catch (IllegalStateException expected) {
- assertTrue(expected.getMessage().contains("Max parallelism mismatch"));
- }
-
- // 3) Load and validate: missing vertex
- assertNotNull(tasks.remove(jobVertexID));
-
- try {
- SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false);
- fail("Did not throw expected Exception");
- } catch (IllegalStateException expected) {
- assertTrue(expected.getMessage().contains("allowNonRestoredState"));
- }
-
- // 4) Load and validate: ignore missing vertex
- SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
deleted file mode 100644
index 0444936..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import java.io.File;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.Matchers;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-
-public class SavepointStoreTest {
-
- @Rule
- public TemporaryFolder tmp = new TemporaryFolder();
-
- /**
- * Tests a store-load-dispose sequence.
- */
- @Test
- public void testStoreLoadDispose() throws Exception {
- String root = tmp.getRoot().getAbsolutePath();
- File rootFile = new File(root);
-
- File[] list = rootFile.listFiles();
-
- assertNotNull(list);
- assertEquals(0, list.length);
-
- // Store
- String savepointDirectory = SavepointStore.createSavepointDirectory(root, new JobID());
- SavepointV2 stored = new SavepointV2(
- 1929292,
- CheckpointTestUtils.createOperatorStates(4, 24),
- Collections.<MasterState>emptyList());
- String path = SavepointStore.storeSavepoint(savepointDirectory, stored);
-
- list = rootFile.listFiles();
- assertNotNull(list);
- assertEquals(1, list.length);
-
- // Load
- Savepoint loaded = SavepointStore.loadSavepoint(path, Thread.currentThread().getContextClassLoader());
-
- assertEquals(stored.getCheckpointId(), loaded.getCheckpointId());
- assertEquals(stored.getOperatorStates(), loaded.getOperatorStates());
- assertEquals(stored.getMasterStates(), loaded.getMasterStates());
-
- loaded.dispose();
-
- // Dispose
- SavepointStore.deleteSavepointDirectory(path);
-
- list = rootFile.listFiles();
- assertNotNull(list);
- assertEquals(0, list.length);
- }
-
- /**
- * Tests loading with unexpected magic number.
- */
- @Test
- public void testUnexpectedSavepoint() throws Exception {
- // Random file
- Path filePath = new Path(tmp.getRoot().getPath(), UUID.randomUUID().toString());
- FSDataOutputStream fdos = FileSystem.get(filePath.toUri()).create(filePath, FileSystem.WriteMode.NO_OVERWRITE);
- DataOutputStream dos = new DataOutputStream(fdos);
- for (int i = 0; i < 10; i++) {
- dos.writeLong(ThreadLocalRandom.current().nextLong());
- }
-
- try {
- SavepointStore.loadSavepoint(filePath.toString(), Thread.currentThread().getContextClassLoader());
- fail("Did not throw expected Exception");
- } catch (RuntimeException e) {
- assertTrue(e.getMessage().contains("Flink 1.0") && e.getMessage().contains("Unexpected magic number"));
- }
- }
-
- /**
- * Tests addition of a new savepoint version.
- */
- @Test
- @SuppressWarnings("unchecked")
- public void testMultipleSavepointVersions() throws Exception {
- Field field = SavepointSerializers.class.getDeclaredField("SERIALIZERS");
- field.setAccessible(true);
- Map<Integer, SavepointSerializer<?>> serializers = (Map<Integer, SavepointSerializer<?>>) field.get(null);
-
- assertTrue(serializers.size() >= 1);
-
- String root = tmp.getRoot().getAbsolutePath();
- File rootFile = new File(root);
-
- // New savepoint type for test
- int version = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE); // make this a positive number
- long checkpointId = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); // make this a positive number
-
- // Add serializer
- serializers.put(version, NewSavepointSerializer.INSTANCE);
-
- String savepointDirectory1 = SavepointStore.createSavepointDirectory(root, new JobID());
- TestSavepoint newSavepoint = new TestSavepoint(version, checkpointId);
- String pathNewSavepoint = SavepointStore.storeSavepoint(savepointDirectory1, newSavepoint);
-
- File[] list = rootFile.listFiles();
-
- assertNotNull(list);
- assertEquals(1, list.length);
-
- // Savepoint v0
- String savepointDirectory2 = SavepointStore.createSavepointDirectory(root, new JobID());
- SavepointV2 savepoint = new SavepointV2(
- checkpointId,
- CheckpointTestUtils.createOperatorStates(4, 32),
- Collections.<MasterState>emptyList());
- String pathSavepoint = SavepointStore.storeSavepoint(savepointDirectory2, savepoint);
-
- list = rootFile.listFiles();
-
- assertNotNull(list);
- assertEquals(2, list.length);
-
- // Load
- Savepoint loaded = SavepointStore.loadSavepoint(pathNewSavepoint, Thread.currentThread().getContextClassLoader());
- assertEquals(newSavepoint, loaded);
-
- loaded = SavepointStore.loadSavepoint(pathSavepoint, Thread.currentThread().getContextClassLoader());
- assertEquals(savepoint.getCheckpointId(), loaded.getCheckpointId());
- assertEquals(savepoint.getTaskStates(), loaded.getTaskStates());
- assertEquals(savepoint.getMasterStates(), loaded.getMasterStates());
- }
-
- /**
- * Tests that an exception during store cleans up the created savepoint file.
- */
- @Test
- public void testCleanupOnStoreFailure() throws Exception {
- Field field = SavepointSerializers.class.getDeclaredField("SERIALIZERS");
- field.setAccessible(true);
- Map<Integer, SavepointSerializer<?>> serializers = (Map<Integer, SavepointSerializer<?>>) field.get(null);
-
- String target = tmp.getRoot().getAbsolutePath();
-
- final int version = 123123;
- SavepointSerializer<TestSavepoint> serializer = mock(SavepointSerializer.class);
- doThrow(new RuntimeException("Test Exception")).when(serializer)
- .serialize(Matchers.any(TestSavepoint.class), any(DataOutputStream.class));
-
- serializers.put(version, serializer);
-
- Savepoint savepoint = new TestSavepoint(version, 12123123);
-
- assertEquals(0, tmp.getRoot().listFiles().length);
-
- try {
- SavepointStore.storeSavepoint(target, savepoint);
- } catch (Throwable ignored) {
- }
-
- assertEquals("Savepoint file not cleaned up on failure", 0, tmp.getRoot().listFiles().length);
- }
-
- /**
- * Tests that multiple externalized checkpoints can be stored to the same
- * directory.
- */
- @Test
- public void testStoreExternalizedCheckpointsToSameDirectory() throws Exception {
- String root = tmp.newFolder().getAbsolutePath();
- FileSystem fs = FileSystem.get(new Path(root).toUri());
-
- // Store
- SavepointV2 savepoint = new SavepointV2(
- 1929292,
- CheckpointTestUtils.createOperatorStates(4, 24),
- Collections.<MasterState>emptyList());
-
- FileStateHandle store1 = SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint);
- fs.exists(store1.getFilePath());
- assertTrue(store1.getFilePath().getPath().contains(SavepointStore.EXTERNALIZED_CHECKPOINT_METADATA_FILE));
-
- FileStateHandle store2 = SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint);
- fs.exists(store2.getFilePath());
- assertTrue(store2.getFilePath().getPath().contains(SavepointStore.EXTERNALIZED_CHECKPOINT_METADATA_FILE));
- }
-
- private static class NewSavepointSerializer implements SavepointSerializer<TestSavepoint> {
-
- private static final NewSavepointSerializer INSTANCE = new NewSavepointSerializer();
-
- @Override
- public void serialize(TestSavepoint savepoint, DataOutputStream dos) throws IOException {
- dos.writeInt(savepoint.version);
- dos.writeLong(savepoint.checkpointId);
- }
-
- @Override
- public TestSavepoint deserialize(DataInputStream dis, ClassLoader userCL) throws IOException {
- int version = dis.readInt();
- long checkpointId = dis.readLong();
- return new TestSavepoint(version, checkpointId);
- }
-
- }
-
- private static class TestSavepoint implements Savepoint {
-
- private final int version;
- private final long checkpointId;
-
- public TestSavepoint(int version, long checkpointId) {
- this.version = version;
- this.checkpointId = checkpointId;
- }
-
- @Override
- public int getVersion() {
- return version;
- }
-
- @Override
- public long getCheckpointId() {
- return checkpointId;
- }
-
- @Override
- public Collection<TaskState> getTaskStates() {
- return Collections.emptyList();
- }
-
- @Override
- public Collection<MasterState> getMasterStates() {
- return Collections.emptyList();
- }
-
- @Override
- public Collection<OperatorState> getOperatorStates() {
- return null;
- }
-
- @Override
- public void dispose() {
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- TestSavepoint that = (TestSavepoint) o;
- return version == that.version && checkpointId == that.checkpointId;
-
- }
-
- @Override
- public int hashCode() {
- int result = version;
- result = 31 * result + (int) (checkpointId ^ (checkpointId >>> 32));
- return result;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 3fe8613..ba93b7b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -15,20 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
@@ -126,22 +125,16 @@ public class ArchivedExecutionGraphTest extends TestLogger {
100,
100,
1,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
Collections.<ExecutionJobVertex>emptyList(),
Collections.<ExecutionJobVertex>emptyList(),
Collections.<ExecutionJobVertex>emptyList(),
Collections.<MasterTriggerRestoreHook<?>>emptyList(),
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
- null,
new MemoryStateBackend(),
statsTracker);
- Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>();
- userAccumulators.put("userAcc", new LongCounter(64));
-
- Execution executionWithAccumulators = runtimeGraph.getJobVertex(v1ID).getTaskVertices()[0].getCurrentExecutionAttempt();
-
runtimeGraph.setJsonPlan("{}");
runtimeGraph.getJobVertex(v2ID).getTaskVertices()[0].getCurrentExecutionAttempt().fail(new RuntimeException("This exception was thrown on purpose."));
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 12e9b5d..55166d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -50,7 +51,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
@@ -690,7 +690,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
10 * 60 * 1000,
0,
1,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
false),
null));
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index 4725296..a3a26f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
@@ -41,7 +42,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
@@ -77,7 +77,7 @@ import static org.mockito.Mockito.when;
/**
* These tests make sure that global failover (restart all) always takes precedence over
* local recovery strategies.
- *
+ *
* <p>This test must be in the package it resides in, because it uses package-private methods
* from the ExecutionGraph classes.
*/
@@ -312,7 +312,7 @@ public class IndividualRestartsConcurrencyTest extends TestLogger {
100000L,
1L,
3,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true);
final ExecutionGraph graph = createSampleGraph(
@@ -330,14 +330,13 @@ public class IndividualRestartsConcurrencyTest extends TestLogger {
checkpointCoordinatorConfiguration.getCheckpointTimeout(),
checkpointCoordinatorConfiguration.getMinPauseBetweenCheckpoints(),
checkpointCoordinatorConfiguration.getMaxConcurrentCheckpoints(),
- checkpointCoordinatorConfiguration.getExternalizedCheckpointSettings(),
+ checkpointCoordinatorConfiguration.getCheckpointRetentionPolicy(),
allVertices,
allVertices,
allVertices,
Collections.emptyList(),
standaloneCheckpointIDCounter,
new StandaloneCompletedCheckpointStore(1),
- "",
new MemoryStateBackend(),
new CheckpointStatsTracker(
1,
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
index 721b8f1..51e7fec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.SerializedValue;
@@ -47,7 +48,7 @@ public class JobCheckpointingSettingsTest {
1231,
112,
12,
- ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+ CheckpointRetentionPolicy.RETAIN_ON_FAILURE,
false),
new SerializedValue<>(new MemoryStateBackend()));
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 79f7342..af9c8b4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
@@ -56,7 +57,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -252,7 +252,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
10L * 60L * 1000L,
0L,
1,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 452afd6..f8da2a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
@@ -61,7 +62,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
@@ -893,7 +893,7 @@ public class JobManagerTest extends TestLogger {
3600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null);
@@ -921,7 +921,7 @@ public class JobManagerTest extends TestLogger {
if (cancelResp instanceof CancellationFailure) {
CancellationFailure failure = (CancellationFailure) cancelResp;
if (failure.cause().getMessage().contains(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING.message())) {
- Thread.sleep(200); // wait and retry
+ Thread.sleep(10); // wait and retry
} else {
failure.cause().printStackTrace();
fail("Failed to cancel job: " + failure.cause().getMessage());
@@ -983,7 +983,7 @@ public class JobManagerTest extends TestLogger {
3600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true);
JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(
@@ -1105,7 +1105,7 @@ public class JobManagerTest extends TestLogger {
3600000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null);
@@ -1220,7 +1220,7 @@ public class JobManagerTest extends TestLogger {
360000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null);
@@ -1336,7 +1336,7 @@ public class JobManagerTest extends TestLogger {
360000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null);
@@ -1384,7 +1384,7 @@ public class JobManagerTest extends TestLogger {
360000,
0,
Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null);
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 0ca83ae..508ccd0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
@@ -32,7 +33,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.NetUtils;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -235,7 +236,7 @@ public class JobSubmitTest {
5000,
0L,
10,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
return jg;
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
index 9e801a3..13a3194 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.util.TestLogger;
@@ -97,7 +97,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
timeout,
1L,
1,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true));
JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
@@ -131,7 +131,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
timeout,
1L,
1,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true));
JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory");
@@ -184,7 +184,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
1L,
1L,
1,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true));
JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
@@ -316,7 +316,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
1L,
1L,
1,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true));
JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
index c7560da..47ebb18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
@@ -19,10 +19,10 @@
package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
@@ -55,7 +55,7 @@ public class CheckpointConfigHandlerTest {
AccessExecutionGraph graph = graphAndSettings.graph;
when(graph.getJobID()).thenReturn(new JobID());
CheckpointCoordinatorConfiguration chkConfig = graphAndSettings.jobCheckpointingConfiguration;
- ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
+ CheckpointRetentionPolicy retentionPolicy = graphAndSettings.retentionPolicy;
Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
Assert.assertEquals(1, archives.size());
@@ -73,8 +73,8 @@ public class CheckpointConfigHandlerTest {
JsonNode externalizedNode = rootNode.get("externalization");
Assert.assertNotNull(externalizedNode);
- Assert.assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
- Assert.assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+ Assert.assertEquals(retentionPolicy != CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, externalizedNode.get("enabled").asBoolean());
+ Assert.assertEquals(retentionPolicy != CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION, externalizedNode.get("delete_on_cancellation").asBoolean());
}
@@ -139,7 +139,7 @@ public class CheckpointConfigHandlerTest {
GraphAndSettings graphAndSettings = createGraphAndSettings(true, false);
AccessExecutionGraph graph = graphAndSettings.graph;
- ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
+ CheckpointRetentionPolicy retentionPolicy = graphAndSettings.retentionPolicy;
CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphCache.class), Executors.directExecutor());
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
@@ -147,8 +147,8 @@ public class CheckpointConfigHandlerTest {
ObjectMapper mapper = new ObjectMapper();
JsonNode externalizedNode = mapper.readTree(json).get("externalization");
assertNotNull(externalizedNode);
- assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
- assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+ assertEquals(retentionPolicy != CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, externalizedNode.get("enabled").asBoolean());
+ assertEquals(retentionPolicy != CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION, externalizedNode.get("delete_on_cancellation").asBoolean());
}
private static GraphAndSettings createGraphAndSettings(boolean externalized, boolean exactlyOnce) {
@@ -156,36 +156,37 @@ public class CheckpointConfigHandlerTest {
long timeout = 996979L;
long minPause = 119191919L;
int maxConcurrent = 12929329;
- ExternalizedCheckpointSettings externalizedSetting = externalized
- ? ExternalizedCheckpointSettings.externalizeCheckpoints(true)
- : ExternalizedCheckpointSettings.none();
+
+ CheckpointRetentionPolicy retentionPolicy = externalized
+ ? CheckpointRetentionPolicy.RETAIN_ON_FAILURE
+ : CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
interval,
timeout,
minPause,
maxConcurrent,
- externalizedSetting,
+ retentionPolicy,
exactlyOnce);
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
when(graph.getCheckpointCoordinatorConfiguration()).thenReturn(chkConfig);
- return new GraphAndSettings(graph, chkConfig, externalizedSetting);
+ return new GraphAndSettings(graph, chkConfig, retentionPolicy);
}
private static class GraphAndSettings {
public final AccessExecutionGraph graph;
public final CheckpointCoordinatorConfiguration jobCheckpointingConfiguration;
- public final ExternalizedCheckpointSettings externalizedSettings;
+ public final CheckpointRetentionPolicy retentionPolicy;
public GraphAndSettings(
AccessExecutionGraph graph,
CheckpointCoordinatorConfiguration jobCheckpointingConfiguration,
- ExternalizedCheckpointSettings externalizedSettings) {
+ CheckpointRetentionPolicy retentionPolicy) {
this.graph = graph;
this.jobCheckpointingConfiguration = jobCheckpointingConfiguration;
- this.externalizedSettings = externalizedSettings;
+ this.retentionPolicy = retentionPolicy;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
index 7cc8efb..ef65e43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
@@ -167,7 +168,8 @@ public class CheckpointStatsDetailsHandlerTest {
PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
when(checkpoint.getCheckpointId()).thenReturn(1992139L);
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
- when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+ when(checkpoint.getProperties()).thenReturn(
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
when(checkpoint.getTriggerTimestamp()).thenReturn(1919191900L);
when(checkpoint.getLatestAckTimestamp()).thenReturn(1977791901L);
when(checkpoint.getStateSize()).thenReturn(111939272822L);
@@ -234,7 +236,7 @@ public class CheckpointStatsDetailsHandlerTest {
CompletedCheckpointStats checkpoint = mock(CompletedCheckpointStats.class);
when(checkpoint.getCheckpointId()).thenReturn(1818213L);
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
- when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+ when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forSavepoint());
when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L);
when(checkpoint.getStateSize()).thenReturn(925281L);
@@ -275,7 +277,7 @@ public class CheckpointStatsDetailsHandlerTest {
FailedCheckpointStats checkpoint = mock(FailedCheckpointStats.class);
when(checkpoint.getCheckpointId()).thenReturn(1818214L);
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
- when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+ when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forSavepoint());
when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L);
when(checkpoint.getStateSize()).thenReturn(925281L);
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java
index cc90d7f..1d5768b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -173,7 +174,7 @@ public class CheckpointStatsHandlerTest {
RestoredCheckpointStats latestRestored = mock(RestoredCheckpointStats.class);
when(latestRestored.getCheckpointId()).thenReturn(1199L);
when(latestRestored.getRestoreTimestamp()).thenReturn(434242L);
- when(latestRestored.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+ when(latestRestored.getProperties()).thenReturn(CheckpointProperties.forSavepoint());
when(latestRestored.getExternalPath()).thenReturn("restored savepoint path");
// History
@@ -183,7 +184,8 @@ public class CheckpointStatsHandlerTest {
PendingCheckpointStats inProgress = mock(PendingCheckpointStats.class);
when(inProgress.getCheckpointId()).thenReturn(1992141L);
when(inProgress.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
- when(inProgress.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+ when(inProgress.getProperties()).thenReturn(
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
when(inProgress.getTriggerTimestamp()).thenReturn(1919191900L);
when(inProgress.getLatestAckTimestamp()).thenReturn(1977791901L);
when(inProgress.getStateSize()).thenReturn(111939272822L);
@@ -195,7 +197,7 @@ public class CheckpointStatsHandlerTest {
CompletedCheckpointStats completedSavepoint = mock(CompletedCheckpointStats.class);
when(completedSavepoint.getCheckpointId()).thenReturn(1322139L);
when(completedSavepoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
- when(completedSavepoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+ when(completedSavepoint.getProperties()).thenReturn(CheckpointProperties.forSavepoint());
when(completedSavepoint.getTriggerTimestamp()).thenReturn(191900L);
when(completedSavepoint.getLatestAckTimestamp()).thenReturn(197791901L);
when(completedSavepoint.getStateSize()).thenReturn(1119822L);
@@ -209,7 +211,8 @@ public class CheckpointStatsHandlerTest {
FailedCheckpointStats failed = mock(FailedCheckpointStats.class);
when(failed.getCheckpointId()).thenReturn(110719L);
when(failed.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
- when(failed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+ when(failed.getProperties()).thenReturn(
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
when(failed.getTriggerTimestamp()).thenReturn(191900L);
when(failed.getLatestAckTimestamp()).thenReturn(197791901L);
when(failed.getStateSize()).thenReturn(1119822L);
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/state/EmptyStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/EmptyStreamStateHandle.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/EmptyStreamStateHandle.java
new file mode 100644
index 0000000..4b42959
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/EmptyStreamStateHandle.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import java.io.IOException;
+
+/**
+ * A simple dummy implementation of a stream state handle that can be passed in tests.
+ * The handle cannot open an input stream.
+ */
+public class EmptyStreamStateHandle implements StreamStateHandle {
+
+ private static final long serialVersionUID = 0L;
+
+ @Override
+ public FSDataInputStream openInputStream() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void discardState() {}
+
+ @Override
+ public long getStateSize() {
+ return 0;
+ }
+}