You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/11/20 12:19:16 UTC

flink git commit: [FLINK-4809] [checkpoints] Operators should tolerate checkpoint failures.

Repository: flink
Updated Branches:
  refs/heads/master 0a22acef4 -> 7c63526ad


[FLINK-4809] [checkpoints] Operators should tolerate checkpoint failures.

This closes #4883.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c63526a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c63526a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c63526a

Branch: refs/heads/master
Commit: 7c63526ad6f27c6f15625b8b6c48359d9532890b
Parents: 0a22ace
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Oct 20 16:59:45 2017 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Nov 20 13:18:55 2017 +0100

----------------------------------------------------------------------
 docs/dev/stream/state/checkpointing.md          |   5 +
 .../flink/api/common/ExecutionConfig.java       |  23 +
 .../operators/testutils/DummyEnvironment.java   |   3 +-
 .../api/environment/CheckpointConfig.java       |  20 +
 .../api/graph/StreamingJobGraphGenerator.java   |  10 +-
 .../tasks/CheckpointExceptionHandler.java       |  36 ++
 .../CheckpointExceptionHandlerFactory.java      |  79 +++
 .../streaming/runtime/tasks/StreamTask.java     | 135 ++++--
 .../runtime/tasks/BlockingCheckpointsTest.java  | 314 ------------
 ...kpointExceptionHandlerConfigurationTest.java | 146 ++++++
 .../tasks/CheckpointExceptionHandlerTest.java   |  96 ++++
 .../streaming/runtime/tasks/StreamTaskTest.java |  19 +
 .../tasks/TaskCheckpointingBehaviourTest.java   | 481 +++++++++++++++++++
 13 files changed, 1003 insertions(+), 364 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/docs/dev/stream/state/checkpointing.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/checkpointing.md b/docs/dev/stream/state/checkpointing.md
index 2a2edc9..4994ac7 100644
--- a/docs/dev/stream/state/checkpointing.md
+++ b/docs/dev/stream/state/checkpointing.md
@@ -74,6 +74,8 @@ Other parameters for checkpointing include:
 
   - *externalized checkpoints*: You can configure periodic checkpoints to be persisted externally. Externalized checkpoints write their meta data out to persistent storage and are *not* automatically cleaned up when the job fails. This way, you will have a checkpoint around to resume from if your job fails. There are more details in the [deployment notes on externalized checkpoints]({{ site.baseurl }}/ops/state/checkpoints.html#externalized-checkpoints).
 
+  - *fail/continue task on checkpoint errors*: This determines if a task will be failed if an error occurs in the execution of the task's checkpoint procedure. This is the default behaviour. Alternatively, when this is disabled, the task will simply decline the checkpoint to the checkpoint coordinator and continue running.
+
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -118,6 +120,9 @@ env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
 // checkpoints have to complete within one minute, or are discarded
 env.getCheckpointConfig.setCheckpointTimeout(60000)
 
+// prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined.
+env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
+
 // allow only one checkpoint to be in progress at the same time
 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
 {% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 88d524e..9f39c46 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -150,6 +150,9 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	/** This flag defines if we use compression for the state snapshot data or not. Default: false */
 	private boolean useSnapshotCompression = false;
 
+	/** Determines if a task fails or not if there is an error in writing its checkpoint data. Default: true */
+	private boolean failTaskOnCheckpointError = true;
+
 	// ------------------------------- User code values --------------------------------------------
 
 	private GlobalJobParameters globalJobParameters;
@@ -860,6 +863,26 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 		this.useSnapshotCompression = useSnapshotCompression;
 	}
 
+	/**
+	 * This method is visible because of the way the configuration is currently forwarded from the checkpoint config to
+	 * the task. This should not be called by the user, please use CheckpointConfig.isFailTaskOnCheckpointError()
+	 * instead.
+	 */
+	@Internal
+	public boolean isFailTaskOnCheckpointError() {
+		return failTaskOnCheckpointError;
+	}
+
+	/**
+	 * This method is visible because of the way the configuration is currently forwarded from the checkpoint config to
+	 * the task. This should not be called by the user, please use CheckpointConfig.setFailOnCheckpointingErrors(...)
+	 * instead.
+	 */
+	@Internal
+	public void setFailTaskOnCheckpointError(boolean failTaskOnCheckpointError) {
+		this.failTaskOnCheckpointError = failTaskOnCheckpointError;
+	}
+
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof ExecutionConfig) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 8ed06b2..0125a5e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -53,6 +53,7 @@ public class DummyEnvironment implements Environment {
 	private final ExecutionConfig executionConfig = new ExecutionConfig();
 	private final TaskInfo taskInfo;
 	private KvStateRegistry kvStateRegistry = new KvStateRegistry();
+	private final AccumulatorRegistry accumulatorRegistry = new AccumulatorRegistry(jobId, executionId);
 
 	public DummyEnvironment(String taskName, int numSubTasks, int subTaskIndex) {
 		this.taskInfo = new TaskInfo(taskName, numSubTasks, subTaskIndex, numSubTasks, 0);
@@ -143,7 +144,7 @@ public class DummyEnvironment implements Environment {
 
 	@Override
 	public AccumulatorRegistry getAccumulatorRegistry() {
-		return null;
+		return accumulatorRegistry;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index e1b566e..342d4a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -70,6 +70,9 @@ public class CheckpointConfig implements java.io.Serializable {
 	/** Cleanup behaviour for persistent checkpoints. */
 	private ExternalizedCheckpointCleanup externalizedCheckpointCleanup;
 
+	/** Determines if a tasks are failed or not if there is an error in their checkpointing. Default: true */
+	private boolean failOnCheckpointingErrors = true;
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -231,6 +234,23 @@ public class CheckpointConfig implements java.io.Serializable {
 	}
 
 	/**
+	 * This determines the behaviour of tasks if there is an error in their local checkpointing. If this returns true,
+	 * tasks will fail as a reaction. If this returns false, task will only decline the failed checkpoint.
+	 */
+	public boolean isFailOnCheckpointingErrors() {
+		return failOnCheckpointingErrors;
+	}
+
+	/**
+	 * Sets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure.
+	 * If this is set to true, the task will fail on checkpointing error. If this is set to false, the task will only
+	 * decline a the checkpoint and continue running. The default is true.
+	 */
+	public void setFailOnCheckpointingErrors(boolean failOnCheckpointingErrors) {
+		this.failOnCheckpointingErrors = failOnCheckpointingErrors;
+	}
+
+	/**
 	 * Enables checkpoints to be persisted externally.
 	 *
 	 * <p>Externalized checkpoints write their meta data out to persistent

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 0364223..fce9dc9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.ResourceSpec;
@@ -574,10 +575,15 @@ public class StreamingJobGraphGenerator {
 
 		long interval = cfg.getCheckpointInterval();
 		if (interval > 0) {
+
+			ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
+			// propagate the expected behaviour for checkpoint errors to task.
+			executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
+
 			// check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy
-			if (streamGraph.getExecutionConfig().getRestartStrategy() == null) {
+			if (executionConfig.getRestartStrategy() == null) {
 				// if the user enabled checkpointing, the default number of exec retries is infinite.
-				streamGraph.getExecutionConfig().setRestartStrategy(
+				executionConfig.setRestartStrategy(
 					RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
 			}
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandler.java
new file mode 100644
index 0000000..0140795
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+
+/**
+ * Handler for exceptions that happen on checkpointing. The handler can reject and rethrow the offered exceptions.
+ */
+public interface CheckpointExceptionHandler {
+
+	/**
+	 * Offers the exception for handling. If the exception cannot be handled from this instance, it is rethrown.
+	 *
+	 * @param checkpointMetaData metadata for the checkpoint for which the exception occurred.
+	 * @param exception  the exception to handle.
+	 * @throws Exception rethrows the exception if it cannot be handled.
+	 */
+	void tryHandleCheckpointException(CheckpointMetaData checkpointMetaData, Exception exception) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
new file mode 100644
index 0000000..430f43e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This factory produces {@link CheckpointExceptionHandler} instances that handle exceptions during checkpointing in a
+ * {@link StreamTask}.
+ */
+public class CheckpointExceptionHandlerFactory {
+
+	/**
+	 * Returns a {@link CheckpointExceptionHandler} that either causes a task to fail completely or to just declines
+	 * checkpoint on exception, depending on the parameter flag.
+	 */
+	public CheckpointExceptionHandler createCheckpointExceptionHandler(
+		boolean failTaskOnCheckpointException,
+		Environment environment) {
+
+		if (failTaskOnCheckpointException) {
+			return new FailingCheckpointExceptionHandler();
+		} else {
+			return new DecliningCheckpointExceptionHandler(environment);
+		}
+	}
+
+	/**
+	 * This handler makes the task fail by rethrowing a reported exception.
+	 */
+	static final class FailingCheckpointExceptionHandler implements CheckpointExceptionHandler {
+
+		@Override
+		public void tryHandleCheckpointException(
+			CheckpointMetaData checkpointMetaData,
+			Exception exception) throws Exception {
+
+			throw exception;
+		}
+	}
+
+	/**
+	 * This handler makes the task decline the checkpoint as reaction to the reported exception. The task is not failed.
+	 */
+	static final class DecliningCheckpointExceptionHandler implements CheckpointExceptionHandler {
+
+		final Environment environment;
+
+		DecliningCheckpointExceptionHandler(Environment environment) {
+			this.environment = Preconditions.checkNotNull(environment);
+		}
+
+		@Override
+		public void tryHandleCheckpointException(
+			CheckpointMetaData checkpointMetaData,
+			Exception exception) throws Exception {
+
+			environment.declineCheckpoint(checkpointMetaData.getCheckpointId(), exception);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 68f590e..36e6748 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -172,6 +172,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	/** Thread pool for async snapshot workers. */
 	private ExecutorService asyncOperationsThreadPool;
 
+	/** Handler for exceptions during checkpointing in the stream task. Used in synchronous part of the checkpoint. */
+	private CheckpointExceptionHandler synchronousCheckpointExceptionHandler;
+
+	/** Wrapper for synchronousCheckpointExceptionHandler to deal with rethrown exceptions. Used in the async part. */
+	private AsyncCheckpointExceptionHandler asynchronousCheckpointExceptionHandler;
+
 	// ------------------------------------------------------------------------
 	//  Life cycle methods for specific implementations
 	// ------------------------------------------------------------------------
@@ -215,6 +221,14 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 			configuration = new StreamConfig(getTaskConfiguration());
 
+			CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
+
+			synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler(
+				getExecutionConfig().isFailTaskOnCheckpointError(),
+				getEnvironment());
+
+			asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);
+
 			stateBackend = createStateBackend();
 
 			accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
@@ -784,6 +798,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			targetLocation);
 	}
 
+	protected CheckpointExceptionHandlerFactory createCheckpointExceptionHandlerFactory() {
+		return new CheckpointExceptionHandlerFactory();
+	}
+
 	private String createOperatorIdentifier(StreamOperator<?> operator, int vertexId) {
 
 		TaskInfo taskInfo = getEnvironment().getTaskInfo();
@@ -850,11 +868,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			CheckpointingOperation.AsynCheckpointState.RUNNING);
 
 		AsyncCheckpointRunnable(
-				StreamTask<?, ?> owner,
-				Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress,
-				CheckpointMetaData checkpointMetaData,
-				CheckpointMetrics checkpointMetrics,
-				long asyncStartNanos) {
+			StreamTask<?, ?> owner,
+			Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress,
+			CheckpointMetaData checkpointMetaData,
+			CheckpointMetrics checkpointMetrics,
+			long asyncStartNanos) {
 
 			this.owner = Preconditions.checkNotNull(owner);
 			this.operatorSnapshotsInProgress = Preconditions.checkNotNull(operatorSnapshotsInProgress);
@@ -929,14 +947,14 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 					e.addSuppressed(cleanupException);
 				}
 
-				// registers the exception and tries to fail the whole task
-				AsynchronousException asyncException = new AsynchronousException(
-					new Exception(
-						"Could not materialize checkpoint " + checkpointMetaData.getCheckpointId() +
-							" for operator " + owner.getName() + '.',
-						e));
+				Exception checkpointException = new Exception(
+					"Could not materialize checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " +
+						owner.getName() + '.',
+					e);
 
-				owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
+				owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
+					checkpointMetaData,
+					checkpointException);
 			} finally {
 				owner.cancelables.unregisterCloseable(this);
 				FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
@@ -1020,7 +1038,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		public void executeCheckpointing() throws Exception {
 			startSyncPartNano = System.nanoTime();
 
-			boolean failed = true;
 			try {
 				for (StreamOperator<?> op : allOperators) {
 					checkpointStreamOperator(op);
@@ -1028,16 +1045,23 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
-							checkpointMetaData.getCheckpointId(), owner.getName());
+						checkpointMetaData.getCheckpointId(), owner.getName());
 				}
 
 				startAsyncPartNano = System.nanoTime();
 
 				checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
 
-				// at this point we are transferring ownership over snapshotInProgressList for cleanup to the thread
-				runAsyncCheckpointingAndAcknowledge();
-				failed = false;
+				// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
+				AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
+					owner,
+					operatorSnapshotsInProgress,
+					checkpointMetaData,
+					checkpointMetrics,
+					startAsyncPartNano);
+
+				owner.cancelables.registerCloseable(asyncCheckpointRunnable);
+				owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
 
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("{} - finished synchronous part of checkpoint {}." +
@@ -1046,27 +1070,27 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 						checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
 						checkpointMetrics.getSyncDurationMillis());
 				}
-			} finally {
-				if (failed) {
-					// Cleanup to release resources
-					for (OperatorSnapshotResult operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
-						if (null != operatorSnapshotResult) {
-							try {
-								operatorSnapshotResult.cancel();
-							} catch (Exception e) {
-								LOG.warn("Could not properly cancel an operator snapshot result.", e);
-							}
+			} catch (Exception ex) {
+				// Cleanup to release resources
+				for (OperatorSnapshotResult operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
+					if (null != operatorSnapshotResult) {
+						try {
+							operatorSnapshotResult.cancel();
+						} catch (Exception e) {
+							LOG.warn("Could not properly cancel an operator snapshot result.", e);
 						}
 					}
+				}
 
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." +
-								"Alignment duration: {} ms, snapshot duration {} ms",
-							owner.getName(), checkpointMetaData.getCheckpointId(),
-							checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
-							checkpointMetrics.getSyncDurationMillis());
-					}
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." +
+							"Alignment duration: {} ms, snapshot duration {} ms",
+						owner.getName(), checkpointMetaData.getCheckpointId(),
+						checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
+						checkpointMetrics.getSyncDurationMillis());
 				}
+
+				owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex);
 			}
 		}
 
@@ -1082,23 +1106,40 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			}
 		}
 
-		public void runAsyncCheckpointingAndAcknowledge() throws IOException {
-
-			AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
-					owner,
-					operatorSnapshotsInProgress,
-					checkpointMetaData,
-					checkpointMetrics,
-					startAsyncPartNano);
-
-			owner.cancelables.registerCloseable(asyncCheckpointRunnable);
-			owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
-		}
-
 		private enum AsynCheckpointState {
 			RUNNING,
 			DISCARDED,
 			COMPLETED
 		}
 	}
+
+	/**
+	 * Wrapper for synchronous {@link CheckpointExceptionHandler}. This implementation catches unhandled, rethrown
+	 * exceptions and reports them through {@link #handleAsyncException(String, Throwable)}. As this implementation
+	 * always handles the exception in some way, it never rethrows.
+	 */
+	static final class AsyncCheckpointExceptionHandler implements CheckpointExceptionHandler {
+
+		/** Owning stream task to which we report async exceptions. */
+		final StreamTask<?, ?> owner;
+
+		/** Synchronous exception handler to which we delegate. */
+		final CheckpointExceptionHandler synchronousCheckpointExceptionHandler;
+
+		AsyncCheckpointExceptionHandler(StreamTask<?, ?> owner) {
+			this.owner = Preconditions.checkNotNull(owner);
+			this.synchronousCheckpointExceptionHandler =
+				Preconditions.checkNotNull(owner.synchronousCheckpointExceptionHandler);
+		}
+
+		@Override
+		public void tryHandleCheckpointException(CheckpointMetaData checkpointMetaData, Exception exception) {
+			try {
+				synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, exception);
+			} catch (Exception unhandled) {
+				AsynchronousException asyncException = new AsynchronousException(unhandled);
+				owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
deleted file mode 100644
index 81b3130..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.blob.BlobCacheService;
-import org.apache.flink.runtime.blob.PermanentBlobCache;
-import org.apache.flink.runtime.blob.TransientBlobCache;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
-import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.JobInformation;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.OperatorStateBackend;
-import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.taskmanager.CheckpointResponder;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.taskmanager.TaskManagerActions;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamFilter;
-import org.apache.flink.util.SerializedValue;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * This test checks that task checkpoints that block and do not react to thread interrupts.
- */
-public class BlockingCheckpointsTest {
-
-	private static final OneShotLatch IN_CHECKPOINT_LATCH = new OneShotLatch();
-
-	@Test
-	public void testBlockingNonInterruptibleCheckpoint() throws Exception {
-
-		Configuration taskConfig = new Configuration();
-		StreamConfig cfg = new StreamConfig(taskConfig);
-		cfg.setStreamOperator(new TestOperator());
-		cfg.setOperatorID(new OperatorID());
-		cfg.setStateBackend(new LockingStreamStateBackend());
-
-		Task task = createTask(taskConfig);
-
-		// start the task and wait until it is in "restore"
-		task.startTaskThread();
-		IN_CHECKPOINT_LATCH.await();
-
-		// cancel the task and wait. unless cancellation properly closes
-		// the streams, this will never terminate
-		task.cancelExecution();
-		task.getExecutingThread().join();
-
-		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
-		assertNull(task.getFailureCause());
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private static Task createTask(Configuration taskConfig) throws IOException {
-
-		JobInformation jobInformation = new JobInformation(
-				new JobID(),
-				"test job name",
-				new SerializedValue<>(new ExecutionConfig()),
-				new Configuration(),
-				Collections.emptyList(),
-				Collections.emptyList());
-
-		TaskInformation taskInformation = new TaskInformation(
-				new JobVertexID(),
-				"test task name",
-				1,
-				11,
-				TestStreamTask.class.getName(),
-				taskConfig);
-
-		TaskKvStateRegistry mockKvRegistry = mock(TaskKvStateRegistry.class);
-		NetworkEnvironment network = mock(NetworkEnvironment.class);
-		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))).thenReturn(mockKvRegistry);
-
-		BlobCacheService blobService =
-			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
-
-		return new Task(
-				jobInformation,
-				taskInformation,
-				new ExecutionAttemptID(),
-				new AllocationID(),
-				0,
-				0,
-				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-				Collections.<InputGateDeploymentDescriptor>emptyList(),
-				0,
-				null,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				network,
-				mock(BroadcastVariableManager.class),
-				mock(TaskManagerActions.class),
-				mock(InputSplitProvider.class),
-				mock(CheckpointResponder.class),
-				blobService,
-				new BlobLibraryCacheManager(
-					blobService.getPermanentBlobService(),
-					FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
-					new String[0]),
-				new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
-				new TestingTaskManagerRuntimeInfo(),
-				new UnregisteredTaskMetricsGroup(),
-				mock(ResultPartitionConsumableNotifier.class),
-				mock(PartitionProducerStateChecker.class),
-				Executors.directExecutor());
-	}
-
-	// ------------------------------------------------------------------------
-	//  state backend with locking output stream
-	// ------------------------------------------------------------------------
-
-	private static class LockingStreamStateBackend extends AbstractStateBackend {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
-			return new LockingOutputStreamFactory();
-		}
-
-		@Override
-		public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId, String operatorIdentifier, String targetLocation) throws IOException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
-				Environment env, JobID jobID, String operatorIdentifier,
-				TypeSerializer<K> keySerializer, int numberOfKeyGroups,
-				KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) {
-
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
-			return new DefaultOperatorStateBackend(
-				getClass().getClassLoader(),
-				new ExecutionConfig(),
-				true);
-		}
-	}
-
-	private static final class LockingOutputStreamFactory implements CheckpointStreamFactory {
-
-		@Override
-		public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) {
-			return new LockingOutputStream();
-		}
-
-		@Override
-		public void close() {}
-	}
-
-	private static final class LockingOutputStream extends CheckpointStateOutputStream {
-
-		private final Object lock = new Object();
-		private volatile boolean closed;
-
-		@Override
-		public StreamStateHandle closeAndGetHandle() throws IOException {
-			return null;
-		}
-
-		@Override
-		public void write(int b) throws IOException {
-			// this needs to not react to interrupts until the handle is closed
-			synchronized (lock) {
-				while (!closed) {
-					try {
-						lock.wait();
-					}
-					catch (InterruptedException ignored) {}
-				}
-			}
-		}
-
-		@Override
-		public void close() throws IOException {
-			synchronized (lock) {
-				closed = true;
-				lock.notifyAll();
-			}
-		}
-
-		@Override
-		public long getPos() {
-			return 0;
-		}
-
-		@Override
-		public void flush() {}
-
-		@Override
-		public void sync() {}
-	}
-
-	// ------------------------------------------------------------------------
-	//  test source operator that calls into the locking checkpoint output stream
-	// ------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static final class TestOperator extends StreamFilter<Object> {
-		private static final long serialVersionUID = 1L;
-
-		public TestOperator() {
-			super(new FilterFunction<Object>() {
-				@Override
-				public boolean filter(Object value) {
-					return false;
-				}
-			});
-		}
-
-		@Override
-		public void snapshotState(StateSnapshotContext context) throws Exception {
-			OperatorStateCheckpointOutputStream outStream = context.getRawOperatorStateOutput();
-
-			IN_CHECKPOINT_LATCH.trigger();
-
-			// this should lock
-			outStream.write(1);
-		}
-	}
-
-	/**
-	 * Stream task that simply triggers a checkpoint.
-	 */
-	public static final class TestStreamTask extends OneInputStreamTask<Object, Object> {
-
-		@Override
-		public void init() {}
-
-		@Override
-		protected void run() throws Exception {
-			triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), CheckpointOptions.forCheckpoint(), new CheckpointMetrics());
-		}
-
-		@Override
-		protected void cleanup() {}
-
-		@Override
-		protected void cancelTask() {}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
new file mode 100644
index 0000000..69f6935
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test that the configuration mechanism for how tasks react on checkpoint errors works correctly.
+ */
+public class CheckpointExceptionHandlerConfigurationTest extends TestLogger {
+
+	@Test
+	public void testConfigurationFailOnException() throws Exception {
+		testConfigForwarding(true);
+	}
+
+	@Test
+	public void testConfigurationDeclineOnException() throws Exception {
+		testConfigForwarding(false);
+	}
+
+	@Test
+	public void testFailIsDefaultConfig() {
+		ExecutionConfig newExecutionConfig = new ExecutionConfig();
+		Assert.assertTrue(newExecutionConfig.isFailTaskOnCheckpointError());
+	}
+
+	private void testConfigForwarding(boolean failOnException) throws Exception {
+
+		final boolean expectedHandlerFlag = failOnException;
+		DummyEnvironment environment = new DummyEnvironment("test", 1, 0);
+		environment.getExecutionConfig().setFailTaskOnCheckpointError(expectedHandlerFlag);
+		final CheckpointExceptionHandlerFactory inspectingFactory = new CheckpointExceptionHandlerFactory() {
+
+			@Override
+			public CheckpointExceptionHandler createCheckpointExceptionHandler(
+				boolean failTaskOnCheckpointException,
+				Environment environment) {
+
+				Assert.assertEquals(expectedHandlerFlag, failTaskOnCheckpointException);
+				return super.createCheckpointExceptionHandler(failTaskOnCheckpointException, environment);
+			}
+		};
+
+		StreamTask streamTask = new StreamTask() {
+			@Override
+			protected void init() throws Exception {
+
+			}
+
+			@Override
+			protected void run() throws Exception {
+
+			}
+
+			@Override
+			protected void cleanup() throws Exception {
+
+			}
+
+			@Override
+			protected void cancelTask() throws Exception {
+
+			}
+
+			@Override
+			protected CheckpointExceptionHandlerFactory createCheckpointExceptionHandlerFactory() {
+				return inspectingFactory;
+			}
+		};
+
+		streamTask.setEnvironment(environment);
+		streamTask.invoke();
+	}
+
+	@Test
+	public void testCheckpointConfigDefault() throws Exception {
+		StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
+		Assert.assertTrue(streamExecutionEnvironment.getCheckpointConfig().isFailOnCheckpointingErrors());
+	}
+
+	@Test
+	public void testPropagationFailFromCheckpointConfig() throws Exception {
+		doTestPropagationFromCheckpointConfig(true);
+	}
+
+	@Test
+	public void testPropagationDeclineFromCheckpointConfig() throws Exception {
+		doTestPropagationFromCheckpointConfig(false);
+	}
+
+	public void doTestPropagationFromCheckpointConfig(boolean failTaskOnCheckpointErrors) throws Exception {
+		StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
+		streamExecutionEnvironment.setParallelism(1);
+		streamExecutionEnvironment.getCheckpointConfig().setCheckpointInterval(1000);
+		streamExecutionEnvironment.getCheckpointConfig().setFailOnCheckpointingErrors(failTaskOnCheckpointErrors);
+		streamExecutionEnvironment.addSource(new SourceFunction<Integer>() {
+
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+			}
+
+			@Override
+			public void cancel() {
+			}
+
+		}).addSink(new DiscardingSink<>());
+
+		StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph();
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+		SerializedValue<ExecutionConfig> serializedExecutionConfig = jobGraph.getSerializedExecutionConfig();
+		ExecutionConfig executionConfig =
+			serializedExecutionConfig.deserializeValue(Thread.currentThread().getContextClassLoader());
+
+		Assert.assertEquals(failTaskOnCheckpointErrors, executionConfig.isFailTaskOnCheckpointError());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
new file mode 100644
index 0000000..2f58162
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for the current implementations of {@link CheckpointExceptionHandler} and their factory.
+ */
+public class CheckpointExceptionHandlerTest extends TestLogger {
+
+	@Test
+	public void testRethrowingHandler() {
+		DeclineDummyEnvironment environment = new DeclineDummyEnvironment();
+		CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+		CheckpointExceptionHandler exceptionHandler =
+			checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, environment);
+
+		CheckpointMetaData failedCheckpointMetaData = new CheckpointMetaData(42L, 4711L);
+		Exception testException = new Exception("test");
+		try {
+			exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException);
+			Assert.fail("Exception not rethrown.");
+		} catch (Exception e) {
+			Assert.assertEquals(testException, e);
+		}
+
+		Assert.assertNull(environment.getLastDeclinedCheckpointCause());
+	}
+
+	@Test
+	public void testDecliningHandler() {
+		DeclineDummyEnvironment environment = new DeclineDummyEnvironment();
+		CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+		CheckpointExceptionHandler exceptionHandler =
+			checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(false, environment);
+
+		CheckpointMetaData failedCheckpointMetaData = new CheckpointMetaData(42L, 4711L);
+		Exception testException = new Exception("test");
+		try {
+			exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException);
+		} catch (Exception e) {
+			Assert.fail("Exception not handled, but rethrown.");
+		}
+
+		Assert.assertEquals(failedCheckpointMetaData.getCheckpointId(), environment.getLastDeclinedCheckpointId());
+		Assert.assertEquals(testException, environment.getLastDeclinedCheckpointCause());
+	}
+
+	static final class DeclineDummyEnvironment extends DummyEnvironment {
+
+		private long lastDeclinedCheckpointId;
+		private Throwable lastDeclinedCheckpointCause;
+
+		DeclineDummyEnvironment() {
+			super("test", 1, 0);
+			this.lastDeclinedCheckpointId = Long.MIN_VALUE;
+			this.lastDeclinedCheckpointCause = null;
+		}
+
+		@Override
+		public void declineCheckpoint(long checkpointId, Throwable cause) {
+			this.lastDeclinedCheckpointId = checkpointId;
+			this.lastDeclinedCheckpointCause = cause;
+		}
+
+		long getLastDeclinedCheckpointId() {
+			return lastDeclinedCheckpointId;
+		}
+
+		Throwable getLastDeclinedCheckpointCause() {
+			return lastDeclinedCheckpointCause;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 4b1eb3f..b31fb41 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -344,10 +344,20 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 
+		CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+		CheckpointExceptionHandler checkpointExceptionHandler =
+			checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, mockEnvironment);
+		Whitebox.setInternalState(streamTask, "synchronousCheckpointExceptionHandler", checkpointExceptionHandler);
+
+		StreamTask.AsyncCheckpointExceptionHandler asyncCheckpointExceptionHandler =
+			new StreamTask.AsyncCheckpointExceptionHandler(streamTask);
+		Whitebox.setInternalState(streamTask, "asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler);
+
 		try {
 			streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint());
 			fail("Expected test exception here.");
 		} catch (Exception e) {
+			e.printStackTrace();
 			assertEquals(testException, e.getCause());
 		}
 
@@ -412,6 +422,15 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", new DirectExecutorService());
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 
+		CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+		CheckpointExceptionHandler checkpointExceptionHandler =
+			checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, mockEnvironment);
+		Whitebox.setInternalState(streamTask, "synchronousCheckpointExceptionHandler", checkpointExceptionHandler);
+
+		StreamTask.AsyncCheckpointExceptionHandler asyncCheckpointExceptionHandler =
+			new StreamTask.AsyncCheckpointExceptionHandler(streamTask);
+		Whitebox.setInternalState(streamTask, "asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler);
+
 		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint());
 
 		verify(streamTask).handleAsyncException(anyString(), any(Throwable.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/7c63526a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
new file mode 100644
index 0000000..d755c56
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.TransientBlobCache;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamFilter;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This test checks that task checkpoints that block and do not react to thread interrupts. It also checks correct
+ * working of different policies how tasks deal with checkpoint failures (fail task, decline checkpoint and continue).
+ */
+public class TaskCheckpointingBehaviourTest extends TestLogger {
+
+	private static final OneShotLatch IN_CHECKPOINT_LATCH = new OneShotLatch();
+
+	@Test
+	public void testDeclineOnCheckpointErrorInSyncPart() throws Exception {
+		runTestDeclineOnCheckpointError(new SyncFailureInducingStateBackend());
+	}
+
+	@Test
+	public void testDeclineOnCheckpointErrorInAsyncPart() throws Exception {
+		runTestDeclineOnCheckpointError(new AsyncFailureInducingStateBackend());
+	}
+
+	@Test
+	public void testTaskFailingOnCheckpointErrorInSyncPart() throws Exception {
+		Throwable failureCause = runTestTaskFailingOnCheckpointError(new SyncFailureInducingStateBackend());
+		assertNotNull(failureCause);
+
+		String expectedMessageStart = "Could not perform checkpoint";
+		assertEquals(expectedMessageStart, failureCause.getMessage().substring(0, expectedMessageStart.length()));
+	}
+
+	@Test
+	public void testTaskFailingOnCheckpointErrorInAsyncPart() throws Exception {
+		Throwable failureCause = runTestTaskFailingOnCheckpointError(new AsyncFailureInducingStateBackend());
+		assertEquals(AsynchronousException.class, failureCause.getClass());
+	}
+
+	@Test
+	public void testBlockingNonInterruptibleCheckpoint() throws Exception {
+
+		Task task =
+			createTask(new TestOperator(), new LockingStreamStateBackend(), mock(CheckpointResponder.class), true);
+
+		// start the task and wait until it is in "restore"
+		task.startTaskThread();
+		IN_CHECKPOINT_LATCH.await();
+
+		// cancel the task and wait. unless cancellation properly closes
+		// the streams, this will never terminate
+		task.cancelExecution();
+		task.getExecutingThread().join();
+
+		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+		assertNull(task.getFailureCause());
+	}
+
+	private void runTestDeclineOnCheckpointError(AbstractStateBackend backend) throws Exception{
+
+		TestDeclinedCheckpointResponder checkpointResponder = new TestDeclinedCheckpointResponder();
+
+		Task task =
+			createTask(new FilterOperator(), backend, checkpointResponder, false);
+
+		// start the task and wait until it is in "restore"
+		task.startTaskThread();
+
+		checkpointResponder.declinedLatch.await();
+
+		Assert.assertEquals(ExecutionState.RUNNING, task.getExecutionState());
+
+		task.cancelExecution();
+		task.getExecutingThread().join();
+	}
+
+	private Throwable runTestTaskFailingOnCheckpointError(AbstractStateBackend backend) throws Exception {
+
+		Task task =
+			createTask(new FilterOperator(), backend, mock(CheckpointResponder.class), true);
+
+		// start the task and wait until it is in "restore"
+		task.startTaskThread();
+
+		task.getExecutingThread().join();
+
+		assertEquals(ExecutionState.FAILED, task.getExecutionState());
+		return task.getFailureCause();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static Task createTask(
+		StreamOperator<?> op,
+		AbstractStateBackend backend,
+		CheckpointResponder checkpointResponder,
+		boolean failOnCheckpointErrors) throws IOException {
+
+		Configuration taskConfig = new Configuration();
+		StreamConfig cfg = new StreamConfig(taskConfig);
+		cfg.setStreamOperator(op);
+		cfg.setOperatorID(new OperatorID());
+		cfg.setStateBackend(backend);
+
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setFailTaskOnCheckpointError(failOnCheckpointErrors);
+
+		JobInformation jobInformation = new JobInformation(
+				new JobID(),
+				"test job name",
+				new SerializedValue<>(executionConfig),
+				new Configuration(),
+				Collections.emptyList(),
+				Collections.emptyList());
+
+		TaskInformation taskInformation = new TaskInformation(
+				new JobVertexID(),
+				"test task name",
+				1,
+				11,
+				TestStreamTask.class.getName(),
+				taskConfig);
+
+		TaskKvStateRegistry mockKvRegistry = mock(TaskKvStateRegistry.class);
+		NetworkEnvironment network = mock(NetworkEnvironment.class);
+		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))).thenReturn(mockKvRegistry);
+
+		BlobCacheService blobService =
+			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
+
+		return new Task(
+				jobInformation,
+				taskInformation,
+				new ExecutionAttemptID(),
+				new AllocationID(),
+				0,
+				0,
+				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+				Collections.<InputGateDeploymentDescriptor>emptyList(),
+				0,
+				null,
+				mock(MemoryManager.class),
+				mock(IOManager.class),
+				network,
+				mock(BroadcastVariableManager.class),
+				mock(TaskManagerActions.class),
+				mock(InputSplitProvider.class),
+				checkpointResponder,
+				blobService,
+				new BlobLibraryCacheManager(
+					blobService.getPermanentBlobService(),
+					FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+					new String[0]),
+				new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
+				new TestingTaskManagerRuntimeInfo(),
+				new UnregisteredTaskMetricsGroup(),
+				mock(ResultPartitionConsumableNotifier.class),
+				mock(PartitionProducerStateChecker.class),
+				Executors.directExecutor());
+	}
+
+	// ------------------------------------------------------------------------
+	//  checkpoint responder that records a call to decline.
+	// ------------------------------------------------------------------------
+	private static class TestDeclinedCheckpointResponder implements CheckpointResponder {
+
+		OneShotLatch declinedLatch = new OneShotLatch();
+
+		@Override
+		public void acknowledgeCheckpoint(
+			JobID jobID,
+			ExecutionAttemptID executionAttemptID,
+			long checkpointId,
+			CheckpointMetrics checkpointMetrics,
+			TaskStateSnapshot subtaskState) {
+
+			throw new RuntimeException("Unexpected call.");
+		}
+
+		@Override
+		public void declineCheckpoint(
+			JobID jobID,
+			ExecutionAttemptID executionAttemptID,
+			long checkpointId,
+			Throwable cause) {
+
+			declinedLatch.trigger();
+		}
+
+		public OneShotLatch getDeclinedLatch() {
+			return declinedLatch;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  state backends that trigger errors in checkpointing.
+	// ------------------------------------------------------------------------
+
+	private static class SyncFailureInducingStateBackend extends MemoryStateBackend {
+
+		@Override
+		public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
+			return new DefaultOperatorStateBackend(
+				env.getUserClassLoader(),
+				env.getExecutionConfig(),
+				true) {
+				@Override
+				public RunnableFuture<OperatorStateHandle> snapshot(
+					long checkpointId,
+					long timestamp,
+					CheckpointStreamFactory streamFactory,
+					CheckpointOptions checkpointOptions) throws Exception {
+
+					throw new Exception("Sync part snapshot exception.");
+				}
+			};
+		}
+	}
+
+	private static class AsyncFailureInducingStateBackend extends MemoryStateBackend {
+
+		@Override
+		public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
+			return new DefaultOperatorStateBackend(
+				env.getUserClassLoader(),
+				env.getExecutionConfig(),
+				true) {
+				@Override
+				public RunnableFuture<OperatorStateHandle> snapshot(
+					long checkpointId,
+					long timestamp,
+					CheckpointStreamFactory streamFactory,
+					CheckpointOptions checkpointOptions) throws Exception {
+
+					return new FutureTask<>(new Callable<OperatorStateHandle>() {
+						@Override
+						public OperatorStateHandle call() throws Exception {
+							throw new Exception("Async part snapshot exception.");
+						}
+					});
+				}
+			};
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  state backend with locking output stream.
+	// ------------------------------------------------------------------------
+
+	private static class LockingStreamStateBackend extends MemoryStateBackend {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
+			return new LockingOutputStreamFactory();
+		}
+
+		@Override
+		public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
+			return new DefaultOperatorStateBackend(
+				getClass().getClassLoader(),
+				new ExecutionConfig(),
+				true);
+		}
+	}
+
+	private static final class LockingOutputStreamFactory implements CheckpointStreamFactory {
+
+		@Override
+		public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) {
+			return new LockingOutputStream();
+		}
+
+		@Override
+		public void close() {}
+	}
+
+	private static final class LockingOutputStream extends CheckpointStateOutputStream {
+
+		private final Object lock = new Object();
+		private volatile boolean closed;
+
+		@Override
+		public StreamStateHandle closeAndGetHandle() throws IOException {
+			return null;
+		}
+
+		@Override
+		public void write(int b) throws IOException {
+			// this needs to not react to interrupts until the handle is closed
+			synchronized (lock) {
+				while (!closed) {
+					try {
+						lock.wait();
+					}
+					catch (InterruptedException ignored) {}
+				}
+			}
+		}
+
+		@Override
+		public void close() throws IOException {
+			synchronized (lock) {
+				closed = true;
+				lock.notifyAll();
+			}
+		}
+
+		@Override
+		public long getPos() {
+			return 0;
+		}
+
+		@Override
+		public void flush() {}
+
+		@Override
+		public void sync() {}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test source operator that calls into the locking checkpoint output stream.
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class FilterOperator extends StreamFilter<Object> {
+		private static final long serialVersionUID = 1L;
+
+		public FilterOperator() {
+			super(new FilterFunction<Object>() {
+				@Override
+				public boolean filter(Object value) {
+					return false;
+				}
+			});
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class TestOperator extends StreamFilter<Object> {
+		private static final long serialVersionUID = 1L;
+
+		public TestOperator() {
+			super(new FilterFunction<Object>() {
+				@Override
+				public boolean filter(Object value) {
+					return false;
+				}
+			});
+		}
+
+		@Override
+		public void snapshotState(StateSnapshotContext context) throws Exception {
+			OperatorStateCheckpointOutputStream outStream = context.getRawOperatorStateOutput();
+
+			IN_CHECKPOINT_LATCH.trigger();
+
+			// this should lock
+			outStream.write(1);
+		}
+	}
+
+	/**
+	 * Stream task that simply triggers a checkpoint.
+	 */
+	public static final class TestStreamTask extends OneInputStreamTask<Object, Object> {
+
+		@Override
+		public void init() {}
+
+		@Override
+		protected void run() throws Exception {
+
+			triggerCheckpointOnBarrier(
+				new CheckpointMetaData(
+					11L,
+					System.currentTimeMillis()),
+				CheckpointOptions.forCheckpoint(),
+				new CheckpointMetrics());
+
+			while (isRunning()) {
+				Thread.sleep(1L);
+			}
+		}
+
+		@Override
+		protected void cleanup() {}
+
+		@Override
+		protected void cancelTask() {}
+	}
+}