You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/31 08:43:01 UTC

[flink] 07/07: [FLINK-13440] Add test for FLINK-12858

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 04966bbcf3ef1be59ebab76d0d27e43b77e856f7
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Jul 26 14:53:31 2019 +0200

    [FLINK-13440] Add test for FLINK-12858
---
 .../checkpoint/CheckpointFailureManager.java       |   2 +-
 .../checkpoint/CheckpointCoordinatorTest.java      | 197 ++++++++++-----------
 2 files changed, 93 insertions(+), 106 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index f65a30a..8d12bef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -127,7 +127,7 @@ public class CheckpointFailureManager {
 	/**
 	 * Fails the whole job graph in case an in-progress synchronous savepoint is discarded.
 	 *
-	 * <p>If the checkpoint failure was cancelled at the checkpoint coordinator, i.e. before
+	 * <p>If the checkpoint was cancelled at the checkpoint coordinator, i.e. before
 	 * the synchronous savepoint barrier was sent to the tasks, then we do not cancel the job
 	 * as we do not risk having a deadlock.
 	 *
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index a98102e..b11bf62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializableObject;
@@ -89,6 +90,7 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -330,27 +332,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		});
 
 		// set up the coordinator
-		CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
-			600000,
-			600000,
-			0,
-			Integer.MAX_VALUE,
-			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-			true,
-			false,
-			0);
-		CheckpointCoordinator coord = new CheckpointCoordinator(
-			jid,
-			chkConfig,
-			new ExecutionVertex[] { vertex1, vertex2 },
-			new ExecutionVertex[] { vertex1, vertex2 },
-			new ExecutionVertex[] { vertex1, vertex2 },
-			new StandaloneCheckpointIDCounter(),
-			new StandaloneCompletedCheckpointStore(1),
-			new MemoryStateBackend(),
-			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY,
-			checkpointFailureManager);
+		CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, checkpointFailureManager);
 
 		try {
 			// trigger the checkpoint. this should succeed
@@ -401,27 +383,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
-				600000,
-				600000,
-				0,
-				Integer.MAX_VALUE,
-				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-				true,
-				false,
-				0);
-			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				chkConfig,
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1),
-				new MemoryStateBackend(),
-				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY,
-				failureManager);
+			CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, failureManager);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -510,27 +472,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
-				600000,
-				600000,
-				0,
-				Integer.MAX_VALUE,
-				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-				true,
-				false,
-				0);
-			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				chkConfig,
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1),
-				new MemoryStateBackend(),
-				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY,
-				failureManager);
+			CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, failureManager);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -636,27 +578,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
 			// set up the coordinator and validate the initial state
-			CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
-				600000,
-				600000,
-				0,
-				Integer.MAX_VALUE,
-				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-				true,
-				false,
-				0);
-			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				chkConfig,
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new ExecutionVertex[] { vertex1, vertex2 },
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1),
-				new MemoryStateBackend(),
-				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY,
-				failureManager);
+			CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, failureManager);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1585,27 +1507,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
 		// set up the coordinator and validate the initial state
-		CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
-			600000,
-			600000,
-			0,
-			Integer.MAX_VALUE,
-			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-			true,
-			false,
-			0);
-		CheckpointCoordinator coord = new CheckpointCoordinator(
-			jid,
-			chkConfig,
-			new ExecutionVertex[] { vertex1, vertex2 },
-			new ExecutionVertex[] { vertex1, vertex2 },
-			new ExecutionVertex[] { vertex1, vertex2 },
-			new StandaloneCheckpointIDCounter(),
-			new StandaloneCompletedCheckpointStore(1),
-			new MemoryStateBackend(),
-			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY,
-			failureManager);
+		CheckpointCoordinator coord = getCheckpointCoordinator(jid, vertex1, vertex2, failureManager);
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -3994,6 +3896,91 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void jobFailsIfInFlightSynchronousSavepointIsDiscarded() throws Exception {
+		final Tuple2<Integer, Throwable> invocationCounterAndException = Tuple2.of(0, null);
+		final Throwable expectedRootCause = new IOException("Custom-Exception");
+
+		final JobID jobId = new JobID();
+
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+		final ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+		final ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+		// set up the coordinator and validate the initial state
+		final CheckpointCoordinator coordinator = getCheckpointCoordinator(jobId, vertex1, vertex2,
+				new CheckpointFailureManager(0, throwable -> {
+					invocationCounterAndException.f0 += 1;
+					invocationCounterAndException.f1 = throwable;
+				}));
+
+		final CompletableFuture<CompletedCheckpoint> savepointFuture = coordinator
+				.triggerSynchronousSavepoint(10L, false, "test-dir");
+
+		final PendingCheckpoint syncSavepoint = declineSynchronousSavepoint(jobId, coordinator, attemptID1, expectedRootCause);
+
+		assertTrue(syncSavepoint.isDiscarded());
+
+		try {
+			savepointFuture.get();
+			fail("Expected Exception not found.");
+		} catch (ExecutionException e) {
+			final Throwable cause = ExceptionUtils.stripExecutionException(e);
+			assertTrue(cause instanceof CheckpointException);
+			assertEquals(expectedRootCause.getMessage(), cause.getCause().getMessage());
+		}
+
+		assertEquals(1L, invocationCounterAndException.f0.intValue());
+		assertTrue(
+				invocationCounterAndException.f1 instanceof CheckpointException &&
+				invocationCounterAndException.f1.getCause().getMessage().equals(expectedRootCause.getMessage()));
+
+		coordinator.shutdown(JobStatus.FAILING);
+	}
+
+	private CheckpointCoordinator getCheckpointCoordinator(
+			final JobID jobId,
+			final ExecutionVertex vertex1,
+			final ExecutionVertex vertex2,
+			final CheckpointFailureManager failureManager) {
+
+		final CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+				true,
+				false,
+				0);
+
+		return new CheckpointCoordinator(
+				jobId,
+				chkConfig,
+				new ExecutionVertex[]{vertex1, vertex2},
+				new ExecutionVertex[]{vertex1, vertex2},
+				new ExecutionVertex[]{vertex1, vertex2},
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				new MemoryStateBackend(),
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY,
+				failureManager);
+	}
+
+	private PendingCheckpoint declineSynchronousSavepoint(
+			final JobID jobId,
+			final CheckpointCoordinator coordinator,
+			final ExecutionAttemptID attemptID,
+			final Throwable reason) {
+
+		final long checkpointId = coordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
+		final PendingCheckpoint checkpoint = coordinator.getPendingCheckpoints().get(checkpointId);
+		coordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID, checkpointId, reason), TASK_MANAGER_LOCATION_INFO);
+		return checkpoint;
+	}
+
 	private void performIncrementalCheckpoint(
 		JobID jid,
 		CheckpointCoordinator coord,