You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/12/12 05:51:53 UTC

[GitHub] ramkrish86 closed pull request #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints

ramkrish86 closed pull request #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints
URL: https://github.com/apache/flink/pull/3334
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0592e3d9aea..9f453d0f2c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -132,6 +132,8 @@
 
 	/** The maximum number of checkpoints that may be in progress at the same time */
 	private final int maxConcurrentCheckpointAttempts;
+	/** The maximum number of unsuccessful checkpoints */
+	private final int maxFailedCheckpoints;
 
 	/** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */
 	private final Timer timer;
@@ -142,6 +144,9 @@
 	/** The number of consecutive failed trigger attempts */
 	private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
 
+	/** The number of consecutive failed checkpoints */
+	private final AtomicInteger numFailedCheckpoints = new AtomicInteger(0);
+
 	private ScheduledTrigger currentPeriodicTrigger;
 
 	/** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed */
@@ -163,6 +168,23 @@
 	private CheckpointStatsTracker statsTracker;
 
 	// --------------------------------------------------------------------------------------------
+	public CheckpointCoordinator(
+		JobID job,
+		long baseInterval,
+		long checkpointTimeout,
+		long minPauseBetweenCheckpoints,
+		int maxConcurrentCheckpointAttempts,
+		ExternalizedCheckpointSettings externalizeSettings,
+		ExecutionVertex[] tasksToTrigger,
+		ExecutionVertex[] tasksToWaitFor,
+		ExecutionVertex[] tasksToCommitTo,
+		CheckpointIDCounter checkpointIDCounter,
+		CompletedCheckpointStore completedCheckpointStore,
+		String checkpointDirectory,
+		Executor executor) {
+		this(job, baseInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpointAttempts, 0, externalizeSettings, tasksToTrigger, tasksToWaitFor, tasksToCommitTo,
+			checkpointIDCounter, completedCheckpointStore, checkpointDirectory, executor);
+	}
 
 	public CheckpointCoordinator(
 			JobID job,
@@ -170,6 +192,7 @@ public CheckpointCoordinator(
 			long checkpointTimeout,
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpointAttempts,
+			int maxFailedCheckpoints,
 			ExternalizedCheckpointSettings externalizeSettings,
 			ExecutionVertex[] tasksToTrigger,
 			ExecutionVertex[] tasksToWaitFor,
@@ -184,6 +207,7 @@ public CheckpointCoordinator(
 		checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero");
 		checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
 		checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1");
+		checkArgument(maxFailedCheckpoints >= 0, "maxFailedCheckpoints must be >= 0");
 
 		if (externalizeSettings.externalizeCheckpoints() && checkpointDirectory == null) {
 			throw new IllegalStateException("CheckpointConfig says to persist periodic " +
@@ -207,6 +231,7 @@ public CheckpointCoordinator(
 		this.checkpointTimeout = checkpointTimeout;
 		this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1_000_000;
 		this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts;
+		this.maxFailedCheckpoints = maxFailedCheckpoints;
 		this.tasksToTrigger = checkNotNull(tasksToTrigger);
 		this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
 		this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
@@ -461,6 +486,9 @@ CheckpointTriggerResult triggerCheckpoint(
 			catch (Throwable t) {
 				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
 				LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
+				if(numUnsuccessful > maxFailedCheckpoints) {
+					return failExecution(tasksToCommitTo);
+				}
 				return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
 			}
 
@@ -577,18 +605,35 @@ else if (!props.forceCheckpoint()) {
 				if (!checkpoint.isDiscarded()) {
 					checkpoint.abortError(new Exception("Failed to trigger checkpoint"));
 				}
+				if(numUnsuccessful > maxFailedCheckpoints) {
+					return failExecution(tasksToCommitTo);
+				}
 				return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
 			}
 
 		} // end trigger lock
 	}
 
+	private CheckpointTriggerResult failExecution(ExecutionVertex[] executions) {
+		if (currentPeriodicTrigger != null) {
+			currentPeriodicTrigger.cancel();
+			currentPeriodicTrigger = null;
+		}
+		for (ExecutionVertex executionVertex : executions) {
+			// fail the graph
+			if(executionVertex != null) {
+				executionVertex.getExecutionGraph().fail(new Throwable("The number of max unsuccessful checkpoints attempts exhausted"));
+			}
+		}
+		return new CheckpointTriggerResult(CheckpointDeclineReason.MAX_FAILED_ATTEMPTS_EXHAUSTED);
+	}
+
 	/**
 	 * Receives a {@link DeclineCheckpoint} message for a pending checkpoint.
 	 *
 	 * @param message Checkpoint decline from the task manager
 	 */
-	public void receiveDeclineMessage(DeclineCheckpoint message) {
+	public void receiveDeclineMessage(DeclineCheckpoint message) throws CheckpointException {
 		if (shutdown || message == null) {
 			return;
 		}
@@ -618,7 +663,6 @@ public void receiveDeclineMessage(DeclineCheckpoint message) {
 				pendingCheckpoints.remove(checkpointId);
 				checkpoint.abortDeclined();
 				rememberRecentCheckpointId(checkpointId);
-
 				// we don't have to schedule another "dissolving" checkpoint any more because the
 				// cancellation barriers take care of breaking downstream alignments
 				// we only need to make sure that suspended queued requests are resumed
@@ -632,11 +676,24 @@ public void receiveDeclineMessage(DeclineCheckpoint message) {
 				}
 
 				if (!haveMoreRecentPending) {
-					triggerQueuedRequests();
+					int numFailed = this.numFailedCheckpoints.incrementAndGet();
+					if(numFailed > maxFailedCheckpoints) {
+						failExecution(tasksToCommitTo);
+						throw new CheckpointException(
+							"Max failed checkpoints attempts exhausted");
+					} else {
+						triggerQueuedRequests();
+					}
 				}
 			}
 			else if (checkpoint != null) {
 				// this should not happen
+				int numFailed = this.numFailedCheckpoints.incrementAndGet();
+				if(numFailed > maxFailedCheckpoints) {
+					failExecution(tasksToCommitTo);
+					throw new CheckpointException(
+						"Max failed checkpoints attempts exhausted");
+				}
 				throw new IllegalStateException(
 						"Received message for discarded but non-removed checkpoint " + checkpointId);
 			}
@@ -795,7 +852,11 @@ public void run() {
 					}
 				});
 			}
-
+			int numFailed = this.numFailedCheckpoints.incrementAndGet();
+			if(numFailed > maxFailedCheckpoints) {
+				failExecution(tasksToCommitTo);
+				throw new CheckpointException("Max failed checkpoints attempts exhausted "+ '.', exception);
+			}
 			throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);
 		} finally {
 			pendingCheckpoints.remove(checkpointId);
@@ -832,6 +893,8 @@ public void run() {
 				ee.notifyCheckpointComplete(checkpointId, timestamp);
 			}
 		}
+		// reset it - we are sure that we got a successful checkpoint
+		this.numFailedCheckpoints.set(0);
 	}
 
 	private void rememberRecentCheckpointId(long id) {
@@ -1027,6 +1090,7 @@ public void stopCheckpointScheduler() {
 
 			pendingCheckpoints.clear();
 			numUnsuccessfulCheckpointsTriggers.set(0);
+			numFailedCheckpoints.set(0);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
index 60fe657c851..34f12d1e72e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
@@ -36,7 +36,9 @@
 
 	NOT_ALL_REQUIRED_TASKS_RUNNING("Not all required tasks are currently running."),
 
-	EXCEPTION("An Exception occurred while triggering the checkpoint.");
+	EXCEPTION("An Exception occurred while triggering the checkpoint."),
+
+	MAX_FAILED_ATTEMPTS_EXHAUSTED("The number of max failed checkpoints attempts exhausted.");
 
 	// ------------------------------------------------------------------------
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 6c9dbaf79c7..b0eb2aa9549 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -61,7 +61,7 @@
  */
 public class PendingCheckpoint {
 
-	private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
+	private static final Logger LOG = LoggerFactory.getLogger(PendingCheckpoint.class);
 
 	private final Object lock = new Object();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 5fa40fced37..fa29398a48a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -358,6 +358,7 @@ public void enableCheckpointing(
 			long checkpointTimeout,
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpoints,
+			int maxFailedCheckpoints,
 			ExternalizedCheckpointSettings externalizeSettings,
 			List<ExecutionJobVertex> verticesToTrigger,
 			List<ExecutionJobVertex> verticesToWaitFor,
@@ -396,6 +397,7 @@ public void enableCheckpointing(
 			checkpointTimeout,
 			minPauseBetweenCheckpoints,
 			maxConcurrentCheckpoints,
+			maxFailedCheckpoints,
 			externalizeSettings,
 			tasksToTrigger,
 			tasksToWaitFor,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index ec7103c3c2c..a8954b45f86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -224,6 +224,7 @@ public static ExecutionGraph buildGraph(
 					snapshotSettings.getCheckpointTimeout(),
 					snapshotSettings.getMinPauseBetweenCheckpoints(),
 					snapshotSettings.getMaxConcurrentCheckpoints(),
+					snapshotSettings.getMaxFailedCheckpoints(),
 					snapshotSettings.getExternalizedCheckpointSettings(),
 					triggerVertices,
 					ackVertices,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
index 233aa8887de..214acd78e30 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
@@ -65,6 +65,23 @@
 	 */
 	private final boolean isExactlyOnce;
 
+	private final int maxFailedCheckpoints;
+
+	public JobSnapshottingSettings(
+		List<JobVertexID> verticesToTrigger,
+		List<JobVertexID> verticesToAcknowledge,
+		List<JobVertexID> verticesToConfirm,
+		long checkpointInterval,
+		long checkpointTimeout,
+		long minPauseBetweenCheckpoints,
+		int maxConcurrentCheckpoints,
+		ExternalizedCheckpointSettings externalizedCheckpointSettings,
+		@Nullable StateBackend defaultStateBackend,
+		boolean isExactlyOnce) {
+		this(verticesToTrigger, verticesToAcknowledge, verticesToConfirm, checkpointInterval, checkpointTimeout,
+			minPauseBetweenCheckpoints, maxConcurrentCheckpoints, 0, externalizedCheckpointSettings, defaultStateBackend, isExactlyOnce);
+	}
+
 	public JobSnapshottingSettings(
 			List<JobVertexID> verticesToTrigger,
 			List<JobVertexID> verticesToAcknowledge,
@@ -73,6 +90,7 @@ public JobSnapshottingSettings(
 			long checkpointTimeout,
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpoints,
+			int maxFailedCheckpoints,
 			ExternalizedCheckpointSettings externalizedCheckpointSettings,
 			@Nullable StateBackend defaultStateBackend,
 			boolean isExactlyOnce) {
@@ -93,6 +111,7 @@ public JobSnapshottingSettings(
 		this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings);
 		this.defaultStateBackend = defaultStateBackend;
 		this.isExactlyOnce = isExactlyOnce;
+		this.maxFailedCheckpoints = maxFailedCheckpoints;
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -138,14 +157,16 @@ public boolean isExactlyOnce() {
 		return isExactlyOnce;
 	}
 
+	public int getMaxFailedCheckpoints() { return maxFailedCheckpoints; }
+
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
 	public String toString() {
 		return String.format("SnapshotSettings: interval=%d, timeout=%d, pause-between=%d, " +
-						"maxConcurrent=%d, trigger=%s, ack=%s, commit=%s",
+						"maxConcurrent=%d, trigger=%s, ack=%s, commit=%s, maxFailedCheckpoints=%s",
 						checkpointInterval, checkpointTimeout,
 						minPauseBetweenCheckpoints, maxConcurrentCheckpoints,
-						verticesToTrigger, verticesToAcknowledge, verticesToConfirm);
+						verticesToTrigger, verticesToAcknowledge, verticesToConfirm, maxFailedCheckpoints);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 16913705efe..0f2268fb632 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -27,10 +27,7 @@
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.*;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
@@ -274,12 +271,14 @@ public void testTriggerAndDeclineCheckpointSimple() {
 			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
 			// set up the coordinator and validate the initial state
+			// set max fail attempts as 3
 			CheckpointCoordinator coord = new CheckpointCoordinator(
 				jid,
 				600000,
 				600000,
 				0,
 				Integer.MAX_VALUE,
+				2,
 				ExternalizedCheckpointSettings.none(),
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
@@ -354,6 +353,132 @@ public void testTriggerAndDeclineCheckpointSimple() {
 		}
 	}
 
+	/**
+	 * This test triggers a checkpoint and then sends a decline checkpoint message from
+	 * one of the tasks. We receive multiple decline messages and as a result
+	 * we fail the executiongraph by throwing a CheckpointException
+	 */
+	@Test
+	public void testTriggerAndMulitpleDeclineCheckpointFailure() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp = System.currentTimeMillis();
+
+			// create some mock Execution vertices that receive the checkpoint trigger messages
+			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+			ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+			// set up the coordinator and validate the initial state
+			// set max fail attempts as 3
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				2,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				Executors.directExecutor());
+
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+			// validate that we have a pending checkpoint
+			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+
+			assertNotNull(checkpoint);
+			assertEquals(checkpointId, checkpoint.getCheckpointId());
+			assertEquals(timestamp, checkpoint.getCheckpointTimestamp());
+			assertEquals(jid, checkpoint.getJobId());
+			assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
+			assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
+			assertEquals(0, checkpoint.getTaskStates().size());
+			assertFalse(checkpoint.isDiscarded());
+			assertFalse(checkpoint.isFullyAcknowledged());
+
+			// check that the vertices received the trigger checkpoint message
+			verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
+			verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
+
+			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+
+			// acknowledge from one of the tasks
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
+			assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
+			assertFalse(checkpoint.isDiscarded());
+			assertFalse(checkpoint.isFullyAcknowledged());
+
+			// acknowledge the same task again (should not matter)
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			assertFalse(checkpoint.isDiscarded());
+			assertFalse(checkpoint.isFullyAcknowledged());
+
+
+			// decline checkpoint from the other task, this should cancel the checkpoint
+			// and trigger a new one
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
+			assertTrue(checkpoint.isDiscarded());
+
+			// validate that we have no new pending checkpoint
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+			// validate that we have a pending checkpoint
+			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+
+			// decline checkpoint from the other task. Still the task should not fail
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
+			assertTrue(checkpoint.isDiscarded());
+
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+			// validate that we have a pending checkpoint
+			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+
+			// decline checkpoint from the other task. This should fail as already we have failed
+			// 2 times
+			try {
+				coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
+				fail("Should have thrown checkpointException");
+			} catch(CheckpointException e) {
+				// exception should be generated
+			}
+			coord.shutdown(JobStatus.FINISHED);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
 	/**
 	 * This test triggers two checkpoints and then sends a decline message from one of the tasks
 	 * for the first checkpoint. This should discard the first checkpoint while not triggering
@@ -476,6 +601,164 @@ public void testTriggerAndDeclineCheckpointComplex() {
 		}
 	}
 
+	/**
+	 * Ensures that though we get multiple decline messages the first one is
+	 * not considered as a failure because already there are checkpoints in
+	 * queue. From the second one onwards we count each decline message as a
+	 * failure and finally fail the execution graph by throwing Checkpoint exception
+	 */
+	@Test
+	public void testTriggerAndDeclineCheckpointComplexWithMultipleDeclineMessage() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp = System.currentTimeMillis();
+
+			// create some mock Execution vertices that receive the checkpoint trigger messages
+			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+			ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+			// set up the coordinator and validate the initial state
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				2,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[]{vertex1, vertex2},
+				new ExecutionVertex[]{vertex1, vertex2},
+				new ExecutionVertex[]{vertex1, vertex2},
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				Executors.directExecutor());
+
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+			// trigger second checkpoint, should also succeed
+			assertTrue(coord.triggerCheckpoint(timestamp + 2, false));
+
+			// validate that we have a pending checkpoint
+			assertEquals(2, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			Iterator<Map.Entry<Long, PendingCheckpoint>> it = coord.getPendingCheckpoints().entrySet().iterator();
+			long checkpoint1Id = it.next().getKey();
+			long checkpoint2Id = it.next().getKey();
+			PendingCheckpoint checkpoint1 = coord.getPendingCheckpoints().get(checkpoint1Id);
+			PendingCheckpoint checkpoint2 = coord.getPendingCheckpoints().get(checkpoint2Id);
+
+			assertNotNull(checkpoint1);
+			assertEquals(checkpoint1Id, checkpoint1.getCheckpointId());
+			assertEquals(timestamp, checkpoint1.getCheckpointTimestamp());
+			assertEquals(jid, checkpoint1.getJobId());
+			assertEquals(2, checkpoint1.getNumberOfNonAcknowledgedTasks());
+			assertEquals(0, checkpoint1.getNumberOfAcknowledgedTasks());
+			assertEquals(0, checkpoint1.getTaskStates().size());
+			assertFalse(checkpoint1.isDiscarded());
+			assertFalse(checkpoint1.isFullyAcknowledged());
+
+			assertNotNull(checkpoint2);
+			assertEquals(checkpoint2Id, checkpoint2.getCheckpointId());
+			assertEquals(timestamp + 2, checkpoint2.getCheckpointTimestamp());
+			assertEquals(jid, checkpoint2.getJobId());
+			assertEquals(2, checkpoint2.getNumberOfNonAcknowledgedTasks());
+			assertEquals(0, checkpoint2.getNumberOfAcknowledgedTasks());
+			assertEquals(0, checkpoint2.getTaskStates().size());
+			assertFalse(checkpoint2.isDiscarded());
+			assertFalse(checkpoint2.isFullyAcknowledged());
+
+			// check that the vertices received the trigger checkpoint message
+			{
+				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), any(CheckpointOptions.class));
+				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), any(CheckpointOptions.class));
+			}
+
+			// check that the vertices received the trigger checkpoint message for the second checkpoint
+			{
+				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), any(CheckpointOptions.class));
+				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), any(CheckpointOptions.class));
+			}
+
+			// decline checkpoint from one of the tasks, this should cancel the checkpoint
+			// and trigger a new one
+			// decline once. But this should not count because we already have another checkpoint
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
+			assertTrue(checkpoint1.isDiscarded());
+
+			// validate that we have only one pending checkpoint left
+			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// validate that it is the same second checkpoint from earlier
+			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew);
+			assertEquals(checkpoint2Id, checkpointIdNew);
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp + 4, false));
+
+			// validate that we have a pending checkpoint
+			assertEquals(2, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			checkpoint2Id = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpoint2Id);
+
+			// decline checkpoint from the other task. Still the task should not fail
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint2Id));
+			assertTrue(checkpoint.isDiscarded());
+
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp + 6, false));
+
+			// validate that we have a pending checkpoint
+			assertEquals(2, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			checkpoint2Id = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			checkpoint = coord.getPendingCheckpoints().get(checkpoint2Id);
+
+			// decline checkpoint from the other task. This should not fail still because
+			// the first one was not accounted for.
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint2Id));
+			assertTrue(checkpoint.isDiscarded());
+
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp + 8, false));
+
+			// validate that we have a pending checkpoint
+			assertEquals(2, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			checkpoint2Id = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			checkpoint = coord.getPendingCheckpoints().get(checkpoint2Id);
+
+			// decline checkpoint from the other task. This should not fail still because
+			// the first one was not accounted for.
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint2Id));
+			assertTrue(checkpoint.isDiscarded());
+			checkpoint2Id = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			try {
+				coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint2Id));
+				fail("Should have thrown checkpointException");
+			} catch (CheckpointException e) {
+				// exception should be generated
+			}
+
+			coord.shutdown(JobStatus.FINISHED);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
 	@Test
 	public void testTriggerAndConfirmSimpleCheckpoint() {
 		try {
@@ -1026,6 +1309,7 @@ public void testStateCleanupForLateOrUnknownMessages() throws Exception {
 			20000L,
 			0L,
 			1,
+			2,
 			ExternalizedCheckpointSettings.none(),
 			new ExecutionVertex[] { triggerVertex },
 			new ExecutionVertex[] {triggerVertex, ackVertex1, ackVertex2},
@@ -2632,7 +2916,7 @@ private static ExecutionVertex mockExecutionVertex(
 		ExecutionState ... successiveStates) {
 
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
-
+		ExecutionGraph graph = mock(ExecutionGraph.class);
 		final Execution exec = spy(new Execution(
 			mock(Executor.class),
 			vertex,
@@ -2642,11 +2926,11 @@ private static ExecutionVertex mockExecutionVertex(
 		));
 		when(exec.getAttemptId()).thenReturn(attemptID);
 		when(exec.getState()).thenReturn(state, successiveStates);
-
 		when(vertex.getJobvertexId()).thenReturn(jobVertexID);
 		when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
 		when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(parallelism);
 		when(vertex.getMaxParallelism()).thenReturn(maxParallelism);
+		when(vertex.getExecutionGraph()).thenReturn(graph);
 
 		return vertex;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 98b4c4deeeb..f3b2de6138d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -113,6 +113,7 @@ private ExecutionGraph createExecutionGraphAndEnableCheckpointing(
 				100,
 				100,
 				1,
+				0,
 				ExternalizedCheckpointSettings.none(),
 				Collections.<ExecutionJobVertex>emptyList(),
 				Collections.<ExecutionJobVertex>emptyList(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 077ab53ac16..5087b3d45e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -124,6 +124,7 @@ public static void setupExecutionGraph() throws Exception {
 			100,
 			100,
 			1,
+			0,
 			ExternalizedCheckpointSettings.none(),
 			Collections.<ExecutionJobVertex>emptyList(),
 			Collections.<ExecutionJobVertex>emptyList(),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index eb7833a636e..9b54138b4f0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -47,6 +47,8 @@
 	/** The default limit of concurrently happening checkpoints: one */
 	public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
 
+	public static final int DEFAULT_MAX_UNSUCCESSFUL_CHECKPOINTS = 0;
+
 	// ------------------------------------------------------------------------
 
 	/** Checkpointing mode (exactly-once vs. at-least-once). */
@@ -70,6 +72,8 @@
 	/** Cleanup behaviour for persistent checkpoints. */
 	private ExternalizedCheckpointCleanup externalizedCheckpointCleanup;
 
+	/** the maximum number of unsuccessful checkpoints **/
+	private int maxFailedCheckpoints = DEFAULT_MAX_UNSUCCESSFUL_CHECKPOINTS;
 	// ------------------------------------------------------------------------
 
 	/**
@@ -204,6 +208,17 @@ public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
 		this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
 	}
 
+	public void setMaxFailedCheckpoints(int maxFailedCheckpoints) {
+		if(maxFailedCheckpoints < 0 ) {
+			throw new IllegalArgumentException("The maximum number of unsuccessful checkpoint attempts must be at least 0.");
+		}
+		this.maxFailedCheckpoints = maxFailedCheckpoints;
+	}
+
+	public int getMaxFailedCheckpoints() {
+		return this.maxFailedCheckpoints;
+	}
+
 	/**
 	 * Checks whether checkpointing is forced, despite currently non-checkpointable iteration feedback.
 	 * 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 003eff90914..f84cebe6bb3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -537,7 +537,7 @@ private void configureCheckpointing() {
 		JobSnapshottingSettings settings = new JobSnapshottingSettings(
 				triggerVertices, ackVertices, commitVertices, interval,
 				cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
-				cfg.getMaxConcurrentCheckpoints(),
+				cfg.getMaxConcurrentCheckpoints(), cfg.getMaxFailedCheckpoints(),
 				externalizedCheckpointSettings,
 				streamGraph.getStateBackend(),
 				isExactlyOnce);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services