You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/31 08:43:01 UTC
[flink] 07/07: [FLINK-13440] Add test for FLINK-12858
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 04966bbcf3ef1be59ebab76d0d27e43b77e856f7
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Jul 26 14:53:31 2019 +0200
[FLINK-13440] Add test for FLINK-12858
---
.../checkpoint/CheckpointFailureManager.java | 2 +-
.../checkpoint/CheckpointCoordinatorTest.java | 197 ++++++++++-----------
2 files changed, 93 insertions(+), 106 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index f65a30a..8d12bef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -127,7 +127,7 @@ public class CheckpointFailureManager {
/**
* Fails the whole job graph in case an in-progress synchronous savepoint is discarded.
*
- * <p>If the checkpoint failure was cancelled at the checkpoint coordinator, i.e. before
+ * <p>If the checkpoint was cancelled at the checkpoint coordinator, i.e. before
* the synchronous savepoint barrier was sent to the tasks, then we do not cancel the job
* as we do not risk having a deadlock.
*
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index a98102e..b11bf62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializableObject;
@@ -89,6 +90,7 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -330,27 +332,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
});
// set up the coordinator
- CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
- 600000,
- 600000,
- 0,
- Integer.MAX_VALUE,
- CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
- true,
- false,
- 0);
- CheckpointCoordinator coord = new CheckpointCoordinator(
- jid,
- chkConfig,
- new ExecutionVertex[] { vertex1, vertex2 },
- new ExecutionVertex[] { vertex1, vertex2 },
- new ExecutionVertex[] { vertex1, vertex2 },
- new StandaloneCheckpointIDCounter(),
- new StandaloneCompletedCheckpointStore(1),
- new MemoryStateBackend(),
- Executors.directExecutor(),
- SharedStateRegistry.DEFAULT_FACTORY,
- checkpointFailureManager);
+ CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, checkpointFailureManager);
try {
// trigger the checkpoint. this should succeed
@@ -401,27 +383,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
// set up the coordinator and validate the initial state
- CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
- 600000,
- 600000,
- 0,
- Integer.MAX_VALUE,
- CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
- true,
- false,
- 0);
- CheckpointCoordinator coord = new CheckpointCoordinator(
- jid,
- chkConfig,
- new ExecutionVertex[] { vertex1, vertex2 },
- new ExecutionVertex[] { vertex1, vertex2 },
- new ExecutionVertex[] { vertex1, vertex2 },
- new StandaloneCheckpointIDCounter(),
- new StandaloneCompletedCheckpointStore(1),
- new MemoryStateBackend(),
- Executors.directExecutor(),
- SharedStateRegistry.DEFAULT_FACTORY,
- failureManager);
+ CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, failureManager);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -510,27 +472,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
// set up the coordinator and validate the initial state
- CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
- 600000,
- 600000,
- 0,
- Integer.MAX_VALUE,
- CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
- true,
- false,
- 0);
- CheckpointCoordinator coord = new CheckpointCoordinator(
- jid,
- chkConfig,
- new ExecutionVertex[] { vertex1, vertex2 },
- new ExecutionVertex[] { vertex1, vertex2 },
- new ExecutionVertex[] { vertex1, vertex2 },
- new StandaloneCheckpointIDCounter(),
- new StandaloneCompletedCheckpointStore(1),
- new MemoryStateBackend(),
- Executors.directExecutor(),
- SharedStateRegistry.DEFAULT_FACTORY,
- failureManager);
+ CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, failureManager);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -636,27 +578,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
// set up the coordinator and validate the initial state
- CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
- 600000,
- 600000,
- 0,
- Integer.MAX_VALUE,
- CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
- true,
- false,
- 0);
- CheckpointCoordinator coord = new CheckpointCoordinator(
- jid,
- chkConfig,
- new ExecutionVertex[] { vertex1, vertex2 },
- new ExecutionVertex[] { vertex1, vertex2 },
- new ExecutionVertex[] { vertex1, vertex2 },
- new StandaloneCheckpointIDCounter(),
- new StandaloneCompletedCheckpointStore(1),
- new MemoryStateBackend(),
- Executors.directExecutor(),
- SharedStateRegistry.DEFAULT_FACTORY,
- failureManager);
+ CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, failureManager);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1585,27 +1507,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
// set up the coordinator and validate the initial state
- CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
- 600000,
- 600000,
- 0,
- Integer.MAX_VALUE,
- CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
- true,
- false,
- 0);
- CheckpointCoordinator coord = new CheckpointCoordinator(
- jid,
- chkConfig,
- new ExecutionVertex[] { vertex1, vertex2 },
- new ExecutionVertex[] { vertex1, vertex2 },
- new ExecutionVertex[] { vertex1, vertex2 },
- new StandaloneCheckpointIDCounter(),
- new StandaloneCompletedCheckpointStore(1),
- new MemoryStateBackend(),
- Executors.directExecutor(),
- SharedStateRegistry.DEFAULT_FACTORY,
- failureManager);
+ CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, failureManager);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -3994,6 +3896,91 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
}
+ @Test
+ public void jobFailsIfInFlightSynchronousSavepointIsDiscarded() throws Exception {
+ final Tuple2<Integer, Throwable> invocationCounterAndException = Tuple2.of(0, null);
+ final Throwable expectedRootCause = new IOException("Custom-Exception");
+
+ final JobID jobId = new JobID();
+
+ final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+ final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+ final ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+ final ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+ // set up the coordinator and validate the initial state
+ final CheckpointCoordinator coordinator = getCheckpointCoordinator(jobId, vertex1, vertex2,
+ new CheckpointFailureManager(0, throwable -> {
+ invocationCounterAndException.f0 += 1;
+ invocationCounterAndException.f1 = throwable;
+ }));
+
+ final CompletableFuture<CompletedCheckpoint> savepointFuture = coordinator
+ .triggerSynchronousSavepoint(10L, false, "test-dir");
+
+ final PendingCheckpoint syncSavepoint = declineSynchronousSavepoint(jobId, coordinator, attemptID1, expectedRootCause);
+
+ assertTrue(syncSavepoint.isDiscarded());
+
+ try {
+ savepointFuture.get();
+ fail("Expected Exception not found.");
+ } catch (ExecutionException e) {
+ final Throwable cause = ExceptionUtils.stripExecutionException(e);
+ assertTrue(cause instanceof CheckpointException);
+ assertEquals(expectedRootCause.getMessage(), cause.getCause().getMessage());
+ }
+
+ assertEquals(1L, invocationCounterAndException.f0.intValue());
+ assertTrue(
+ invocationCounterAndException.f1 instanceof CheckpointException &&
+ invocationCounterAndException.f1.getCause().getMessage().equals(expectedRootCause.getMessage()));
+
+ coordinator.shutdown(JobStatus.FAILING);
+ }
+
+ private CheckpointCoordinator getCheckpointCoordinator(
+ final JobID jobId,
+ final ExecutionVertex vertex1,
+ final ExecutionVertex vertex2,
+ final CheckpointFailureManager failureManager) {
+
+ final CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
+ 600000,
+ 600000,
+ 0,
+ Integer.MAX_VALUE,
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+ true,
+ false,
+ 0);
+
+ return new CheckpointCoordinator(
+ jobId,
+ chkConfig,
+ new ExecutionVertex[]{vertex1, vertex2},
+ new ExecutionVertex[]{vertex1, vertex2},
+ new ExecutionVertex[]{vertex1, vertex2},
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1),
+ new MemoryStateBackend(),
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY,
+ failureManager);
+ }
+
+ private PendingCheckpoint declineSynchronousSavepoint(
+ final JobID jobId,
+ final CheckpointCoordinator coordinator,
+ final ExecutionAttemptID attemptID,
+ final Throwable reason) {
+
+ final long checkpointId = coordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+ final PendingCheckpoint checkpoint = coordinator.getPendingCheckpoints().get(checkpointId);
+ coordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID, checkpointId, reason), TASK_MANAGER_LOCATION_INFO);
+ return checkpoint;
+ }
+
private void performIncrementalCheckpoint(
JobID jid,
CheckpointCoordinator coord,