You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/30 11:00:25 UTC

[GitHub] yanghua closed pull request #6567: [FLINK-10074] Allowable number of checkpoint failures

yanghua closed pull request #6567: [FLINK-10074] Allowable number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 7d88f0d94ea..b091192f0a5 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1354,6 +1354,8 @@ public void runBrokerFailureTest() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(parallelism);
 		env.enableCheckpointing(500);
+		env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
+		env.getCheckpointConfig().setTolerableFailureNumber(Integer.MAX_VALUE);
 		env.setRestartStrategy(RestartStrategies.noRestart());
 		env.getConfig().disableSysoutLogging();
 
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 8c0d7665fed..23507bfbdf8 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -57,6 +57,8 @@ public static void generateRandomizedIntegerSequence(
 		env.setParallelism(numPartitions);
 		env.getConfig().disableSysoutLogging();
 		env.setRestartStrategy(RestartStrategies.noRestart());
+		env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
+		env.getCheckpointConfig().setTolerableFailureNumber(Integer.MAX_VALUE);
 
 		DataStream<Integer> stream = env.addSource(
 				new RichParallelSourceFunction<Integer>() {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 6b7caaac6ec..719560ef860 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -883,16 +883,6 @@ public boolean isFailTaskOnCheckpointError() {
 		return failTaskOnCheckpointError;
 	}
 
-	/**
-	 * This method is visible because of the way the configuration is currently forwarded from the checkpoint config to
-	 * the task. This should not be called by the user, please use CheckpointConfig.setFailOnCheckpointingErrors(...)
-	 * instead.
-	 */
-	@Internal
-	public void setFailTaskOnCheckpointError(boolean failTaskOnCheckpointError) {
-		this.failTaskOnCheckpointError = failTaskOnCheckpointError;
-	}
-
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof ExecutionConfig) {
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 179cf9c6de8..f4714c2cce6 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -100,6 +100,8 @@ public void testJobManagerJMXMetricAccess() throws Exception {
 					50,
 					5,
 					CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+					true,
+					0,
 					true),
 				null));
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 57337b6286f..a541b6ec444 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -180,6 +180,8 @@
 	/** Registry that tracks state which is shared across (incremental) checkpoints */
 	private SharedStateRegistry sharedStateRegistry;
 
+	private final CheckpointFailureManager failureManager;
+
 	// --------------------------------------------------------------------------------------------
 
 	public CheckpointCoordinator(
@@ -196,10 +198,12 @@ public CheckpointCoordinator(
 			CompletedCheckpointStore completedCheckpointStore,
 			StateBackend checkpointStateBackend,
 			Executor executor,
-			SharedStateRegistryFactory sharedStateRegistryFactory) {
+			SharedStateRegistryFactory sharedStateRegistryFactory,
+			CheckpointFailureManager failureManager) {
 
 		// sanity checks
 		checkNotNull(checkpointStateBackend);
+		checkNotNull(failureManager);
 		checkArgument(baseInterval > 0, "Checkpoint base interval must be larger than zero");
 		checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero");
 		checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
@@ -230,6 +234,7 @@ public CheckpointCoordinator(
 		this.executor = checkNotNull(executor);
 		this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
 		this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
+		this.failureManager = failureManager;
 
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 		this.masterHooks = new HashMap<>();
@@ -546,6 +551,8 @@ public CheckpointTriggerResult triggerCheckpoint(
 						LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);
 
 						checkpoint.abortExpired();
+						failureManager.tryHandleFailure(new Throwable("Checkpoint " + checkpointID + " of job " +
+							job + " expired before completing"), checkpointID);
 						pendingCheckpoints.remove(checkpointID);
 						rememberRecentCheckpointId(checkpointID);
 
@@ -642,6 +649,8 @@ else if (!props.forceCheckpoint()) {
 
 				if (!checkpoint.isDiscarded()) {
 					checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
+					failureManager.tryHandleFailure(
+						new Exception("Failed to trigger checkpoint : " + checkpointID, t), checkpointID);
 				}
 
 				try {
@@ -831,6 +840,8 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) thro
 				// abort the current pending checkpoint if we fails to finalize the pending checkpoint.
 				if (!pendingCheckpoint.isDiscarded()) {
 					pendingCheckpoint.abortError(e1);
+					failureManager.tryHandleFailure(
+						new Exception("Checkpoint : " + checkpointId, e1), checkpointId);
 				}
 
 				throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', e1);
@@ -874,6 +885,8 @@ public void run() {
 		LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
 			completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
 
+		failureManager.resetCounter();
+
 		if (LOG.isDebugEnabled()) {
 			StringBuilder builder = new StringBuilder();
 			builder.append("Checkpoint state: ");
@@ -1147,10 +1160,6 @@ public CompletedCheckpointStore getCheckpointStore() {
 		return completedCheckpointStore;
 	}
 
-	public CheckpointIDCounter getCheckpointIdCounter() {
-		return checkpointIdCounter;
-	}
-
 	public long getCheckpointTimeout() {
 		return checkpointTimeout;
 	}
@@ -1254,6 +1263,8 @@ private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Th
 		LOG.info("Discarding checkpoint {} of job {} because: {}", checkpointId, job, reason);
 
 		pendingCheckpoint.abortDeclined();
+		failureManager.tryHandleFailure("Discarding checkpoint " + checkpointId +" of job " +
+			job + " because: " + reason, checkpointId);
 		rememberRecentCheckpointId(checkpointId);
 
 		// we don't have to schedule another "dissolving" checkpoint any more because the
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
new file mode 100644
index 00000000000..19ec7368d5a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The checkpoint failure manager to manage how to process checkpoint failure.
+ */
+public class CheckpointFailureManager {
+
+	private final boolean failOnCheckpointingErrors;
+	private final int tolerableCpFailureNumber;
+	private final AtomicInteger continuousFailureCounter;
+	private final ExecutionGraph executionGraph;
+	private final Object lock = new Object();
+
+	public CheckpointFailureManager(
+		boolean failOnCheckpointingErrors,
+		int tolerableCpFailureNumber,
+		ExecutionGraph executionGraph) {
+		this.failOnCheckpointingErrors = failOnCheckpointingErrors;
+		this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+		this.continuousFailureCounter = new AtomicInteger(0);
+		this.executionGraph = checkNotNull(executionGraph);
+	}
+
+	@VisibleForTesting
+	public AtomicInteger getContinuousFailureCounter() {
+		return continuousFailureCounter;
+	}
+
+	public void resetCounter() {
+		continuousFailureCounter.set(0);
+	}
+
+	public void tryHandleFailure(String reason, long checkpointId) {
+		synchronized (lock) {
+			if (failOnCheckpointingErrors ||
+				continuousFailureCounter.incrementAndGet() > tolerableCpFailureNumber) {
+				executionGraph.failGlobal(new Throwable(reason));
+			}
+		}
+	}
+
+	public void tryHandleFailure(Throwable cause, long checkpointId) {
+		synchronized (lock) {
+			if (failOnCheckpointingErrors ||
+				continuousFailureCounter.incrementAndGet() > tolerableCpFailureNumber) {
+				executionGraph.failGlobal(cause);
+			}
+		}
+	}
+
+	public void tryHandleFailure(CheckpointDeclineReason reason, long checkpointId) {
+		synchronized (lock) {
+			if (failOnCheckpointingErrors ||
+				continuousFailureCounter.incrementAndGet() > tolerableCpFailureNumber) {
+				executionGraph.failGlobal(
+					new Throwable("Checkpoint : " + checkpointId + reason.message()));
+			}
+		}
+	}
+
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index acb1e16fe71..8f11748de2b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -34,6 +34,7 @@
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -475,7 +476,8 @@ public void enableCheckpointing(
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore checkpointStore,
 			StateBackend checkpointStateBackend,
-			CheckpointStatsTracker statsTracker) {
+			CheckpointStatsTracker statsTracker,
+			CheckpointFailureManager failureManager) {
 
 		// simple sanity checks
 		checkArgument(interval >= 10, "checkpoint interval must not be below 10ms");
@@ -505,7 +507,8 @@ public void enableCheckpointing(
 			checkpointStore,
 			checkpointStateBackend,
 			ioExecutor,
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			failureManager);
 
 		// register the master hooks on the checkpoint coordinator
 		for (MasterTriggerRestoreHook<?> hook : masterHooks) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index f1a861d2ca1..0d2286ea431 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -28,6 +28,7 @@
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
@@ -342,6 +343,11 @@ public static ExecutionGraph buildGraph(
 
 			final CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();
 
+			CheckpointFailureManager failureManager = new CheckpointFailureManager(
+				chkConfig.isFailOnCheckpointingErrors(),
+				chkConfig.getTolerableCpFailureNumber(),
+				executionGraph);
+
 			executionGraph.enableCheckpointing(
 				chkConfig.getCheckpointInterval(),
 				chkConfig.getCheckpointTimeout(),
@@ -355,7 +361,8 @@ public static ExecutionGraph buildGraph(
 				checkpointIdCounter,
 				completedCheckpoints,
 				rootBackend,
-				checkpointStatsTracker);
+				checkpointStatsTracker,
+				failureManager);
 		}
 
 		// create all the metrics for the Execution Graph
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
index 4ecbda57b28..348c42e33e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -42,6 +42,10 @@
 
 	private final int maxConcurrentCheckpoints;
 
+	private final int tolerableCpFailureNumber;
+
+	private final boolean failOnCheckpointingErrors;
+
 	/** Settings for what to do with checkpoints when a job finishes. */
 	private final CheckpointRetentionPolicy checkpointRetentionPolicy;
 
@@ -60,11 +64,14 @@ public CheckpointCoordinatorConfiguration(
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpoints,
 			CheckpointRetentionPolicy checkpointRetentionPolicy,
-			boolean isExactlyOnce) {
+			boolean isExactlyOnce,
+			int tolerableCpFailureNumber,
+			boolean failOnCheckpointingErrors) {
 
 		// sanity checks
 		if (checkpointInterval < 1 || checkpointTimeout < 1 ||
-			minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) {
+			minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1 ||
+			tolerableCpFailureNumber < 0) {
 			throw new IllegalArgumentException();
 		}
 
@@ -74,6 +81,8 @@ public CheckpointCoordinatorConfiguration(
 		this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
 		this.checkpointRetentionPolicy = Preconditions.checkNotNull(checkpointRetentionPolicy);
 		this.isExactlyOnce = isExactlyOnce;
+		this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+		this.failOnCheckpointingErrors = failOnCheckpointingErrors;
 	}
 
 	public long getCheckpointInterval() {
@@ -100,6 +109,14 @@ public boolean isExactlyOnce() {
 		return isExactlyOnce;
 	}
 
+	public int getTolerableCpFailureNumber() {
+		return tolerableCpFailureNumber;
+	}
+
+	public boolean isFailOnCheckpointingErrors() {
+		return failOnCheckpointingErrors;
+	}
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -114,7 +131,8 @@ public boolean equals(Object o) {
 			minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints &&
 			maxConcurrentCheckpoints == that.maxConcurrentCheckpoints &&
 			isExactlyOnce == that.isExactlyOnce &&
-			checkpointRetentionPolicy == that.checkpointRetentionPolicy;
+			checkpointRetentionPolicy == that.checkpointRetentionPolicy &&
+			tolerableCpFailureNumber == that.tolerableCpFailureNumber;
 	}
 
 	@Override
@@ -125,7 +143,8 @@ public int hashCode() {
 				minPauseBetweenCheckpoints,
 				maxConcurrentCheckpoints,
 				checkpointRetentionPolicy,
-				isExactlyOnce);
+				isExactlyOnce,
+				tolerableCpFailureNumber);
 	}
 
 	@Override
@@ -136,6 +155,7 @@ public String toString() {
 			", minPauseBetweenCheckpoints=" + minPauseBetweenCheckpoints +
 			", maxConcurrentCheckpoints=" + maxConcurrentCheckpoints +
 			", checkpointRetentionPolicy=" + checkpointRetentionPolicy +
+			", tolerableCpFailureNumber=" + tolerableCpFailureNumber +
 			'}';
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 32b32cfb3d6..3c0639c152f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -80,7 +80,8 @@ public void testFailingCompletedCheckpointStoreAdd() throws Exception {
 			new FailingCompletedCheckpointStore(),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			mock(CheckpointFailureManager.class));
 
 		coord.triggerCheckpoint(triggerTimestamp, false);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index f644c01caf5..06b49201bbd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -435,7 +435,8 @@ private static CheckpointCoordinator instantiateCheckpointCoordinator(JobID jid,
 				new StandaloneCompletedCheckpointStore(10),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 	}
 
 	private static <T> T mockGeneric(Class<?> clazz) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 3650f43066d..142ea2bcc0b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -23,15 +23,23 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.DummyJobInformation;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
@@ -51,6 +59,7 @@
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.util.InstantiationUtil;
@@ -143,7 +152,8 @@ public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -204,7 +214,8 @@ public void testCheckpointAbortsIfTriggerTasksAreFinished() {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -256,7 +267,8 @@ public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -309,7 +321,8 @@ public void testTriggerAndDeclineCheckpointSimple() {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -412,7 +425,8 @@ public void testTriggerAndDeclineCheckpointComplex() {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -532,7 +546,8 @@ public void testTriggerAndConfirmSimpleCheckpoint() {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -700,7 +715,8 @@ public void testMultipleConcurrentCheckpoints() {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -831,7 +847,8 @@ public void testSuccessfulCheckpointSubsumesUnsuccessful() {
 				new StandaloneCompletedCheckpointStore(10),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -996,7 +1013,8 @@ public void testCheckpointTimeoutIsolated() {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			// trigger a checkpoint, partially acknowledged
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1074,7 +1092,8 @@ public void testHandleMessagesForNonExistingCheckpoints() {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1138,7 +1157,8 @@ public void testStateCleanupForLateOrUnknownMessages() throws Exception {
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			mock(CheckpointFailureManager.class));
 
 		assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1271,7 +1291,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 
 			coord.startCheckpointScheduler();
@@ -1361,7 +1382,8 @@ public void testMinTimeBetweenCheckpointsInterval() throws Exception {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 		try {
 			coord.startCheckpointScheduler();
@@ -1435,7 +1457,8 @@ public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			mock(CheckpointFailureManager.class));
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1587,7 +1610,8 @@ public void testSavepointsAreNotSubsumed() throws Exception {
 			new StandaloneCompletedCheckpointStore(10),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			mock(CheckpointFailureManager.class));
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -1681,7 +1705,8 @@ private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			coord.startCheckpointScheduler();
 
@@ -1755,7 +1780,8 @@ public void testMaxConcurrentAttempsWithSubsumption() {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			coord.startCheckpointScheduler();
 
@@ -1832,7 +1858,8 @@ public void testPeriodicSchedulingWithInactiveTasks() {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			coord.startCheckpointScheduler();
 
@@ -1885,7 +1912,8 @@ public void testConcurrentSavepoints() throws Exception {
 			new StandaloneCompletedCheckpointStore(2),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			mock(CheckpointFailureManager.class));
 
 		List<CompletableFuture<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
 
@@ -1939,7 +1967,8 @@ public void testMinDelayBetweenSavepoints() throws Exception {
 			new StandaloneCompletedCheckpointStore(2),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			mock(CheckpointFailureManager.class));
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -2002,7 +2031,8 @@ public void testRestoreLatestCheckpointedState() throws Exception {
 			store,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			mock(CheckpointFailureManager.class));
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2117,7 +2147,8 @@ public void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			mock(CheckpointFailureManager.class));
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2264,7 +2295,8 @@ private void testRestoreLatestCheckpointedStateWithChangingParallelism(boolean s
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			mock(CheckpointFailureManager.class));
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2549,7 +2581,8 @@ public void testStateRecoveryWithTopologyChange(int scaleType) throws Exception
 			standaloneCompletedCheckpointStore,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			mock(CheckpointFailureManager.class));
 
 		coord.restoreLatestCheckpointedState(tasks, false, true);
 
@@ -2701,7 +2734,8 @@ public void testExternalizedCheckpoints() throws Exception {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -2777,6 +2811,434 @@ public void testReplicateModeStateHandle() {
 		Assert.assertEquals(3, checkCounts.get("t-6").intValue());
 	}
 
+	@Test
+	public void testTolerableFailureForCompletingCheckpointFailed() {
+		JobID jid = new JobID();
+
+		final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+		final ExecutionVertex vertex = CheckpointCoordinatorTest.mockExecutionVertex(executionAttemptId);
+
+		final long triggerTimestamp = 1L;
+		final boolean failOnCheckpointingErrors = false;
+		final int tolerableFailureNumber = 1;
+		final CheckpointFailureManager failureManager = new CheckpointFailureManager(
+			failOnCheckpointingErrors, tolerableFailureNumber, mock(ExecutionGraph.class));
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+			jid,
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+			new ExecutionVertex[]{vertex},
+			new ExecutionVertex[]{vertex},
+			new ExecutionVertex[]{vertex},
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(2),
+			new MemoryStateBackend(),
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY,
+			failureManager);
+
+		TaskStateSnapshot subtaskState = spy(new TaskStateSnapshot());
+
+		KeyedStateHandle managedKeyedHandle = mock(KeyedStateHandle.class);
+		KeyedStateHandle rawKeyedHandle = mock(KeyedStateHandle.class);
+		OperatorStateHandle managedOpHandle = mock(OperatorStreamStateHandle.class);
+		OperatorStateHandle rawOpHandle = mock(OperatorStreamStateHandle.class);
+
+		final OperatorSubtaskState operatorSubtaskState = spy(new OperatorSubtaskState(
+			managedOpHandle,
+			rawOpHandle,
+			managedKeyedHandle,
+			rawKeyedHandle));
+
+		subtaskState.putSubtaskStateByOperatorID(new OperatorID(), operatorSubtaskState);
+
+		when(subtaskState.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID(vertex.getJobvertexId()))).thenReturn(operatorSubtaskState);
+
+		coord.triggerCheckpoint(triggerTimestamp, false);
+
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+		PendingCheckpoint pendingCheckpoint = coord.getPendingCheckpoints().values().iterator().next();
+
+		assertFalse(pendingCheckpoint.isDiscarded());
+
+		long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
+		AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId, new CheckpointMetrics(), subtaskState);
+
+		try {
+			coord.receiveAcknowledgeMessage(acknowledgeMessage);
+			fail("Expected a checkpoint exception because the completed checkpoint store could not " +
+				"store the completed checkpoint.");
+		} catch (CheckpointException e) {
+			// ignore because we expected this exception
+		}
+
+		assertEquals(1, failureManager.getContinuousFailureCounter().get());
+
+		coord.triggerCheckpoint(triggerTimestamp, false);
+		checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
+		acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId, new CheckpointMetrics(), subtaskState);
+
+		try {
+			coord.receiveAcknowledgeMessage(acknowledgeMessage);
+		} catch (CheckpointException e) {
+
+		}
+
+		assertTrue(failureManager.getContinuousFailureCounter().get() > tolerableFailureNumber);
+
+		// make sure that the pending checkpoint has been discarded after we could not complete it
+		assertTrue(pendingCheckpoint.isDiscarded());
+	}
+
+	@Test
+	public void testTolerableFailureForTriggeringCheckpointTimeout() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp = System.currentTimeMillis();
+
+			// create some mock execution vertices
+
+			final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
+
+			final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
+
+			final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+			final int tolerableFailureNumber = 1;
+			final boolean failOnCheckpointErrors = false;
+			final CheckpointFailureManager failureManager = new CheckpointFailureManager(
+				failOnCheckpointErrors,
+				tolerableFailureNumber,
+				mock(ExecutionGraph.class)
+			);
+
+			ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID);
+
+			ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
+			ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
+
+			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+
+			// set up the coordinator
+			// the timeout for the checkpoint is a 200 milliseconds
+
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+				jid,
+				600000,
+				200,
+				0,
+				Integer.MAX_VALUE,
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+				new ExecutionVertex[] { triggerVertex },
+				new ExecutionVertex[] { ackVertex1, ackVertex2 },
+				new ExecutionVertex[] { commitVertex },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(2),
+				new MemoryStateBackend(),
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY,
+				failureManager);
+
+			// trigger a checkpoint, partially acknowledged
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
+			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next();
+			assertFalse(checkpoint.isDiscarded());
+
+			OperatorID opID1 = OperatorID.fromJobVertexID(ackVertex1.getJobvertexId());
+
+			TaskStateSnapshot taskOperatorSubtaskStates1 = spy(new TaskStateSnapshot());
+			OperatorSubtaskState subtaskState1 = mock(OperatorSubtaskState.class);
+			taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
+
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId(), new CheckpointMetrics(), taskOperatorSubtaskStates1));
+
+			// wait until the checkpoint must have expired.
+			// we check every 250 msecs conservatively for 5 seconds
+			// to give even slow build servers a very good chance of completing this
+			long deadline = System.currentTimeMillis() + 5000;
+			do {
+				Thread.sleep(250);
+			}
+			while (!checkpoint.isDiscarded() &&
+				coord.getNumberOfPendingCheckpoints() > 0 &&
+				System.currentTimeMillis() < deadline);
+
+			assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDiscarded());
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			assertEquals(1, failureManager.getContinuousFailureCounter().get());
+
+			// validate that the received states have been discarded
+			verify(subtaskState1, times(1)).discardState();
+
+			// no confirm message must have been sent
+			verify(commitVertex.getCurrentExecutionAttempt(), times(0)).notifyCheckpointComplete(anyLong(), anyLong());
+
+			coord.shutdown(JobStatus.FINISHED);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testTolerableFailureForDecliningCheckpointFailed() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp = System.currentTimeMillis();
+			final int tolerableFailureNumber = 1;
+			final boolean failOnCheckpointingError = false;
+			CheckpointFailureManager failureManager = new CheckpointFailureManager(
+				failOnCheckpointingError,
+				tolerableFailureNumber,
+				mock(ExecutionGraph.class)
+			);
+
+			// create some mock Execution vertices that receive the checkpoint trigger messages
+			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+			ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+			// set up the coordinator and validate the initial state
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				new MemoryStateBackend(),
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY,
+				failureManager);
+
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+			// validate that we have a pending checkpoint
+			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// we have one task scheduled that will cancel after timeout
+			assertEquals(1, coord.getNumScheduledTasks());
+
+			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+
+			assertNotNull(checkpoint);
+			assertEquals(checkpointId, checkpoint.getCheckpointId());
+			assertEquals(timestamp, checkpoint.getCheckpointTimestamp());
+			assertEquals(jid, checkpoint.getJobId());
+			assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
+			assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
+			assertEquals(0, checkpoint.getOperatorStates().size());
+			assertFalse(checkpoint.isDiscarded());
+			assertFalse(checkpoint.isFullyAcknowledged());
+
+			// check that the vertices received the trigger checkpoint message
+			verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forCheckpointWithDefaultLocation());
+			verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forCheckpointWithDefaultLocation());
+
+			// acknowledge from one of the tasks
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
+			assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
+			assertFalse(checkpoint.isDiscarded());
+			assertFalse(checkpoint.isFullyAcknowledged());
+
+			// acknowledge the same task again (should not matter)
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			assertFalse(checkpoint.isDiscarded());
+			assertFalse(checkpoint.isFullyAcknowledged());
+
+
+			// decline checkpoint from the other task, this should cancel the checkpoint
+			// and trigger a new one
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
+			assertTrue(checkpoint.isDiscarded());
+
+			assertEquals(tolerableFailureNumber, failureManager.getContinuousFailureCounter().get());
+
+			// the canceler is also removed
+			assertEquals(0, coord.getNumScheduledTasks());
+
+			// validate that we have no new pending checkpoint
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// decline again, nothing should happen
+			// decline from the other task, nothing should happen
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId));
+			assertTrue(checkpoint.isDiscarded());
+
+			assertEquals(1, failureManager.getContinuousFailureCounter().get());
+
+			//trigger a new checkpoint
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
+			checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
+
+			// trigger job fail
+			assertTrue(failureManager.getContinuousFailureCounter().get() > tolerableFailureNumber);
+
+			coord.shutdown(JobStatus.FINISHED);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testTolerableFailureForResettingCounter() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp = System.currentTimeMillis();
+			final int tolerableFailureNumber = 1;
+			final boolean failOnCheckpointErrors = false;
+			final CheckpointFailureManager failureManager = new CheckpointFailureManager(
+				failOnCheckpointErrors,
+				tolerableFailureNumber,
+				mock(ExecutionGraph.class));
+
+			// create some mock Execution vertices that receive the checkpoint trigger messages
+			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+			ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+			// set up the coordinator and validate the initial state
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new ExecutionVertex[] { vertex1, vertex2 },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				new MemoryStateBackend(),
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY,
+				failureManager);
+
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+			// validate that we have a pending checkpoint
+			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// we have one task scheduled that will cancel after timeout
+			assertEquals(1, coord.getNumScheduledTasks());
+
+			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+
+			assertNotNull(checkpoint);
+			assertEquals(checkpointId, checkpoint.getCheckpointId());
+			assertEquals(timestamp, checkpoint.getCheckpointTimestamp());
+			assertEquals(jid, checkpoint.getJobId());
+			assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
+			assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
+			assertEquals(0, checkpoint.getOperatorStates().size());
+			assertFalse(checkpoint.isDiscarded());
+			assertFalse(checkpoint.isFullyAcknowledged());
+
+			// check that the vertices received the trigger checkpoint message
+			verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forCheckpointWithDefaultLocation());
+			verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forCheckpointWithDefaultLocation());
+
+			// acknowledge from one of the tasks
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
+			assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
+			assertFalse(checkpoint.isDiscarded());
+			assertFalse(checkpoint.isFullyAcknowledged());
+
+			// acknowledge the same task again (should not matter)
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			assertFalse(checkpoint.isDiscarded());
+			assertFalse(checkpoint.isFullyAcknowledged());
+
+
+			// decline checkpoint from the other task, this should cancel the checkpoint
+			// and trigger a new one
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
+			assertTrue(checkpoint.isDiscarded());
+
+			assertEquals(tolerableFailureNumber, failureManager.getContinuousFailureCounter().get());
+
+			// the canceler is also removed
+			assertEquals(0, coord.getNumScheduledTasks());
+
+			// validate that we have no new pending checkpoint
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// decline again, nothing should happen
+			// decline from the other task, nothing should happen
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId));
+			assertTrue(checkpoint.isDiscarded());
+
+			assertEquals(1, failureManager.getContinuousFailureCounter().get());
+
+			//trigger a new checkpoint
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
+			checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
+
+			// trigger job fail
+			assertTrue(failureManager.getContinuousFailureCounter().get() > tolerableFailureNumber);
+
+			// trigger the checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
+			checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+
+			// counter has been reset
+			assertEquals(0, failureManager.getContinuousFailureCounter().get());
+
+			coord.shutdown(JobStatus.FINISHED);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -3192,7 +3654,8 @@ public void testStopPeriodicScheduler() throws Exception {
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			mock(CheckpointFailureManager.class));
 
 		// Periodic
 		CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(
@@ -3400,7 +3863,8 @@ public void testCheckpointStatsTrackerPendingCheckpointCallback() {
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			mock(CheckpointFailureManager.class));
 
 		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
 		coord.setCheckpointStatsTracker(tracker);
@@ -3439,7 +3903,8 @@ public void testCheckpointStatsTrackerRestoreCallback() throws Exception {
 			store,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			mock(CheckpointFailureManager.class));
 
 		store.addCheckpoint(new CompletedCheckpoint(
 			new JobID(),
@@ -3506,7 +3971,8 @@ public void testSharedStateRegistrationOnRestore() throws Exception {
 					SharedStateRegistry instance = new SharedStateRegistry(deleteExecutor);
 					createdSharedStateRegistries.add(instance);
 					return instance;
-				});
+				},
+			mock(CheckpointFailureManager.class));
 
 		final int numCheckpoints = 3;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index ffebc52f8a0..b9dfa231f1e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -84,6 +84,8 @@ public void testDeserializationOfUserCodeWithUserClassLoader() throws Exception
 					0L,
 					1,
 					CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+					true,
+					0,
 					true),
 				new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)),
 				serHooks);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 236717ff434..e45625a4d7b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -109,7 +109,8 @@ public void testSetState() {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -187,7 +188,8 @@ public void testNoCheckpointAvailable() {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				mock(CheckpointFailureManager.class));
 
 			try {
 				coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
@@ -245,7 +247,8 @@ public void testNonRestoredState() throws Exception {
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			mock(CheckpointFailureManager.class));
 
 		// --- (2) Checkpoint misses state for a jobVertex (should work) ---
 		Map<OperatorID, OperatorState> checkpointTaskStates = new HashMap<>();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 82dcd023f9b..23bb576e420 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -64,7 +64,9 @@ public void testGetSnapshottingSettings() throws Exception {
 				191929L,
 				123,
 				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-				false
+				false,
+				0,
+				true
 			),
 			null);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index f6b7730a72e..fae57f26268 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -86,6 +86,8 @@ public void testCoordinatorShutsDownOnFailure() {
 						0L,
 						Integer.MAX_VALUE,
 						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+						true,
+						0,
 						true),
 					null));
 			testGraph.setExecutionConfig(executionConfig);
@@ -157,6 +159,8 @@ public void testCoordinatorShutsDownOnSuccess() {
 						0L,
 						Integer.MAX_VALUE,
 						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+						true,
+						0,
 						true),
 					null));
 			
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 4b7449b46ef..4434561b60a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -104,7 +104,8 @@ private ExecutionGraph createExecutionGraphAndEnableCheckpointing(
 				counter,
 				store,
 				new MemoryStateBackend(),
-				CheckpointStatsTrackerTest.createTestTracker());
+				CheckpointStatsTrackerTest.createTestTracker(),
+				mock(CheckpointFailureManager.class));
 
 		JobVertex jobVertex = new JobVertex("MockVertex");
 		jobVertex.setInvokableClass(AbstractInvokable.class);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 7c2a02c3314..66c71f5f379 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -28,6 +28,7 @@
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
@@ -135,7 +136,8 @@ public static void setupExecutionGraph() throws Exception {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
-			statsTracker);
+			statsTracker,
+			mock(CheckpointFailureManager.class));
 
 		runtimeGraph.setJsonPlan("{}");
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 12b4277941f..e1054dc5094 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -694,7 +694,9 @@ private ExecutionGraph createExecutionGraph(Configuration configuration) throws
 					0,
 					1,
 					CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-					false),
+					false,
+					0,
+					true),
 				null));
 
 		final Time timeout = Time.seconds(10L);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index a3a26f336e4..99270d8b03f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
@@ -313,6 +314,8 @@ public void testLocalFailureFailsPendingCheckpoints() throws Exception {
 			1L,
 			3,
 			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+			true,
+			0,
 			true);
 
 		final ExecutionGraph graph = createSampleGraph(
@@ -342,7 +345,8 @@ public void testLocalFailureFailsPendingCheckpoints() throws Exception {
 				1,
 				allVertices,
 				checkpointCoordinatorConfiguration,
-				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()));
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()),
+				mock(CheckpointFailureManager.class));
 
 		final CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
index 51e7fec1223..832b113aa4c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
@@ -49,7 +49,9 @@ public void testIsJavaSerializable() throws Exception {
 				112,
 				12,
 				CheckpointRetentionPolicy.RETAIN_ON_FAILURE,
-				false),
+				false,
+				0,
+				true),
 			new SerializedValue<>(new MemoryStateBackend()));
 
 		JobCheckpointingSettings copy = CommonTestUtils.createCopySerializable(settings);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index d991983e6f2..b7efe048d27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -258,6 +258,8 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception {
 						0L,
 						1,
 						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+						true,
+						0,
 						true),
 					null));
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index f3c72545e69..3cdaac6e6bf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -1429,6 +1429,8 @@ private JobGraph createJobGraphFromJobVerticesWithCheckpointing(SavepointRestore
 			1000L,
 			1,
 			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+			true,
+			0,
 			true);
 		final JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(
 			Collections.emptyList(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 0c9dd49c1d1..89d01a7e1ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -312,7 +312,7 @@ public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpoin
 
 	@Override
 	public void declineCheckpoint(long checkpointId, Throwable cause) {
-		throw new UnsupportedOperationException();
+
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
index 13a319490d6..6890b151521 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
@@ -98,6 +98,8 @@ public void testAskTimeoutEqualsCheckpointTimeout() throws Exception {
 				1L,
 				1,
 				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+				true,
+				0,
 				true));
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
@@ -132,6 +134,8 @@ public void testSavepointDirectoryConfiguration() throws Exception {
 				1L,
 				1,
 				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+				true,
+				0,
 				true));
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory");
@@ -185,6 +189,8 @@ public void testTriggerNewRequest() throws Exception {
 				1L,
 				1,
 				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+				true,
+				0,
 				true));
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
@@ -317,6 +323,8 @@ public void testFailedCancellation() throws Exception {
 				1L,
 				1,
 				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+				true,
+				0,
 				true));
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
index 47ebb18a115..99fb8ff29de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
@@ -156,6 +156,8 @@ private static GraphAndSettings createGraphAndSettings(boolean externalized, boo
 		long timeout = 996979L;
 		long minPause = 119191919L;
 		int maxConcurrent = 12929329;
+		int tolerableCpFailureNumber = 0;
+		boolean failOnCheckpointingErrors = false;
 
 		CheckpointRetentionPolicy retentionPolicy = externalized
 			? CheckpointRetentionPolicy.RETAIN_ON_FAILURE
@@ -167,7 +169,9 @@ private static GraphAndSettings createGraphAndSettings(boolean externalized, boo
 			minPause,
 			maxConcurrent,
 			retentionPolicy,
-			exactlyOnce);
+			exactlyOnce,
+			tolerableCpFailureNumber,
+			failOnCheckpointingErrors);
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getCheckpointCoordinatorConfiguration()).thenReturn(chkConfig);
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 3ffe7701682..5ace2f18527 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -843,6 +843,8 @@ class JobManagerITCase(_system: ActorSystem)
               60000,
               1,
               CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+              true,
+              0,
               true),
             null))
 
@@ -904,6 +906,8 @@ class JobManagerITCase(_system: ActorSystem)
               60000,
               1,
               CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+              true,
+              0,
               true),
             null))
 
@@ -973,6 +977,8 @@ class JobManagerITCase(_system: ActorSystem)
               60000,
               1,
               CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+              true,
+              0,
               true),
             null))
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 87c800d2130..aec83d65a07 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -22,6 +22,7 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.util.Preconditions;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -46,6 +47,9 @@
 	/** The default limit of concurrently happening checkpoints: one. */
 	public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
 
+	/** The default tolerable failure number: none. */
+	public static final int DEFAULT_TOLERABLE_FAILURE_NUMBER = 0;
+
 	// ------------------------------------------------------------------------
 
 	/** Checkpointing mode (exactly-once vs. at-least-once). */
@@ -72,6 +76,9 @@
 	/** Determines if a tasks are failed or not if there is an error in their checkpointing. Default: true */
 	private boolean failOnCheckpointingErrors = true;
 
+	/** Determines tolerate how many times the checkpoint fails in succession to trigger task fail. */
+	private int tolerableFailureNumber = DEFAULT_TOLERABLE_FAILURE_NUMBER;
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -206,6 +213,24 @@ public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
 		this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
 	}
 
+	/**
+	 * Get the number which allow how many times the checkpoint fails in succession to
+	 * trigger the job to fail. Default: 0.
+	 */
+	public int getTolerableFailureNumber() {
+		return tolerableFailureNumber;
+	}
+
+	/**
+	 * Set the number which allow how many times the checkpoint fails in succession to
+	 * trigger the job to fail. Default: 0.
+	 */
+	public void setTolerableFailureNumber(int tolerableFailureNumber) {
+		Preconditions.checkArgument(
+			tolerableFailureNumber >= 0, "The number must greater than or equal to zero.");
+		this.tolerableFailureNumber = tolerableFailureNumber;
+	}
+
 	/**
 	 * Checks whether checkpointing is forced, despite currently non-checkpointable iteration feedback.
 	 *
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 69213024975..2d6c036336c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.ResourceSpec;
@@ -593,14 +592,7 @@ private void configureCheckpointing() {
 		CheckpointConfig cfg = streamGraph.getCheckpointConfig();
 
 		long interval = cfg.getCheckpointInterval();
-		if (interval > 0) {
-			ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
-			// propagate the expected behaviour for checkpoint errors to task.
-			executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
-		} else {
-			// interval of max value means disable periodic checkpoint
-			interval = Long.MAX_VALUE;
-		}
+		interval = interval > 0 ? interval : Long.MAX_VALUE;
 
 		//  --- configure the participating vertices ---
 
@@ -711,7 +703,9 @@ private void configureCheckpointing() {
 				cfg.getMinPauseBetweenCheckpoints(),
 				cfg.getMaxConcurrentCheckpoints(),
 				retentionAfterTermination,
-				isExactlyOnce),
+				isExactlyOnce,
+				cfg.getTolerableFailureNumber(),
+				cfg.isFailOnCheckpointingErrors()),
 			serializedStateBackend,
 			serializedHooks);
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandler.java
deleted file mode 100644
index 01407952550..00000000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-
-/**
- * Handler for exceptions that happen on checkpointing. The handler can reject and rethrow the offered exceptions.
- */
-public interface CheckpointExceptionHandler {
-
-	/**
-	 * Offers the exception for handling. If the exception cannot be handled from this instance, it is rethrown.
-	 *
-	 * @param checkpointMetaData metadata for the checkpoint for which the exception occurred.
-	 * @param exception  the exception to handle.
-	 * @throws Exception rethrows the exception if it cannot be handled.
-	 */
-	void tryHandleCheckpointException(CheckpointMetaData checkpointMetaData, Exception exception) throws Exception;
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
deleted file mode 100644
index 430f43e3db8..00000000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.util.Preconditions;
-
-/**
- * This factory produces {@link CheckpointExceptionHandler} instances that handle exceptions during checkpointing in a
- * {@link StreamTask}.
- */
-public class CheckpointExceptionHandlerFactory {
-
-	/**
-	 * Returns a {@link CheckpointExceptionHandler} that either causes a task to fail completely or to just declines
-	 * checkpoint on exception, depending on the parameter flag.
-	 */
-	public CheckpointExceptionHandler createCheckpointExceptionHandler(
-		boolean failTaskOnCheckpointException,
-		Environment environment) {
-
-		if (failTaskOnCheckpointException) {
-			return new FailingCheckpointExceptionHandler();
-		} else {
-			return new DecliningCheckpointExceptionHandler(environment);
-		}
-	}
-
-	/**
-	 * This handler makes the task fail by rethrowing a reported exception.
-	 */
-	static final class FailingCheckpointExceptionHandler implements CheckpointExceptionHandler {
-
-		@Override
-		public void tryHandleCheckpointException(
-			CheckpointMetaData checkpointMetaData,
-			Exception exception) throws Exception {
-
-			throw exception;
-		}
-	}
-
-	/**
-	 * This handler makes the task decline the checkpoint as reaction to the reported exception. The task is not failed.
-	 */
-	static final class DecliningCheckpointExceptionHandler implements CheckpointExceptionHandler {
-
-		final Environment environment;
-
-		DecliningCheckpointExceptionHandler(Environment environment) {
-			this.environment = Preconditions.checkNotNull(environment);
-		}
-
-		@Override
-		public void tryHandleCheckpointException(
-			CheckpointMetaData checkpointMetaData,
-			Exception exception) throws Exception {
-
-			environment.declineCheckpoint(checkpointMetaData.getCheckpointId(), exception);
-		}
-	}
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 9ee8892cf95..16faeee88b5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -171,12 +171,6 @@
 	/** Thread pool for async snapshot workers. */
 	private ExecutorService asyncOperationsThreadPool;
 
-	/** Handler for exceptions during checkpointing in the stream task. Used in synchronous part of the checkpoint. */
-	private CheckpointExceptionHandler synchronousCheckpointExceptionHandler;
-
-	/** Wrapper for synchronousCheckpointExceptionHandler to deal with rethrown exceptions. Used in the async part. */
-	private AsyncCheckpointExceptionHandler asynchronousCheckpointExceptionHandler;
-
 	private final List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> streamRecordWriters;
 
 	// ------------------------------------------------------------------------
@@ -245,14 +239,6 @@ public final void invoke() throws Exception {
 
 			asyncOperationsThreadPool = Executors.newCachedThreadPool();
 
-			CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
-
-			synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler(
-				getExecutionConfig().isFailTaskOnCheckpointError(),
-				getEnvironment());
-
-			asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);
-
 			stateBackend = createStateBackend();
 			checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());
 
@@ -724,7 +710,8 @@ private void checkpointState(
 			checkpointMetaData,
 			checkpointOptions,
 			storage,
-			checkpointMetrics);
+			checkpointMetrics,
+			getEnvironment());
 
 		checkpointingOperation.executeCheckpointing();
 	}
@@ -754,10 +741,6 @@ private StateBackend createStateBackend() throws Exception {
 				LOG);
 	}
 
-	protected CheckpointExceptionHandlerFactory createCheckpointExceptionHandlerFactory() {
-		return new CheckpointExceptionHandlerFactory();
-	}
-
 	/**
 	 * Returns the {@link ProcessingTimeService} responsible for telling the current
 	 * processing time and registering timers.
@@ -819,18 +802,22 @@ public String toString() {
 		private final AtomicReference<CheckpointingOperation.AsyncCheckpointState> asyncCheckpointState = new AtomicReference<>(
 			CheckpointingOperation.AsyncCheckpointState.RUNNING);
 
+		private final Environment environment;
+
 		AsyncCheckpointRunnable(
 			StreamTask<?, ?> owner,
 			Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress,
 			CheckpointMetaData checkpointMetaData,
 			CheckpointMetrics checkpointMetrics,
-			long asyncStartNanos) {
+			long asyncStartNanos,
+			Environment environment) {
 
 			this.owner = Preconditions.checkNotNull(owner);
 			this.operatorSnapshotsInProgress = Preconditions.checkNotNull(operatorSnapshotsInProgress);
 			this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
 			this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);
 			this.asyncStartNanos = asyncStartNanos;
+			this.environment = environment;
 		}
 
 		@Override
@@ -944,9 +931,7 @@ private void handleExecutionException(Exception e) {
 
 					// We only report the exception for the original cause of fail and cleanup.
 					// Otherwise this followup exception could race the original exception in failing the task.
-					owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
-						checkpointMetaData,
-						checkpointException);
+					environment.declineCheckpoint(checkpointMetaData.getCheckpointId(), checkpointException);
 
 					currentState = CheckpointingOperation.AsyncCheckpointState.DISCARDED;
 				} else {
@@ -1027,6 +1012,8 @@ public CloseableRegistry getCancelables() {
 		private long startSyncPartNano;
 		private long startAsyncPartNano;
 
+		private final Environment environment;
+
 		// ------------------------
 
 		private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
@@ -1036,7 +1023,8 @@ public CheckpointingOperation(
 				CheckpointMetaData checkpointMetaData,
 				CheckpointOptions checkpointOptions,
 				CheckpointStreamFactory checkpointStorageLocation,
-				CheckpointMetrics checkpointMetrics) {
+				CheckpointMetrics checkpointMetrics,
+				Environment environment) {
 
 			this.owner = Preconditions.checkNotNull(owner);
 			this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
@@ -1045,6 +1033,7 @@ public CheckpointingOperation(
 			this.storageLocation = Preconditions.checkNotNull(checkpointStorageLocation);
 			this.allOperators = owner.operatorChain.getAllOperators();
 			this.operatorSnapshotsInProgress = new HashMap<>(allOperators.length);
+			this.environment = environment;
 		}
 
 		public void executeCheckpointing() throws Exception {
@@ -1070,7 +1059,8 @@ public void executeCheckpointing() throws Exception {
 					operatorSnapshotsInProgress,
 					checkpointMetaData,
 					checkpointMetrics,
-					startAsyncPartNano);
+					startAsyncPartNano,
+					environment);
 
 				owner.cancelables.registerCloseable(asyncCheckpointRunnable);
 				owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
@@ -1102,7 +1092,7 @@ public void executeCheckpointing() throws Exception {
 						checkpointMetrics.getSyncDurationMillis());
 				}
 
-				owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex);
+				environment.declineCheckpoint(checkpointMetaData.getCheckpointId(), ex);
 			}
 		}
 
@@ -1126,36 +1116,6 @@ private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
 		}
 	}
 
-	/**
-	 * Wrapper for synchronous {@link CheckpointExceptionHandler}. This implementation catches unhandled, rethrown
-	 * exceptions and reports them through {@link #handleAsyncException(String, Throwable)}. As this implementation
-	 * always handles the exception in some way, it never rethrows.
-	 */
-	static final class AsyncCheckpointExceptionHandler implements CheckpointExceptionHandler {
-
-		/** Owning stream task to which we report async exceptions. */
-		final StreamTask<?, ?> owner;
-
-		/** Synchronous exception handler to which we delegate. */
-		final CheckpointExceptionHandler synchronousCheckpointExceptionHandler;
-
-		AsyncCheckpointExceptionHandler(StreamTask<?, ?> owner) {
-			this.owner = Preconditions.checkNotNull(owner);
-			this.synchronousCheckpointExceptionHandler =
-				Preconditions.checkNotNull(owner.synchronousCheckpointExceptionHandler);
-		}
-
-		@Override
-		public void tryHandleCheckpointException(CheckpointMetaData checkpointMetaData, Exception exception) {
-			try {
-				synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, exception);
-			} catch (Exception unhandled) {
-				AsynchronousException asyncException = new AsynchronousException(unhandled);
-				owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
-			}
-		}
-	}
-
 	@VisibleForTesting
 	public static <OUT> List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createStreamRecordWriters(
 			StreamConfig configuration,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
deleted file mode 100644
index 08cee559621..00000000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.TestTaskStateManager;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
-import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test that the configuration mechanism for how tasks react on checkpoint errors works correctly.
- */
-public class CheckpointExceptionHandlerConfigurationTest extends TestLogger {
-
-	@Test
-	public void testConfigurationFailOnException() throws Exception {
-		testConfigForwarding(true);
-	}
-
-	@Test
-	public void testConfigurationDeclineOnException() throws Exception {
-		testConfigForwarding(false);
-	}
-
-	@Test
-	public void testFailIsDefaultConfig() {
-		ExecutionConfig newExecutionConfig = new ExecutionConfig();
-		Assert.assertTrue(newExecutionConfig.isFailTaskOnCheckpointError());
-	}
-
-	private void testConfigForwarding(boolean failOnException) throws Exception {
-
-		final boolean expectedHandlerFlag = failOnException;
-
-		final DummyEnvironment environment = new DummyEnvironment("test", 1, 0);
-		environment.setTaskStateManager(new TestTaskStateManager());
-		environment.getExecutionConfig().setFailTaskOnCheckpointError(expectedHandlerFlag);
-
-		final CheckpointExceptionHandlerFactory inspectingFactory = new CheckpointExceptionHandlerFactory() {
-
-			@Override
-			public CheckpointExceptionHandler createCheckpointExceptionHandler(
-				boolean failTaskOnCheckpointException,
-				Environment environment) {
-
-				Assert.assertEquals(expectedHandlerFlag, failTaskOnCheckpointException);
-				return super.createCheckpointExceptionHandler(failTaskOnCheckpointException, environment);
-			}
-		};
-
-		StreamTask streamTask = new StreamTask(environment, null) {
-			@Override
-			protected void init() throws Exception {}
-
-			@Override
-			protected void run() throws Exception {}
-
-			@Override
-			protected void cleanup() throws Exception {}
-
-			@Override
-			protected void cancelTask() throws Exception {}
-
-			@Override
-			protected CheckpointExceptionHandlerFactory createCheckpointExceptionHandlerFactory() {
-				return inspectingFactory;
-			}
-		};
-
-		streamTask.invoke();
-	}
-
-	@Test
-	public void testCheckpointConfigDefault() throws Exception {
-		StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
-		Assert.assertTrue(streamExecutionEnvironment.getCheckpointConfig().isFailOnCheckpointingErrors());
-	}
-
-	@Test
-	public void testPropagationFailFromCheckpointConfig() throws Exception {
-		doTestPropagationFromCheckpointConfig(true);
-	}
-
-	@Test
-	public void testPropagationDeclineFromCheckpointConfig() throws Exception {
-		doTestPropagationFromCheckpointConfig(false);
-	}
-
-	public void doTestPropagationFromCheckpointConfig(boolean failTaskOnCheckpointErrors) throws Exception {
-		StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
-		streamExecutionEnvironment.setParallelism(1);
-		streamExecutionEnvironment.getCheckpointConfig().setCheckpointInterval(1000);
-		streamExecutionEnvironment.getCheckpointConfig().setFailOnCheckpointingErrors(failTaskOnCheckpointErrors);
-		streamExecutionEnvironment.addSource(new SourceFunction<Integer>() {
-
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-			}
-
-			@Override
-			public void cancel() {
-			}
-
-		}).addSink(new DiscardingSink<>());
-
-		StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph();
-		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
-		SerializedValue<ExecutionConfig> serializedExecutionConfig = jobGraph.getSerializedExecutionConfig();
-		ExecutionConfig executionConfig =
-			serializedExecutionConfig.deserializeValue(Thread.currentThread().getContextClassLoader());
-
-		Assert.assertEquals(failTaskOnCheckpointErrors, executionConfig.isFailTaskOnCheckpointError());
-	}
-}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
deleted file mode 100644
index 2f581622220..00000000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for the current implementations of {@link CheckpointExceptionHandler} and their factory.
- */
-public class CheckpointExceptionHandlerTest extends TestLogger {
-
-	@Test
-	public void testRethrowingHandler() {
-		DeclineDummyEnvironment environment = new DeclineDummyEnvironment();
-		CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
-		CheckpointExceptionHandler exceptionHandler =
-			checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, environment);
-
-		CheckpointMetaData failedCheckpointMetaData = new CheckpointMetaData(42L, 4711L);
-		Exception testException = new Exception("test");
-		try {
-			exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException);
-			Assert.fail("Exception not rethrown.");
-		} catch (Exception e) {
-			Assert.assertEquals(testException, e);
-		}
-
-		Assert.assertNull(environment.getLastDeclinedCheckpointCause());
-	}
-
-	@Test
-	public void testDecliningHandler() {
-		DeclineDummyEnvironment environment = new DeclineDummyEnvironment();
-		CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
-		CheckpointExceptionHandler exceptionHandler =
-			checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(false, environment);
-
-		CheckpointMetaData failedCheckpointMetaData = new CheckpointMetaData(42L, 4711L);
-		Exception testException = new Exception("test");
-		try {
-			exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException);
-		} catch (Exception e) {
-			Assert.fail("Exception not handled, but rethrown.");
-		}
-
-		Assert.assertEquals(failedCheckpointMetaData.getCheckpointId(), environment.getLastDeclinedCheckpointId());
-		Assert.assertEquals(testException, environment.getLastDeclinedCheckpointCause());
-	}
-
-	static final class DeclineDummyEnvironment extends DummyEnvironment {
-
-		private long lastDeclinedCheckpointId;
-		private Throwable lastDeclinedCheckpointCause;
-
-		DeclineDummyEnvironment() {
-			super("test", 1, 0);
-			this.lastDeclinedCheckpointId = Long.MIN_VALUE;
-			this.lastDeclinedCheckpointCause = null;
-		}
-
-		@Override
-		public void declineCheckpoint(long checkpointId, Throwable cause) {
-			this.lastDeclinedCheckpointId = checkpointId;
-			this.lastDeclinedCheckpointCause = cause;
-		}
-
-		long getLastDeclinedCheckpointId() {
-			return lastDeclinedCheckpointId;
-		}
-
-		Throwable getLastDeclinedCheckpointCause() {
-			return lastDeclinedCheckpointCause;
-		}
-	}
-}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
index bc864a2e5d1..4b860a4a768 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -111,7 +111,8 @@ public void testReportingFromSnapshotToTaskStateManager() {
 				snapshots,
 				checkpointMetaData,
 				checkpointMetrics,
-				0L);
+				0L,
+				streamMockEnvironment);
 
 		checkpointRunnable.run();
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index a94f8ac879d..67c2052cae6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -155,7 +155,6 @@
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -377,18 +376,8 @@ public void testFailingCheckpointStreamOperator() throws Exception {
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 		Whitebox.setInternalState(streamTask, "checkpointStorage", new MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));
 
-		CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
-		CheckpointExceptionHandler checkpointExceptionHandler =
-			checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, mockEnvironment);
-		Whitebox.setInternalState(streamTask, "synchronousCheckpointExceptionHandler", checkpointExceptionHandler);
-
-		StreamTask.AsyncCheckpointExceptionHandler asyncCheckpointExceptionHandler =
-			new StreamTask.AsyncCheckpointExceptionHandler(streamTask);
-		Whitebox.setInternalState(streamTask, "asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler);
-
 		try {
 			streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
-			fail("Expected test exception here.");
 		} catch (Exception e) {
 			assertEquals(testException, e.getCause());
 		}
@@ -449,20 +438,9 @@ public void testFailingAsyncCheckpointRunnable() throws Exception {
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 		Whitebox.setInternalState(streamTask, "checkpointStorage", new MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));
 
-		CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
-		CheckpointExceptionHandler checkpointExceptionHandler =
-			checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, mockEnvironment);
-		Whitebox.setInternalState(streamTask, "synchronousCheckpointExceptionHandler", checkpointExceptionHandler);
-
-		StreamTask.AsyncCheckpointExceptionHandler asyncCheckpointExceptionHandler =
-			new StreamTask.AsyncCheckpointExceptionHandler(streamTask);
-		Whitebox.setInternalState(streamTask, "asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler);
-
 		mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
 		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
 
-		verify(streamTask).handleAsyncException(anyString(), any(Throwable.class));
-
 		verify(operatorSnapshotResult1).cancel();
 		verify(operatorSnapshotResult2).cancel();
 		verify(operatorSnapshotResult3).cancel();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index cd8a4fafd9a..79426aaff02 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -91,7 +91,6 @@
 import java.util.concurrent.RunnableFuture;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -115,28 +114,13 @@ public void testDeclineOnCheckpointErrorInAsyncPart() throws Exception {
 		runTestDeclineOnCheckpointError(new AsyncFailureInducingStateBackend());
 	}
 
-	@Test
-	public void testTaskFailingOnCheckpointErrorInSyncPart() throws Exception {
-		Throwable failureCause = runTestTaskFailingOnCheckpointError(new SyncFailureInducingStateBackend());
-		assertNotNull(failureCause);
-
-		String expectedMessageStart = "Could not perform checkpoint";
-		assertEquals(expectedMessageStart, failureCause.getMessage().substring(0, expectedMessageStart.length()));
-	}
-
-	@Test
-	public void testTaskFailingOnCheckpointErrorInAsyncPart() throws Exception {
-		Throwable failureCause = runTestTaskFailingOnCheckpointError(new AsyncFailureInducingStateBackend());
-		assertEquals(AsynchronousException.class, failureCause.getClass());
-	}
-
 	@Test
 	public void testBlockingNonInterruptibleCheckpoint() throws Exception {
 
 		StateBackend lockingStateBackend = new BackendForTestStream(LockingOutputStream::new);
 
 		Task task =
-			createTask(new TestOperator(), lockingStateBackend, mock(CheckpointResponder.class), true);
+			createTask(new TestOperator(), lockingStateBackend, mock(CheckpointResponder.class));
 
 		// start the task and wait until it is in "restore"
 		task.startTaskThread();
@@ -156,7 +140,7 @@ private void runTestDeclineOnCheckpointError(AbstractStateBackend backend) throw
 		TestDeclinedCheckpointResponder checkpointResponder = new TestDeclinedCheckpointResponder();
 
 		Task task =
-			createTask(new FilterOperator(), backend, checkpointResponder, false);
+			createTask(new FilterOperator(), backend, checkpointResponder);
 
 		// start the task and wait until it is in "restore"
 		task.startTaskThread();
@@ -169,20 +153,6 @@ private void runTestDeclineOnCheckpointError(AbstractStateBackend backend) throw
 		task.getExecutingThread().join();
 	}
 
-	private Throwable runTestTaskFailingOnCheckpointError(AbstractStateBackend backend) throws Exception {
-
-		Task task =
-			createTask(new FilterOperator(), backend, mock(CheckpointResponder.class), true);
-
-		// start the task and wait until it is in "restore"
-		task.startTaskThread();
-
-		task.getExecutingThread().join();
-
-		assertEquals(ExecutionState.FAILED, task.getExecutionState());
-		return task.getFailureCause();
-	}
-
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -190,8 +160,7 @@ private Throwable runTestTaskFailingOnCheckpointError(AbstractStateBackend backe
 	private static Task createTask(
 		StreamOperator<?> op,
 		StateBackend backend,
-		CheckpointResponder checkpointResponder,
-		boolean failOnCheckpointErrors) throws IOException {
+		CheckpointResponder checkpointResponder) throws IOException {
 
 		Configuration taskConfig = new Configuration();
 		StreamConfig cfg = new StreamConfig(taskConfig);
@@ -200,7 +169,6 @@ private static Task createTask(
 		cfg.setStateBackend(backend);
 
 		ExecutionConfig executionConfig = new ExecutionConfig();
-		executionConfig.setFailTaskOnCheckpointError(failOnCheckpointErrors);
 
 		JobInformation jobInformation = new JobInformation(
 				new JobID(),
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
index 95c357dc4b6..07b2fd5fb23 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
@@ -105,6 +105,8 @@ private void setUpWithCheckpointInterval(long checkpointInterval) throws Excepti
 				10,
 				1,
 				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+				true,
+				0,
 				true),
 			null));
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 7eebde86028..bb40188f690 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -188,6 +188,8 @@ private JobGraph createJobGraph(ExecutionMode mode) {
 		env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
 		env.setRestartStrategy(RestartStrategies.noRestart());
 		env.setStateBackend((StateBackend) new MemoryStateBackend());
+		env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
+		env.getCheckpointConfig().setTolerableFailureNumber(Integer.MAX_VALUE);
 
 		switch (mode) {
 			case MIGRATE:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services