You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/08 20:25:07 UTC

[1/4] flink git commit: [FLINK-4984] [checkpointing] Add Cancellation Barriers as a way to signal aborted checkpoints

Repository: flink
Updated Branches:
  refs/heads/release-1.1 4dd3efea4 -> 0962cb6f4


http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index d4fdc59..cf1f98e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -18,16 +18,18 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.state.StateHandle;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -35,6 +37,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.util.Arrays;
+import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -42,15 +45,23 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 /**
  * Tests for the behavior of the {@link BarrierBuffer}.
  */
 public class BarrierBufferTest {
 
+	private static final Random RND = new Random();
+
 	private static final int PAGE_SIZE = 512;
-	
+
 	private static int SIZE_COUNTER = 0;
-	
+
 	private static IOManager IO_MANAGER;
 
 	@BeforeClass
@@ -86,7 +97,9 @@ public class BarrierBufferTest {
 			for (BufferOrEvent boe : sequence) {
 				assertEquals(boe, buffer.getNextNonBlocked());
 			}
-			
+
+			assertEquals(0L, buffer.getAlignmentDurationNanos());
+
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
 			
@@ -120,6 +133,8 @@ public class BarrierBufferTest {
 				assertEquals(boe, buffer.getNextNonBlocked());
 			}
 
+			assertEquals(0L, buffer.getAlignmentDurationNanos());
+
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
 
@@ -222,13 +237,15 @@ public class BarrierBufferTest {
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
 			buffer.registerCheckpointEventHandler(handler);
 			handler.setNextExpectedCheckpointId(1L);
-			
+
 			// pre checkpoint 1
 			check(sequence[0], buffer.getNextNonBlocked());
 			check(sequence[1], buffer.getNextNonBlocked());
 			check(sequence[2], buffer.getNextNonBlocked());
 			assertEquals(1L, handler.getNextExpectedCheckpointId());
 
+			long startTs = System.nanoTime();
+
 			// blocking while aligning for checkpoint 1
 			check(sequence[7], buffer.getNextNonBlocked());
 			assertEquals(1L, handler.getNextExpectedCheckpointId());
@@ -236,6 +253,8 @@ public class BarrierBufferTest {
 			// checkpoint 1 done, returning buffered data
 			check(sequence[5], buffer.getNextNonBlocked());
 			assertEquals(2L, handler.getNextExpectedCheckpointId());
+			validateAlignmentTime(startTs, buffer);
+
 			check(sequence[6], buffer.getNextNonBlocked());
 
 			// pre checkpoint 2
@@ -245,10 +264,13 @@ public class BarrierBufferTest {
 			check(sequence[12], buffer.getNextNonBlocked());
 			check(sequence[13], buffer.getNextNonBlocked());
 			assertEquals(2L, handler.getNextExpectedCheckpointId());
-			
+
 			// checkpoint 2 barriers come together
+			startTs = System.nanoTime();
 			check(sequence[17], buffer.getNextNonBlocked());
 			assertEquals(3L, handler.getNextExpectedCheckpointId());
+			validateAlignmentTime(startTs, buffer);
+
 			check(sequence[18], buffer.getNextNonBlocked());
 
 			// checkpoint 3 starts, data buffered
@@ -257,7 +279,7 @@ public class BarrierBufferTest {
 			check(sequence[21], buffer.getNextNonBlocked());
 
 			// checkpoint 4 happens without extra data
-			
+
 			// pre checkpoint 5
 			check(sequence[27], buffer.getNextNonBlocked());
 			assertEquals(5L, handler.getNextExpectedCheckpointId());
@@ -301,7 +323,7 @@ public class BarrierBufferTest {
 			BufferOrEvent[] sequence = {
 					createBuffer(0), createBuffer(1), createBuffer(2),
 					createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0),
-					
+
 					createBuffer(2), createBuffer(1), createBuffer(0),
 					createBarrier(2, 1),
 					createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2),
@@ -327,12 +349,14 @@ public class BarrierBufferTest {
 			assertEquals(2L, handler.getNextExpectedCheckpointId());
 			check(sequence[7], buffer.getNextNonBlocked());
 			check(sequence[8], buffer.getNextNonBlocked());
-			
+
 			// checkpoint 2 alignment
+			long startTs = System.nanoTime();
 			check(sequence[13], buffer.getNextNonBlocked());
 			check(sequence[14], buffer.getNextNonBlocked());
 			check(sequence[18], buffer.getNextNonBlocked());
 			check(sequence[19], buffer.getNextNonBlocked());
+			validateAlignmentTime(startTs, buffer);
 
 			// end of stream: remaining buffered contents
 			check(sequence[10], buffer.getNextNonBlocked());
@@ -343,7 +367,7 @@ public class BarrierBufferTest {
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
-			
+
 			buffer.cleanup();
 
 			checkNoTempFilesRemain();
@@ -389,7 +413,7 @@ public class BarrierBufferTest {
 					createBarrier(3, 2),
 					createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
 					createBarrier(6, 1),
-					
+
 					// complete checkpoint 4, checkpoint 5 remains not fully triggered
 					createBarrier(4, 2),
 					createBuffer(2),
@@ -419,12 +443,14 @@ public class BarrierBufferTest {
 
 			// alignment of checkpoint 2 - buffering also some barriers for
 			// checkpoints 3 and 4
+			long startTs = System.nanoTime();
 			check(sequence[13], buffer.getNextNonBlocked());
 			check(sequence[20], buffer.getNextNonBlocked());
 			check(sequence[23], buffer.getNextNonBlocked());
-			
+
 			// checkpoint 2 completed
 			check(sequence[12], buffer.getNextNonBlocked());
+			validateAlignmentTime(startTs, buffer);
 			check(sequence[25], buffer.getNextNonBlocked());
 			check(sequence[27], buffer.getNextNonBlocked());
 			check(sequence[30], buffer.getNextNonBlocked());
@@ -507,36 +533,53 @@ public class BarrierBufferTest {
 			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
 			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
 
-			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-			buffer.registerCheckpointEventHandler(handler);
-			handler.setNextExpectedCheckpointId(1L);
+			StatefulTask<?> toNotify = mock(StatefulTask.class);
+			buffer.registerCheckpointEventHandler(toNotify);
 
-			// checkpoint 1
+			long startTs;
+
+			// initial data
 			check(sequence[0], buffer.getNextNonBlocked());
 			check(sequence[1], buffer.getNextNonBlocked());
 			check(sequence[2], buffer.getNextNonBlocked());
+
+			// align checkpoint 1
+			startTs = System.nanoTime();
 			check(sequence[7], buffer.getNextNonBlocked());
 			assertEquals(1L, buffer.getCurrentCheckpointId());
-			
+
+			// checkpoint done - replay buffered
 			check(sequence[5], buffer.getNextNonBlocked());
+			validateAlignmentTime(startTs, buffer);
+			verify(toNotify).triggerCheckpointOnBarrier(eq(1L), anyLong());
 			check(sequence[6], buffer.getNextNonBlocked());
+
 			check(sequence[9], buffer.getNextNonBlocked());
 			check(sequence[10], buffer.getNextNonBlocked());
 
 			// alignment of checkpoint 2
+			startTs = System.nanoTime();
 			check(sequence[13], buffer.getNextNonBlocked());
-			assertEquals(2L, buffer.getCurrentCheckpointId());
 			check(sequence[15], buffer.getNextNonBlocked());
 
 			// checkpoint 2 aborted, checkpoint 3 started
 			check(sequence[12], buffer.getNextNonBlocked());
 			assertEquals(3L, buffer.getCurrentCheckpointId());
+			validateAlignmentTime(startTs, buffer);
+			verify(toNotify).abortCheckpointOnBarrier(2L);
 			check(sequence[16], buffer.getNextNonBlocked());
+
+			// checkpoint 3 alignment in progress
 			check(sequence[19], buffer.getNextNonBlocked());
-			check(sequence[20], buffer.getNextNonBlocked());
-			
+
 			// checkpoint 3 aborted (end of partition)
+			check(sequence[20], buffer.getNextNonBlocked());
+			verify(toNotify).abortCheckpointOnBarrier(3L);
+
+			// replay buffered data from checkpoint 3
 			check(sequence[18], buffer.getNextNonBlocked());
+
+			// all the remaining messages
 			check(sequence[21], buffer.getNextNonBlocked());
 			check(sequence[22], buffer.getNextNonBlocked());
 			check(sequence[23], buffer.getNextNonBlocked());
@@ -613,17 +656,21 @@ public class BarrierBufferTest {
 			check(sequence[19], buffer.getNextNonBlocked());
 			check(sequence[21], buffer.getNextNonBlocked());
 
+			long startTs = System.nanoTime();
+
 			// checkpoint 2 aborted, checkpoint 4 started. replay buffered
 			check(sequence[12], buffer.getNextNonBlocked());
 			assertEquals(4L, buffer.getCurrentCheckpointId());
 			check(sequence[16], buffer.getNextNonBlocked());
 			check(sequence[18], buffer.getNextNonBlocked());
 			check(sequence[22], buffer.getNextNonBlocked());
-			
+
 			// align checkpoint 4 remainder
 			check(sequence[25], buffer.getNextNonBlocked());
 			check(sequence[26], buffer.getNextNonBlocked());
-			
+
+			validateAlignmentTime(startTs, buffer);
+
 			// checkpoint 4 aborted (due to end of partition)
 			check(sequence[24], buffer.getNextNonBlocked());
 			check(sequence[27], buffer.getNextNonBlocked());
@@ -862,9 +909,9 @@ public class BarrierBufferTest {
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
-			
+
 			buffer.cleanup();
-			
+
 			checkNoTempFilesRemain();
 		}
 		catch (Exception e) {
@@ -874,26 +921,480 @@ public class BarrierBufferTest {
 	}
 
 	@Test
-	public void testEndOfStreamWhileCheckpoint() {
+	public void testEndOfStreamWhileCheckpoint() throws Exception {
+		BufferOrEvent[] sequence = {
+				// one checkpoint
+				createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2),
+
+				// some buffers
+				createBuffer(0), createBuffer(0), createBuffer(2),
+
+				// start the checkpoint that will be incomplete
+				createBarrier(2, 2), createBarrier(2, 0),
+				createBuffer(0), createBuffer(2), createBuffer(1),
+
+				// close one after the barrier one before the barrier
+				createEndOfPartition(2), createEndOfPartition(1),
+				createBuffer(0),
+
+				// final end of stream
+				createEndOfPartition(0)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+		// data after first checkpoint
+		check(sequence[3], buffer.getNextNonBlocked());
+		check(sequence[4], buffer.getNextNonBlocked());
+		check(sequence[5], buffer.getNextNonBlocked());
+		assertEquals(1L, buffer.getCurrentCheckpointId());
+
+		// alignment of second checkpoint
+		check(sequence[10], buffer.getNextNonBlocked());
+		assertEquals(2L, buffer.getCurrentCheckpointId());
+
+		// first end-of-partition encountered: checkpoint will not be completed
+		check(sequence[12], buffer.getNextNonBlocked());
+		check(sequence[8], buffer.getNextNonBlocked());
+		check(sequence[9], buffer.getNextNonBlocked());
+		check(sequence[11], buffer.getNextNonBlocked());
+		check(sequence[13], buffer.getNextNonBlocked());
+		check(sequence[14], buffer.getNextNonBlocked());
+
+		// all done
+		assertNull(buffer.getNextNonBlocked());
+		assertNull(buffer.getNextNonBlocked());
+
+		buffer.cleanup();
+
+		checkNoTempFilesRemain();
+	}
+
+	@Test
+	public void testSingleChannelAbortCheckpoint() throws Exception {
+		BufferOrEvent[] sequence = {
+				createBuffer(0),
+				createBarrier(1, 0),
+				createBuffer(0),
+				createBarrier(2, 0),
+				createCancellationBarrier(4, 0),
+				createBarrier(5, 0),
+				createBuffer(0),
+				createCancellationBarrier(6, 0),
+				createBuffer(0)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+		StatefulTask<?> toNotify = mock(StatefulTask.class);
+		buffer.registerCheckpointEventHandler(toNotify);
+
+		check(sequence[0], buffer.getNextNonBlocked());
+		check(sequence[2], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(1L), anyLong());
+		assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+		check(sequence[6], buffer.getNextNonBlocked());
+		assertEquals(5L, buffer.getCurrentCheckpointId());
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), anyLong());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong());
+		assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+		check(sequence[8], buffer.getNextNonBlocked());
+		assertEquals(6L, buffer.getCurrentCheckpointId());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+		assertEquals(0L, buffer.getAlignmentDurationNanos());
 		
+		buffer.cleanup();
+		checkNoTempFilesRemain();
+	}
+
+	@Test
+	public void testMultiChannelAbortCheckpoint() throws Exception {
+		BufferOrEvent[] sequence = {
+				// some buffers and a successful checkpoint
+				/* 0 */ createBuffer(0), createBuffer(2), createBuffer(0),
+				/* 3 */ createBarrier(1, 1), createBarrier(1, 2),
+				/* 5 */ createBuffer(2), createBuffer(1),
+				/* 7 */ createBarrier(1, 0),
+				/* 8 */ createBuffer(0), createBuffer(2),
+
+				// aborted on last barrier
+				/* 10 */ createBarrier(2, 0), createBarrier(2, 2),
+				/* 12 */ createBuffer(0), createBuffer(2),
+				/* 14 */ createCancellationBarrier(2, 1),
+
+				// successful checkpoint
+				/* 15 */ createBuffer(2), createBuffer(1),
+				/* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0),
+
+				// abort on first barrier
+				/* 20 */ createBuffer(0), createBuffer(1),
+				/* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2),
+				/* 24 */ createBuffer(0),
+				/* 25 */ createBarrier(4, 0),
+
+				// another successful checkpoint
+				/* 26 */ createBuffer(0), createBuffer(1), createBuffer(2),
+				/* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0),
+				/* 32 */ createBuffer(0), createBuffer(1),
+
+				// abort multiple cancellations and a barrier after the cancellations
+				/* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2),
+				/* 36 */ createBarrier(6, 0),
+
+				/* 37 */ createBuffer(0)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+		StatefulTask<?> toNotify = mock(StatefulTask.class);
+		buffer.registerCheckpointEventHandler(toNotify);
+
+		long startTs;
+
+		// successful first checkpoint, with some aligned buffers
+		check(sequence[0], buffer.getNextNonBlocked());
+		check(sequence[1], buffer.getNextNonBlocked());
+		check(sequence[2], buffer.getNextNonBlocked());
+		startTs = System.nanoTime();
+		check(sequence[5], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(1L), anyLong());
+		validateAlignmentTime(startTs, buffer);
+
+		check(sequence[6], buffer.getNextNonBlocked());
+		check(sequence[8], buffer.getNextNonBlocked());
+		check(sequence[9], buffer.getNextNonBlocked());
+
+		// canceled checkpoint on last barrier
+		startTs = System.nanoTime();
+		check(sequence[12], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+		validateAlignmentTime(startTs, buffer);
+		check(sequence[13], buffer.getNextNonBlocked());
+
+		// one more successful checkpoint
+		check(sequence[15], buffer.getNextNonBlocked());
+		check(sequence[16], buffer.getNextNonBlocked());
+		startTs = System.nanoTime();
+		check(sequence[20], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(3L), anyLong());
+		validateAlignmentTime(startTs, buffer);
+		check(sequence[21], buffer.getNextNonBlocked());
+
+		// this checkpoint gets immediately canceled
+		check(sequence[24], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+		assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+		// some buffers
+		check(sequence[26], buffer.getNextNonBlocked());
+		check(sequence[27], buffer.getNextNonBlocked());
+		check(sequence[28], buffer.getNextNonBlocked());
+
+		// a simple successful checkpoint
+		startTs = System.nanoTime();
+		check(sequence[32], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong());
+		validateAlignmentTime(startTs, buffer);
+		check(sequence[33], buffer.getNextNonBlocked());
+
+		check(sequence[37], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+		assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+		// all done
+		assertNull(buffer.getNextNonBlocked());
+		assertNull(buffer.getNextNonBlocked());
+
+		buffer.cleanup();
+		checkNoTempFilesRemain();
+	}
+
+	@Test
+	public void testAbortViaQueuedBarriers() throws Exception {
+		BufferOrEvent[] sequence = {
+				// starting a checkpoint
+				/* 0 */ createBuffer(1),
+				/* 1 */ createBarrier(1, 1), createBarrier(1, 2),
+				/* 3 */ createBuffer(2), createBuffer(0), createBuffer(1),
+
+				// queued barrier and cancellation barrier
+				/* 6 */ createCancellationBarrier(2, 2),
+				/* 7 */ createBarrier(2, 1),
+
+				// some intermediate buffers (some queued)
+				/* 8 */ createBuffer(0), createBuffer(1), createBuffer(2),
+
+				// complete initial checkpoint
+				/* 11 */ createBarrier(1, 0),
+
+				// some buffers (none queued, since checkpoint is aborted)
+				/* 12 */ createBuffer(2), createBuffer(1), createBuffer(0),
+
+				// final barrier of aborted checkpoint
+				/* 15 */ createBarrier(2, 0),
+
+				// some more buffers
+				/* 16 */ createBuffer(0), createBuffer(1), createBuffer(2)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+		StatefulTask<?> toNotify = mock(StatefulTask.class);
+		buffer.registerCheckpointEventHandler(toNotify);
+
+		long startTs;
+
+		check(sequence[0], buffer.getNextNonBlocked());
+
+		// starting first checkpoint
+		startTs = System.nanoTime();
+		check(sequence[4], buffer.getNextNonBlocked());
+		check(sequence[8], buffer.getNextNonBlocked());
+
+		// finished first checkpoint
+		check(sequence[3], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(1L), anyLong());
+		validateAlignmentTime(startTs, buffer);
+
+		check(sequence[5], buffer.getNextNonBlocked());
+
+		// re-read the queued cancellation barriers
+		check(sequence[9], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+		assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+		check(sequence[10], buffer.getNextNonBlocked());
+		check(sequence[12], buffer.getNextNonBlocked());
+		check(sequence[13], buffer.getNextNonBlocked());
+		check(sequence[14], buffer.getNextNonBlocked());
+
+		check(sequence[16], buffer.getNextNonBlocked());
+		check(sequence[17], buffer.getNextNonBlocked());
+		check(sequence[18], buffer.getNextNonBlocked());
+
+		// no further alignment should have happened
+		assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+		// no further checkpoint (abort) notifications
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+
+		// all done
+		assertNull(buffer.getNextNonBlocked());
+		assertNull(buffer.getNextNonBlocked());
+
+		buffer.cleanup();
+		checkNoTempFilesRemain();
+	}
+
+	/**
+	 * This tests the where a replay of queued checkpoint barriers meets
+	 * a canceled checkpoint.
+	 *
+	 * The replayed newer checkpoint barrier must not try to cancel the
+	 * already canceled checkpoint.
+	 */
+	@Test
+	public void testAbortWhileHavingQueuedBarriers() throws Exception {
+		BufferOrEvent[] sequence = {
+				// starting a checkpoint
+				/*  0 */ createBuffer(1),
+				/*  1 */ createBarrier(1, 1),
+				/*  2 */ createBuffer(2), createBuffer(0), createBuffer(1),
+
+				// queued barrier and cancellation barrier
+				/*  5 */ createBarrier(2, 1),
+
+				// some queued buffers
+				/*  6 */ createBuffer(2), createBuffer(1),
+
+				// cancel the initial checkpoint
+				/*  8 */ createCancellationBarrier(1, 0),
+
+				// some more buffers
+				/*  9 */ createBuffer(2), createBuffer(1), createBuffer(0),
+
+				// ignored barrier - already canceled and moved to next checkpoint
+				/* 12 */ createBarrier(1, 2),
+
+				// some more buffers
+				/* 13 */ createBuffer(0), createBuffer(1), createBuffer(2),
+
+				// complete next checkpoint regularly
+				/* 16 */ createBarrier(2, 0), createBarrier(2, 2),
+
+				// some more buffers
+				/* 18 */ createBuffer(0), createBuffer(1), createBuffer(2)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+		StatefulTask<?> toNotify = mock(StatefulTask.class);
+		buffer.registerCheckpointEventHandler(toNotify);
+
+		long startTs;
+
+		check(sequence[0], buffer.getNextNonBlocked());
+
+		// starting first checkpoint
+		startTs = System.nanoTime();
+		check(sequence[2], buffer.getNextNonBlocked());
+		check(sequence[3], buffer.getNextNonBlocked());
+		check(sequence[6], buffer.getNextNonBlocked());
+
+		// cancelled by cancellation barrier
+		check(sequence[4], buffer.getNextNonBlocked());
+		validateAlignmentTime(startTs, buffer);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
+
+		// the next checkpoint alignment starts now
+		startTs = System.nanoTime();
+		check(sequence[9], buffer.getNextNonBlocked());
+		check(sequence[11], buffer.getNextNonBlocked());
+		check(sequence[13], buffer.getNextNonBlocked());
+		check(sequence[15], buffer.getNextNonBlocked());
+
+		// checkpoint done
+		check(sequence[7], buffer.getNextNonBlocked());
+		validateAlignmentTime(startTs, buffer);
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), anyLong());
+
+		// queued data
+		check(sequence[10], buffer.getNextNonBlocked());
+		check(sequence[14], buffer.getNextNonBlocked());
+
+		// trailing data
+		check(sequence[18], buffer.getNextNonBlocked());
+		check(sequence[19], buffer.getNextNonBlocked());
+		check(sequence[20], buffer.getNextNonBlocked());
+
+		// all done
+		assertNull(buffer.getNextNonBlocked());
+		assertNull(buffer.getNextNonBlocked());
+
+		buffer.cleanup();
+		checkNoTempFilesRemain();
+
+		// check overall notifications
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+	}
+
+	/**
+	 * This tests the where a cancellation barrier is received for a checkpoint already
+	 * canceled due to receiving a newer checkpoint barrier.
+	 */
+	@Test
+	public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws Exception {
+		BufferOrEvent[] sequence = {
+				// starting a checkpoint
+				/*  0 */ createBuffer(2),
+				/*  1 */ createBarrier(3, 1), createBarrier(3, 0),
+				/*  3 */ createBuffer(0), createBuffer(1), createBuffer(2),
+
+				// newer checkpoint barrier cancels/subsumes pending checkpoint
+				/*  6 */ createBarrier(5, 2),
+
+				// some queued buffers
+				/*  7 */ createBuffer(2), createBuffer(1), createBuffer(0),
+
+				// cancel barrier the initial checkpoint /it is already canceled)
+				/* 10 */ createCancellationBarrier(3, 2),
+
+				// some more buffers
+				/* 11 */ createBuffer(2), createBuffer(0), createBuffer(1),
+
+				// complete next checkpoint regularly
+				/* 14 */ createBarrier(5, 0), createBarrier(5, 1),
+
+				// some more buffers
+				/* 16 */ createBuffer(0), createBuffer(1), createBuffer(2)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+		StatefulTask<?> toNotify = mock(StatefulTask.class);
+		buffer.registerCheckpointEventHandler(toNotify);
+
+		long startTs;
+
+		// validate the sequence
+
+		check(sequence[0], buffer.getNextNonBlocked());
+
+		// beginning of first checkpoint
+		check(sequence[5], buffer.getNextNonBlocked());
+
+		// future barrier aborts checkpoint
+		startTs = System.nanoTime();
+		check(sequence[3], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(3L);
+		check(sequence[4], buffer.getNextNonBlocked());
+
+		// alignment of next checkpoint
+		check(sequence[8], buffer.getNextNonBlocked());
+		check(sequence[9], buffer.getNextNonBlocked());
+		check(sequence[12], buffer.getNextNonBlocked());
+		check(sequence[13], buffer.getNextNonBlocked());
+
+		// checkpoint finished
+		check(sequence[7], buffer.getNextNonBlocked());
+		validateAlignmentTime(startTs, buffer);
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong());
+		check(sequence[11], buffer.getNextNonBlocked());
+
+		// remaining data
+		check(sequence[16], buffer.getNextNonBlocked());
+		check(sequence[17], buffer.getNextNonBlocked());
+		check(sequence[18], buffer.getNextNonBlocked());
+
+		// all done
+		assertNull(buffer.getNextNonBlocked());
+		assertNull(buffer.getNextNonBlocked());
+
+		buffer.cleanup();
+		checkNoTempFilesRemain();
+
+		// check overall notifications
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
 	}
 
 	// ------------------------------------------------------------------------
 	//  Utils
 	// ------------------------------------------------------------------------
 
-	private static BufferOrEvent createBarrier(long id, int channel) {
-		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+	private static BufferOrEvent createBarrier(long checkpointId, int channel) {
+		return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis()), channel);
+	}
+
+	private static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) {
+		return new BufferOrEvent(new CancelCheckpointMarker(checkpointId), channel);
 	}
 
 	private static BufferOrEvent createBuffer(int channel) {
-		// since we have no access to the contents, we need to use the size as an
-		// identifier to validate correctness here
-		Buffer buf = new Buffer(
-				MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE),
-				FreeingBufferRecycler.INSTANCE);
-		
-		buf.setSize(SIZE_COUNTER++);
+		final int size = SIZE_COUNTER++;
+		byte[] bytes = new byte[size];
+		RND.nextBytes(bytes);
+
+		MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
+		memory.put(0, bytes);
+
+		Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE);
+		buf.setSize(size);
+
+		// retain an additional time so it does not get disposed after being read by the input gate
+		buf.retain();
+
 		return new BufferOrEvent(buf, channel);
 	}
 
@@ -907,15 +1408,16 @@ public class BarrierBufferTest {
 		assertEquals(expected.isBuffer(), present.isBuffer());
 		
 		if (expected.isBuffer()) {
-			// since we have no access to the contents, we need to use the size as an
-			// identifier to validate correctness here
 			assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
+			MemorySegment expectedMem = expected.getBuffer().getMemorySegment();
+			MemorySegment presentMem = present.getBuffer().getMemorySegment();
+			assertTrue("memory contents differs", expectedMem.compare(presentMem, 0, 0, PAGE_SIZE) == 0);
 		}
 		else {
 			assertEquals(expected.getEvent(), present.getEvent());
 		}
 	}
-	
+
 	private static void checkNoTempFilesRemain() {
 		// validate that all temp files have been removed
 		for (File dir : IO_MANAGER.getSpillingDirectories()) {
@@ -926,12 +1428,17 @@ public class BarrierBufferTest {
 			}
 		}
 	}
-	
+
+	private static void validateAlignmentTime(long startTimestamp, BarrierBuffer buffer) {
+		final long elapsed = System.nanoTime() - startTimestamp;
+		assertTrue("wrong alignment time", buffer.getAlignmentDurationNanos() <= elapsed);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Testing Mocks
 	// ------------------------------------------------------------------------
 
-	private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier> {
+	private static class ValidatingCheckpointHandler implements StatefulTask<StateHandle<Object>> {
 		
 		private long nextExpectedCheckpointId = -1L;
 
@@ -944,11 +1451,31 @@ public class BarrierBufferTest {
 		}
 
 		@Override
-		public void onEvent(CheckpointBarrier barrier) {
-			assertNotNull(barrier);
-			assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == barrier.getId());
-			assertTrue(barrier.getTimestamp() > 0);
+		public void setInitialState(StateHandle<Object> stateHandle) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
+			assertTrue("wrong checkpoint id",
+					nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointId);
+
+			assertTrue(timestamp > 0);
+
 			nextExpectedCheckpointId++;
 		}
+
+		@Override
+		public void abortCheckpointOnBarrier(long checkpointId) {}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index b9b6e5f..903f585 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -19,25 +19,30 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
+import org.apache.flink.runtime.state.StateHandle;
 import org.junit.Test;
 
 import java.util.Arrays;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the behavior of the barrier tracker.
  */
 public class BarrierTrackerTest {
-	
+
 	private static final int PAGE_SIZE = 512;
-	
+
 	@Test
 	public void testSingleChannelNoBarriers() {
 		try {
@@ -329,6 +334,98 @@ public class BarrierTrackerTest {
 		}
 	}
 
+	@Test
+	public void testSingleChannelAbortCheckpoint() throws Exception {
+		BufferOrEvent[] sequence = {
+				createBuffer(0),
+				createBarrier(1, 0),
+				createBuffer(0),
+				createBarrier(2, 0),
+				createCancellationBarrier(4, 0),
+				createBarrier(5, 0),
+				createBuffer(0),
+				createCancellationBarrier(6, 0),
+				createBuffer(0)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
+		BarrierTracker tracker = new BarrierTracker(gate);
+
+		// negative values mean an expected cancellation call!
+		CheckpointSequenceValidator validator =
+				new CheckpointSequenceValidator(1, 2, -4, 5, -6);
+		tracker.registerCheckpointEventHandler(validator);
+
+		for (BufferOrEvent boe : sequence) {
+			if (boe.isBuffer()) {
+				assertEquals(boe, tracker.getNextNonBlocked());
+			}
+			assertTrue(tracker.isEmpty());
+		}
+
+		assertNull(tracker.getNextNonBlocked());
+		assertNull(tracker.getNextNonBlocked());
+	}
+
+	@Test
+	public void testMultiChannelAbortCheckpoint() throws Exception {
+		BufferOrEvent[] sequence = {
+				// some buffers and a successful checkpoint
+				createBuffer(0), createBuffer(2), createBuffer(0),
+				createBarrier(1, 1), createBarrier(1, 2),
+				createBuffer(2), createBuffer(1),
+				createBarrier(1, 0),
+
+				// aborted on last barrier
+				createBuffer(0), createBuffer(2),
+				createBarrier(2, 0), createBarrier(2, 2),
+				createBuffer(0), createBuffer(2),
+				createCancellationBarrier(2, 1),
+
+				// successful checkpoint
+				createBuffer(2), createBuffer(1),
+				createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0),
+
+				// abort on first barrier
+				createBuffer(0), createBuffer(1),
+				createCancellationBarrier(4, 1), createBarrier(4, 2),
+				createBuffer(0),
+				createBarrier(4, 0),
+
+				// another successful checkpoint
+				createBuffer(0), createBuffer(1), createBuffer(2),
+				createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0),
+
+				// abort multiple cancellations and a barrier after the cancellations 
+				createBuffer(0), createBuffer(1),
+				createCancellationBarrier(6, 1), createCancellationBarrier(6, 2),
+				createBarrier(6, 0),
+
+				createBuffer(0)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierTracker tracker = new BarrierTracker(gate);
+
+		// negative values mean an expected cancellation call!
+		CheckpointSequenceValidator validator =
+				new CheckpointSequenceValidator(1, -2, 3, -4, 5, -6);
+		tracker.registerCheckpointEventHandler(validator);
+
+		for (BufferOrEvent boe : sequence) {
+			if (boe.isBuffer()) {
+				assertEquals(boe, tracker.getNextNonBlocked());
+			}
+		}
+
+		assertTrue(tracker.isEmpty());
+
+		assertNull(tracker.getNextNonBlocked());
+		assertNull(tracker.getNextNonBlocked());
+
+		assertTrue(tracker.isEmpty());
+	}
+	
 	// ------------------------------------------------------------------------
 	//  Utils
 	// ------------------------------------------------------------------------
@@ -337,6 +434,10 @@ public class BarrierTrackerTest {
 		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
 	}
 
+	private static BufferOrEvent createCancellationBarrier(long id, int channel) {
+		return new BufferOrEvent(new CancelCheckpointMarker(id), channel);
+	}
+
 	private static BufferOrEvent createBuffer(int channel) {
 		return new BufferOrEvent(
 				new Buffer(MemorySegmentFactory.wrap(new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), channel);
@@ -346,22 +447,54 @@ public class BarrierTrackerTest {
 	//  Testing Mocks
 	// ------------------------------------------------------------------------
 	
-	private static class CheckpointSequenceValidator implements EventListener<CheckpointBarrier> {
+	private static class CheckpointSequenceValidator implements StatefulTask<StateHandle<Object>> {
 
 		private final long[] checkpointIDs;
-		
+
 		private int i = 0;
 
 		private CheckpointSequenceValidator(long... checkpointIDs) {
 			this.checkpointIDs = checkpointIDs;
 		}
-		
+
 		@Override
-		public void onEvent(CheckpointBarrier barrier) {
+		public void setInitialState(StateHandle<Object> state) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
 			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
-			assertNotNull(barrier);
-			assertEquals("wrong checkpoint id", checkpointIDs[i++], barrier.getId());
-			assertTrue(barrier.getTimestamp() > 0);
+
+			final long expectedId = checkpointIDs[i++];
+			if (expectedId >= 0) {
+				assertEquals("wrong checkpoint id", expectedId, checkpointId);
+				assertTrue(timestamp > 0);
+			} else {
+				fail("got 'triggerCheckpointOnBarrier()' when expecting an 'abortCheckpointOnBarrier()'");
+			}
+		}
+
+		@Override
+		public void abortCheckpointOnBarrier(long checkpointId) {
+			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
+
+			final long expectedId = checkpointIDs[i++];
+			if (expectedId < 0) {
+				assertEquals("wrong checkpoint id for checkoint abort", -expectedId, checkpointId);
+			} else {
+				fail("got 'abortCheckpointOnBarrier()' when expecting an 'triggerCheckpointOnBarrier()'");
+			}
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
 		}
 	}
 }


[3/4] flink git commit: [FLINK-4985] [checkpointing] Report canceled / declined checkpoints to the Checkpoint Coordinator

Posted by se...@apache.org.
[FLINK-4985] [checkpointing] Report canceled / declined checkpoints to the Checkpoint Coordinator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a4fdfff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a4fdfff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a4fdfff

Branch: refs/heads/release-1.1
Commit: 1a4fdfff5d364a35e935604c0a5058a1a9f242f7
Parents: a1f028d
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Nov 8 17:13:19 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 19:07:16 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  18 +-
 .../AlignmentLimitExceededException.java        |  33 +++
 .../decline/CheckpointDeclineException.java     |  35 +++
 ...ntDeclineOnCancellationBarrierException.java |  32 +++
 .../CheckpointDeclineSubsumedException.java     |  32 +++
 ...intDeclineTaskNotCheckpointingException.java |  32 +++
 .../CheckpointDeclineTaskNotReadyException.java |  32 +++
 .../decline/InputEndOfStreamException.java      |  32 +++
 .../flink/runtime/execution/Environment.java    |   9 +
 .../runtime/jobgraph/tasks/StatefulTask.java    |   3 +-
 .../messages/checkpoint/DeclineCheckpoint.java  |  65 +++---
 .../runtime/taskmanager/RuntimeEnvironment.java |   7 +
 .../apache/flink/runtime/taskmanager/Task.java  |  22 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  41 +---
 .../savepoint/SavepointCoordinatorTest.java     |   2 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   2 +-
 .../operators/testutils/DummyEnvironment.java   |   5 +
 .../operators/testutils/MockEnvironment.java    |   5 +
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   2 +-
 .../streaming/runtime/io/BarrierBuffer.java     |  28 ++-
 .../streaming/runtime/io/BarrierTracker.java    |   4 +-
 .../streaming/runtime/tasks/StreamTask.java     |  25 ++-
 .../streaming/runtime/io/BarrierBufferTest.java |  31 +--
 .../runtime/io/BarrierTrackerTest.java          |   2 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |   2 +
 .../runtime/tasks/StreamMockEnvironment.java    |   3 +
 .../StreamTaskCancellationBarrierTest.java      | 221 +++++++++++++++++++
 .../runtime/tasks/StreamTaskTestHarness.java    |  31 ++-
 .../runtime/tasks/TwoInputStreamTaskTest.java   |   2 +
 29 files changed, 632 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 409f05b..8661ddc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -575,6 +575,7 @@ public class CheckpointCoordinator {
 		}
 
 		final long checkpointId = message.getCheckpointId();
+		final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");
 
 		PendingCheckpoint checkpoint;
 
@@ -594,8 +595,8 @@ public class CheckpointCoordinator {
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
 				isPendingCheckpoint = true;
 
-				LOG.info("Discarding checkpoint " + checkpointId
-					+ " because of checkpoint decline from task " + message.getTaskExecutionId());
+				LOG.info("Discarding checkpoint {} because of checkpoint decline from task {} : {}",
+						checkpointId, message.getTaskExecutionId(), reason);
 
 				pendingCheckpoints.remove(checkpointId);
 				checkpoint.discard(userClassLoader);
@@ -604,19 +605,14 @@ public class CheckpointCoordinator {
 				onCancelCheckpoint(checkpointId);
 
 				boolean haveMoreRecentPending = false;
-				Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
-				while (entries.hasNext()) {
-					PendingCheckpoint p = entries.next().getValue();
-					if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
+				for (PendingCheckpoint p : pendingCheckpoints.values()) {
+					if (!p.isDiscarded() && p.getCheckpointId() >= checkpoint.getCheckpointId()) {
 						haveMoreRecentPending = true;
 						break;
 					}
 				}
-				if (!haveMoreRecentPending && !triggerRequestQueued) {
-					LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId);
-					triggerCheckpoint(System.currentTimeMillis());
-				} else if (!haveMoreRecentPending) {
-					LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId);
+
+				if (!haveMoreRecentPending) {
 					triggerQueuedRequests();
 				}
 			} else if (checkpoint != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
new file mode 100644
index 0000000..64d57bc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because too many bytes were
+ * buffered in the alignment phase.
+ */
+public final class AlignmentLimitExceededException extends CheckpointDeclineException {
+
+	private static final long serialVersionUID = 1L;
+
+	public AlignmentLimitExceededException(long numBytes) {
+		super("The checkpoint alignment phase needed to buffer more than the configured maximum ("
+				+ numBytes + " bytes).");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
new file mode 100644
index 0000000..8a2802c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Base class of all exceptions that indicate a declined checkpoint.
+ */
+public abstract class CheckpointDeclineException extends Exception {
+
+	private static final long serialVersionUID = 1L;
+
+	public CheckpointDeclineException(String message) {
+		super(message);
+	}
+
+	public CheckpointDeclineException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
new file mode 100644
index 0000000..9ae4096
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a cancellation
+ * barrier was received.
+ */
+public final class CheckpointDeclineOnCancellationBarrierException extends CheckpointDeclineException {
+
+	private static final long serialVersionUID = 1L;
+
+	public CheckpointDeclineOnCancellationBarrierException() {
+		super("Task received cancellation from one of its inputs");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
new file mode 100644
index 0000000..5380469
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a newer checkpoint
+ * barrier was received on an input before the pending checkpoint's barrier. 
+ */
+public final class CheckpointDeclineSubsumedException extends CheckpointDeclineException {
+
+	private static final long serialVersionUID = 1L;
+
+	public CheckpointDeclineSubsumedException(long newCheckpointId) {
+		super("Checkpoint was canceled because a barrier from newer checkpoint " + newCheckpointId + " was received.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
new file mode 100644
index 0000000..e5773d1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a task does not support
+ * checkpointing.
+ */
+public final class CheckpointDeclineTaskNotCheckpointingException extends CheckpointDeclineException {
+
+	private static final long serialVersionUID = 1L;
+
+	public CheckpointDeclineTaskNotCheckpointingException(String taskName) {
+		super("Task '" + taskName + "'does not support checkpointing");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
new file mode 100644
index 0000000..a1214fe
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a task was not
+ * ready to perform a checkpoint.
+ */
+public final class CheckpointDeclineTaskNotReadyException extends CheckpointDeclineException {
+
+	private static final long serialVersionUID = 1L;
+
+	public CheckpointDeclineTaskNotReadyException(String taskName) {
+		super("Task " + taskName + " was not running");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
new file mode 100644
index 0000000..86b29dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because one of the input
+ * stream reached its end before the alignment was complete.
+ */
+public final class InputEndOfStreamException extends CheckpointDeclineException {
+
+	private static final long serialVersionUID = 1L;
+
+	public InputEndOfStreamException() {
+		super("Checkpoint was declined because one input stream is finished");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 5ad5fe2..e84a5e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -167,6 +167,15 @@ public interface Environment {
 	void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state);
 
 	/**
+	 * Declines a checkpoint. This tells the checkpoint coordinator that this task will
+	 * not be able to successfully complete a certain checkpoint.
+	 * 
+	 * @param checkpointId The ID of the declined checkpoint.
+	 * @param cause An optional reason why the checkpoint was declined.
+	 */
+	void declineCheckpoint(long checkpointId, Throwable cause);
+	
+	/**
 	 * Marks task execution failed for an external reason (a reason other than the task code itself
 	 * throwing an exception). If the task is already in a terminal state
 	 * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 7c581df..874ca4a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -66,8 +66,9 @@ public interface StatefulTask<T extends StateHandle<?>> {
 	 * {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs.
 	 * 
 	 * @param checkpointId The ID of the checkpoint to be aborted.
+	 * @param cause The reason why the checkpoint was aborted during alignment   
 	 */
-	void abortCheckpointOnBarrier(long checkpointId) throws Exception;
+	void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception;
 
 	/**
 	 * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
index f26d2fb..dca212c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
@@ -19,7 +19,14 @@
 package org.apache.flink.runtime.messages.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
+import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.util.SerializedThrowable;
 
 /**
  * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
@@ -31,44 +38,48 @@ public class DeclineCheckpoint extends AbstractCheckpointMessage implements java
 
 	private static final long serialVersionUID = 2094094662279578953L;
 
-	/** The timestamp associated with the checkpoint */
-	private final long timestamp;
+	/** The reason why the checkpoint was declined */
+	private final Throwable reason;
 
-	public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
-		super(job, taskExecutionId, checkpointId);
-		this.timestamp = timestamp;
+	public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+		this(job, taskExecutionId, checkpointId, null);
 	}
 
-	// --------------------------------------------------------------------------------------------
+	public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, Throwable reason) {
+		super(job, taskExecutionId, checkpointId);
 
-	public long getTimestamp() {
-		return timestamp;
+		if (reason == null ||
+				reason.getClass() == AlignmentLimitExceededException.class ||
+				reason.getClass() == CheckpointDeclineOnCancellationBarrierException.class ||
+				reason.getClass() == CheckpointDeclineSubsumedException.class ||
+				reason.getClass() == CheckpointDeclineTaskNotCheckpointingException.class ||
+				reason.getClass() == CheckpointDeclineTaskNotReadyException.class ||
+				reason.getClass() == InputEndOfStreamException.class)
+		{
+			// null or known common exceptions that cannot reference any dynamically loaded code
+			this.reason = reason;
+		} else {
+			// some other exception. replace with a serialized throwable, to be on the safe side
+			this.reason = new SerializedThrowable(reason);
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
 
-	@Override
-	public int hashCode() {
-		return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32));
+	/**
+	 * Gets the reason why the checkpoint was declined.
+	 *
+	 * @return The reason why the checkpoint was declined
+	 */
+	public Throwable getReason() {
+		return reason;
 	}
 
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		else if (o instanceof DeclineCheckpoint) {
-			DeclineCheckpoint that = (DeclineCheckpoint) o;
-			return this.timestamp == that.timestamp && super.equals(o);
-		}
-		else {
-			return false;
-		}
-	}
+	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public String toString() {
-		return String.format("Declined Checkpoint %d@%d for (%s/%s)",
-				getCheckpointId(), getTimestamp(), getJob(), getTaskExecutionId());
+		return String.format("Declined Checkpoint %d for (%s/%s): %s",
+				getCheckpointId(), getJob(), getTaskExecutionId(), reason);
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 6fdf6f9..47149a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -268,6 +269,12 @@ public class RuntimeEnvironment implements Environment {
 	}
 
 	@Override
+	public void declineCheckpoint(long checkpointId, Throwable cause) {
+		DeclineCheckpoint message = new DeclineCheckpoint(jobId, executionId, checkpointId, cause);
+		jobManager.tell(message);
+	}
+
+	@Override
 	public void failExternally(Throwable cause) {
 		this.containingTask.failExternally(cause);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index ed15dbf..3eab7e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -29,6 +29,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -969,13 +971,16 @@ public class Task implements Runnable {
 						try {
 							boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
 							if (!success) {
-								DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp);
+								// task was not ready to trigger this checkpoint
+								DeclineCheckpoint decline = new DeclineCheckpoint(
+										jobId, getExecutionId(), checkpointID,
+										new CheckpointDeclineTaskNotReadyException(taskName));
 								jobManager.tell(decline);
 							}
 						}
 						catch (Throwable t) {
 							if (getExecutionState() == ExecutionState.RUNNING) {
-								failExternally(new RuntimeException(
+								failExternally(new Exception(
 									"Error while triggering checkpoint for " + taskName,
 									t));
 							}
@@ -987,10 +992,21 @@ public class Task implements Runnable {
 			else {
 				LOG.error("Task received a checkpoint request, but is not a checkpointing task - "
 						+ taskNameWithSubtask);
+
+				DeclineCheckpoint decline = new DeclineCheckpoint(
+						jobId, executionId, checkpointID,
+						new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
+				jobManager.tell(decline);
 			}
 		}
 		else {
-			LOG.debug("Ignoring request to trigger a checkpoint for non-running task.");
+			LOG.debug("Declining checkpoint request for non-running task");
+
+			// send back a message that we did not do the checkpoint
+			DeclineCheckpoint decline = new DeclineCheckpoint(
+					jobId, executionId, checkpointID,
+					new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
+			jobManager.tell(decline);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 62af42b..f3f988a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -288,44 +288,19 @@ public class CheckpointCoordinatorTest {
 
 			// decline checkpoint from the other task, this should cancel the checkpoint
 			// and trigger a new one
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
 			assertTrue(checkpoint.isDiscarded());
 
-			// validate that we have a new pending checkpoint
-			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			// validate that we have no new pending checkpoint
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew);
-
-			assertNotNull(checkpointNew);
-			assertEquals(checkpointIdNew, checkpointNew.getCheckpointId());
-			assertEquals(jid, checkpointNew.getJobId());
-			assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
-			assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
-			assertEquals(0, checkpointNew.getTaskStates().size());
-			assertFalse(checkpointNew.isDiscarded());
-			assertFalse(checkpointNew.isFullyAcknowledged());
-			assertNotEquals(checkpoint.getCheckpointId(), checkpointNew.getCheckpointId());
-
-			// check that the vertices received the new trigger checkpoint message
-			{
-				TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointIdNew, checkpointNew.getCheckpointTimestamp());
-				TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointIdNew, checkpointNew.getCheckpointTimestamp());
-				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
-				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
-			}
-
 			// decline again, nothing should happen
 			// decline from the other task, nothing should happen
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId, checkpoint.getCheckpointTimestamp()));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId));
 			assertTrue(checkpoint.isDiscarded());
 
-			// should still have the same second checkpoint pending
-			long checkpointIdNew2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-			assertEquals(checkpointIdNew2, checkpointIdNew);
-
 			coord.shutdown();
 		}
 		catch (Exception e) {
@@ -422,7 +397,7 @@ public class CheckpointCoordinatorTest {
 
 			// decline checkpoint from one of the tasks, this should cancel the checkpoint
 			// and trigger a new one
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
 			assertTrue(checkpoint1.isDiscarded());
 
 			// validate that we have only one pending checkpoint left
@@ -446,8 +421,8 @@ public class CheckpointCoordinatorTest {
 
 			// decline again, nothing should happen
 			// decline from the other task, nothing should happen
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
-			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
+			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id));
 			assertTrue(checkpoint1.isDiscarded());
 
 			coord.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
index b1b384d..dc2b23f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
@@ -202,7 +202,7 @@ public class SavepointCoordinatorTest extends TestLogger {
 
 		coordinator.receiveDeclineMessage(new DeclineCheckpoint(
 				jobId, vertices[1].getCurrentExecutionAttempt().getAttemptId(),
-				checkpointId, 0));
+				checkpointId));
 
 
 		// The pending checkpoint is completed

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 4dfaf95..2e3bace 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -455,7 +455,7 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public void abortCheckpointOnBarrier(long checkpointId) {
+		public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
 			throw new UnsupportedOperationException("should not be called!");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 5af34fb..ca68683 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -140,6 +140,11 @@ public class DummyEnvironment implements Environment {
 	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {}
 
 	@Override
+	public void declineCheckpoint(long checkpointId, Throwable cause) {
+		throw new UnsupportedOperationException();
+	}
+	
+	@Override
 	public void failExternally(Throwable cause) {
 		throw new UnsupportedOperationException("DummyEnvironment does not support external task failure.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 9dea324..22dee63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -291,6 +291,11 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
+	public void declineCheckpoint(long checkpointId, Throwable cause) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
 	public void failExternally(Throwable cause) {
 		throw new UnsupportedOperationException("MockEnvironment does not support external task failure.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 5b344eb..3ace0f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -226,7 +226,7 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
-		public void abortCheckpointOnBarrier(long checkpointId) {
+		public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
 			throw new UnsupportedOperationException("Should not be called");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 36de717..7a8e7d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,6 +18,10 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -142,7 +146,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				}
 				else {
 					if (next.getEvent().getClass() == EndOfPartitionEvent.class) {
-						processEndOfPartition(next.getChannelIndex());
+						processEndOfPartition();
 					}
 					return next;
 				}
@@ -196,7 +200,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 						"Skipping current checkpoint.", barrierId, currentCheckpointId);
 
 				// let the task know we are not completing this
-				notifyAbort(currentCheckpointId);
+				notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
 
 				// abort the current checkpoint
 				releaseBlocksAndResetBarriers();
@@ -241,7 +245,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			if (barrierId > currentCheckpointId) {
 				// new checkpoint
 				currentCheckpointId = barrierId;
-				notifyAbort(barrierId);
+				notifyAbortOnCancellationBarrier(barrierId);
 			}
 			return;
 		}
@@ -258,7 +262,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				}
 
 				releaseBlocksAndResetBarriers();
-				notifyAbort(barrierId);
+				notifyAbortOnCancellationBarrier(barrierId);
 			}
 			else if (barrierId > currentCheckpointId) {
 				// we canceled the next which also cancels the current
@@ -272,7 +276,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				currentCheckpointId = barrierId;
 				startOfAlignmentTimestamp = 0L;
 				latestAlignmentDurationNanos = 0L;
-				notifyAbort(barrierId);
+				notifyAbortOnCancellationBarrier(barrierId);
 			}
 
 			// else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now)
@@ -292,7 +296,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId);
 			}
 
-			notifyAbort(barrierId);
+			notifyAbortOnCancellationBarrier(barrierId);
 		}
 
 		// else: trailing barrier from either
@@ -300,12 +304,12 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		//   - the current checkpoint if it was already canceled
 	}
 
-	private void processEndOfPartition(int channel) throws Exception {
+	private void processEndOfPartition() throws Exception {
 		numClosedChannels++;
 
 		if (numBarriersReceived > 0) {
 			// let the task know we skip a checkpoint
-			notifyAbort(currentCheckpointId);
+			notifyAbort(currentCheckpointId, new InputEndOfStreamException());
 
 			// no chance to complete this checkpoint
 			releaseBlocksAndResetBarriers();
@@ -319,9 +323,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		}
 	}
 
-	private void notifyAbort(long checkpointId) throws Exception {
+	private void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception {
+		notifyAbort(checkpointId, new CheckpointDeclineOnCancellationBarrierException());
+	}
+
+	private void notifyAbort(long checkpointId, CheckpointDeclineException cause) throws Exception {
 		if (toNotifyOnCheckpoint != null) {
-			toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+			toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 5157336..8b4cc48 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
@@ -227,7 +228,8 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 
 	private void notifyAbort(long checkpointId) throws Exception {
 		if (toNotifyOnCheckpoint != null) {
-			toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+			toNotifyOnCheckpoint.abortCheckpointOnBarrier(
+					checkpointId, new CheckpointDeclineOnCancellationBarrierException());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d55a9c5..4f0839f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -592,13 +594,15 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	}
 
 	@Override
-	public void abortCheckpointOnBarrier(long checkpointId) throws Exception {
+	public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception {
 		LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, getName());
 
+		// notify the coordinator that we decline this checkpoint
+		getEnvironment().declineCheckpoint(checkpointId, cause);
+
+		// notify all downstream operators that they should not wait for a barrier from us
 		synchronized (lock) {
-			if (isRunning) {
-				operatorChain.broadcastCheckpointCancelMarker(checkpointId);
-			}
+			operatorChain.broadcastCheckpointCancelMarker(checkpointId);
 		}
 	}
 
@@ -669,7 +673,18 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 					checkpointThread.start();
 				}
 				return true;
-			} else {
+			}
+			else {
+				// we cannot perform our checkpoint - let the downstream operators know that they
+				// should not wait for any input from this operator
+
+				// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
+				// yet be created
+				final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointId);
+				for (ResultPartitionWriter output : getEnvironment().getAllWriters()) {
+					output.writeEventToAllChannels(message);
+				}
+
 				return false;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index cf1f98e..20cb8f7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -45,6 +47,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
@@ -566,7 +569,7 @@ public class BarrierBufferTest {
 			check(sequence[12], buffer.getNextNonBlocked());
 			assertEquals(3L, buffer.getCurrentCheckpointId());
 			validateAlignmentTime(startTs, buffer);
-			verify(toNotify).abortCheckpointOnBarrier(2L);
+			verify(toNotify).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineSubsumedException.class));
 			check(sequence[16], buffer.getNextNonBlocked());
 
 			// checkpoint 3 alignment in progress
@@ -574,7 +577,7 @@ public class BarrierBufferTest {
 
 			// checkpoint 3 aborted (end of partition)
 			check(sequence[20], buffer.getNextNonBlocked());
-			verify(toNotify).abortCheckpointOnBarrier(3L);
+			verify(toNotify).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class));
 
 			// replay buffered data from checkpoint 3
 			check(sequence[18], buffer.getNextNonBlocked());
@@ -999,13 +1002,13 @@ public class BarrierBufferTest {
 		check(sequence[6], buffer.getNextNonBlocked());
 		assertEquals(5L, buffer.getCurrentCheckpointId());
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), anyLong());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class));
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong());
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		check(sequence[8], buffer.getNextNonBlocked());
 		assertEquals(6L, buffer.getCurrentCheckpointId());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class));
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 		
 		buffer.cleanup();
@@ -1073,7 +1076,7 @@ public class BarrierBufferTest {
 		// canceled checkpoint on last barrier
 		startTs = System.nanoTime();
 		check(sequence[12], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
 		validateAlignmentTime(startTs, buffer);
 		check(sequence[13], buffer.getNextNonBlocked());
 
@@ -1088,7 +1091,7 @@ public class BarrierBufferTest {
 
 		// this checkpoint gets immediately canceled
 		check(sequence[24], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class));
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		// some buffers
@@ -1104,7 +1107,7 @@ public class BarrierBufferTest {
 		check(sequence[33], buffer.getNextNonBlocked());
 
 		check(sequence[37], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class));
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		// all done
@@ -1167,7 +1170,7 @@ public class BarrierBufferTest {
 
 		// re-read the queued cancellation barriers
 		check(sequence[9], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		check(sequence[10], buffer.getNextNonBlocked());
@@ -1184,7 +1187,7 @@ public class BarrierBufferTest {
 
 		// no further checkpoint (abort) notifications
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class));
 
 		// all done
 		assertNull(buffer.getNextNonBlocked());
@@ -1253,7 +1256,7 @@ public class BarrierBufferTest {
 		// cancelled by cancellation barrier
 		check(sequence[4], buffer.getNextNonBlocked());
 		validateAlignmentTime(startTs, buffer);
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(1L), any(CheckpointDeclineOnCancellationBarrierException.class));
 
 		// the next checkpoint alignment starts now
 		startTs = System.nanoTime();
@@ -1285,7 +1288,7 @@ public class BarrierBufferTest {
 
 		// check overall notifications
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
 	}
 
 	/**
@@ -1337,7 +1340,7 @@ public class BarrierBufferTest {
 		// future barrier aborts checkpoint
 		startTs = System.nanoTime();
 		check(sequence[3], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(3L);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class));
 		check(sequence[4], buffer.getNextNonBlocked());
 
 		// alignment of next checkpoint
@@ -1366,7 +1369,7 @@ public class BarrierBufferTest {
 
 		// check overall notifications
 		verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
-		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
 	}
 
 	// ------------------------------------------------------------------------
@@ -1471,7 +1474,7 @@ public class BarrierBufferTest {
 		}
 
 		@Override
-		public void abortCheckpointOnBarrier(long checkpointId) {}
+		public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {}
 
 		@Override
 		public void notifyCheckpointComplete(long checkpointId) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 903f585..978c212 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -481,7 +481,7 @@ public class BarrierTrackerTest {
 		}
 
 		@Override
-		public void abortCheckpointOnBarrier(long checkpointId) {
+		public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
 			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
 
 			final long expectedId = checkpointIDs[i++];

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 5fcc59e..9003f0e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -270,6 +271,7 @@ public class OneInputStreamTaskTest {
 		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
 		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
 
+		expectedOutput.add(new CancelCheckpointMarker(0));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
 		expectedOutput.add(new CheckpointBarrier(1, 1));

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 7084208..36ad8ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -302,6 +302,9 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
+	public void declineCheckpoint(long checkpointId, Throwable cause) {}
+
+	@Override
 	public void failExternally(Throwable cause) {
 		throw new UnsupportedOperationException("StreamMockEnvironment does not support external task failure.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
new file mode 100644
index 0000000..8b8b659
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"})
+public class StreamTaskCancellationBarrierTest {
+
+	/**
+	 * This test checks that tasks emit a proper cancel checkpoint barrier, if a "trigger checkpoint" message
+	 * comes before they are ready.
+	 */
+	@Test
+	public void testEmitCancellationBarrierWhenNotReady() throws Exception {
+		StreamTask<String, ?> task = new InitBlockingTask();
+		StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO);
+
+		// start the test - this cannot succeed across the 'init()' method
+		testHarness.invoke();
+
+		// tell the task to commence a checkpoint
+		boolean result = task.triggerCheckpoint(41L, System.currentTimeMillis());
+		assertFalse("task triggered checkpoint though not ready", result);
+
+		// a cancellation barrier should be downstream
+		Object emitted = testHarness.getOutput().poll();
+		assertNotNull("nothing emitted", emitted);
+		assertTrue("wrong type emitted", emitted instanceof CancelCheckpointMarker);
+		assertEquals("wrong checkpoint id", 41L, ((CancelCheckpointMarker) emitted).getCheckpointId());
+	}
+
+	/**
+	 * This test verifies (for onw input tasks) that the Stream tasks react the following way to
+	 * receiving a checkpoint cancellation barrier:
+	 *
+	 *   - send a "decline checkpoint" notification out (to the JobManager)
+	 *   - emit a cancellation barrier downstream
+	 */
+	@Test
+	public void testDeclineCallOnCancelBarrierOneInput() throws Exception {
+
+		OneInputStreamTask<String, String> task = new OneInputStreamTask<String, String>();
+		OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
+				task,
+				1, 2,
+				BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		StreamMap<String, String> mapOperator = new StreamMap<>(new IdentityMap());
+		streamConfig.setStreamOperator(mapOperator);
+
+		StreamMockEnvironment environment = spy(testHarness.createEnvironment());
+
+		// start the task
+		testHarness.invoke(environment);
+		testHarness.waitForTaskRunning();
+
+		// emit cancellation barriers
+		testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 1);
+		testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0);
+		testHarness.waitForInputProcessing();
+
+		// the decline call should go to the coordinator
+		verify(environment, times(1)).declineCheckpoint(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
+
+		// a cancellation barrier should be downstream
+		Object result = testHarness.getOutput().poll();
+		assertNotNull("nothing emitted", result);
+		assertTrue("wrong type emitted", result instanceof CancelCheckpointMarker);
+		assertEquals("wrong checkpoint id", 2L, ((CancelCheckpointMarker) result).getCheckpointId());
+
+		// cancel and shutdown
+		testHarness.endInput();
+		testHarness.waitForTaskCompletion();
+	}
+
+	/**
+	 * This test verifies (for onw input tasks) that the Stream tasks react the following way to
+	 * receiving a checkpoint cancellation barrier:
+	 *
+	 *   - send a "decline checkpoint" notification out (to the JobManager)
+	 *   - emit a cancellation barrier downstream
+	 */
+	@Test
+	public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception {
+
+		TwoInputStreamTask<String, String, String> task = new TwoInputStreamTask<String, String, String>();
+		TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>(
+				task,
+				BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		CoStreamMap<String, String, String> op = new CoStreamMap<>(new UnionCoMap());
+		streamConfig.setStreamOperator(op);
+
+		StreamMockEnvironment environment = spy(testHarness.createEnvironment());
+
+		// start the task
+		testHarness.invoke(environment);
+		testHarness.waitForTaskRunning();
+
+		// emit cancellation barriers
+		testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0);
+		testHarness.processEvent(new CancelCheckpointMarker(2L), 1, 0);
+		testHarness.waitForInputProcessing();
+
+		// the decline call should go to the coordinator
+		verify(environment, times(1)).declineCheckpoint(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
+
+		// a cancellation barrier should be downstream
+		Object result = testHarness.getOutput().poll();
+		assertNotNull("nothing emitted", result);
+		assertTrue("wrong type emitted", result instanceof CancelCheckpointMarker);
+		assertEquals("wrong checkpoint id", 2L, ((CancelCheckpointMarker) result).getCheckpointId());
+
+		// cancel and shutdown
+		testHarness.endInput();
+		testHarness.waitForTaskCompletion();
+	}
+
+	// ------------------------------------------------------------------------
+	//  test tasks / functions
+	// ------------------------------------------------------------------------
+
+	private static class InitBlockingTask extends StreamTask<String, AbstractStreamOperator<String>> {
+
+		private final Object lock = new Object();
+		private volatile boolean running = true;
+
+		@Override
+		protected void init() throws Exception {
+			synchronized (lock) {
+				while (running) {
+					lock.wait();
+				}
+			}
+		}
+
+		@Override
+		protected void run() throws Exception {}
+
+		@Override
+		protected void cleanup() throws Exception {}
+
+		@Override
+		protected void cancelTask() throws Exception {
+			running = false;
+			synchronized (lock) {
+				lock.notifyAll();
+			}
+		}
+	}
+
+	private static class IdentityMap implements MapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map(String value) throws Exception {
+			return value;
+		}
+	}
+
+	private static class UnionCoMap implements CoMapFunction<String, String, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map1(String value) throws Exception {
+			return value;
+		}
+
+		@Override
+		public String map2(String value) throws Exception {
+			return value;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 00e95b9..0bd8d9a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -150,21 +150,18 @@ public class StreamTaskTestHarness<OUT> {
 
 	}
 
+	public StreamMockEnvironment createEnvironment() {
+		return new StreamMockEnvironment(
+				jobConfig, taskConfig, executionConfig, memorySize, new MockInputSplitProvider(), bufferSize);
+	}
+	
 	/**
 	 * Invoke the Task. This resets the output of any previous invocation. This will start a new
 	 * Thread to execute the Task in. Use {@link #waitForTaskCompletion()} to wait for the
 	 * Task thread to finish running.
 	 */
 	public void invoke() throws Exception {
-		mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, executionConfig,
-			memorySize, new MockInputSplitProvider(), bufferSize);
-		task.setEnvironment(mockEnv);
-
-		initializeInputs();
-		initializeOutput();
-
-		taskThread = new TaskThread(task);
-		taskThread.start();
+		invoke(createEnvironment());
 	}
 
 	/**
@@ -205,7 +202,7 @@ public class StreamTaskTestHarness<OUT> {
 			if (taskThread.task instanceof StreamTask) {
 				StreamTask<?, ?> streamTask = (StreamTask<?, ?>) taskThread.task;
 				while (!streamTask.isRunning()) {
-					Thread.sleep(100);
+					Thread.sleep(10);
 					if (!taskThread.isAlive()) {
 						if (taskThread.getError() != null) {
 							throw new Exception("Task Thread failed due to an error.", taskThread.getError());
@@ -282,15 +279,15 @@ public class StreamTaskTestHarness<OUT> {
 	/**
 	 * This only returns after all input queues are empty.
 	 */
-	public void waitForInputProcessing() {
-
-
+	public void waitForInputProcessing() throws Exception {
 		// first wait for all input queues to be empty
-		try {
-			Thread.sleep(1);
-		} catch (InterruptedException ignored) {}
-		
+
 		while (true) {
+			Throwable error = taskThread.getError();
+			if (error != null) {
+				throw new Exception("Exception in the task thread", error);
+			}
+
 			boolean allEmpty = true;
 			for (int i = 0; i < numInputGates; i++) {
 				if (!inputGates[i].allQueuesEmpty()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index b9211b1..92f8553 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -290,6 +291,7 @@ public class TwoInputStreamTaskTest {
 		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
 		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
 
+		expectedOutput.add(new CancelCheckpointMarker(0));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
 		expectedOutput.add(new CheckpointBarrier(1, 1));


[4/4] flink git commit: [FLINK-4975] [checkpointing] Add a limit for how much data may be buffered in alignment.

Posted by se...@apache.org.
[FLINK-4975] [checkpointing] Add a limit for how much data may be buffered in alignment.

If more data than the defined amount is buffered, the alignment is aborted and the checkpoint canceled.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0962cb6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0962cb6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0962cb6f

Branch: refs/heads/release-1.1
Commit: 0962cb6f45607fb21d50030e325e99fc2c37164a
Parents: 1a4fdff
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 3 15:28:15 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 19:07:16 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  15 +
 .../streaming/runtime/io/BarrierBuffer.java     |  48 ++-
 .../streaming/runtime/io/BufferSpiller.java     |  39 ++-
 .../runtime/io/StreamInputProcessor.java        |  17 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  17 +-
 .../runtime/tasks/OneInputStreamTask.java       |   3 +-
 .../runtime/tasks/TwoInputStreamTask.java       |   3 +-
 .../io/BarrierBufferAlignmentLimitTest.java     | 315 +++++++++++++++++++
 8 files changed, 441 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d1ad1c4..d9ccb35 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -277,6 +277,14 @@ public final class ConfigConstants {
 	@PublicEvolving
 	public static final String TASK_CANCELLATION_TIMEOUT_MILLIS = "task.cancellation.timeout";
 
+	/**
+	 * The maximum number of bytes that a checkpoint alignment may buffer.
+	 * If the checkpoint alignment buffers more than the configured amount of
+	 * data, the checkpoint is aborted (skipped).
+	 */
+	@PublicEvolving
+	public static final String TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT = "task.checkpoint.alignment.max-size";
+
 	// --------------------------- Runtime Algorithms -------------------------------
 	
 	/**
@@ -873,6 +881,13 @@ public final class ConfigConstants {
 	 */
 	public static final long DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS = 0; // deactivated
 
+	/**
+	 * The default for the maximum number of bytes that a checkpoint alignment may buffer.
+	 * {@code -1} = infinite.
+	 */
+	@PublicEvolving
+	public static final long DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT = -1L;
+
 	// ------------------------ Runtime Algorithms ------------------------
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 7a8e7d3..c4cf98e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
@@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayDeque;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
  * all inputs have received the barrier for a given checkpoint.
@@ -65,6 +68,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	 * further data from the input gate. */
 	private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> queuedBuffered;
 
+	/** The maximum number of bytes that may be buffered before an alignment is broken. -1 means unlimited */
+	private final long maxBufferedBytes;
+
 	/** The sequence of buffers/events that has been unblocked and must now be consumed
 	 * before requesting further data from the input gate */
 	private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
@@ -82,6 +88,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	/** The number of already closed channels */
 	private int numClosedChannels;
 
+	/** The number of bytes in the queued spilled sequences */
+	private long numQueuedBytes;
+
 	/** The timestamp as in {@link System#nanoTime()} at which the last alignment started */
 	private long startOfAlignmentTimestamp;
 
@@ -92,14 +101,37 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private boolean endOfStream;
 
 	/**
+	 * Creates a new checkpoint stream aligner.
+	 * 
+	 * <p>There is no limit to how much data may be buffered during an alignment.
 	 * 
 	 * @param inputGate The input gate to draw the buffers and events from.
 	 * @param ioManager The I/O manager that gives access to the temp directories.
-	 * 
+	 *
 	 * @throws IOException Thrown, when the spilling to temp files cannot be initialized.
 	 */
 	public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException {
+		this (inputGate, ioManager, -1);
+	}
+
+	/**
+	 * Creates a new checkpoint stream aligner.
+	 * 
+	 * <p>The aligner will allow only alignments that buffer up to the given number of bytes.
+	 * When that number is exceeded, it will stop the alignment and notify the task that the
+	 * checkpoint has been cancelled.
+	 * 
+	 * @param inputGate The input gate to draw the buffers and events from.
+	 * @param ioManager The I/O manager that gives access to the temp directories.
+	 * @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts.
+	 * 
+	 * @throws IOException Thrown, when the spilling to temp files cannot be initialized.
+	 */
+	public BarrierBuffer(InputGate inputGate, IOManager ioManager, long maxBufferedBytes) throws IOException {
+		checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
+
 		this.inputGate = inputGate;
+		this.maxBufferedBytes = maxBufferedBytes;
 		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
 		this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
 
@@ -131,6 +163,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				if (isBlocked(next.getChannelIndex())) {
 					// if the channel is blocked we, we just store the BufferOrEvent
 					bufferSpiller.add(next);
+					checkSizeLimit();
 				}
 				else if (next.isBuffer()) {
 					return next;
@@ -169,6 +202,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		currentBuffered = queuedBuffered.pollFirst();
 		if (currentBuffered != null) {
 			currentBuffered.open();
+			numQueuedBytes -= currentBuffered.size();
 		}
 	}
 
@@ -333,6 +367,16 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		}
 	}
 
+	private void checkSizeLimit() throws Exception {
+		if (maxBufferedBytes > 0 && (numQueuedBytes + bufferSpiller.getBytesWritten()) > maxBufferedBytes) {
+			// exceeded our limit - abort this checkpoint
+			LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded",
+					currentCheckpointId, maxBufferedBytes);
+
+			releaseBlocksAndResetBarriers();
+			notifyAbort(currentCheckpointId, new AlignmentLimitExceededException(maxBufferedBytes));
+		}
+	}
 
 	@Override
 	public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) {
@@ -359,6 +403,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			seq.cleanup();
 		}
 		queuedBuffered.clear();
+		numQueuedBytes = 0L;
 	}
 
 	private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
@@ -429,6 +474,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			if (bufferedNow != null) {
 				bufferedNow.open();
 				queuedBuffered.addFirst(currentBuffered);
+				numQueuedBytes += currentBuffered.size();
 				currentBuffered = bufferedNow;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index dc8d245..8060d02 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -84,9 +84,9 @@ public class BufferSpiller {
 	
 	/** A counter, to created numbered spill files */
 	private int fileCounter;
-	
-	/** A flag to check whether the spiller has written since the last roll over */
-	private boolean hasWritten;
+
+	/** The number of bytes written since the last roll over */
+	private long bytesWritten;
 	
 	/**
 	 * Creates a new buffer spiller, spilling to one of the I/O manager's temp directories.
@@ -124,7 +124,6 @@ public class BufferSpiller {
 	 * @throws IOException Thrown, if the buffer of event could not be spilled.
 	 */
 	public void add(BufferOrEvent boe) throws IOException {
-		hasWritten = true;
 		try {
 			ByteBuffer contents;
 			if (boe.isBuffer()) {
@@ -140,7 +139,9 @@ public class BufferSpiller {
 			headBuffer.putInt(contents.remaining());
 			headBuffer.put((byte) (boe.isBuffer() ? 0 : 1));
 			headBuffer.flip();
-			
+
+			bytesWritten += (headBuffer.remaining() + contents.remaining());
+
 			sources[1] = contents;
 			currentChannel.write(sources);
 		}
@@ -186,7 +187,7 @@ public class BufferSpiller {
 	}
 	
 	private SpilledBufferOrEventSequence rollOverInternal(boolean newBuffer) throws IOException {
-		if (!hasWritten) {
+		if (bytesWritten == 0) {
 			return null;
 		}
 		
@@ -205,8 +206,8 @@ public class BufferSpiller {
 		
 		// create ourselves a new spill file
 		createSpillingChannel();
-		
-		hasWritten = false;
+
+		bytesWritten = 0L;
 		return seq;
 	}
 
@@ -225,6 +226,14 @@ public class BufferSpiller {
 		}
 	}
 
+	/**
+	 * Gets the number of bytes written in the current spill file.
+	 * @return the number of bytes written in the current spill file
+	 */
+	public long getBytesWritten() {
+		return bytesWritten;
+	}
+
 	// ------------------------------------------------------------------------
 	//  For testing
 	// ------------------------------------------------------------------------
@@ -268,6 +277,9 @@ public class BufferSpiller {
 		/** The byte buffer for bulk reading */
 		private final ByteBuffer buffer;
 
+		/** We store this size as a constant because it is crucial it never changes */
+		private final long size;
+
 		/** The page size to instantiate properly sized memory segments */
 		private final int pageSize;
 
@@ -282,11 +294,13 @@ public class BufferSpiller {
 		 * @param buffer The buffer used for bulk reading.
 		 * @param pageSize The page size to use for the created memory segments.
 		 */
-		SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize) {
+		SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize)
+				throws IOException {
 			this.file = file;
 			this.fileChannel = fileChannel;
 			this.buffer = buffer;
 			this.pageSize = pageSize;
+			this.size = fileChannel.size();
 		}
 
 		/**
@@ -408,5 +422,12 @@ public class BufferSpiller {
 				throw new IOException("Cannot remove temp file for stream alignment writer");
 			}
 		}
+
+		/**
+		 * Gets the size of this spilled sequence.
+		 */
+		public long size() throws IOException {
+			return size;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 7d9e4d2..bcca2bb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
@@ -87,12 +90,22 @@ public class StreamInputProcessor<IN> {
 								StatefulTask<?> checkpointListener,
 								CheckpointingMode checkpointMode,
 								IOManager ioManager,
-								boolean enableWatermarkMultiplexing) throws IOException {
+								boolean enableWatermarkMultiplexing,
+								Configuration taskManagerConfig) throws IOException {
 
 		InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
 		if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
-			this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+			long maxAlign = taskManagerConfig.getLong(
+					ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT,
+					ConfigConstants.DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+
+			if (!(maxAlign == -1 || maxAlign > 0)) {
+				throw new IllegalConfigurationException(
+						ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT
+						+ " must be positive or -1 (infinite)");
+			}
+			this.barrierHandler = new BarrierBuffer(inputGate, ioManager, maxAlign);
 		}
 		else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
 			this.barrierHandler = new BarrierTracker(inputGate);

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index a3ae077..f116aff 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
@@ -97,12 +100,22 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 			StatefulTask<?> checkpointListener,
 			CheckpointingMode checkpointMode,
 			IOManager ioManager,
-			boolean enableWatermarkMultiplexing) throws IOException {
+			boolean enableWatermarkMultiplexing,
+			Configuration taskManagerConfig) throws IOException {
 		
 		final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
 
 		if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
-			this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+			long maxAlign = taskManagerConfig.getLong(
+					ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT,
+					ConfigConstants.DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+
+			if (!(maxAlign == -1 || maxAlign > 0)) {
+				throw new IllegalConfigurationException(
+						ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT
+								+ " must be positive or -1 (infinite)");
+			}
+			this.barrierHandler = new BarrierBuffer(inputGate, ioManager, maxAlign);
 		}
 		else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
 			this.barrierHandler = new BarrierTracker(inputGate);

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index d18ca16..8470c7c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -46,7 +46,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 					this,
 					configuration.getCheckpointMode(),
 					getEnvironment().getIOManager(),
-					isSerializingTimestamps());
+					isSerializingTimestamps(),
+					getEnvironment().getTaskManagerInfo().getConfiguration());
 
 			// make sure that stream tasks report their I/O statistics
 			AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 9252063..8718b88 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -71,7 +71,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 				this,
 				configuration.getCheckpointMode(),
 				getEnvironment().getIOManager(),
-				isSerializingTimestamps());
+				isSerializingTimestamps(),
+				getEnvironment().getTaskManagerInfo().getConfiguration());
 
 		// make sure that stream tasks report their I/O statistics
 		AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
new file mode 100644
index 0000000..529f809
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the barrier buffer's maximum limit of buffered/spilled bytes 
+ */
+public class BarrierBufferAlignmentLimitTest {
+
+	private static final int PAGE_SIZE = 512;
+
+	private static final Random RND = new Random();
+
+	private static IOManager IO_MANAGER;
+
+	// ------------------------------------------------------------------------
+	//  Setup
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void setup() {
+		IO_MANAGER = new IOManagerAsync();
+	}
+
+	@AfterClass
+	public static void shutdownIOManager() {
+		IO_MANAGER.shutdown();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This tests that a single alignment that buffers too much data cancels
+	 */
+	@Test
+	public void testBreakCheckpointAtAlignmentLimit() throws Exception {
+		BufferOrEvent[] sequence = {
+				// some initial buffers
+				/*  0 */ createBuffer(1, 100), createBuffer(2, 70),
+				/*  2 */ createBuffer(0, 42), createBuffer(2, 111),
+
+				// starting a checkpoint
+				/*  4 */ createBarrier(7, 1), 
+				/*  5 */ createBuffer(1, 100), createBuffer(2, 200), createBuffer(1, 300), createBuffer(0, 50),
+				/*  9 */ createBarrier(7, 0),
+				/* 10 */ createBuffer(2, 100), createBuffer(0, 100), createBuffer(1, 200), createBuffer(0, 200),
+
+				// this buffer makes the alignment spill too large
+				/* 14 */ createBuffer(0, 101),
+
+				// additional data
+				/* 15 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100),
+				
+				// checkpoint completes - this should not result in a "completion notification"
+				/* 18 */ createBarrier(7, 2),
+
+				// trailing buffers
+				/* 19 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100)
+		};
+
+		// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 1000);
+
+		StatefulTask<?> toNotify = mock(StatefulTask.class);
+		buffer.registerCheckpointEventHandler(toNotify);
+
+		// validating the sequence of buffers
+
+		check(sequence[0], buffer.getNextNonBlocked());
+		check(sequence[1], buffer.getNextNonBlocked());
+		check(sequence[2], buffer.getNextNonBlocked());
+		check(sequence[3], buffer.getNextNonBlocked());
+
+		// start of checkpoint
+		long startTs = System.nanoTime();
+		check(sequence[6], buffer.getNextNonBlocked());
+		check(sequence[8], buffer.getNextNonBlocked());
+		check(sequence[10], buffer.getNextNonBlocked());
+
+		// trying to pull the next makes the alignment overflow - so buffered buffers are replayed
+		check(sequence[5], buffer.getNextNonBlocked());
+		validateAlignmentTime(startTs, buffer);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L), any(AlignmentLimitExceededException.class));
+
+		// playing back buffered events
+		check(sequence[7], buffer.getNextNonBlocked());
+		check(sequence[11], buffer.getNextNonBlocked());
+		check(sequence[12], buffer.getNextNonBlocked());
+		check(sequence[13], buffer.getNextNonBlocked());
+		check(sequence[14], buffer.getNextNonBlocked());
+
+		// the additional data
+		check(sequence[15], buffer.getNextNonBlocked());
+		check(sequence[16], buffer.getNextNonBlocked());
+		check(sequence[17], buffer.getNextNonBlocked());
+
+		check(sequence[19], buffer.getNextNonBlocked());
+		check(sequence[20], buffer.getNextNonBlocked());
+		check(sequence[21], buffer.getNextNonBlocked());
+
+		// no call for a completed checkpoint must have happened
+		verify(toNotify, times(0)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+
+		assertNull(buffer.getNextNonBlocked());
+		assertNull(buffer.getNextNonBlocked());
+
+		buffer.cleanup();
+		checkNoTempFilesRemain();
+	}
+
+	/**
+	 * This tests the following case:
+	 *   - an alignment starts
+	 *   - barriers from a second checkpoint queue before the first completes
+	 *   - together they are larger than the threshold
+	 *   - after the first checkpoint (with second checkpoint data queued) aborts, the second completes 
+	 */
+	@Test
+	public void testAlignmentLimitWithQueuedAlignments() throws Exception {
+		BufferOrEvent[] sequence = {
+				// some initial buffers
+				/*  0 */ createBuffer(1, 100), createBuffer(2, 70),
+
+				// starting a checkpoint
+				/*  2 */ createBarrier(3, 2), 
+				/*  3 */ createBuffer(1, 100), createBuffer(2, 100), 
+				/*  5 */ createBarrier(3, 0),
+				/*  6 */ createBuffer(0, 100), createBuffer(1, 100),
+
+				// queue some data from the next checkpoint
+				/*  8 */ createBarrier(4, 0),
+				/*  9 */ createBuffer(0, 100), createBuffer(0, 120), createBuffer(1, 100),
+
+				// this one makes the alignment overflow
+				/* 12 */ createBuffer(2, 100),
+
+				// checkpoint completed
+				/* 13 */ createBarrier(3, 1),
+
+				// more for the next checkpoint
+				/* 14 */ createBarrier(4, 1),
+				/* 15 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100),
+
+				// next checkpoint completes
+				/* 18 */ createBarrier(4, 2),
+
+				// trailing data
+				/* 19 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100)
+		};
+
+		// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 500);
+
+		StatefulTask<?> toNotify = mock(StatefulTask.class);
+		buffer.registerCheckpointEventHandler(toNotify);
+
+		// validating the sequence of buffers
+		long startTs;
+
+		check(sequence[0], buffer.getNextNonBlocked());
+		check(sequence[1], buffer.getNextNonBlocked());
+
+		// start of checkpoint
+		startTs = System.nanoTime();
+		check(sequence[3], buffer.getNextNonBlocked());
+		check(sequence[7], buffer.getNextNonBlocked());
+
+		// next checkpoint also in progress
+		check(sequence[11], buffer.getNextNonBlocked());
+
+		// checkpoint alignment aborted due to too much data
+		check(sequence[4], buffer.getNextNonBlocked());
+		validateAlignmentTime(startTs, buffer);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(AlignmentLimitExceededException.class));
+
+		// replay buffered data - in the middle, the alignment for checkpoint 4 starts
+		check(sequence[6], buffer.getNextNonBlocked());
+		startTs = System.nanoTime();
+		check(sequence[12], buffer.getNextNonBlocked());
+
+		// only checkpoint 4 is pending now - the last checkpoint 3 barrier will not trigger success 
+		check(sequence[17], buffer.getNextNonBlocked());
+
+		// checkpoint 4 completed - check and validate buffered replay
+		check(sequence[9], buffer.getNextNonBlocked());
+		validateAlignmentTime(startTs, buffer);
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(4L), anyLong());
+
+		check(sequence[10], buffer.getNextNonBlocked());
+		check(sequence[15], buffer.getNextNonBlocked());
+		check(sequence[16], buffer.getNextNonBlocked());
+
+		// trailing data
+		check(sequence[19], buffer.getNextNonBlocked());
+		check(sequence[20], buffer.getNextNonBlocked());
+		check(sequence[21], buffer.getNextNonBlocked());
+
+		// only checkpoint 4 was successfully completed, not checkpoint 3
+		verify(toNotify, times(0)).triggerCheckpointOnBarrier(eq(3L), anyLong());
+
+		assertNull(buffer.getNextNonBlocked());
+		assertNull(buffer.getNextNonBlocked());
+
+		buffer.cleanup();
+		checkNoTempFilesRemain();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static BufferOrEvent createBuffer(int channel, int size) {
+		byte[] bytes = new byte[size];
+		RND.nextBytes(bytes);
+
+		MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
+		memory.put(0, bytes);
+
+		Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE);
+		buf.setSize(size);
+
+		// retain an additional time so it does not get disposed after being read by the input gate
+		buf.retain();
+
+		return new BufferOrEvent(buf, channel);
+	}
+
+	private static BufferOrEvent createBarrier(long id, int channel) {
+		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+	}
+
+	private static void check(BufferOrEvent expected, BufferOrEvent present) {
+		assertNotNull(expected);
+		assertNotNull(present);
+		assertEquals(expected.isBuffer(), present.isBuffer());
+
+		if (expected.isBuffer()) {
+			assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
+			MemorySegment expectedMem = expected.getBuffer().getMemorySegment();
+			MemorySegment presentMem = present.getBuffer().getMemorySegment();
+			assertTrue("memory contents differs", expectedMem.compare(presentMem, 0, 0, PAGE_SIZE) == 0);
+		}
+		else {
+			assertEquals(expected.getEvent(), present.getEvent());
+		}
+	}
+
+	private static void validateAlignmentTime(long startTimestamp, BarrierBuffer buffer) {
+		final long elapsed = System.nanoTime() - startTimestamp;
+		assertTrue("wrong alignment time", buffer.getAlignmentDurationNanos() <= elapsed);
+	}
+
+	private static void checkNoTempFilesRemain() {
+		// validate that all temp files have been removed
+		for (File dir : IO_MANAGER.getSpillingDirectories()) {
+			for (String file : dir.list()) {
+				if (file != null && !(file.equals(".") || file.equals(".."))) {
+					fail("barrier buffer did not clean up temp files. remaining file: " + file);
+				}
+			}
+		}
+	}
+}


[2/4] flink git commit: [FLINK-4984] [checkpointing] Add Cancellation Barriers as a way to signal aborted checkpoints

Posted by se...@apache.org.
[FLINK-4984] [checkpointing] Add Cancellation Barriers as a way to signal aborted checkpoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1f028de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1f028de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1f028de

Branch: refs/heads/release-1.1
Commit: a1f028dee49928ada014632bb27216b36e30250e
Parents: 4dd3efe
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Oct 23 18:41:32 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 18:27:47 2016 +0100

----------------------------------------------------------------------
 .../io/network/api/CancelCheckpointMarker.java  |  77 +++
 .../api/serialization/EventSerializer.java      |  57 +-
 .../runtime/io/network/netty/NettyMessage.java  |   2 +-
 .../partition/PipelinedSubpartition.java        |   3 +-
 .../runtime/jobgraph/tasks/StatefulTask.java    |  21 +
 .../api/serialization/EventSerializerTest.java  |  45 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  10 +
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  10 +
 .../streaming/runtime/io/BarrierBuffer.java     | 265 ++++++--
 .../streaming/runtime/io/BarrierTracker.java    | 164 +++--
 .../streaming/runtime/io/BufferSpiller.java     |   2 +-
 .../runtime/io/CheckpointBarrierHandler.java    |  22 +-
 .../runtime/io/StreamInputProcessor.java        |   5 +-
 .../runtime/io/StreamTwoInputProcessor.java     |   5 +-
 .../runtime/tasks/OneInputStreamTask.java       |   2 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  32 +-
 .../streaming/runtime/tasks/StreamTask.java     |  48 +-
 .../runtime/tasks/TwoInputStreamTask.java       |   2 +-
 .../streaming/runtime/io/BarrierBufferTest.java | 617 +++++++++++++++++--
 .../runtime/io/BarrierTrackerTest.java          | 157 ++++-
 20 files changed, 1291 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
new file mode 100644
index 0000000..52a2517
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * The CancelCheckpointMarker travels through the data streams, similar to the {@link CheckpointBarrier},
+ * but signals that a certain checkpoint should be canceled. Any in-progress alignment for that
+ * checkpoint needs to be canceled and regular processing should be resumed.
+ */
+public class CancelCheckpointMarker extends RuntimeEvent {
+
+	/** The id of the checkpoint to be canceled */
+	private final long checkpointId;
+
+	public CancelCheckpointMarker(long checkpointId) {
+		this.checkpointId = checkpointId;
+	}
+
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	// ------------------------------------------------------------------------
+	// These known and common event go through special code paths, rather than
+	// through generic serialization 
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		throw new UnsupportedOperationException("this method should never be called");
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		throw new UnsupportedOperationException("this method should never be called");
+	}
+	
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return (int) (checkpointId ^ (checkpointId >>> 32));
+	}
+
+	@Override
+	public boolean equals(Object other) {
+		return other != null && 
+				other.getClass() == CancelCheckpointMarker.class &&
+				this.checkpointId == ((CancelCheckpointMarker) other).checkpointId;
+	}
+
+	@Override
+	public String toString() {
+		return "CancelCheckpointMarker " + checkpointId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index a34f8cf..0bc3b28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -38,7 +39,7 @@ import java.nio.ByteOrder;
  * Utility class to serialize and deserialize task events.
  */
 public class EventSerializer {
-	
+
 	private static final int END_OF_PARTITION_EVENT = 0;
 
 	private static final int CHECKPOINT_BARRIER_EVENT = 1;
@@ -46,17 +47,19 @@ public class EventSerializer {
 	private static final int END_OF_SUPERSTEP_EVENT = 2;
 
 	private static final int OTHER_EVENT = 3;
-	
+
+	private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4;
+
 	// ------------------------------------------------------------------------
-	
-	public static ByteBuffer toSerializedEvent(AbstractEvent event) {
+
+	public static ByteBuffer toSerializedEvent(AbstractEvent event) throws IOException {
 		final Class<?> eventClass = event.getClass();
 		if (eventClass == EndOfPartitionEvent.class) {
 			return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_PARTITION_EVENT });
 		}
 		else if (eventClass == CheckpointBarrier.class) {
 			CheckpointBarrier barrier = (CheckpointBarrier) event;
-			
+
 			ByteBuffer buf = ByteBuffer.allocate(20);
 			buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
 			buf.putLong(4, barrier.getId());
@@ -66,32 +69,39 @@ public class EventSerializer {
 		else if (eventClass == EndOfSuperstepEvent.class) {
 			return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_SUPERSTEP_EVENT });
 		}
+		else if (eventClass == CancelCheckpointMarker.class) {
+			CancelCheckpointMarker marker = (CancelCheckpointMarker) event;
+
+			ByteBuffer buf = ByteBuffer.allocate(12);
+			buf.putInt(0, CANCEL_CHECKPOINT_MARKER_EVENT);
+			buf.putLong(4, marker.getCheckpointId());
+			return buf;
+		}
 		else {
 			try {
 				final DataOutputSerializer serializer = new DataOutputSerializer(128);
 				serializer.writeInt(OTHER_EVENT);
 				serializer.writeUTF(event.getClass().getName());
 				event.write(serializer);
-	
 				return serializer.wrapAsByteBuffer();
 			}
 			catch (IOException e) {
-				throw new RuntimeException("Error while serializing event.", e);
+				throw new IOException("Error while serializing event.", e);
 			}
 		}
 	}
 
-	public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) {
+	public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) throws IOException {
 		if (buffer.remaining() < 4) {
-			throw new RuntimeException("Incomplete event");
+			throw new IOException("Incomplete event");
 		}
-		
+
 		final ByteOrder bufferOrder = buffer.order();
 		buffer.order(ByteOrder.BIG_ENDIAN);
-		
+
 		try {
 			int type = buffer.getInt();
-				
+
 			if (type == END_OF_PARTITION_EVENT) {
 				return EndOfPartitionEvent.INSTANCE;
 			}
@@ -103,35 +113,38 @@ public class EventSerializer {
 			else if (type == END_OF_SUPERSTEP_EVENT) {
 				return EndOfSuperstepEvent.INSTANCE;
 			}
+			else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) {
+				long id = buffer.getLong();
+				return new CancelCheckpointMarker(id);
+			}
 			else if (type == OTHER_EVENT) {
 				try {
 					final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
-		
 					final String className = deserializer.readUTF();
-		
+
 					final Class<? extends AbstractEvent> clazz;
 					try {
 						clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
 					}
 					catch (ClassNotFoundException e) {
-						throw new RuntimeException("Could not load event class '" + className + "'.", e);
+						throw new IOException("Could not load event class '" + className + "'.", e);
 					}
 					catch (ClassCastException e) {
-						throw new RuntimeException("The class '" + className + "' is not a valid subclass of '"
+						throw new IOException("The class '" + className + "' is not a valid subclass of '"
 								+ AbstractEvent.class.getName() + "'.", e);
 					}
-		
+
 					final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
 					event.read(deserializer);
-		
+
 					return event;
 				}
 				catch (Exception e) {
-					throw new RuntimeException("Error while deserializing or instantiating event.", e);
+					throw new IOException("Error while deserializing or instantiating event.", e);
 				}
 			} 
 			else {
-				throw new RuntimeException("Corrupt byte stream for event");
+				throw new IOException("Corrupt byte stream for event");
 			}
 		}
 		finally {
@@ -143,7 +156,7 @@ public class EventSerializer {
 	// Buffer helpers
 	// ------------------------------------------------------------------------
 
-	public static Buffer toBuffer(AbstractEvent event) {
+	public static Buffer toBuffer(AbstractEvent event) throws IOException {
 		final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);
 
 		MemorySegment data = MemorySegmentFactory.wrap(serializedEvent.array());
@@ -154,7 +167,7 @@ public class EventSerializer {
 		return buffer;
 	}
 
-	public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) {
+	public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) throws IOException {
 		return fromSerializedEvent(buffer.getNioBuffer(), classLoader);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 3a24181..6f6001b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -463,7 +463,7 @@ abstract class NettyMessage {
 		}
 
 		@Override
-		public void readFrom(ByteBuf buffer) {
+		public void readFrom(ByteBuf buffer) throws IOException {
 			// TODO Directly deserialize fromNetty's buffer
 			int length = buffer.readInt();
 			ByteBuffer serializedEvent = ByteBuffer.allocate(length);

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 2d7097d..b703acb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.util.event.NotificationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -88,7 +89,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public void finish() {
+	public void finish() throws IOException {
 		final NotificationListener listener;
 
 		synchronized (buffers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index f8bba1a..7c581df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -47,6 +47,27 @@ public interface StatefulTask<T extends StateHandle<?>> {
 	 */
 	boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
 
+	/**
+	 * This method is called when a checkpoint is triggered as a result of receiving checkpoint
+	 * barriers on all input streams.
+	 *
+	 * @param checkpointId The ID of the checkpoint, incrementing.
+	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
+	 *
+	 * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
+	 */
+	void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception;
+
+	/**
+	 * Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,
+	 * but at least one {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker}.
+	 * 
+	 * <p>This requires implementing tasks to forward a
+	 * {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs.
+	 * 
+	 * @param checkpointId The ID of the checkpoint to be aborted.
+	 */
+	void abortCheckpointOnBarrier(long checkpointId) throws Exception;
 
 	/**
 	 * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index ddfd758..d47a0b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -28,34 +29,30 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class EventSerializerTest {
 
 	@Test
-	public void testSerializeDeserializeEvent() {
-		try {
-			AbstractEvent[] events = {
-					EndOfPartitionEvent.INSTANCE,
-					EndOfSuperstepEvent.INSTANCE,
-					new CheckpointBarrier(1678L, 4623784L),
-					new TestTaskEvent(Math.random(), 12361231273L)
-			};
-			
-			for (AbstractEvent evt : events) {
-				ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
-				assertTrue(serializedEvent.hasRemaining());
-
-				AbstractEvent deserialized = 
-						EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
-				assertNotNull(deserialized);
-				assertEquals(evt, deserialized);
-			}
-			
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+	public void testSerializeDeserializeEvent() throws Exception {
+		AbstractEvent[] events = {
+				EndOfPartitionEvent.INSTANCE,
+				EndOfSuperstepEvent.INSTANCE,
+				new CheckpointBarrier(1678L, 4623784L),
+				new TestTaskEvent(Math.random(), 12361231273L),
+				new CancelCheckpointMarker(287087987329842L)
+		};
+		
+		for (AbstractEvent evt : events) {
+			ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
+			assertTrue(serializedEvent.hasRemaining());
+
+			AbstractEvent deserialized = 
+					EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
+			assertNotNull(deserialized);
+			assertEquals(evt, deserialized);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index f050e29..4dfaf95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -450,6 +450,16 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
+		public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
+			throw new UnsupportedOperationException("should not be called!");
+		}
+
+		@Override
+		public void abortCheckpointOnBarrier(long checkpointId) {
+			throw new UnsupportedOperationException("should not be called!");
+		}
+
+		@Override
 		public void notifyCheckpointComplete(long checkpointId) {
 			if (completedCheckpoints++ > NUM_CHECKPOINTS_TO_COMPLETE) {
 				completedCheckpointsLatch.countDown();

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 0c0d064..5b344eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -221,6 +221,16 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
+		public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
+			throw new UnsupportedOperationException("Should not be called");
+		}
+
+		@Override
+		public void abortCheckpointOnBarrier(long checkpointId) {
+			throw new UnsupportedOperationException("Should not be called");
+		}
+
+		@Override
 		public void notifyCheckpointComplete(long checkpointId) {
 			if (checkpointId != lastCheckpointId && this.error == null) {
 				this.error = new Exception("calls out of order");

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index dcd76c6..36de717 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -17,42 +17,43 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import java.io.IOException;
-import java.util.ArrayDeque;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayDeque;
+
 /**
  * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
  * all inputs have received the barrier for a given checkpoint.
  * 
  * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
  * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until 
- * the blocks are released.</p>
+ * the blocks are released.
  */
 @Internal
 public class BarrierBuffer implements CheckpointBarrierHandler {
 
 	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
-	
+
 	/** The gate that the buffer draws its input from */
 	private final InputGate inputGate;
 
 	/** Flags that indicate whether a channel is currently blocked/buffered */
 	private final boolean[] blockedChannels;
-	
+
 	/** The total number of channels that this buffer handles data from */
 	private final int totalNumberOfInputChannels;
-	
+
 	/** To utility to write blocked data to a file channel */
 	private final BufferSpiller bufferSpiller;
 
@@ -65,17 +66,24 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
 
 	/** Handler that receives the checkpoint notifications */
-	private EventListener<CheckpointBarrier> checkpointHandler;
+	private StatefulTask<?> toNotifyOnCheckpoint;
 
 	/** The ID of the checkpoint for which we expect barriers */
 	private long currentCheckpointId = -1L;
 
-	/** The number of received barriers (= number of blocked/buffered channels) */
+	/** The number of received barriers (= number of blocked/buffered channels)
+	 * IMPORTANT: A canceled checkpoint must always have 0 barriers */
 	private int numBarriersReceived;
-	
+
 	/** The number of already closed channels */
 	private int numClosedChannels;
-	
+
+	/** The timestamp as in {@link System#nanoTime()} at which the last alignment started */
+	private long startOfAlignmentTimestamp;
+
+	/** The time (in nanoseconds) that the latest alignment took */
+	private long latestAlignmentDurationNanos;
+
 	/** Flag to indicate whether we have drawn all available input */
 	private boolean endOfStream;
 
@@ -90,7 +98,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		this.inputGate = inputGate;
 		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
 		this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
-		
+
 		this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize());
 		this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
 	}
@@ -100,7 +108,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+	public BufferOrEvent getNextNonBlocked() throws Exception {
 		while (true) {
 			// process buffered BufferOrEvents before grabbing new ones
 			BufferOrEvent next;
@@ -114,7 +122,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 					return getNextNonBlocked();
 				}
 			}
-			
+
 			if (next != null) {
 				if (isBlocked(next.getChannelIndex())) {
 					// if the channel is blocked we, we just store the BufferOrEvent
@@ -129,27 +137,29 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 						processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
 					}
 				}
+				else if (next.getEvent().getClass() == CancelCheckpointMarker.class) {
+					processCancellationBarrier((CancelCheckpointMarker) next.getEvent());
+				}
 				else {
 					if (next.getEvent().getClass() == EndOfPartitionEvent.class) {
-						numClosedChannels++;
-						// no chance to complete this checkpoint
-						releaseBlocks();
+						processEndOfPartition(next.getChannelIndex());
 					}
 					return next;
 				}
 			}
 			else if (!endOfStream) {
-				// end of stream. we feed the data that is still buffered
+				// end of input stream. stream continues with the buffered data
 				endOfStream = true;
-				releaseBlocks();
+				releaseBlocksAndResetBarriers();
 				return getNextNonBlocked();
 			}
 			else {
+				// final end of both input and buffered data
 				return null;
 			}
 		}
 	}
-	
+
 	private void completeBufferedSequence() throws IOException {
 		currentBuffered.cleanup();
 		currentBuffered = queuedBuffered.pollFirst();
@@ -157,66 +167,175 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			currentBuffered.open();
 		}
 	}
-	
-	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws IOException {
+
+	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
 		final long barrierId = receivedBarrier.getId();
 
+		// fast path for single channel cases
+		if (totalNumberOfInputChannels == 1) {
+			if (barrierId > currentCheckpointId) {
+				// new checkpoint
+				currentCheckpointId = barrierId;
+				notifyCheckpoint(receivedBarrier);
+			}
+			return;
+		}
+
+		// -- general code path for multiple input channels --
+
 		if (numBarriersReceived > 0) {
-			// subsequent barrier of a checkpoint.
+			// this is only true if some alignment is already progress and was not canceled
+
 			if (barrierId == currentCheckpointId) {
 				// regular case
 				onBarrier(channelIndex);
 			}
 			else if (barrierId > currentCheckpointId) {
-				// we did not complete the current checkpoint
+				// we did not complete the current checkpoint, another started before
 				LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
 						"Skipping current checkpoint.", barrierId, currentCheckpointId);
 
-				releaseBlocks();
-				currentCheckpointId = barrierId;
-				onBarrier(channelIndex);
+				// let the task know we are not completing this
+				notifyAbort(currentCheckpointId);
+
+				// abort the current checkpoint
+				releaseBlocksAndResetBarriers();
+
+				// begin a the new checkpoint
+				beginNewAlignment(barrierId, channelIndex);
 			}
 			else {
-				// ignore trailing barrier from aborted checkpoint
+				// ignore trailing barrier from an earlier checkpoint (obsolete now)
 				return;
 			}
-			
 		}
 		else if (barrierId > currentCheckpointId) {
 			// first barrier of a new checkpoint
-			currentCheckpointId = barrierId;
-			onBarrier(channelIndex);
+			beginNewAlignment(barrierId, channelIndex);
 		}
 		else {
-			// trailing barrier from previous (skipped) checkpoint
+			// either the current checkpoint was canceled (numBarriers == 0) or
+			// this barrier is from an old subsumed checkpoint
 			return;
 		}
 
-		// check if we have all barriers
+		// check if we have all barriers - since canceled checkpoints always have zero barriers
+		// this can only happen on a non canceled checkpoint
 		if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
+			// actually trigger checkpoint
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Received all barrier, triggering checkpoint {} at {}",
+				LOG.debug("Received all barriers, triggering checkpoint {} at {}",
 						receivedBarrier.getId(), receivedBarrier.getTimestamp());
 			}
 
-			if (checkpointHandler != null) {
-				checkpointHandler.onEvent(receivedBarrier);
+			releaseBlocksAndResetBarriers();
+			notifyCheckpoint(receivedBarrier);
+		}
+	}
+
+	private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
+		final long barrierId = cancelBarrier.getCheckpointId();
+
+		// fast path for single channel cases
+		if (totalNumberOfInputChannels == 1) {
+			if (barrierId > currentCheckpointId) {
+				// new checkpoint
+				currentCheckpointId = barrierId;
+				notifyAbort(barrierId);
 			}
-			
-			releaseBlocks();
+			return;
+		}
+
+		// -- general code path for multiple input channels --
+
+		if (numBarriersReceived > 0) {
+			// this is only true if some alignment is in progress and nothing was canceled
+
+			if (barrierId == currentCheckpointId) {
+				// cancel this alignment
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Checkpoint {} canceled, aborting alignment", barrierId);
+				}
+
+				releaseBlocksAndResetBarriers();
+				notifyAbort(barrierId);
+			}
+			else if (barrierId > currentCheckpointId) {
+				// we canceled the next which also cancels the current
+				LOG.warn("Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
+						"Skipping current checkpoint.", barrierId, currentCheckpointId);
+
+				// this stops the current alignment
+				releaseBlocksAndResetBarriers();
+
+				// the next checkpoint starts as canceled
+				currentCheckpointId = barrierId;
+				startOfAlignmentTimestamp = 0L;
+				latestAlignmentDurationNanos = 0L;
+				notifyAbort(barrierId);
+			}
+
+			// else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now)
+
+		}
+		else if (barrierId > currentCheckpointId) {
+			// first barrier of a new checkpoint is directly a cancellation
+
+			// by setting the currentCheckpointId to this checkpoint while keeping the numBarriers
+			// at zero means that no checkpoint barrier can start a new alignment
+			currentCheckpointId = barrierId;
+
+			startOfAlignmentTimestamp = 0L;
+			latestAlignmentDurationNanos = 0L;
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId);
+			}
+
+			notifyAbort(barrierId);
 		}
+
+		// else: trailing barrier from either
+		//   - a previous (subsumed) checkpoint
+		//   - the current checkpoint if it was already canceled
 	}
-	
+
+	private void processEndOfPartition(int channel) throws Exception {
+		numClosedChannels++;
+
+		if (numBarriersReceived > 0) {
+			// let the task know we skip a checkpoint
+			notifyAbort(currentCheckpointId);
+
+			// no chance to complete this checkpoint
+			releaseBlocksAndResetBarriers();
+		}
+	}
+
+	private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception {
+		if (toNotifyOnCheckpoint != null) {
+			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+					checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
+		}
+	}
+
+	private void notifyAbort(long checkpointId) throws Exception {
+		if (toNotifyOnCheckpoint != null) {
+			toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+		}
+	}
+
+
 	@Override
-	public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
-		if (this.checkpointHandler == null) {
-			this.checkpointHandler = checkpointHandler;
+	public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) {
+		if (this.toNotifyOnCheckpoint == null) {
+			this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
 		}
 		else {
-			throw new IllegalStateException("BarrierBuffer already has a registered checkpoint handler");
+			throw new IllegalStateException("BarrierBuffer already has a registered checkpoint notifyee");
 		}
 	}
-	
+
 	@Override
 	public boolean isEmpty() {
 		return currentBuffered == null;
@@ -231,8 +350,20 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		for (BufferSpiller.SpilledBufferOrEventSequence seq : queuedBuffered) {
 			seq.cleanup();
 		}
+		queuedBuffered.clear();
 	}
-	
+
+	private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
+		currentCheckpointId = checkpointId;
+		onBarrier(channelIndex);
+
+		startOfAlignmentTimestamp = System.nanoTime();
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Starting stream alignment for checkpoint " + checkpointId);
+		}
+	}
+
 	/**
 	 * Checks whether the channel with the given index is blocked.
 	 * 
@@ -242,7 +373,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private boolean isBlocked(int channelIndex) {
 		return blockedChannels[channelIndex];
 	}
-	
+
 	/**
 	 * Blocks the given channel index, from which a barrier has been received.
 	 * 
@@ -251,30 +382,28 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private void onBarrier(int channelIndex) throws IOException {
 		if (!blockedChannels[channelIndex]) {
 			blockedChannels[channelIndex] = true;
+
 			numBarriersReceived++;
-			
+
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Received barrier from channel " + channelIndex);
 			}
 		}
 		else {
-			throw new IOException("Stream corrupt: Repeated barrier for same checkpoint and input stream");
+			throw new IOException("Stream corrupt: Repeated barrier for same checkpoint on input " + channelIndex);
 		}
 	}
 
 	/**
-	 * Releases the blocks on all channels. Makes sure the just written data
-	 * is the next to be consumed.
+	 * Releases the blocks on all channels and resets the barrier count.
+	 * Makes sure the just written data is the next to be consumed.
 	 */
-	private void releaseBlocks() throws IOException {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Releasing blocks");
-		}
+	private void releaseBlocksAndResetBarriers() throws IOException {
+		LOG.debug("End of stream alignment, feeding buffered data back");
 
 		for (int i = 0; i < blockedChannels.length; i++) {
 			blockedChannels[i] = false;
 		}
-		numBarriersReceived = 0;
 
 		if (currentBuffered == null) {
 			// common case: no more buffered data
@@ -295,10 +424,18 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				currentBuffered = bufferedNow;
 			}
 		}
+
+		// the next barrier that comes must assume it is the first
+		numBarriersReceived = 0;
+
+		if (startOfAlignmentTimestamp > 0) {
+			latestAlignmentDurationNanos = System.nanoTime() - startOfAlignmentTimestamp;
+			startOfAlignmentTimestamp = 0;
+		}
 	}
 
 	// ------------------------------------------------------------------------
-	// For Testing
+	//  Properties
 	// ------------------------------------------------------------------------
 
 	/**
@@ -309,7 +446,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	public long getCurrentCheckpointId() {
 		return this.currentCheckpointId;
 	}
-	
+
+	@Override
+	public long getAlignmentDurationNanos() {
+		long start = this.startOfAlignmentTimestamp;
+		if (start <= 0) {
+			return latestAlignmentDurationNanos;
+		} else {
+			return System.nanoTime() - start;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	// Utilities 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 9c9ec4f..5157336 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,12 +19,12 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
-import java.io.IOException;
 import java.util.ArrayDeque;
 
 /**
@@ -34,9 +34,9 @@ import java.util.ArrayDeque;
  * 
  * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input
  * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing
- * guarantees. It can, however, be used to gain "at least once" processing guarantees.</p>
+ * guarantees. It can, however, be used to gain "at least once" processing guarantees.
  * 
- * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.</p>
+ * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.
  */
 @Internal
 public class BarrierTracker implements CheckpointBarrierHandler {
@@ -57,11 +57,12 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
 	
 	/** The listener to be notified on complete checkpoints */
-	private EventListener<CheckpointBarrier> checkpointHandler;
+	private StatefulTask<?> toNotifyOnCheckpoint;
 	
 	/** The highest checkpoint ID encountered so far */
 	private long latestPendingCheckpointID = -1;
-	
+
+	// ------------------------------------------------------------------------
 	
 	public BarrierTracker(InputGate inputGate) {
 		this.inputGate = inputGate;
@@ -70,28 +71,33 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	}
 
 	@Override
-	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+	public BufferOrEvent getNextNonBlocked() throws Exception {
 		while (true) {
 			BufferOrEvent next = inputGate.getNextBufferOrEvent();
-			if (next == null) {
-				return null;
-			}
-			else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
+			if (next == null || next.isBuffer()) {
+				// buffer or input exhausted
 				return next;
 			}
-			else {
+			else if (next.getEvent().getClass() == CheckpointBarrier.class) {
 				processBarrier((CheckpointBarrier) next.getEvent());
 			}
+			else if (next.getEvent().getClass() == CancelCheckpointMarker.class) {
+				processCheckpointAbortBarrier((CancelCheckpointMarker) next.getEvent());
+			}
+			else {
+				// some other event
+				return next;
+			}
 		}
 	}
 
 	@Override
-	public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
-		if (this.checkpointHandler == null) {
-			this.checkpointHandler = checkpointHandler;
+	public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) {
+		if (this.toNotifyOnCheckpoint == null) {
+			this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
 		}
 		else {
-			throw new IllegalStateException("BarrierTracker already has a registered checkpoint handler");
+			throw new IllegalStateException("BarrierTracker already has a registered checkpoint notifyee");
 		}
 	}
 
@@ -105,22 +111,27 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		return pendingCheckpoints.isEmpty();
 	}
 
-	private void processBarrier(CheckpointBarrier receivedBarrier) {
+	@Override
+	public long getAlignmentDurationNanos() {
+		// this one does not do alignment at all
+		return 0L;
+	}
+
+	private void processBarrier(CheckpointBarrier receivedBarrier) throws Exception {
+		final long barrierId = receivedBarrier.getId();
+
 		// fast path for single channel trackers
 		if (totalNumberOfInputChannels == 1) {
-			if (checkpointHandler != null) {
-				checkpointHandler.onEvent(receivedBarrier);
-			}
+			notifyCheckpoint(barrierId, receivedBarrier.getTimestamp());
 			return;
 		}
-		
+
 		// general path for multiple input channels
-		final long barrierId = receivedBarrier.getId();
 
 		// find the checkpoint barrier in the queue of bending barriers
 		CheckpointBarrierCount cbc = null;
 		int pos = 0;
-		
+
 		for (CheckpointBarrierCount next : pendingCheckpoints) {
 			if (next.checkpointId == barrierId) {
 				cbc = next;
@@ -128,21 +139,21 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 			}
 			pos++;
 		}
-		
+
 		if (cbc != null) {
 			// add one to the count to that barrier and check for completion
 			int numBarriersNew = cbc.incrementBarrierCount();
 			if (numBarriersNew == totalNumberOfInputChannels) {
-				// checkpoint can be triggered
+				// checkpoint can be triggered (or is aborted and all barriers have been seen)
 				// first, remove this checkpoint and all all prior pending
 				// checkpoints (which are now subsumed)
 				for (int i = 0; i <= pos; i++) {
 					pendingCheckpoints.pollFirst();
 				}
-				
+
 				// notify the listener
-				if (checkpointHandler != null) {
-					checkpointHandler.onEvent(receivedBarrier);
+				if (!cbc.isAborted()) {
+					notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp());
 				}
 			}
 		}
@@ -163,45 +174,104 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		}
 	}
 
+	private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier) throws Exception {
+		final long checkpointId = barrier.getCheckpointId();
+
+		// fast path for single channel trackers
+		if (totalNumberOfInputChannels == 1) {
+			notifyAbort(checkpointId);
+			return;
+		}
+
+		// -- general path for multiple input channels --
+
+		// find the checkpoint barrier in the queue of pending barriers
+		// while doing this we "abort" all checkpoints before that one
+		CheckpointBarrierCount cbc;
+		while ((cbc = pendingCheckpoints.peekFirst()) != null && cbc.checkpointId() < checkpointId) {
+			pendingCheckpoints.removeFirst();
+		}
+
+		if (cbc != null && cbc.checkpointId() == checkpointId) {
+			// make sure the checkpoint is remembered as aborted
+			if (cbc.markAborted()) {
+				// this was the first time the checkpoint was aborted - notify
+				notifyAbort(checkpointId);
+			}
+
+			// we still count the barriers to be able to remove the entry once all barriers have been seen
+			if (cbc.incrementBarrierCount() == totalNumberOfInputChannels) {
+				// we can remove this entry
+				pendingCheckpoints.removeFirst();
+			}
+		}
+		else {
+			notifyAbort(checkpointId);
+
+			// first barrier for this checkpoint - remember it as aborted
+			// since we polled away all entries with lower checkpoint IDs
+			// this entry will become the new first entry
+			if (pendingCheckpoints.size() < MAX_CHECKPOINTS_TO_TRACK) {
+				CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId);
+				abortedMarker.markAborted();
+				pendingCheckpoints.addFirst(abortedMarker);
+			}
+		}
+	}
+
+	private void notifyCheckpoint(long checkpointId, long timestamp) throws Exception {
+		if (toNotifyOnCheckpoint != null) {
+			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointId, timestamp);
+		}
+	}
+
+	private void notifyAbort(long checkpointId) throws Exception {
+		if (toNotifyOnCheckpoint != null) {
+			toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Simple class for a checkpoint ID with a barrier counter.
 	 */
 	private static final class CheckpointBarrierCount {
-		
+
 		private final long checkpointId;
-		
+
 		private int barrierCount;
-		
-		private CheckpointBarrierCount(long checkpointId) {
+
+		private boolean aborted;
+
+		CheckpointBarrierCount(long checkpointId) {
 			this.checkpointId = checkpointId;
 			this.barrierCount = 1;
 		}
 
+		public long checkpointId() {
+			return checkpointId;
+		}
+
 		public int incrementBarrierCount() {
 			return ++barrierCount;
 		}
-		
-		@Override
-		public int hashCode() {
-			return (int) ((checkpointId >>> 32) ^ checkpointId) + 17 * barrierCount; 
+
+		public boolean isAborted() {
+			return aborted;
 		}
 
-		@Override
-		public boolean equals(Object obj) {
-			if (obj instanceof  CheckpointBarrierCount) {
-				CheckpointBarrierCount that = (CheckpointBarrierCount) obj;
-				return this.checkpointId == that.checkpointId && this.barrierCount == that.barrierCount;
-			}
-			else {
-				return false;
-			}
+		public boolean markAborted() {
+			boolean firstAbort = !this.aborted;
+			this.aborted = true;
+			return firstAbort;
 		}
 
 		@Override
 		public String toString() {
-			return String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
+			return isAborted() ?
+				String.format("checkpointID=%d - ABORTED", checkpointId) :
+				String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 1b38a56..dc8d245 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -134,7 +134,7 @@ public class BufferSpiller {
 			else {
 				contents = EventSerializer.toSerializedEvent(boe.getEvent());
 			}
-			
+
 			headBuffer.clear();
 			headBuffer.putInt(boe.getChannelIndex());
 			headBuffer.putInt(contents.remaining());

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 5aa2030..ca23491 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -20,8 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
 import java.io.IOException;
 
@@ -43,14 +42,14 @@ public interface CheckpointBarrierHandler {
 	 * @throws java.lang.InterruptedException Thrown if the thread is interrupted while blocking during
 	 *                                        waiting for the next BufferOrEvent to become available.
 	 */
-	BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
+	BufferOrEvent getNextNonBlocked() throws Exception;
 
 	/**
-	 * Registers the given event handler to be notified on successful checkpoints.
-	 * 
-	 * @param checkpointHandler The handler to register.
+	 * Registers the task be notified once all checkpoint barriers have been received for a checkpoint.
+	 *
+	 * @param task The task to notify
 	 */
-	void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
+	void registerCheckpointEventHandler(StatefulTask<?> task);
 
 	/**
 	 * Cleans up all internally held resources.
@@ -64,4 +63,13 @@ public interface CheckpointBarrierHandler {
 	 * @return {@code True}, if no data is buffered internally, {@code false} otherwise.
 	 */
 	boolean isEmpty();
+
+	/**
+	 * Gets the time that the latest alignment took, in nanoseconds.
+	 * If there is currently an alignment in progress, it will return the time spent in the
+	 * current alignment so far.
+	 *
+	 * @return The duration in nanoseconds
+	 */
+	long getAlignmentDurationNanos();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index d11990e..7d9e4d2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -24,6 +24,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -37,7 +38,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -45,7 +45,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 /**
  * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
@@ -85,7 +84,7 @@ public class StreamInputProcessor<IN> {
 
 	@SuppressWarnings("unchecked")
 	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
-								EventListener<CheckpointBarrier> checkpointListener,
+								StatefulTask<?> checkpointListener,
 								CheckpointingMode checkpointMode,
 								IOManager ioManager,
 								boolean enableWatermarkMultiplexing) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index ce764b7..a3ae077 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -34,7 +35,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -95,7 +94,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 			Collection<InputGate> inputGates2,
 			TypeSerializer<IN1> inputSerializer1,
 			TypeSerializer<IN2> inputSerializer2,
-			EventListener<CheckpointBarrier> checkpointListener,
+			StatefulTask<?> checkpointListener,
 			CheckpointingMode checkpointMode,
 			IOManager ioManager,
 			boolean enableWatermarkMultiplexing) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 938d8c1..d18ca16 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -43,7 +43,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 		if (numberOfInputs > 0) {
 			InputGate[] inputGates = getEnvironment().getAllInputGates();
 			inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
-					getCheckpointBarrierListener(), 
+					this,
 					configuration.getCheckpointMode(),
 					getEnvironment().getIOManager(),
 					isSerializingTimestamps());

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 0e24516..351acaa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -126,15 +127,32 @@ public class OperatorChain<OUT> {
 		}
 		
 	}
-	
-	
-	public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException, InterruptedException {
-		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
-		for (RecordWriterOutput<?> streamOutput : streamOutputs) {
-			streamOutput.broadcastEvent(barrier);
+
+
+	public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException {
+		try {
+			CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
+			for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+				streamOutput.broadcastEvent(barrier);
+			}
+		}
+		catch (InterruptedException e) {
+			throw new IOException("Interrupted while broadcasting checkpoint barrier");
 		}
 	}
-	
+
+	public void broadcastCheckpointCancelMarker(long id) throws IOException {
+		try {
+			CancelCheckpointMarker barrier = new CancelCheckpointMarker(id);
+			for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+				streamOutput.broadcastEvent(barrier);
+			}
+		}
+		catch (InterruptedException e) {
+			throw new IOException("Interrupted while broadcasting checkpoint cancellation");
+		}
+	}
+
 	public RecordWriterOutput<?>[] getStreamOutputs() {
 		return streamOutputs;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8f28cef..d55a9c5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -37,7 +36,6 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
@@ -580,9 +578,34 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		}
 	}
 
-	protected boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception {
+	@Override
+	public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
+		try {
+			performCheckpoint(checkpointId, timestamp);
+		}
+		catch (CancelTaskException e) {
+			throw e;
+		}
+		catch (Exception e) {
+			throw new Exception("Error while performing a checkpoint", e);
+		}
+	}
+
+	@Override
+	public void abortCheckpointOnBarrier(long checkpointId) throws Exception {
+		LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, getName());
+
+		synchronized (lock) {
+			if (isRunning) {
+				operatorChain.broadcastCheckpointCancelMarker(checkpointId);
+			}
+		}
+	}
+
+	private boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception {
+
 		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
-		
+
 		synchronized (lock) {
 			if (isRunning) {
 
@@ -759,23 +782,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		return getName();
 	}
 
-	protected final EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
-		return new EventListener<CheckpointBarrier>() {
-			@Override
-			public void onEvent(CheckpointBarrier barrier) {
-				try {
-					performCheckpoint(barrier.getId(), barrier.getTimestamp());
-				}
-				catch (CancelTaskException e) {
-					throw e;
-				}
-				catch (Exception e) {
-					throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
-				}
-			}
-		};
-	}
-
 	// ------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index c3305eb..9252063 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -68,7 +68,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 	
 		this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
 				inputDeserializer1, inputDeserializer2,
-				getCheckpointBarrierListener(),
+				this,
 				configuration.getCheckpointMode(),
 				getEnvironment().getIOManager(),
 				isSerializingTimestamps());