You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/08 20:25:09 UTC

[3/4] flink git commit: [FLINK-4985] [checkpointing] Report canceled / declined checkpoints to the Checkpoint Coordinator

[FLINK-4985] [checkpointing] Report canceled / declined checkpoints to the Checkpoint Coordinator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a4fdfff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a4fdfff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a4fdfff

Branch: refs/heads/release-1.1
Commit: 1a4fdfff5d364a35e935604c0a5058a1a9f242f7
Parents: a1f028d
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Nov 8 17:13:19 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 19:07:16 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  18 +-
 .../AlignmentLimitExceededException.java        |  33 +++
 .../decline/CheckpointDeclineException.java     |  35 +++
 ...ntDeclineOnCancellationBarrierException.java |  32 +++
 .../CheckpointDeclineSubsumedException.java     |  32 +++
 ...intDeclineTaskNotCheckpointingException.java |  32 +++
 .../CheckpointDeclineTaskNotReadyException.java |  32 +++
 .../decline/InputEndOfStreamException.java      |  32 +++
 .../flink/runtime/execution/Environment.java    |   9 +
 .../runtime/jobgraph/tasks/StatefulTask.java    |   3 +-
 .../messages/checkpoint/DeclineCheckpoint.java  |  65 +++---
 .../runtime/taskmanager/RuntimeEnvironment.java |   7 +
 .../apache/flink/runtime/taskmanager/Task.java  |  22 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  41 +---
 .../savepoint/SavepointCoordinatorTest.java     |   2 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   2 +-
 .../operators/testutils/DummyEnvironment.java   |   5 +
 .../operators/testutils/MockEnvironment.java    |   5 +
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   2 +-
 .../streaming/runtime/io/BarrierBuffer.java     |  28 ++-
 .../streaming/runtime/io/BarrierTracker.java    |   4 +-
 .../streaming/runtime/tasks/StreamTask.java     |  25 ++-
 .../streaming/runtime/io/BarrierBufferTest.java |  31 +--
 .../runtime/io/BarrierTrackerTest.java          |   2 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |   2 +
 .../runtime/tasks/StreamMockEnvironment.java    |   3 +
 .../StreamTaskCancellationBarrierTest.java      | 221 +++++++++++++++++++
 .../runtime/tasks/StreamTaskTestHarness.java    |  31 ++-
 .../runtime/tasks/TwoInputStreamTaskTest.java   |   2 +
 29 files changed, 632 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 409f05b..8661ddc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -575,6 +575,7 @@ public class CheckpointCoordinator {
 		}
 
 		final long checkpointId = message.getCheckpointId();
+		final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");
 
 		PendingCheckpoint checkpoint;
 
@@ -594,8 +595,8 @@ public class CheckpointCoordinator {
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
 				isPendingCheckpoint = true;
 
-				LOG.info("Discarding checkpoint " + checkpointId
-					+ " because of checkpoint decline from task " + message.getTaskExecutionId());
+				LOG.info("Discarding checkpoint {} because of checkpoint decline from task {} : {}",
+						checkpointId, message.getTaskExecutionId(), reason);
 
 				pendingCheckpoints.remove(checkpointId);
 				checkpoint.discard(userClassLoader);
@@ -604,19 +605,14 @@ public class CheckpointCoordinator {
 				onCancelCheckpoint(checkpointId);
 
 				boolean haveMoreRecentPending = false;
-				Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
-				while (entries.hasNext()) {
-					PendingCheckpoint p = entries.next().getValue();
-					if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
+				for (PendingCheckpoint p : pendingCheckpoints.values()) {
+					if (!p.isDiscarded() && p.getCheckpointId() >= checkpoint.getCheckpointId()) {
 						haveMoreRecentPending = true;
 						break;
 					}
 				}
-				if (!haveMoreRecentPending && !triggerRequestQueued) {
-					LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId);
-					triggerCheckpoint(System.currentTimeMillis());
-				} else if (!haveMoreRecentPending) {
-					LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId);
+
+				if (!haveMoreRecentPending) {
 					triggerQueuedRequests();
 				}
 			} else if (checkpoint != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
new file mode 100644
index 0000000..64d57bc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because too many bytes were
+ * buffered in the alignment phase.
+ */
+public final class AlignmentLimitExceededException extends CheckpointDeclineException {
+
+	private static final long serialVersionUID = 1L;
+
+	public AlignmentLimitExceededException(long numBytes) {
+		super("The checkpoint alignment phase needed to buffer more than the configured maximum ("
+				+ numBytes + " bytes).");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
new file mode 100644
index 0000000..8a2802c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Base class of all exceptions that indicate a declined checkpoint.
+ */
+public abstract class CheckpointDeclineException extends Exception {
+
+	private static final long serialVersionUID = 1L;
+
+	public CheckpointDeclineException(String message) {
+		super(message);
+	}
+
+	public CheckpointDeclineException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
new file mode 100644
index 0000000..9ae4096
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a cancellation
+ * barrier was received.
+ */
+public final class CheckpointDeclineOnCancellationBarrierException extends CheckpointDeclineException {
+
+	private static final long serialVersionUID = 1L;
+
+	public CheckpointDeclineOnCancellationBarrierException() {
+		super("Task received cancellation from one of its inputs");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
new file mode 100644
index 0000000..5380469
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a newer checkpoint
+ * barrier was received on an input before the pending checkpoint's barrier. 
+ */
+public final class CheckpointDeclineSubsumedException extends CheckpointDeclineException {
+
+	private static final long serialVersionUID = 1L;
+
+	public CheckpointDeclineSubsumedException(long newCheckpointId) {
+		super("Checkpoint was canceled because a barrier from newer checkpoint " + newCheckpointId + " was received.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
new file mode 100644
index 0000000..e5773d1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a task does not support
+ * checkpointing.
+ */
+public final class CheckpointDeclineTaskNotCheckpointingException extends CheckpointDeclineException {
+
+	private static final long serialVersionUID = 1L;
+
+	public CheckpointDeclineTaskNotCheckpointingException(String taskName) {
+		super("Task '" + taskName + "'does not support checkpointing");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
new file mode 100644
index 0000000..a1214fe
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a task was not
+ * ready to perform a checkpoint.
+ */
+public final class CheckpointDeclineTaskNotReadyException extends CheckpointDeclineException {
+
+	private static final long serialVersionUID = 1L;
+
+	public CheckpointDeclineTaskNotReadyException(String taskName) {
+		super("Task " + taskName + " was not running");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
new file mode 100644
index 0000000..86b29dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because one of the input
+ * stream reached its end before the alignment was complete.
+ */
+public final class InputEndOfStreamException extends CheckpointDeclineException {
+
+	private static final long serialVersionUID = 1L;
+
+	public InputEndOfStreamException() {
+		super("Checkpoint was declined because one input stream is finished");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 5ad5fe2..e84a5e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -167,6 +167,15 @@ public interface Environment {
 	void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state);
 
 	/**
+	 * Declines a checkpoint. This tells the checkpoint coordinator that this task will
+	 * not be able to successfully complete a certain checkpoint.
+	 * 
+	 * @param checkpointId The ID of the declined checkpoint.
+	 * @param cause An optional reason why the checkpoint was declined.
+	 */
+	void declineCheckpoint(long checkpointId, Throwable cause);
+	
+	/**
 	 * Marks task execution failed for an external reason (a reason other than the task code itself
 	 * throwing an exception). If the task is already in a terminal state
 	 * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 7c581df..874ca4a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -66,8 +66,9 @@ public interface StatefulTask<T extends StateHandle<?>> {
 	 * {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs.
 	 * 
 	 * @param checkpointId The ID of the checkpoint to be aborted.
+	 * @param cause The reason why the checkpoint was aborted during alignment   
 	 */
-	void abortCheckpointOnBarrier(long checkpointId) throws Exception;
+	void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception;
 
 	/**
 	 * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
index f26d2fb..dca212c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
@@ -19,7 +19,14 @@
 package org.apache.flink.runtime.messages.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
+import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.util.SerializedThrowable;
 
 /**
  * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
@@ -31,44 +38,48 @@ public class DeclineCheckpoint extends AbstractCheckpointMessage implements java
 
 	private static final long serialVersionUID = 2094094662279578953L;
 
-	/** The timestamp associated with the checkpoint */
-	private final long timestamp;
+	/** The reason why the checkpoint was declined */
+	private final Throwable reason;
 
-	public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
-		super(job, taskExecutionId, checkpointId);
-		this.timestamp = timestamp;
+	public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+		this(job, taskExecutionId, checkpointId, null);
 	}
 
-	// --------------------------------------------------------------------------------------------
+	public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, Throwable reason) {
+		super(job, taskExecutionId, checkpointId);
 
-	public long getTimestamp() {
-		return timestamp;
+		if (reason == null ||
+				reason.getClass() == AlignmentLimitExceededException.class ||
+				reason.getClass() == CheckpointDeclineOnCancellationBarrierException.class ||
+				reason.getClass() == CheckpointDeclineSubsumedException.class ||
+				reason.getClass() == CheckpointDeclineTaskNotCheckpointingException.class ||
+				reason.getClass() == CheckpointDeclineTaskNotReadyException.class ||
+				reason.getClass() == InputEndOfStreamException.class)
+		{
+			// null or known common exceptions that cannot reference any dynamically loaded code
+			this.reason = reason;
+		} else {
+			// some other exception. replace with a serialized throwable, to be on the safe side
+			this.reason = new SerializedThrowable(reason);
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
 
-	@Override
-	public int hashCode() {
-		return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32));
+	/**
+	 * Gets the reason why the checkpoint was declined.
+	 *
+	 * @return The reason why the checkpoint was declined
+	 */
+	public Throwable getReason() {
+		return reason;
 	}
 
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		else if (o instanceof DeclineCheckpoint) {
-			DeclineCheckpoint that = (DeclineCheckpoint) o;
-			return this.timestamp == that.timestamp && super.equals(o);
-		}
-		else {
-			return false;
-		}
-	}
+	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public String toString() {
-		return String.format("Declined Checkpoint %d@%d for (%s/%s)",
-				getCheckpointId(), getTimestamp(), getJob(), getTaskExecutionId());
+		return String.format("Declined Checkpoint %d for (%s/%s): %s",
+				getCheckpointId(), getJob(), getTaskExecutionId(), reason);
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 6fdf6f9..47149a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -268,6 +269,12 @@ public class RuntimeEnvironment implements Environment {
 	}
 
 	@Override
+	public void declineCheckpoint(long checkpointId, Throwable cause) {
+		DeclineCheckpoint message = new DeclineCheckpoint(jobId, executionId, checkpointId, cause);
+		jobManager.tell(message);
+	}
+
+	@Override
 	public void failExternally(Throwable cause) {
 		this.containingTask.failExternally(cause);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index ed15dbf..3eab7e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -29,6 +29,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -969,13 +971,16 @@ public class Task implements Runnable {
 						try {
 							boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
 							if (!success) {
-								DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp);
+								// task was not ready to trigger this checkpoint
+								DeclineCheckpoint decline = new DeclineCheckpoint(
+										jobId, getExecutionId(), checkpointID,
+										new CheckpointDeclineTaskNotReadyException(taskName));
 								jobManager.tell(decline);
 							}
 						}
 						catch (Throwable t) {
 							if (getExecutionState() == ExecutionState.RUNNING) {
-								failExternally(new RuntimeException(
+								failExternally(new Exception(
 									"Error while triggering checkpoint for " + taskName,
 									t));
 							}
@@ -987,10 +992,21 @@ public class Task implements Runnable {
 			else {
 				LOG.error("Task received a checkpoint request, but is not a checkpointing task - "
 						+ taskNameWithSubtask);
+
+				DeclineCheckpoint decline = new DeclineCheckpoint(
+						jobId, executionId, checkpointID,
+						new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
+				jobManager.tell(decline);
 			}
 		}
 		else {
-			LOG.debug("Ignoring request to trigger a checkpoint for non-running task.");
+			LOG.debug("Declining checkpoint request for non-running task");
+
+			// send back a message that we did not do the checkpoint
+			DeclineCheckpoint decline = new DeclineCheckpoint(
+					jobId, executionId, checkpointID,
+					new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
+			jobManager.tell(decline);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 62af42b..f3f988a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -288,44 +288,19 @@ public class CheckpointCoordinatorTest {
 
 			// decline checkpoint from the other task, this should cancel the checkpoint
 			// and trigger a new one
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
 			assertTrue(checkpoint.isDiscarded());
 
-			// validate that we have a new pending checkpoint
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			// validate that we have no new pending checkpoint
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew);
-
-			assertNotNull(checkpointNew);
-			assertEquals(checkpointIdNew, checkpointNew.getCheckpointId());
-			assertEquals(jid, checkpointNew.getJobId());
-			assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
-			assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
-			assertEquals(0, checkpointNew.getTaskStates().size());
-			assertFalse(checkpointNew.isDiscarded());
-			assertFalse(checkpointNew.isFullyAcknowledged());
-			assertNotEquals(checkpoint.getCheckpointId(), checkpointNew.getCheckpointId());
-
-			// check that the vertices received the new trigger checkpoint message
-			{
-				TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointIdNew, checkpointNew.getCheckpointTimestamp());
-				TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointIdNew, checkpointNew.getCheckpointTimestamp());
-				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
-				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
-			}
-
 			// decline again, nothing should happen
 			// decline from the other task, nothing should happen
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId, checkpoint.getCheckpointTimestamp()));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId));
 			assertTrue(checkpoint.isDiscarded());
 
-			// should still have the same second checkpoint pending
-			long checkpointIdNew2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			assertEquals(checkpointIdNew2, checkpointIdNew);
-
 			coord.shutdown();
 		}
 		catch (Exception e) {
@@ -422,7 +397,7 @@ public class CheckpointCoordinatorTest {
 
 			// decline checkpoint from one of the tasks, this should cancel the checkpoint
 			// and trigger a new one
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
 			assertTrue(checkpoint1.isDiscarded());
 
 			// validate that we have only one pending checkpoint left
@@ -446,8 +421,8 @@ public class CheckpointCoordinatorTest {
 
 			// decline again, nothing should happen
 			// decline from the other task, nothing should happen
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id));
 			assertTrue(checkpoint1.isDiscarded());
 
 			coord.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
index b1b384d..dc2b23f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
@@ -202,7 +202,7 @@ public class SavepointCoordinatorTest extends TestLogger {
 
 		coordinator.receiveDeclineMessage(new DeclineCheckpoint(
 				jobId, vertices[1].getCurrentExecutionAttempt().getAttemptId(),
-				checkpointId, 0));
+				checkpointId));
 
 
 		// The pending checkpoint is completed

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 4dfaf95..2e3bace 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -455,7 +455,7 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public void abortCheckpointOnBarrier(long checkpointId) {
+		public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
 			throw new UnsupportedOperationException("should not be called!");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 5af34fb..ca68683 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -140,6 +140,11 @@ public class DummyEnvironment implements Environment {
 	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {}
 
 	@Override
+	public void declineCheckpoint(long checkpointId, Throwable cause) {
+		throw new UnsupportedOperationException();
+	}
+	
+	@Override
 	public void failExternally(Throwable cause) {
 		throw new UnsupportedOperationException("DummyEnvironment does not support external task failure.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 9dea324..22dee63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -291,6 +291,11 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
+	public void declineCheckpoint(long checkpointId, Throwable cause) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
 	public void failExternally(Throwable cause) {
 		throw new UnsupportedOperationException("MockEnvironment does not support external task failure.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 5b344eb..3ace0f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -226,7 +226,7 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
-		public void abortCheckpointOnBarrier(long checkpointId) {
+		public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
 			throw new UnsupportedOperationException("Should not be called");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 36de717..7a8e7d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,6 +18,10 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -142,7 +146,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				}
 				else {
 					if (next.getEvent().getClass() == EndOfPartitionEvent.class) {
-						processEndOfPartition(next.getChannelIndex());
+						processEndOfPartition();
 					}
 					return next;
 				}
@@ -196,7 +200,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 						"Skipping current checkpoint.", barrierId, currentCheckpointId);
 
 				// let the task know we are not completing this
-				notifyAbort(currentCheckpointId);
+				notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
 
 				// abort the current checkpoint
 				releaseBlocksAndResetBarriers();
@@ -241,7 +245,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			if (barrierId > currentCheckpointId) {
 				// new checkpoint
 				currentCheckpointId = barrierId;
-				notifyAbort(barrierId);
+				notifyAbortOnCancellationBarrier(barrierId);
 			}
 			return;
 		}
@@ -258,7 +262,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				}
 
 				releaseBlocksAndResetBarriers();
-				notifyAbort(barrierId);
+				notifyAbortOnCancellationBarrier(barrierId);
 			}
 			else if (barrierId > currentCheckpointId) {
 				// we canceled the next which also cancels the current
@@ -272,7 +276,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				currentCheckpointId = barrierId;
 				startOfAlignmentTimestamp = 0L;
 				latestAlignmentDurationNanos = 0L;
-				notifyAbort(barrierId);
+				notifyAbortOnCancellationBarrier(barrierId);
 			}
 
 			// else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now)
@@ -292,7 +296,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId);
 			}
 
-			notifyAbort(barrierId);
+			notifyAbortOnCancellationBarrier(barrierId);
 		}
 
 		// else: trailing barrier from either
@@ -300,12 +304,12 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		//   - the current checkpoint if it was already canceled
 	}
 
-	private void processEndOfPartition(int channel) throws Exception {
+	private void processEndOfPartition() throws Exception {
 		numClosedChannels++;
 
 		if (numBarriersReceived > 0) {
 			// let the task know we skip a checkpoint
-			notifyAbort(currentCheckpointId);
+			notifyAbort(currentCheckpointId, new InputEndOfStreamException());
 
 			// no chance to complete this checkpoint
 			releaseBlocksAndResetBarriers();
@@ -319,9 +323,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		}
 	}
 
-	private void notifyAbort(long checkpointId) throws Exception {
+	private void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception {
+		notifyAbort(checkpointId, new CheckpointDeclineOnCancellationBarrierException());
+	}
+
+	private void notifyAbort(long checkpointId, CheckpointDeclineException cause) throws Exception {
 		if (toNotifyOnCheckpoint != null) {
-			toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+			toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 5157336..8b4cc48 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
@@ -227,7 +228,8 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 
 	private void notifyAbort(long checkpointId) throws Exception {
 		if (toNotifyOnCheckpoint != null) {
-			toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+			toNotifyOnCheckpoint.abortCheckpointOnBarrier(
+					checkpointId, new CheckpointDeclineOnCancellationBarrierException());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d55a9c5..4f0839f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -592,13 +594,15 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	}
 
 	@Override
-	public void abortCheckpointOnBarrier(long checkpointId) throws Exception {
+	public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception {
 		LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, getName());
 
+		// notify the coordinator that we decline this checkpoint
+		getEnvironment().declineCheckpoint(checkpointId, cause);
+
+		// notify all downstream operators that they should not wait for a barrier from us
 		synchronized (lock) {
-			if (isRunning) {
-				operatorChain.broadcastCheckpointCancelMarker(checkpointId);
-			}
+			operatorChain.broadcastCheckpointCancelMarker(checkpointId);
 		}
 	}
 
@@ -669,7 +673,18 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 					checkpointThread.start();
 				}
 				return true;
-			} else {
+			}
+			else {
+				// we cannot perform our checkpoint - let the downstream operators know that they
+				// should not wait for any input from this operator
+
+				// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
+				// yet be created
+				final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointId);
+				for (ResultPartitionWriter output : getEnvironment().getAllWriters()) {
+					output.writeEventToAllChannels(message);
+				}
+
 				return false;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index cf1f98e..20cb8f7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -45,6 +47,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
@@ -566,7 +569,7 @@ public class BarrierBufferTest {
 			check(sequence[12], buffer.getNextNonBlocked());
 			assertEquals(3L, buffer.getCurrentCheckpointId());
 			validateAlignmentTime(startTs, buffer);
-			verify(toNotify).abortCheckpointOnBarrier(2L);
+			verify(toNotify).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineSubsumedException.class));
 			check(sequence[16], buffer.getNextNonBlocked());
 
 			// checkpoint 3 alignment in progress
@@ -574,7 +577,7 @@ public class BarrierBufferTest {
 
 			// checkpoint 3 aborted (end of partition)
 			check(sequence[20], buffer.getNextNonBlocked());
-			verify(toNotify).abortCheckpointOnBarrier(3L);
+			verify(toNotify).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class));
 
 			// replay buffered data from checkpoint 3
 			check(sequence[18], buffer.getNextNonBlocked());
@@ -999,13 +1002,13 @@ public class BarrierBufferTest {
 		check(sequence[6], buffer.getNextNonBlocked());
 		assertEquals(5L, buffer.getCurrentCheckpointId());
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), anyLong());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class));
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong());
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		check(sequence[8], buffer.getNextNonBlocked());
 		assertEquals(6L, buffer.getCurrentCheckpointId());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class));
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 		
 		buffer.cleanup();
@@ -1073,7 +1076,7 @@ public class BarrierBufferTest {
 		// canceled checkpoint on last barrier
 		startTs = System.nanoTime();
 		check(sequence[12], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
 		validateAlignmentTime(startTs, buffer);
 		check(sequence[13], buffer.getNextNonBlocked());
 
@@ -1088,7 +1091,7 @@ public class BarrierBufferTest {
 
 		// this checkpoint gets immediately canceled
 		check(sequence[24], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class));
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		// some buffers
@@ -1104,7 +1107,7 @@ public class BarrierBufferTest {
 		check(sequence[33], buffer.getNextNonBlocked());
 
 		check(sequence[37], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class));
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		// all done
@@ -1167,7 +1170,7 @@ public class BarrierBufferTest {
 
 		// re-read the queued cancellation barriers
 		check(sequence[9], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		check(sequence[10], buffer.getNextNonBlocked());
@@ -1184,7 +1187,7 @@ public class BarrierBufferTest {
 
 		// no further checkpoint (abort) notifications
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class));
 
 		// all done
 		assertNull(buffer.getNextNonBlocked());
@@ -1253,7 +1256,7 @@ public class BarrierBufferTest {
 		// cancelled by cancellation barrier
 		check(sequence[4], buffer.getNextNonBlocked());
 		validateAlignmentTime(startTs, buffer);
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(1L), any(CheckpointDeclineOnCancellationBarrierException.class));
 
 		// the next checkpoint alignment starts now
 		startTs = System.nanoTime();
@@ -1285,7 +1288,7 @@ public class BarrierBufferTest {
 
 		// check overall notifications
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
 	}
 
 	/**
@@ -1337,7 +1340,7 @@ public class BarrierBufferTest {
 		// future barrier aborts checkpoint
 		startTs = System.nanoTime();
 		check(sequence[3], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(3L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class));
 		check(sequence[4], buffer.getNextNonBlocked());
 
 		// alignment of next checkpoint
@@ -1366,7 +1369,7 @@ public class BarrierBufferTest {
 
 		// check overall notifications
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
 	}
 
 	// ------------------------------------------------------------------------
@@ -1471,7 +1474,7 @@ public class BarrierBufferTest {
 		}
 
 		@Override
-		public void abortCheckpointOnBarrier(long checkpointId) {}
+		public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {}
 
 		@Override
 		public void notifyCheckpointComplete(long checkpointId) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 903f585..978c212 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -481,7 +481,7 @@ public class BarrierTrackerTest {
 		}
 
 		@Override
-		public void abortCheckpointOnBarrier(long checkpointId) {
+		public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
 			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
 
 			final long expectedId = checkpointIDs[i++];

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 5fcc59e..9003f0e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -270,6 +271,7 @@ public class OneInputStreamTaskTest {
 		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
 		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
 
+		expectedOutput.add(new CancelCheckpointMarker(0));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
 		expectedOutput.add(new CheckpointBarrier(1, 1));

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 7084208..36ad8ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -302,6 +302,9 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
+	public void declineCheckpoint(long checkpointId, Throwable cause) {}
+
+	@Override
 	public void failExternally(Throwable cause) {
 		throw new UnsupportedOperationException("StreamMockEnvironment does not support external task failure.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
new file mode 100644
index 0000000..8b8b659
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"})
+public class StreamTaskCancellationBarrierTest {
+
+	/**
+	 * This test checks that tasks emit a proper cancel checkpoint barrier, if a "trigger checkpoint" message
+	 * comes before they are ready.
+	 */
+	@Test
+	public void testEmitCancellationBarrierWhenNotReady() throws Exception {
+		StreamTask<String, ?> task = new InitBlockingTask();
+		StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO);
+
+		// start the test - this cannot succeed across the 'init()' method
+		testHarness.invoke();
+
+		// tell the task to commence a checkpoint
+		boolean result = task.triggerCheckpoint(41L, System.currentTimeMillis());
+		assertFalse("task triggered checkpoint though not ready", result);
+
+		// a cancellation barrier should be downstream
+		Object emitted = testHarness.getOutput().poll();
+		assertNotNull("nothing emitted", emitted);
+		assertTrue("wrong type emitted", emitted instanceof CancelCheckpointMarker);
+		assertEquals("wrong checkpoint id", 41L, ((CancelCheckpointMarker) emitted).getCheckpointId());
+	}
+
+	/**
+	 * This test verifies (for onw input tasks) that the Stream tasks react the following way to
+	 * receiving a checkpoint cancellation barrier:
+	 *
+	 *   - send a "decline checkpoint" notification out (to the JobManager)
+	 *   - emit a cancellation barrier downstream
+	 */
+	@Test
+	public void testDeclineCallOnCancelBarrierOneInput() throws Exception {
+
+		OneInputStreamTask<String, String> task = new OneInputStreamTask<String, String>();
+		OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
+				task,
+				1, 2,
+				BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		StreamMap<String, String> mapOperator = new StreamMap<>(new IdentityMap());
+		streamConfig.setStreamOperator(mapOperator);
+
+		StreamMockEnvironment environment = spy(testHarness.createEnvironment());
+
+		// start the task
+		testHarness.invoke(environment);
+		testHarness.waitForTaskRunning();
+
+		// emit cancellation barriers
+		testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 1);
+		testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0);
+		testHarness.waitForInputProcessing();
+
+		// the decline call should go to the coordinator
+		verify(environment, times(1)).declineCheckpoint(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
+
+		// a cancellation barrier should be downstream
+		Object result = testHarness.getOutput().poll();
+		assertNotNull("nothing emitted", result);
+		assertTrue("wrong type emitted", result instanceof CancelCheckpointMarker);
+		assertEquals("wrong checkpoint id", 2L, ((CancelCheckpointMarker) result).getCheckpointId());
+
+		// cancel and shutdown
+		testHarness.endInput();
+		testHarness.waitForTaskCompletion();
+	}
+
+	/**
+	 * This test verifies (for onw input tasks) that the Stream tasks react the following way to
+	 * receiving a checkpoint cancellation barrier:
+	 *
+	 *   - send a "decline checkpoint" notification out (to the JobManager)
+	 *   - emit a cancellation barrier downstream
+	 */
+	@Test
+	public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception {
+
+		TwoInputStreamTask<String, String, String> task = new TwoInputStreamTask<String, String, String>();
+		TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>(
+				task,
+				BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		CoStreamMap<String, String, String> op = new CoStreamMap<>(new UnionCoMap());
+		streamConfig.setStreamOperator(op);
+
+		StreamMockEnvironment environment = spy(testHarness.createEnvironment());
+
+		// start the task
+		testHarness.invoke(environment);
+		testHarness.waitForTaskRunning();
+
+		// emit cancellation barriers
+		testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0);
+		testHarness.processEvent(new CancelCheckpointMarker(2L), 1, 0);
+		testHarness.waitForInputProcessing();
+
+		// the decline call should go to the coordinator
+		verify(environment, times(1)).declineCheckpoint(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
+
+		// a cancellation barrier should be downstream
+		Object result = testHarness.getOutput().poll();
+		assertNotNull("nothing emitted", result);
+		assertTrue("wrong type emitted", result instanceof CancelCheckpointMarker);
+		assertEquals("wrong checkpoint id", 2L, ((CancelCheckpointMarker) result).getCheckpointId());
+
+		// cancel and shutdown
+		testHarness.endInput();
+		testHarness.waitForTaskCompletion();
+	}
+
+	// ------------------------------------------------------------------------
+	//  test tasks / functions
+	// ------------------------------------------------------------------------
+
+	private static class InitBlockingTask extends StreamTask<String, AbstractStreamOperator<String>> {
+
+		private final Object lock = new Object();
+		private volatile boolean running = true;
+
+		@Override
+		protected void init() throws Exception {
+			synchronized (lock) {
+				while (running) {
+					lock.wait();
+				}
+			}
+		}
+
+		@Override
+		protected void run() throws Exception {}
+
+		@Override
+		protected void cleanup() throws Exception {}
+
+		@Override
+		protected void cancelTask() throws Exception {
+			running = false;
+			synchronized (lock) {
+				lock.notifyAll();
+			}
+		}
+	}
+
+	private static class IdentityMap implements MapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map(String value) throws Exception {
+			return value;
+		}
+	}
+
+	private static class UnionCoMap implements CoMapFunction<String, String, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map1(String value) throws Exception {
+			return value;
+		}
+
+		@Override
+		public String map2(String value) throws Exception {
+			return value;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 00e95b9..0bd8d9a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -150,21 +150,18 @@ public class StreamTaskTestHarness<OUT> {
 
 	}
 
+	public StreamMockEnvironment createEnvironment() {
+		return new StreamMockEnvironment(
+				jobConfig, taskConfig, executionConfig, memorySize, new MockInputSplitProvider(), bufferSize);
+	}
+	
 	/**
 	 * Invoke the Task. This resets the output of any previous invocation. This will start a new
 	 * Thread to execute the Task in. Use {@link #waitForTaskCompletion()} to wait for the
 	 * Task thread to finish running.
 	 */
 	public void invoke() throws Exception {
-		mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, executionConfig,
-			memorySize, new MockInputSplitProvider(), bufferSize);
-		task.setEnvironment(mockEnv);
-
-		initializeInputs();
-		initializeOutput();
-
-		taskThread = new TaskThread(task);
-		taskThread.start();
+		invoke(createEnvironment());
 	}
 
 	/**
@@ -205,7 +202,7 @@ public class StreamTaskTestHarness<OUT> {
 			if (taskThread.task instanceof StreamTask) {
 				StreamTask<?, ?> streamTask = (StreamTask<?, ?>) taskThread.task;
 				while (!streamTask.isRunning()) {
-					Thread.sleep(100);
+					Thread.sleep(10);
 					if (!taskThread.isAlive()) {
 						if (taskThread.getError() != null) {
 							throw new Exception("Task Thread failed due to an error.", taskThread.getError());
@@ -282,15 +279,15 @@ public class StreamTaskTestHarness<OUT> {
 	/**
 	 * This only returns after all input queues are empty.
 	 */
-	public void waitForInputProcessing() {
-
-
+	public void waitForInputProcessing() throws Exception {
 		// first wait for all input queues to be empty
-		try {
-			Thread.sleep(1);
-		} catch (InterruptedException ignored) {}
-		
+
 		while (true) {
+			Throwable error = taskThread.getError();
+			if (error != null) {
+				throw new Exception("Exception in the task thread", error);
+			}
+
 			boolean allEmpty = true;
 			for (int i = 0; i < numInputGates; i++) {
 				if (!inputGates[i].allQueuesEmpty()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index b9211b1..92f8553 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -290,6 +291,7 @@ public class TwoInputStreamTaskTest {
 		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
 		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
 
+		expectedOutput.add(new CancelCheckpointMarker(0));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
 		expectedOutput.add(new CheckpointBarrier(1, 1));