You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/08 20:25:08 UTC

[2/4] flink git commit: [FLINK-4984] [checkpointing] Add Cancellation Barriers as a way to signal aborted checkpoints

[FLINK-4984] [checkpointing] Add Cancellation Barriers as a way to signal aborted checkpoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1f028de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1f028de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1f028de

Branch: refs/heads/release-1.1
Commit: a1f028dee49928ada014632bb27216b36e30250e
Parents: 4dd3efe
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Oct 23 18:41:32 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 18:27:47 2016 +0100

----------------------------------------------------------------------
 .../io/network/api/CancelCheckpointMarker.java  |  77 +++
 .../api/serialization/EventSerializer.java      |  57 +-
 .../runtime/io/network/netty/NettyMessage.java  |   2 +-
 .../partition/PipelinedSubpartition.java        |   3 +-
 .../runtime/jobgraph/tasks/StatefulTask.java    |  21 +
 .../api/serialization/EventSerializerTest.java  |  45 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  10 +
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  10 +
 .../streaming/runtime/io/BarrierBuffer.java     | 265 ++++++--
 .../streaming/runtime/io/BarrierTracker.java    | 164 +++--
 .../streaming/runtime/io/BufferSpiller.java     |   2 +-
 .../runtime/io/CheckpointBarrierHandler.java    |  22 +-
 .../runtime/io/StreamInputProcessor.java        |   5 +-
 .../runtime/io/StreamTwoInputProcessor.java     |   5 +-
 .../runtime/tasks/OneInputStreamTask.java       |   2 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  32 +-
 .../streaming/runtime/tasks/StreamTask.java     |  48 +-
 .../runtime/tasks/TwoInputStreamTask.java       |   2 +-
 .../streaming/runtime/io/BarrierBufferTest.java | 617 +++++++++++++++++--
 .../runtime/io/BarrierTrackerTest.java          | 157 ++++-
 20 files changed, 1291 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
new file mode 100644
index 0000000..52a2517
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * The CancelCheckpointMarker travels through the data streams, similar to the {@link CheckpointBarrier},
+ * but signals that a certain checkpoint should be canceled. Any in-progress alignment for that
+ * checkpoint needs to be canceled and regular processing should be resumed.
+ */
+public class CancelCheckpointMarker extends RuntimeEvent {
+
+	/** The id of the checkpoint to be canceled */
+	private final long checkpointId;
+
+	public CancelCheckpointMarker(long checkpointId) {
+		this.checkpointId = checkpointId;
+	}
+
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	// ------------------------------------------------------------------------
+	// These known and common event go through special code paths, rather than
+	// through generic serialization 
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		throw new UnsupportedOperationException("this method should never be called");
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		throw new UnsupportedOperationException("this method should never be called");
+	}
+	
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return (int) (checkpointId ^ (checkpointId >>> 32));
+	}
+
+	@Override
+	public boolean equals(Object other) {
+		return other != null && 
+				other.getClass() == CancelCheckpointMarker.class &&
+				this.checkpointId == ((CancelCheckpointMarker) other).checkpointId;
+	}
+
+	@Override
+	public String toString() {
+		return "CancelCheckpointMarker " + checkpointId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index a34f8cf..0bc3b28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -38,7 +39,7 @@ import java.nio.ByteOrder;
  * Utility class to serialize and deserialize task events.
  */
 public class EventSerializer {
-	
+
 	private static final int END_OF_PARTITION_EVENT = 0;
 
 	private static final int CHECKPOINT_BARRIER_EVENT = 1;
@@ -46,17 +47,19 @@ public class EventSerializer {
 	private static final int END_OF_SUPERSTEP_EVENT = 2;
 
 	private static final int OTHER_EVENT = 3;
-	
+
+	private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4;
+
 	// ------------------------------------------------------------------------
-	
-	public static ByteBuffer toSerializedEvent(AbstractEvent event) {
+
+	public static ByteBuffer toSerializedEvent(AbstractEvent event) throws IOException {
 		final Class<?> eventClass = event.getClass();
 		if (eventClass == EndOfPartitionEvent.class) {
 			return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_PARTITION_EVENT });
 		}
 		else if (eventClass == CheckpointBarrier.class) {
 			CheckpointBarrier barrier = (CheckpointBarrier) event;
-			
+
 			ByteBuffer buf = ByteBuffer.allocate(20);
 			buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
 			buf.putLong(4, barrier.getId());
@@ -66,32 +69,39 @@ public class EventSerializer {
 		else if (eventClass == EndOfSuperstepEvent.class) {
 			return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_SUPERSTEP_EVENT });
 		}
+		else if (eventClass == CancelCheckpointMarker.class) {
+			CancelCheckpointMarker marker = (CancelCheckpointMarker) event;
+
+			ByteBuffer buf = ByteBuffer.allocate(12);
+			buf.putInt(0, CANCEL_CHECKPOINT_MARKER_EVENT);
+			buf.putLong(4, marker.getCheckpointId());
+			return buf;
+		}
 		else {
 			try {
 				final DataOutputSerializer serializer = new DataOutputSerializer(128);
 				serializer.writeInt(OTHER_EVENT);
 				serializer.writeUTF(event.getClass().getName());
 				event.write(serializer);
-	
 				return serializer.wrapAsByteBuffer();
 			}
 			catch (IOException e) {
-				throw new RuntimeException("Error while serializing event.", e);
+				throw new IOException("Error while serializing event.", e);
 			}
 		}
 	}
 
-	public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) {
+	public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) throws IOException {
 		if (buffer.remaining() < 4) {
-			throw new RuntimeException("Incomplete event");
+			throw new IOException("Incomplete event");
 		}
-		
+
 		final ByteOrder bufferOrder = buffer.order();
 		buffer.order(ByteOrder.BIG_ENDIAN);
-		
+
 		try {
 			int type = buffer.getInt();
-				
+
 			if (type == END_OF_PARTITION_EVENT) {
 				return EndOfPartitionEvent.INSTANCE;
 			}
@@ -103,35 +113,38 @@ public class EventSerializer {
 			else if (type == END_OF_SUPERSTEP_EVENT) {
 				return EndOfSuperstepEvent.INSTANCE;
 			}
+			else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) {
+				long id = buffer.getLong();
+				return new CancelCheckpointMarker(id);
+			}
 			else if (type == OTHER_EVENT) {
 				try {
 					final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
-		
 					final String className = deserializer.readUTF();
-		
+
 					final Class<? extends AbstractEvent> clazz;
 					try {
 						clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
 					}
 					catch (ClassNotFoundException e) {
-						throw new RuntimeException("Could not load event class '" + className + "'.", e);
+						throw new IOException("Could not load event class '" + className + "'.", e);
 					}
 					catch (ClassCastException e) {
-						throw new RuntimeException("The class '" + className + "' is not a valid subclass of '"
+						throw new IOException("The class '" + className + "' is not a valid subclass of '"
 								+ AbstractEvent.class.getName() + "'.", e);
 					}
-		
+
 					final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
 					event.read(deserializer);
-		
+
 					return event;
 				}
 				catch (Exception e) {
-					throw new RuntimeException("Error while deserializing or instantiating event.", e);
+					throw new IOException("Error while deserializing or instantiating event.", e);
 				}
 			} 
 			else {
-				throw new RuntimeException("Corrupt byte stream for event");
+				throw new IOException("Corrupt byte stream for event");
 			}
 		}
 		finally {
@@ -143,7 +156,7 @@ public class EventSerializer {
 	// Buffer helpers
 	// ------------------------------------------------------------------------
 
-	public static Buffer toBuffer(AbstractEvent event) {
+	public static Buffer toBuffer(AbstractEvent event) throws IOException {
 		final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);
 
 		MemorySegment data = MemorySegmentFactory.wrap(serializedEvent.array());
@@ -154,7 +167,7 @@ public class EventSerializer {
 		return buffer;
 	}
 
-	public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) {
+	public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) throws IOException {
 		return fromSerializedEvent(buffer.getNioBuffer(), classLoader);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 3a24181..6f6001b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -463,7 +463,7 @@ abstract class NettyMessage {
 		}
 
 		@Override
-		public void readFrom(ByteBuf buffer) {
+		public void readFrom(ByteBuf buffer) throws IOException {
 			// TODO Directly deserialize fromNetty's buffer
 			int length = buffer.readInt();
 			ByteBuffer serializedEvent = ByteBuffer.allocate(length);

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 2d7097d..b703acb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.util.event.NotificationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -88,7 +89,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public void finish() {
+	public void finish() throws IOException {
 		final NotificationListener listener;
 
 		synchronized (buffers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index f8bba1a..7c581df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -47,6 +47,27 @@ public interface StatefulTask<T extends StateHandle<?>> {
 	 */
 	boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
 
+	/**
+	 * This method is called when a checkpoint is triggered as a result of receiving checkpoint
+	 * barriers on all input streams.
+	 *
+	 * @param checkpointId The ID of the checkpoint, incrementing.
+	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
+	 *
+	 * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
+	 */
+	void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception;
+
+	/**
+	 * Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,
+	 * but at least one {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker}.
+	 * 
+	 * <p>This requires implementing tasks to forward a
+	 * {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs.
+	 * 
+	 * @param checkpointId The ID of the checkpoint to be aborted.
+	 */
+	void abortCheckpointOnBarrier(long checkpointId) throws Exception;
 
 	/**
 	 * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index ddfd758..d47a0b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -28,34 +29,30 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class EventSerializerTest {
 
 	@Test
-	public void testSerializeDeserializeEvent() {
-		try {
-			AbstractEvent[] events = {
-					EndOfPartitionEvent.INSTANCE,
-					EndOfSuperstepEvent.INSTANCE,
-					new CheckpointBarrier(1678L, 4623784L),
-					new TestTaskEvent(Math.random(), 12361231273L)
-			};
-			
-			for (AbstractEvent evt : events) {
-				ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
-				assertTrue(serializedEvent.hasRemaining());
-
-				AbstractEvent deserialized = 
-						EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
-				assertNotNull(deserialized);
-				assertEquals(evt, deserialized);
-			}
-			
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+	public void testSerializeDeserializeEvent() throws Exception {
+		AbstractEvent[] events = {
+				EndOfPartitionEvent.INSTANCE,
+				EndOfSuperstepEvent.INSTANCE,
+				new CheckpointBarrier(1678L, 4623784L),
+				new TestTaskEvent(Math.random(), 12361231273L),
+				new CancelCheckpointMarker(287087987329842L)
+		};
+		
+		for (AbstractEvent evt : events) {
+			ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
+			assertTrue(serializedEvent.hasRemaining());
+
+			AbstractEvent deserialized = 
+					EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
+			assertNotNull(deserialized);
+			assertEquals(evt, deserialized);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index f050e29..4dfaf95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -450,6 +450,16 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
+		public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
+			throw new UnsupportedOperationException("should not be called!");
+		}
+
+		@Override
+		public void abortCheckpointOnBarrier(long checkpointId) {
+			throw new UnsupportedOperationException("should not be called!");
+		}
+
+		@Override
 		public void notifyCheckpointComplete(long checkpointId) {
 			if (completedCheckpoints++ > NUM_CHECKPOINTS_TO_COMPLETE) {
 				completedCheckpointsLatch.countDown();

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 0c0d064..5b344eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -221,6 +221,16 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
+		public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
+			throw new UnsupportedOperationException("Should not be called");
+		}
+
+		@Override
+		public void abortCheckpointOnBarrier(long checkpointId) {
+			throw new UnsupportedOperationException("Should not be called");
+		}
+
+		@Override
 		public void notifyCheckpointComplete(long checkpointId) {
 			if (checkpointId != lastCheckpointId && this.error == null) {
 				this.error = new Exception("calls out of order");

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index dcd76c6..36de717 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -17,42 +17,43 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import java.io.IOException;
-import java.util.ArrayDeque;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayDeque;
+
 /**
  * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
  * all inputs have received the barrier for a given checkpoint.
  * 
  * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
  * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until 
- * the blocks are released.</p>
+ * the blocks are released.
  */
 @Internal
 public class BarrierBuffer implements CheckpointBarrierHandler {
 
 	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
-	
+
 	/** The gate that the buffer draws its input from */
 	private final InputGate inputGate;
 
 	/** Flags that indicate whether a channel is currently blocked/buffered */
 	private final boolean[] blockedChannels;
-	
+
 	/** The total number of channels that this buffer handles data from */
 	private final int totalNumberOfInputChannels;
-	
+
 	/** To utility to write blocked data to a file channel */
 	private final BufferSpiller bufferSpiller;
 
@@ -65,17 +66,24 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
 
 	/** Handler that receives the checkpoint notifications */
-	private EventListener<CheckpointBarrier> checkpointHandler;
+	private StatefulTask<?> toNotifyOnCheckpoint;
 
 	/** The ID of the checkpoint for which we expect barriers */
 	private long currentCheckpointId = -1L;
 
-	/** The number of received barriers (= number of blocked/buffered channels) */
+	/** The number of received barriers (= number of blocked/buffered channels)
+	 * IMPORTANT: A canceled checkpoint must always have 0 barriers */
 	private int numBarriersReceived;
-	
+
 	/** The number of already closed channels */
 	private int numClosedChannels;
-	
+
+	/** The timestamp as in {@link System#nanoTime()} at which the last alignment started */
+	private long startOfAlignmentTimestamp;
+
+	/** The time (in nanoseconds) that the latest alignment took */
+	private long latestAlignmentDurationNanos;
+
 	/** Flag to indicate whether we have drawn all available input */
 	private boolean endOfStream;
 
@@ -90,7 +98,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		this.inputGate = inputGate;
 		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
 		this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
-		
+
 		this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize());
 		this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
 	}
@@ -100,7 +108,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+	public BufferOrEvent getNextNonBlocked() throws Exception {
 		while (true) {
 			// process buffered BufferOrEvents before grabbing new ones
 			BufferOrEvent next;
@@ -114,7 +122,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 					return getNextNonBlocked();
 				}
 			}
-			
+
 			if (next != null) {
 				if (isBlocked(next.getChannelIndex())) {
 					// if the channel is blocked we, we just store the BufferOrEvent
@@ -129,27 +137,29 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 						processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
 					}
 				}
+				else if (next.getEvent().getClass() == CancelCheckpointMarker.class) {
+					processCancellationBarrier((CancelCheckpointMarker) next.getEvent());
+				}
 				else {
 					if (next.getEvent().getClass() == EndOfPartitionEvent.class) {
-						numClosedChannels++;
-						// no chance to complete this checkpoint
-						releaseBlocks();
+						processEndOfPartition(next.getChannelIndex());
 					}
 					return next;
 				}
 			}
 			else if (!endOfStream) {
-				// end of stream. we feed the data that is still buffered
+				// end of input stream. stream continues with the buffered data
 				endOfStream = true;
-				releaseBlocks();
+				releaseBlocksAndResetBarriers();
 				return getNextNonBlocked();
 			}
 			else {
+				// final end of both input and buffered data
 				return null;
 			}
 		}
 	}
-	
+
 	private void completeBufferedSequence() throws IOException {
 		currentBuffered.cleanup();
 		currentBuffered = queuedBuffered.pollFirst();
@@ -157,66 +167,175 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			currentBuffered.open();
 		}
 	}
-	
-	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws IOException {
+
+	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
 		final long barrierId = receivedBarrier.getId();
 
+		// fast path for single channel cases
+		if (totalNumberOfInputChannels == 1) {
+			if (barrierId > currentCheckpointId) {
+				// new checkpoint
+				currentCheckpointId = barrierId;
+				notifyCheckpoint(receivedBarrier);
+			}
+			return;
+		}
+
+		// -- general code path for multiple input channels --
+
 		if (numBarriersReceived > 0) {
-			// subsequent barrier of a checkpoint.
+			// this is only true if some alignment is already progress and was not canceled
+
 			if (barrierId == currentCheckpointId) {
 				// regular case
 				onBarrier(channelIndex);
 			}
 			else if (barrierId > currentCheckpointId) {
-				// we did not complete the current checkpoint
+				// we did not complete the current checkpoint, another started before
 				LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
 						"Skipping current checkpoint.", barrierId, currentCheckpointId);
 
-				releaseBlocks();
-				currentCheckpointId = barrierId;
-				onBarrier(channelIndex);
+				// let the task know we are not completing this
+				notifyAbort(currentCheckpointId);
+
+				// abort the current checkpoint
+				releaseBlocksAndResetBarriers();
+
+				// begin a the new checkpoint
+				beginNewAlignment(barrierId, channelIndex);
 			}
 			else {
-				// ignore trailing barrier from aborted checkpoint
+				// ignore trailing barrier from an earlier checkpoint (obsolete now)
 				return;
 			}
-			
 		}
 		else if (barrierId > currentCheckpointId) {
 			// first barrier of a new checkpoint
-			currentCheckpointId = barrierId;
-			onBarrier(channelIndex);
+			beginNewAlignment(barrierId, channelIndex);
 		}
 		else {
-			// trailing barrier from previous (skipped) checkpoint
+			// either the current checkpoint was canceled (numBarriers == 0) or
+			// this barrier is from an old subsumed checkpoint
 			return;
 		}
 
-		// check if we have all barriers
+		// check if we have all barriers - since canceled checkpoints always have zero barriers
+		// this can only happen on a non canceled checkpoint
 		if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
+			// actually trigger checkpoint
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Received all barrier, triggering checkpoint {} at {}",
+				LOG.debug("Received all barriers, triggering checkpoint {} at {}",
 						receivedBarrier.getId(), receivedBarrier.getTimestamp());
 			}
 
-			if (checkpointHandler != null) {
-				checkpointHandler.onEvent(receivedBarrier);
+			releaseBlocksAndResetBarriers();
+			notifyCheckpoint(receivedBarrier);
+		}
+	}
+
+	private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
+		final long barrierId = cancelBarrier.getCheckpointId();
+
+		// fast path for single channel cases
+		if (totalNumberOfInputChannels == 1) {
+			if (barrierId > currentCheckpointId) {
+				// new checkpoint
+				currentCheckpointId = barrierId;
+				notifyAbort(barrierId);
 			}
-			
-			releaseBlocks();
+			return;
+		}
+
+		// -- general code path for multiple input channels --
+
+		if (numBarriersReceived > 0) {
+			// this is only true if some alignment is in progress and nothing was canceled
+
+			if (barrierId == currentCheckpointId) {
+				// cancel this alignment
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Checkpoint {} canceled, aborting alignment", barrierId);
+				}
+
+				releaseBlocksAndResetBarriers();
+				notifyAbort(barrierId);
+			}
+			else if (barrierId > currentCheckpointId) {
+				// we canceled the next which also cancels the current
+				LOG.warn("Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
+						"Skipping current checkpoint.", barrierId, currentCheckpointId);
+
+				// this stops the current alignment
+				releaseBlocksAndResetBarriers();
+
+				// the next checkpoint starts as canceled
+				currentCheckpointId = barrierId;
+				startOfAlignmentTimestamp = 0L;
+				latestAlignmentDurationNanos = 0L;
+				notifyAbort(barrierId);
+			}
+
+			// else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now)
+
+		}
+		else if (barrierId > currentCheckpointId) {
+			// first barrier of a new checkpoint is directly a cancellation
+
+			// by setting the currentCheckpointId to this checkpoint while keeping the numBarriers
+			// at zero means that no checkpoint barrier can start a new alignment
+			currentCheckpointId = barrierId;
+
+			startOfAlignmentTimestamp = 0L;
+			latestAlignmentDurationNanos = 0L;
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId);
+			}
+
+			notifyAbort(barrierId);
 		}
+
+		// else: trailing barrier from either
+		//   - a previous (subsumed) checkpoint
+		//   - the current checkpoint if it was already canceled
 	}
-	
+
+	private void processEndOfPartition(int channel) throws Exception {
+		numClosedChannels++;
+
+		if (numBarriersReceived > 0) {
+			// let the task know we skip a checkpoint
+			notifyAbort(currentCheckpointId);
+
+			// no chance to complete this checkpoint
+			releaseBlocksAndResetBarriers();
+		}
+	}
+
+	private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception {
+		if (toNotifyOnCheckpoint != null) {
+			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+					checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
+		}
+	}
+
+	private void notifyAbort(long checkpointId) throws Exception {
+		if (toNotifyOnCheckpoint != null) {
+			toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+		}
+	}
+
+
 	@Override
-	public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
-		if (this.checkpointHandler == null) {
-			this.checkpointHandler = checkpointHandler;
+	public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) {
+		if (this.toNotifyOnCheckpoint == null) {
+			this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
 		}
 		else {
-			throw new IllegalStateException("BarrierBuffer already has a registered checkpoint handler");
+			throw new IllegalStateException("BarrierBuffer already has a registered checkpoint notifyee");
 		}
 	}
-	
+
 	@Override
 	public boolean isEmpty() {
 		return currentBuffered == null;
@@ -231,8 +350,20 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		for (BufferSpiller.SpilledBufferOrEventSequence seq : queuedBuffered) {
 			seq.cleanup();
 		}
+		queuedBuffered.clear();
 	}
-	
+
+	private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
+		currentCheckpointId = checkpointId;
+		onBarrier(channelIndex);
+
+		startOfAlignmentTimestamp = System.nanoTime();
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Starting stream alignment for checkpoint " + checkpointId);
+		}
+	}
+
 	/**
 	 * Checks whether the channel with the given index is blocked.
 	 * 
@@ -242,7 +373,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private boolean isBlocked(int channelIndex) {
 		return blockedChannels[channelIndex];
 	}
-	
+
 	/**
 	 * Blocks the given channel index, from which a barrier has been received.
 	 * 
@@ -251,30 +382,28 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private void onBarrier(int channelIndex) throws IOException {
 		if (!blockedChannels[channelIndex]) {
 			blockedChannels[channelIndex] = true;
+
 			numBarriersReceived++;
-			
+
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Received barrier from channel " + channelIndex);
 			}
 		}
 		else {
-			throw new IOException("Stream corrupt: Repeated barrier for same checkpoint and input stream");
+			throw new IOException("Stream corrupt: Repeated barrier for same checkpoint on input " + channelIndex);
 		}
 	}
 
 	/**
-	 * Releases the blocks on all channels. Makes sure the just written data
-	 * is the next to be consumed.
+	 * Releases the blocks on all channels and resets the barrier count.
+	 * Makes sure the just written data is the next to be consumed.
 	 */
-	private void releaseBlocks() throws IOException {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Releasing blocks");
-		}
+	private void releaseBlocksAndResetBarriers() throws IOException {
+		LOG.debug("End of stream alignment, feeding buffered data back");
 
 		for (int i = 0; i < blockedChannels.length; i++) {
 			blockedChannels[i] = false;
 		}
-		numBarriersReceived = 0;
 
 		if (currentBuffered == null) {
 			// common case: no more buffered data
@@ -295,10 +424,18 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				currentBuffered = bufferedNow;
 			}
 		}
+
+		// the next barrier that comes must assume it is the first
+		numBarriersReceived = 0;
+
+		if (startOfAlignmentTimestamp > 0) {
+			latestAlignmentDurationNanos = System.nanoTime() - startOfAlignmentTimestamp;
+			startOfAlignmentTimestamp = 0;
+		}
 	}
 
 	// ------------------------------------------------------------------------
-	// For Testing
+	//  Properties
 	// ------------------------------------------------------------------------
 
 	/**
@@ -309,7 +446,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	public long getCurrentCheckpointId() {
 		return this.currentCheckpointId;
 	}
-	
+
+	@Override
+	public long getAlignmentDurationNanos() {
+		long start = this.startOfAlignmentTimestamp;
+		if (start <= 0) {
+			return latestAlignmentDurationNanos;
+		} else {
+			return System.nanoTime() - start;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	// Utilities 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 9c9ec4f..5157336 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,12 +19,12 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
-import java.io.IOException;
 import java.util.ArrayDeque;
 
 /**
@@ -34,9 +34,9 @@ import java.util.ArrayDeque;
  * 
  * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input
  * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing
- * guarantees. It can, however, be used to gain "at least once" processing guarantees.</p>
+ * guarantees. It can, however, be used to gain "at least once" processing guarantees.
  * 
- * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.</p>
+ * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.
  */
 @Internal
 public class BarrierTracker implements CheckpointBarrierHandler {
@@ -57,11 +57,12 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
 	
 	/** The listener to be notified on complete checkpoints */
-	private EventListener<CheckpointBarrier> checkpointHandler;
+	private StatefulTask<?> toNotifyOnCheckpoint;
 	
 	/** The highest checkpoint ID encountered so far */
 	private long latestPendingCheckpointID = -1;
-	
+
+	// ------------------------------------------------------------------------
 	
 	public BarrierTracker(InputGate inputGate) {
 		this.inputGate = inputGate;
@@ -70,28 +71,33 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	}
 
 	@Override
-	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+	public BufferOrEvent getNextNonBlocked() throws Exception {
 		while (true) {
 			BufferOrEvent next = inputGate.getNextBufferOrEvent();
-			if (next == null) {
-				return null;
-			}
-			else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
+			if (next == null || next.isBuffer()) {
+				// buffer or input exhausted
 				return next;
 			}
-			else {
+			else if (next.getEvent().getClass() == CheckpointBarrier.class) {
 				processBarrier((CheckpointBarrier) next.getEvent());
 			}
+			else if (next.getEvent().getClass() == CancelCheckpointMarker.class) {
+				processCheckpointAbortBarrier((CancelCheckpointMarker) next.getEvent());
+			}
+			else {
+				// some other event
+				return next;
+			}
 		}
 	}
 
 	@Override
-	public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
-		if (this.checkpointHandler == null) {
-			this.checkpointHandler = checkpointHandler;
+	public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) {
+		if (this.toNotifyOnCheckpoint == null) {
+			this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
 		}
 		else {
-			throw new IllegalStateException("BarrierTracker already has a registered checkpoint handler");
+			throw new IllegalStateException("BarrierTracker already has a registered checkpoint notifyee");
 		}
 	}
 
@@ -105,22 +111,27 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		return pendingCheckpoints.isEmpty();
 	}
 
-	private void processBarrier(CheckpointBarrier receivedBarrier) {
+	@Override
+	public long getAlignmentDurationNanos() {
+		// this one does not do alignment at all
+		return 0L;
+	}
+
+	private void processBarrier(CheckpointBarrier receivedBarrier) throws Exception {
+		final long barrierId = receivedBarrier.getId();
+
 		// fast path for single channel trackers
 		if (totalNumberOfInputChannels == 1) {
-			if (checkpointHandler != null) {
-				checkpointHandler.onEvent(receivedBarrier);
-			}
+			notifyCheckpoint(barrierId, receivedBarrier.getTimestamp());
 			return;
 		}
-		
+
 		// general path for multiple input channels
-		final long barrierId = receivedBarrier.getId();
 
 		// find the checkpoint barrier in the queue of bending barriers
 		CheckpointBarrierCount cbc = null;
 		int pos = 0;
-		
+
 		for (CheckpointBarrierCount next : pendingCheckpoints) {
 			if (next.checkpointId == barrierId) {
 				cbc = next;
@@ -128,21 +139,21 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 			}
 			pos++;
 		}
-		
+
 		if (cbc != null) {
 			// add one to the count to that barrier and check for completion
 			int numBarriersNew = cbc.incrementBarrierCount();
 			if (numBarriersNew == totalNumberOfInputChannels) {
-				// checkpoint can be triggered
+				// checkpoint can be triggered (or is aborted and all barriers have been seen)
 				// first, remove this checkpoint and all all prior pending
 				// checkpoints (which are now subsumed)
 				for (int i = 0; i <= pos; i++) {
 					pendingCheckpoints.pollFirst();
 				}
-				
+
 				// notify the listener
-				if (checkpointHandler != null) {
-					checkpointHandler.onEvent(receivedBarrier);
+				if (!cbc.isAborted()) {
+					notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp());
 				}
 			}
 		}
@@ -163,45 +174,104 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		}
 	}
 
+	private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier) throws Exception {
+		final long checkpointId = barrier.getCheckpointId();
+
+		// fast path for single channel trackers
+		if (totalNumberOfInputChannels == 1) {
+			notifyAbort(checkpointId);
+			return;
+		}
+
+		// -- general path for multiple input channels --
+
+		// find the checkpoint barrier in the queue of pending barriers
+		// while doing this we "abort" all checkpoints before that one
+		CheckpointBarrierCount cbc;
+		while ((cbc = pendingCheckpoints.peekFirst()) != null && cbc.checkpointId() < checkpointId) {
+			pendingCheckpoints.removeFirst();
+		}
+
+		if (cbc != null && cbc.checkpointId() == checkpointId) {
+			// make sure the checkpoint is remembered as aborted
+			if (cbc.markAborted()) {
+				// this was the first time the checkpoint was aborted - notify
+				notifyAbort(checkpointId);
+			}
+
+			// we still count the barriers to be able to remove the entry once all barriers have been seen
+			if (cbc.incrementBarrierCount() == totalNumberOfInputChannels) {
+				// we can remove this entry
+				pendingCheckpoints.removeFirst();
+			}
+		}
+		else {
+			notifyAbort(checkpointId);
+
+			// first barrier for this checkpoint - remember it as aborted
+			// since we polled away all entries with lower checkpoint IDs
+			// this entry will become the new first entry
+			if (pendingCheckpoints.size() < MAX_CHECKPOINTS_TO_TRACK) {
+				CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId);
+				abortedMarker.markAborted();
+				pendingCheckpoints.addFirst(abortedMarker);
+			}
+		}
+	}
+
+	private void notifyCheckpoint(long checkpointId, long timestamp) throws Exception {
+		if (toNotifyOnCheckpoint != null) {
+			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointId, timestamp);
+		}
+	}
+
+	private void notifyAbort(long checkpointId) throws Exception {
+		if (toNotifyOnCheckpoint != null) {
+			toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Simple class for a checkpoint ID with a barrier counter.
 	 */
 	private static final class CheckpointBarrierCount {
-		
+
 		private final long checkpointId;
-		
+
 		private int barrierCount;
-		
-		private CheckpointBarrierCount(long checkpointId) {
+
+		private boolean aborted;
+
+		CheckpointBarrierCount(long checkpointId) {
 			this.checkpointId = checkpointId;
 			this.barrierCount = 1;
 		}
 
+		public long checkpointId() {
+			return checkpointId;
+		}
+
 		public int incrementBarrierCount() {
 			return ++barrierCount;
 		}
-		
-		@Override
-		public int hashCode() {
-			return (int) ((checkpointId >>> 32) ^ checkpointId) + 17 * barrierCount; 
+
+		public boolean isAborted() {
+			return aborted;
 		}
 
-		@Override
-		public boolean equals(Object obj) {
-			if (obj instanceof  CheckpointBarrierCount) {
-				CheckpointBarrierCount that = (CheckpointBarrierCount) obj;
-				return this.checkpointId == that.checkpointId && this.barrierCount == that.barrierCount;
-			}
-			else {
-				return false;
-			}
+		public boolean markAborted() {
+			boolean firstAbort = !this.aborted;
+			this.aborted = true;
+			return firstAbort;
 		}
 
 		@Override
 		public String toString() {
-			return String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
+			return isAborted() ?
+				String.format("checkpointID=%d - ABORTED", checkpointId) :
+				String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 1b38a56..dc8d245 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -134,7 +134,7 @@ public class BufferSpiller {
 			else {
 				contents = EventSerializer.toSerializedEvent(boe.getEvent());
 			}
-			
+
 			headBuffer.clear();
 			headBuffer.putInt(boe.getChannelIndex());
 			headBuffer.putInt(contents.remaining());

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 5aa2030..ca23491 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -20,8 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
 import java.io.IOException;
 
@@ -43,14 +42,14 @@ public interface CheckpointBarrierHandler {
 	 * @throws java.lang.InterruptedException Thrown if the thread is interrupted while blocking during
 	 *                                        waiting for the next BufferOrEvent to become available.
 	 */
-	BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
+	BufferOrEvent getNextNonBlocked() throws Exception;
 
 	/**
-	 * Registers the given event handler to be notified on successful checkpoints.
-	 * 
-	 * @param checkpointHandler The handler to register.
+	 * Registers the task be notified once all checkpoint barriers have been received for a checkpoint.
+	 *
+	 * @param task The task to notify
 	 */
-	void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
+	void registerCheckpointEventHandler(StatefulTask<?> task);
 
 	/**
 	 * Cleans up all internally held resources.
@@ -64,4 +63,13 @@ public interface CheckpointBarrierHandler {
 	 * @return {@code True}, if no data is buffered internally, {@code false} otherwise.
 	 */
 	boolean isEmpty();
+
+	/**
+	 * Gets the time that the latest alignment took, in nanoseconds.
+	 * If there is currently an alignment in progress, it will return the time spent in the
+	 * current alignment so far.
+	 *
+	 * @return The duration in nanoseconds
+	 */
+	long getAlignmentDurationNanos();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index d11990e..7d9e4d2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -24,6 +24,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -37,7 +38,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -45,7 +45,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 /**
  * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
@@ -85,7 +84,7 @@ public class StreamInputProcessor<IN> {
 
 	@SuppressWarnings("unchecked")
 	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
-								EventListener<CheckpointBarrier> checkpointListener,
+								StatefulTask<?> checkpointListener,
 								CheckpointingMode checkpointMode,
 								IOManager ioManager,
 								boolean enableWatermarkMultiplexing) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index ce764b7..a3ae077 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -34,7 +35,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -95,7 +94,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 			Collection<InputGate> inputGates2,
 			TypeSerializer<IN1> inputSerializer1,
 			TypeSerializer<IN2> inputSerializer2,
-			EventListener<CheckpointBarrier> checkpointListener,
+			StatefulTask<?> checkpointListener,
 			CheckpointingMode checkpointMode,
 			IOManager ioManager,
 			boolean enableWatermarkMultiplexing) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 938d8c1..d18ca16 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -43,7 +43,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 		if (numberOfInputs > 0) {
 			InputGate[] inputGates = getEnvironment().getAllInputGates();
 			inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
-					getCheckpointBarrierListener(), 
+					this,
 					configuration.getCheckpointMode(),
 					getEnvironment().getIOManager(),
 					isSerializingTimestamps());

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 0e24516..351acaa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -126,15 +127,32 @@ public class OperatorChain<OUT> {
 		}
 		
 	}
-	
-	
-	public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException, InterruptedException {
-		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
-		for (RecordWriterOutput<?> streamOutput : streamOutputs) {
-			streamOutput.broadcastEvent(barrier);
+
+
+	public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException {
+		try {
+			CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
+			for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+				streamOutput.broadcastEvent(barrier);
+			}
+		}
+		catch (InterruptedException e) {
+			throw new IOException("Interrupted while broadcasting checkpoint barrier");
 		}
 	}
-	
+
+	public void broadcastCheckpointCancelMarker(long id) throws IOException {
+		try {
+			CancelCheckpointMarker barrier = new CancelCheckpointMarker(id);
+			for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+				streamOutput.broadcastEvent(barrier);
+			}
+		}
+		catch (InterruptedException e) {
+			throw new IOException("Interrupted while broadcasting checkpoint cancellation");
+		}
+	}
+
 	public RecordWriterOutput<?>[] getStreamOutputs() {
 		return streamOutputs;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8f28cef..d55a9c5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -37,7 +36,6 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
@@ -580,9 +578,34 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		}
 	}
 
-	protected boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception {
+	@Override
+	public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
+		try {
+			performCheckpoint(checkpointId, timestamp);
+		}
+		catch (CancelTaskException e) {
+			throw e;
+		}
+		catch (Exception e) {
+			throw new Exception("Error while performing a checkpoint", e);
+		}
+	}
+
+	@Override
+	public void abortCheckpointOnBarrier(long checkpointId) throws Exception {
+		LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, getName());
+
+		synchronized (lock) {
+			if (isRunning) {
+				operatorChain.broadcastCheckpointCancelMarker(checkpointId);
+			}
+		}
+	}
+
+	private boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception {
+
 		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
-		
+
 		synchronized (lock) {
 			if (isRunning) {
 
@@ -759,23 +782,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		return getName();
 	}
 
-	protected final EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
-		return new EventListener<CheckpointBarrier>() {
-			@Override
-			public void onEvent(CheckpointBarrier barrier) {
-				try {
-					performCheckpoint(barrier.getId(), barrier.getTimestamp());
-				}
-				catch (CancelTaskException e) {
-					throw e;
-				}
-				catch (Exception e) {
-					throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
-				}
-			}
-		};
-	}
-
 	// ------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index c3305eb..9252063 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -68,7 +68,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 	
 		this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
 				inputDeserializer1, inputDeserializer2,
-				getCheckpointBarrierListener(),
+				this,
 				configuration.getCheckpointMode(),
 				getEnvironment().getIOManager(),
 				isSerializingTimestamps());