You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/29 00:15:43 UTC

[1/8] flink git commit: [tests] Add a manual test to evaluate impact of checkpointing on latency

Repository: flink
Updated Branches:
  refs/heads/master 571084152 -> 8f87b7164


[tests] Add a manual test to evaluate impact of checkpointing on latency


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed30ff4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed30ff4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed30ff4d

Branch: refs/heads/master
Commit: ed30ff4de46deb524d9449768d7199d99e3cc0f0
Parents: 2d237e1
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 12 19:33:38 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:06 2015 +0200

----------------------------------------------------------------------
 .../manual/StreamingScalabilityAndLatency.java  | 154 +++++++++++++++++++
 .../apache/flink/test/manual/package-info.java  |   4 +-
 2 files changed, 156 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed30ff4d/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
new file mode 100644
index 0000000..a34ec15
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+
+import static org.junit.Assert.fail;
+
+public class StreamingScalabilityAndLatency {
+	
+	public static void main(String[] args) throws Exception {
+		if ((Runtime.getRuntime().maxMemory() >>> 20) < 5000) {
+			throw new RuntimeException("This test program needs to run with at least 5GB of heap space.");
+		}
+		
+		final int TASK_MANAGERS = 1;
+		final int SLOTS_PER_TASK_MANAGER = 80;
+		final int PARALLELISM = TASK_MANAGERS * SLOTS_PER_TASK_MANAGER;
+
+		LocalFlinkMiniCluster cluster = null;
+
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, TASK_MANAGERS);
+			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 80);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 20000);
+
+			config.setInteger("taskmanager.net.server.numThreads", 1);
+			config.setInteger("taskmanager.net.client.numThreads", 1);
+
+			cluster = new LocalFlinkMiniCluster(config, false, StreamingMode.STREAMING);
+			
+			runPartitioningProgram(cluster.getJobManagerRPCPort(), PARALLELISM);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (cluster != null) {
+				cluster.shutdown();
+			}
+		}
+	}
+	
+	private static void runPartitioningProgram(int jobManagerPort, int parallelism) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
+		env.setParallelism(parallelism);
+		env.getConfig().enableObjectReuse();
+
+		env.setBufferTimeout(5L);
+//		env.enableCheckpointing(1000);
+
+		env
+			.addSource(new TimeStampingSource())
+			.map(new IdMapper<Tuple2<Long, Long>>())
+			.partitionByHash(0)
+			.addSink(new TimestampingSink());
+		
+		env.execute("Partitioning Program");
+	}
+	
+	public static class TimeStampingSource implements ParallelSourceFunction<Tuple2<Long, Long>> {
+
+		private static final long serialVersionUID = -151782334777482511L;
+
+		private volatile boolean running = true;
+		
+		
+		@Override
+		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+			
+			long num = 100;
+			long counter = (long) (Math.random() * 4096);
+			
+			while (running) {
+				if (num < 100) {
+					num++;
+					ctx.collect(new Tuple2<Long, Long>(counter++, 0L));
+				}
+				else {
+					num = 0;
+					ctx.collect(new Tuple2<Long, Long>(counter++, System.currentTimeMillis()));
+				}
+
+				Thread.sleep(1);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+	
+	public static class TimestampingSink implements SinkFunction<Tuple2<Long, Long>> {
+
+		private static final long serialVersionUID = 1876986644706201196L;
+
+		private long maxLatency;
+		private long count; 
+		
+		@Override
+		public void invoke(Tuple2<Long, Long> value) {
+			long ts = value.f1;
+			if (ts != 0L) {
+				long diff = System.currentTimeMillis() - ts;
+				maxLatency = Math.max(diff, maxLatency);
+			}
+			
+			count++;
+			if (count == 5000) {
+				System.out.println("Max latency: " + maxLatency);
+				count = 0;
+				maxLatency = 0;
+			}
+		}
+	}
+
+	public static class IdMapper<T> implements MapFunction<T, T> {
+
+		private static final long serialVersionUID = -6543809409233225099L;
+
+		@Override
+		public T map(T value) {
+			return value;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed30ff4d/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java b/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
index 893f3cc..1c5744d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
@@ -18,7 +18,7 @@
 
 /**
  * This package contains various tests that are not automatically executed, but
- * need to be manually invoked, because they are extremely heavy of require larger-than-usual
- * JVMs.
+ * need to be manually invoked, because they are extremely heavy, time intensive,
+ * of require larger-than-usual JVMs.
  */
 package org.apache.flink.test.manual;
\ No newline at end of file


[7/8] flink git commit: [FLINK-2406] [streaming] Abstract and improve stream alignment via the BarrierBuffer

Posted by se...@apache.org.
[FLINK-2406] [streaming] Abstract and improve stream alignment via the BarrierBuffer

 - Add an interface for the functionaliy of the barrier buffer (for later addition of other implementatiions)
 - Add broader tests for the BarrierBuffer, inluding trailing data and barrier races.
 - Checkpoint barriers are handled by the buffer directly, rather than being returned and re-injected.
 - Simplify logic in the BarrierBuffer and fix certain corner cases.
 - Give access to spill directories properly via I/O manager, rather than via GlobalConfiguration singleton.
 - Rename the "BarrierBufferIOTest" to "BarrierBufferMassiveRandomTest"
 - A lot of code style/robustness fixes (proplery define constants, visibility, exception signatures)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0579f90b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0579f90b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0579f90b

Branch: refs/heads/master
Commit: 0579f90bab165a7df336163eb9d6337267020029
Parents: ed30ff4
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 26 18:58:37 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:07 2015 +0200

----------------------------------------------------------------------
 .../runtime/io/disk/iomanager/IOManager.java    |  12 +-
 .../io/network/api/EndOfPartitionEvent.java     |  23 +-
 .../partition/consumer/SingleInputGate.java     |   2 +-
 .../streaming/runtime/io/BarrierBuffer.java     | 413 +++++-----
 .../streaming/runtime/io/BufferSpiller.java     |  54 +-
 .../runtime/io/CheckpointBarrierHandler.java    |  55 ++
 .../runtime/io/StreamInputProcessor.java        |  69 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  56 +-
 .../runtime/tasks/OneInputStreamTask.java       |  37 +-
 .../streaming/runtime/tasks/StreamTask.java     |  16 +-
 .../runtime/tasks/TwoInputStreamTask.java       |  74 +-
 .../consumer/StreamTestSingleInputGate.java     |  13 -
 .../runtime/io/BarrierBufferIOTest.java         | 159 ----
 .../io/BarrierBufferMassiveRandomTest.java      | 167 ++++
 .../streaming/runtime/io/BarrierBufferTest.java | 775 +++++++++++++++----
 .../runtime/io/DummyBufferRecycler.java         |   8 +-
 .../runtime/io/SpillingBufferOrEventTest.java   |  20 +-
 17 files changed, 1282 insertions(+), 671 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index c0bd360..45d9b9e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -301,9 +301,19 @@ public abstract class IOManager {
 	 * 
 	 * @return The number of temporary file directories.
 	 */
-	public int getNumberOfTempDirs() {
+	public int getNumberOfSpillingDirectories() {
 		return this.paths.length;
 	}
+
+	/**
+	 * Gets the directories that the I/O manager spills to.
+	 * 
+	 * @return The directories that the I/O manager spills to.
+	 */
+	public File[] getSpillingDirectories() {
+		return this.paths;
+	}
+	
 	
 	protected int getNextPathNum() {
 		final int next = this.nextPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
index 49d7958..3ecdb94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
@@ -22,19 +22,34 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.event.task.RuntimeEvent;
 
-import java.io.IOException;
 
 public class EndOfPartitionEvent extends RuntimeEvent {
 
 	public static final EndOfPartitionEvent INSTANCE = new EndOfPartitionEvent();
-
+	
+	
 	@Override
-	public void read(DataInputView in) throws IOException {
+	public void read(DataInputView in) {
 		// Nothing to do here
 	}
 
 	@Override
-	public void write(DataOutputView out) throws IOException {
+	public void write(DataOutputView out) {
 		// Nothing to do here
 	}
+
+	@Override
+	public int hashCode() {
+		return 1965146673;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj != null && obj.getClass() == EndOfPartitionEvent.class;
+	}
+
+	@Override
+	public String toString() {
+		return getClass().getSimpleName();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 78aa6f7..0aebcae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -408,7 +408,7 @@ public class SingleInputGate implements InputGate {
 
 		// Sanity check that notifications only happen when data is available
 		if (buffer == null) {
-			throw new IllegalStateException("Bug in input gate/channel logic: input gate got" +
+			throw new IllegalStateException("Bug in input gate/channel logic: input gate got " +
 					"notified by channel about available data, but none was available.");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 40e84fc..466b8f7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -19,263 +19,262 @@ package org.apache.flink.streaming.runtime.io;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
+import java.util.ArrayDeque;
 
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The barrier buffer is responsible for implementing the blocking behaviour described
- * here: {@link CheckpointBarrier}.
- *
- * <p>
- * To avoid back-pressuring the
- * readers, we buffer up the new data received from the blocked channels until
- * the blocks are released.
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ * 
+ * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
+ * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until 
+ * the blocks are released.</p>
  */
-public class BarrierBuffer {
+public class BarrierBuffer implements CheckpointBarrierHandler {
 
 	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
+	
+	/** The gate that the buffer draws its input from */
+	private final InputGate inputGate;
+
+	/** Flags that indicate whether a channel is currently blocked/buffered */
+	private final boolean[] blockedChannels;
+	
+	/** The total number of channels that this buffer handles data from */
+	private final int totalNumberOfInputChannels;
+
+	private final SpillReader spillReader;
+	private final BufferSpiller bufferSpiller;
+	
+	private ArrayDeque<SpillingBufferOrEvent> nonProcessed;
+	private ArrayDeque<SpillingBufferOrEvent> blockedNonProcessed;
+
+	/** Handler that receives the checkpoint notifications */
+	private EventListener<CheckpointBarrier> checkpointHandler;
+
+	/** The ID of the checkpoint for which we expect barriers */
+	private long currentCheckpointId = -1L;
+
+	/** The number of received barriers (= number of blocked/buffered channels) */
+	private long numReceivedBarriers;
+	
+	/** Flag to indicate whether we have drawn all available input */
+	private boolean endOfStream;
+
+	
+	public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException {
+		this.inputGate = inputGate;
+		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
+		this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
+		
+		this.nonProcessed = new ArrayDeque<SpillingBufferOrEvent>();
+		this.blockedNonProcessed = new ArrayDeque<SpillingBufferOrEvent>();
+		
+		this.bufferSpiller = new BufferSpiller(ioManager);
+		this.spillReader = new SpillReader();
+	}
 
-	private Queue<SpillingBufferOrEvent> nonProcessed = new LinkedList<SpillingBufferOrEvent>();
-	private Queue<SpillingBufferOrEvent> blockedNonProcessed = new LinkedList<SpillingBufferOrEvent>();
-
-	private Set<Integer> blockedChannels = new HashSet<Integer>();
-	private int totalNumberOfInputChannels;
-
-	private CheckpointBarrier currentBarrier;
-
-	private AbstractReader reader;
-
-	private InputGate inputGate;
-
-	private SpillReader spillReader;
-	private BufferSpiller bufferSpiller;
-
-	private boolean inputFinished = false;
+	// ------------------------------------------------------------------------
+	//  Buffer and barrier handling
+	// ------------------------------------------------------------------------
 
-	private BufferOrEvent endOfStreamEvent = null;
+	@Override
+	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+		while (true) {
+			// process buffered BufferOrEvents before grabbing new ones
+			final SpillingBufferOrEvent nextBuffered = nonProcessed.pollFirst();
+			final BufferOrEvent next = nextBuffered == null ?
+					inputGate.getNextBufferOrEvent() :
+					nextBuffered.getBufferOrEvent();
+			
+			if (next != null) {
+				if (isBlocked(next.getChannelIndex())) {
+					// if the channel is blocked we, we just store the BufferOrEvent
+					blockedNonProcessed.add(new SpillingBufferOrEvent(next, bufferSpiller, spillReader));
+				}
+				else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
+					return next;
+				}
+				else if (!endOfStream) {
+					// process barriers only if there is a chance of the checkpoint completing
+					processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
+				}
+			}
+			else if (!endOfStream) {
+				// end of stream. we feed the data that is still buffered
+				endOfStream = true;
+				releaseBlocks();
+				return getNextNonBlocked();
+			}
+			else {
+				return null;
+			}
+		}
+	}
+	
+	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws IOException {
+		final long barrierId = receivedBarrier.getId();
+
+		if (numReceivedBarriers > 0) {
+			// subsequent barrier of a checkpoint.
+			if (barrierId == currentCheckpointId) {
+				// regular case
+				onBarrier(channelIndex);
+			}
+			else if (barrierId > currentCheckpointId) {
+				// we did not complete the current checkpoint
+				LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
+						"Skipping current checkpoint.", barrierId, currentCheckpointId);
+
+				releaseBlocks();
+				currentCheckpointId = barrierId;
+				onBarrier(channelIndex);
+			}
+			else {
+				// ignore trailing barrier from aborted checkpoint
+				return;
+			}
+			
+		}
+		else if (barrierId > currentCheckpointId) {
+			// first barrier of a new checkpoint
+			currentCheckpointId = barrierId;
+			onBarrier(channelIndex);
+		}
+		else {
+			// trailing barrier from previous (skipped) checkpoint
+			return;
+		}
 
-	private long lastCheckpointId = Long.MIN_VALUE;
+		// check if we have all barriers
+		if (numReceivedBarriers == totalNumberOfInputChannels) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Received all barrier, triggering checkpoint {} at {}",
+						receivedBarrier.getId(), receivedBarrier.getTimestamp());
+			}
 
-	public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
-		this.inputGate = inputGate;
-		totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
-		this.reader = reader;
-		try {
-			this.bufferSpiller = new BufferSpiller();
-			this.spillReader = new SpillReader();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
+			if (checkpointHandler != null) {
+				checkpointHandler.onEvent(receivedBarrier);
+			}
+			
+			releaseBlocks();
 		}
-
+	}
+	
+	@Override
+	public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
+		if (this.checkpointHandler == null) {
+			this.checkpointHandler = checkpointHandler;
+		}
+		else {
+			throw new IllegalStateException("BarrierBuffer already has a registered checkpoint handler");
+		}
+	}
+	
+	@Override
+	public boolean isEmpty() {
+		return nonProcessed.isEmpty() && blockedNonProcessed.isEmpty();
 	}
 
-	/**
-	 * Get then next non-blocked non-processed {@link BufferOrEvent}. Returns null if
-	 * none available.
-	 * 
-	 * @throws IOException
-	 */
-	private BufferOrEvent getNonProcessed() throws IOException {
-		SpillingBufferOrEvent nextNonProcessed;
-
-		while ((nextNonProcessed = nonProcessed.poll()) != null) {
-			BufferOrEvent boe = nextNonProcessed.getBufferOrEvent();
-			if (isBlocked(boe.getChannelIndex())) {
-				blockedNonProcessed.add(new SpillingBufferOrEvent(boe, bufferSpiller, spillReader));
-			} else {
-				return boe;
+	@Override
+	public void cleanup() throws IOException {
+		bufferSpiller.close();
+		File spillfile1 = bufferSpiller.getSpillFile();
+		if (spillfile1 != null) {
+			if (!spillfile1.delete()) {
+				LOG.warn("Cannot remove barrier buffer spill file: " + spillfile1.getAbsolutePath());
 			}
 		}
 
-		return null;
+		spillReader.close();
+		File spillfile2 = spillReader.getSpillFile();
+		if (spillfile2 != null) {
+			if (!spillfile2.delete()) {
+				LOG.warn("Cannot remove barrier buffer spill file: " + spillfile2.getAbsolutePath());
+			}
+		}
 	}
-
+	
 	/**
 	 * Checks whether the channel with the given index is blocked.
 	 * 
-	 * @param channelIndex The channel index to check
+	 * @param channelIndex The channel index to check.
+	 * @return True if the channel is blocked, false if not.
 	 */
 	private boolean isBlocked(int channelIndex) {
-		return blockedChannels.contains(channelIndex);
+		return blockedChannels[channelIndex];
 	}
-
-	/**
-	 * Checks whether all channels are blocked meaning that barriers have been
-	 * received from all channels
-	 */
-	private boolean isAllBlocked() {
-		return blockedChannels.size() == totalNumberOfInputChannels;
-	}
-
-	/**
-	 * Returns the next non-blocked {@link BufferOrEvent}. This is a blocking operator.
-	 */
-	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
-		// If there are non-processed buffers from the previously blocked ones,
-		// we get the next
-		BufferOrEvent bufferOrEvent = getNonProcessed();
-
-		if (bufferOrEvent != null) {
-			return bufferOrEvent;
-		} else if (blockedNonProcessed.isEmpty() && inputFinished) {
-			return endOfStreamEvent;
-		} else {
-			// If no non-processed, get new from input
-			while (true) {
-				if (!inputFinished) {
-					// We read the next buffer from the inputgate
-					bufferOrEvent = inputGate.getNextBufferOrEvent();
-
-					if (!bufferOrEvent.isBuffer()
-							&& bufferOrEvent.getEvent() instanceof EndOfPartitionEvent) {
-						if (inputGate.isFinished()) {
-							// store the event for later if the channel is
-							// closed
-							endOfStreamEvent = bufferOrEvent;
-							inputFinished = true;
-						}
-
-					} else {
-						if (isBlocked(bufferOrEvent.getChannelIndex())) {
-							// If channel blocked we just store it
-							blockedNonProcessed.add(new SpillingBufferOrEvent(bufferOrEvent,
-									bufferSpiller, spillReader));
-						} else {
-							return bufferOrEvent;
-						}
-					}
-				} else {
-					actOnAllBlocked();
-					return getNextNonBlocked();
-				}
-			}
-		}
-	}
-
+	
 	/**
 	 * Blocks the given channel index, from which a barrier has been received.
 	 * 
-	 * @param channelIndex
-	 *            The channel index to block.
+	 * @param channelIndex The channel index to block.
 	 */
-	private void blockChannel(int channelIndex) {
-		if (!blockedChannels.contains(channelIndex)) {
-			blockedChannels.add(channelIndex);
+	private void onBarrier(int channelIndex) throws IOException {
+		if (!blockedChannels[channelIndex]) {
+			blockedChannels[channelIndex] = true;
+			numReceivedBarriers++;
+			
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Channel blocked with index: " + channelIndex);
-			}
-			if (isAllBlocked()) {
-				actOnAllBlocked();
+				LOG.debug("Received barrier from channel " + channelIndex);
 			}
-
-		} else {
-			throw new RuntimeException("Tried to block an already blocked channel");
+		}
+		else {
+			throw new IOException("Stream corrupt: Repeated barrier for same checkpoint and input stream");
 		}
 	}
 
 	/**
 	 * Releases the blocks on all channels.
 	 */
-	private void releaseBlocks() {
-		if (!nonProcessed.isEmpty()) {
-			// sanity check
-			throw new RuntimeException("Error in barrier buffer logic");
-		}
-		nonProcessed = blockedNonProcessed;
-		blockedNonProcessed = new LinkedList<SpillingBufferOrEvent>();
-
-		try {
-			spillReader.setSpillFile(bufferSpiller.getSpillFile());
-			bufferSpiller.resetSpillFile();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-
-		blockedChannels.clear();
-		currentBarrier = null;
+	private void releaseBlocks() throws IOException {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("All barriers received, blocks released");
+			LOG.debug("Releasing blocks");
 		}
-	}
 
-	/**
-	 * Method that is executed once the barrier has been received from all
-	 * channels.
-	 */
-	private void actOnAllBlocked() {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Publishing barrier to the vertex");
+		for (int i = 0; i < blockedChannels.length; i++) {
+			blockedChannels[i] = false;
 		}
-
-		if (currentBarrier != null && !inputFinished) {
-			reader.publish(currentBarrier);
-			lastCheckpointId = currentBarrier.getId();
+		numReceivedBarriers = 0;
+		
+		if (nonProcessed.isEmpty()) {
+			// swap the queues
+			ArrayDeque<SpillingBufferOrEvent> empty = nonProcessed;
+			nonProcessed = blockedNonProcessed;
+			blockedNonProcessed = empty;
 		}
-
-		releaseBlocks();
-	}
-
-	/**
-	 * Processes one {@link org.apache.flink.streaming.runtime.tasks.CheckpointBarrier}
-	 * 
-	 * @param bufferOrEvent The {@link BufferOrEvent} containing the checkpoint barrier
-	 */
-	public void processBarrier(BufferOrEvent bufferOrEvent) {
-		CheckpointBarrier receivedBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
-
-		if (receivedBarrier.getId() < lastCheckpointId) {
-			// a barrier from an old checkpoint, ignore these
-			return;
-		}
-
-		if (currentBarrier == null) {
-			this.currentBarrier = receivedBarrier;
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Checkpoint barrier received start waiting for checkpoint: {}", receivedBarrier);
-			}
-		} else if (receivedBarrier.getId() > currentBarrier.getId()) {
-			// we have a barrier from a more recent checkpoint, free all locks and start with
-			// this newer checkpoint
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Checkpoint barrier received while waiting on checkpoint {}. Restarting waiting with checkpoint {}: ", currentBarrier, receivedBarrier);
-			}
-			releaseBlocks();
-			currentBarrier = receivedBarrier;
-
+		else {
+			throw new IllegalStateException("Unconsumed data from previous checkpoint alignment " +
+					"when starting next checkpoint alignment");
 		}
-		blockChannel(bufferOrEvent.getChannelIndex());
+		
+		// roll over the spill files
+		spillReader.setSpillFile(bufferSpiller.getSpillFile());
+		bufferSpiller.resetSpillFile();
 	}
 
-	public void cleanup() throws IOException {
-		bufferSpiller.close();
-		File spillfile1 = bufferSpiller.getSpillFile();
-		if (spillfile1 != null) {
-			spillfile1.delete();
-		}
+	// ------------------------------------------------------------------------
+	// For Testing
+	// ------------------------------------------------------------------------
 
-		spillReader.close();
-		File spillfile2 = spillReader.getSpillFile();
-		if (spillfile2 != null) {
-			spillfile2.delete();
-		}
+	public long getCurrentCheckpointId() {
+		return this.currentCheckpointId;
 	}
-
+	
+	// ------------------------------------------------------------------------
+	// Utilities 
+	// ------------------------------------------------------------------------
+	
+	@Override
 	public String toString() {
-		return nonProcessed.toString() + blockedNonProcessed.toString();
-	}
-
-	public boolean isEmpty() {
-		return nonProcessed.isEmpty() && blockedNonProcessed.isEmpty();
+		return "Non-Processed: " + nonProcessed + " | Blocked: " + blockedNonProcessed;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 0d57d05..fda612e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -22,28 +22,33 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.util.StringUtils;
 
 public class BufferSpiller {
+	
+	/** The random number generator for temp file names */
+	private static final Random RND = new Random();
 
-	protected static Random rnd = new Random();
+	/** The counter that selects the next directory to spill into */
+	private static final AtomicInteger DIRECTORY_INDEX = new AtomicInteger(0);
+	
+	
+	/** The directories to spill to */
+	private final File tempDir;
 
 	private File spillFile;
-	protected FileChannel spillingChannel;
-	private String tempDir;
-
-	public BufferSpiller() throws IOException {
-		String tempDirString = GlobalConfiguration.getString(
-				ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-				ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
-		String[] tempDirs = tempDirString.split(",|" + File.pathSeparator);
-
-		tempDir = tempDirs[rnd.nextInt(tempDirs.length)];
-
+	
+	private FileChannel spillingChannel;
+	
+	
+
+	public BufferSpiller(IOManager ioManager) throws IOException {
+		File[] tempDirs = ioManager.getSpillingDirectories();
+		this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % tempDirs.length];
 		createSpillingChannel();
 	}
 
@@ -54,24 +59,20 @@ public class BufferSpiller {
 		try {
 			spillingChannel.write(buffer.getNioBuffer());
 			buffer.recycle();
-		} catch (IOException e) {
+		}
+		catch (IOException e) {
 			close();
-			throw new IOException(e);
+			throw e;
 		}
-
 	}
 
 	@SuppressWarnings("resource")
 	private void createSpillingChannel() throws IOException {
-		this.spillFile = new File(tempDir, randomString(rnd) + ".buffer");
+		this.spillFile = new File(tempDir, randomString(RND) + ".buffer");
 		this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
 	}
 
-	private static String randomString(Random random) {
-		final byte[] bytes = new byte[20];
-		random.nextBytes(bytes);
-		return StringUtils.byteToHexString(bytes);
-	}
+
 
 	public void close() throws IOException {
 		if (spillingChannel != null && spillingChannel.isOpen()) {
@@ -87,5 +88,12 @@ public class BufferSpiller {
 	public File getSpillFile() {
 		return spillFile;
 	}
+	
+	// ------------------------------------------------------------------------
 
+	private static String randomString(Random random) {
+		final byte[] bytes = new byte[20];
+		random.nextBytes(bytes);
+		return StringUtils.byteToHexString(bytes);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
new file mode 100644
index 0000000..02dd33d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
+import java.io.IOException;
+
+/**
+ * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the input channels.
+ * Different implementations may either simply track barriers, or block certain inputs on
+ * barriers.
+ */
+public interface CheckpointBarrierHandler {
+
+	/**
+	 * Returns the next {@link BufferOrEvent} that the operator may consume.
+	 * This call blocks until the next BufferOrEvent is available, ir until the stream
+	 * has been determined to be finished.
+	 * 
+	 * @return The next BufferOrEvent, or {@code null}, if the stream is finished.
+	 * @throws java.io.IOException Thrown, if the network or local disk I/O fails.
+	 * @throws java.lang.InterruptedException Thrown, if the thread is interrupted while blocking during
+	 *                                        waiting for the next BufferOrEvent to become available.
+	 */
+	BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
+
+	void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
+	
+	void cleanup() throws IOException;
+
+	/**
+	 * Checks if the barrier handler has buffered any data internally.
+	 * @return True, if no data is buffered internally, false otherwise.
+	 */
+	boolean isEmpty();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 9db0178..4d60375 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
@@ -40,6 +42,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +69,7 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 
 	private boolean isFinished;
 
-	private final BarrierBuffer barrierBuffer;
+	private final CheckpointBarrierHandler barrierHandler;
 
 	private final long[] watermarks;
 	private long lastEmittedWatermark;
@@ -74,10 +77,17 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 	private final DeserializationDelegate<Object> deserializationDelegate;
 
 	@SuppressWarnings("unchecked")
-	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, boolean enableWatermarkMultiplexing) {
+	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
+								EventListener<CheckpointBarrier> checkpointListener,
+								IOManager ioManager,
+								boolean enableWatermarkMultiplexing) throws IOException {
+		
 		super(InputGateUtil.createInputGate(inputGates));
 
-		barrierBuffer = new BarrierBuffer(inputGate, this);
+		this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+		if (checkpointListener != null) {
+			this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
+		}
 		
 		if (enableWatermarkMultiplexing) {
 			MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
@@ -101,8 +111,8 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 		}
 		lastEmittedWatermark = Long.MIN_VALUE;
 	}
-
-	@SuppressWarnings("unchecked")
+	
+	
 	public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator) throws Exception {
 		if (isFinished) {
 			return false;
@@ -137,8 +147,10 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 							}
 						}
 						continue;
-					} else {
+					}
+					else {
 						// now we can do the actual processing
+						@SuppressWarnings("unchecked")
 						StreamRecord<IN> record = (StreamRecord<IN>) deserializationDelegate.getInstance();
 						StreamingRuntimeContext ctx = streamOperator.getRuntimeContext();
 						if (ctx != null) {
@@ -150,32 +162,26 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 				}
 			}
 
-			final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
-
-			if (bufferOrEvent.isBuffer()) {
-				currentChannel = bufferOrEvent.getChannelIndex();
-				currentRecordDeserializer = recordDeserializers[currentChannel];
-				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-			} else {
-				// Event received
-				final AbstractEvent event = bufferOrEvent.getEvent();
-
-				if (event instanceof CheckpointBarrier) {
-					barrierBuffer.processBarrier(bufferOrEvent);
-				} else {
-					if (handleEvent(event)) {
-						if (inputGate.isFinished()) {
-							if (!barrierBuffer.isEmpty()) {
-								throw new RuntimeException("BarrierBuffer should be empty at this point");
-							}
-							isFinished = true;
-							return false;
-						} else if (hasReachedEndOfSuperstep()) {
-							return false;
-						} // else: More data is coming...
-					}
+			final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
+			if (bufferOrEvent != null) {
+				if (bufferOrEvent.isBuffer()) {
+					currentChannel = bufferOrEvent.getChannelIndex();
+					currentRecordDeserializer = recordDeserializers[currentChannel];
+					currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+				}
+				else {
+					// Event received
+					final AbstractEvent event = bufferOrEvent.getEvent();
+					handleEvent(event);
 				}
 			}
+			else {
+				isFinished = true;
+				if (!barrierHandler.isEmpty()) {
+					throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
+				}
+				return false;
+			}
 		}
 	}
 
@@ -195,7 +201,8 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 		}
 	}
 
+	@Override
 	public void cleanup() throws IOException {
-		barrierBuffer.cleanup();
+		barrierHandler.cleanup();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index e235ffe..9668c7f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
@@ -31,6 +32,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
@@ -68,7 +70,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 
 	private boolean isFinished;
 
-	private final BarrierBuffer barrierBuffer;
+	private final CheckpointBarrierHandler barrierHandler;
 
 	private final long[] watermarks1;
 	private long lastEmittedWatermark1;
@@ -87,11 +89,17 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 			Collection<InputGate> inputGates2,
 			TypeSerializer<IN1> inputSerializer1,
 			TypeSerializer<IN2> inputSerializer2,
-			boolean enableWatermarkMultiplexing) {
+			EventListener<CheckpointBarrier> checkpointListener,
+			IOManager ioManager,
+			boolean enableWatermarkMultiplexing) throws IOException {
 		
 		super(InputGateUtil.createInputGate(inputGates1, inputGates2));
 
-		barrierBuffer = new BarrierBuffer(inputGate, this);
+		this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+		if (checkpointListener != null) {
+			this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
+		}
+
 		
 		if (enableWatermarkMultiplexing) {
 			MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
@@ -186,32 +194,26 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 				}
 			}
 
-			final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
-
-			if (bufferOrEvent.isBuffer()) {
-				currentChannel = bufferOrEvent.getChannelIndex();
-				currentRecordDeserializer = recordDeserializers[currentChannel];
-				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-
-			} else {
-				// Event received
-				final AbstractEvent event = bufferOrEvent.getEvent();
+			final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
+			if (bufferOrEvent != null) {
 
-				if (event instanceof CheckpointBarrier) {
-					barrierBuffer.processBarrier(bufferOrEvent);
+				if (bufferOrEvent.isBuffer()) {
+					currentChannel = bufferOrEvent.getChannelIndex();
+					currentRecordDeserializer = recordDeserializers[currentChannel];
+					currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+	
 				} else {
-					if (handleEvent(event)) {
-						if (inputGate.isFinished()) {
-							if (!barrierBuffer.isEmpty()) {
-								throw new RuntimeException("BarrierBuffer should be empty at this point");
-							}
-							isFinished = true;
-							return false;
-						} else if (hasReachedEndOfSuperstep()) {
-							return false;
-						} // else: More data is coming...
-					}
+					// Event received
+					final AbstractEvent event = bufferOrEvent.getEvent();
+					handleEvent(event);
+				}
+			}
+			else {
+				isFinished = true;
+				if (!barrierHandler.isEmpty()) {
+					throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
 				}
+				return false;
 			}
 		}
 	}
@@ -270,6 +272,6 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 
 	@Override
 	public void cleanup() throws IOException {
-		barrierBuffer.cleanup();
+		barrierHandler.cleanup();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 9d6e88e..d078320 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -34,22 +34,27 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 
 	@Override
 	public void registerInputOutput() {
-		super.registerInputOutput();
-
-		TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
-
-		int numberOfInputs = configuration.getNumberOfInputs();
-
-		if (numberOfInputs > 0) {
-			InputGate[] inputGates = getEnvironment().getAllInputGates();
-			inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer, getExecutionConfig().areTimestampsEnabled());
-
-			inputProcessor.registerTaskEventListener(getCheckpointBarrierListener(), CheckpointBarrier.class);
-
-			AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
-			AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
-
-			inputProcessor.setReporter(reporter);
+		try {
+			super.registerInputOutput();
+			
+			TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
+			int numberOfInputs = configuration.getNumberOfInputs();
+	
+			if (numberOfInputs > 0) {
+				InputGate[] inputGates = getEnvironment().getAllInputGates();
+				inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
+						getCheckpointBarrierListener(), 
+						getEnvironment().getIOManager(),
+						getExecutionConfig().areTimestampsEnabled());
+	
+				// make sure that stream tasks report their I/O statistics
+				AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+				AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+				inputProcessor.setReporter(reporter);
+			}
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Failed to initialize stream operator: " + e.getMessage(), e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 75bdd57..d829833 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
@@ -74,7 +73,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 	protected ClassLoader userClassLoader;
 	
-	private EventListener<TaskEvent> checkpointBarrierListener;
+	private EventListener<CheckpointBarrier> checkpointBarrierListener;
 
 	public StreamTask() {
 		streamOperator = null;
@@ -106,7 +105,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 			streamOperator.setup(outputHandler.getOutput(), headContext);
 		}
 
-		hasChainedOperators = !(outputHandler.getChainedOperators().size() == 1);
+		hasChainedOperators = outputHandler.getChainedOperators().size() != 1;
 	}
 
 	public String getName() {
@@ -199,7 +198,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		this.isRunning = false;
 	}
 
-	public EventListener<TaskEvent> getCheckpointBarrierListener() {
+	public EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
 		return this.checkpointBarrierListener;
 	}
 
@@ -211,7 +210,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	@Override
 	public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {
 
-		// We retrieve end restore the states for the chained oeprators.
+		// We retrieve end restore the states for the chained operators.
 		List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>> chainedStates = (List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>>) stateHandle.getState();
 
 		// We restore all stateful chained operators
@@ -306,13 +305,12 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 	// ------------------------------------------------------------------------
 
-	private class CheckpointBarrierListener implements EventListener<TaskEvent> {
+	private class CheckpointBarrierListener implements EventListener<CheckpointBarrier> {
 
 		@Override
-		public void onEvent(TaskEvent event) {
+		public void onEvent(CheckpointBarrier barrier) {
 			try {
-				CheckpointBarrier sStep = (CheckpointBarrier) event;
-				triggerCheckpoint(sStep.getId(), sStep.getTimestamp());
+				triggerCheckpoint(barrier.getId(), barrier.getTimestamp());
 			}
 			catch (Exception e) {
 				throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index f981cd5..b4667b2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -34,44 +34,52 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 
 	private static final Logger LOG = LoggerFactory.getLogger(TwoInputStreamTask.class);
 
-	StreamTwoInputProcessor<IN1, IN2> inputProcessor;
+	private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
 
 	@Override
 	public void registerInputOutput() {
-		super.registerInputOutput();
-
-		TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
-		TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
-
-		int numberOfInputs = configuration.getNumberOfInputs();
-
-		ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
-		ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
-
-		List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
-
-		for (int i = 0; i < numberOfInputs; i++) {
-			int inputType = inEdges.get(i).getTypeNumber();
-			InputGate reader = getEnvironment().getInputGate(i);
-			switch (inputType) {
-				case 1:
-					inputList1.add(reader);
-					break;
-				case 2:
-					inputList2.add(reader);
-					break;
-				default:
-					throw new RuntimeException("Invalid input type number: " + inputType);
+		try {
+			super.registerInputOutput();
+	
+			TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
+			TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
+	
+			int numberOfInputs = configuration.getNumberOfInputs();
+	
+			ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
+			ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
+	
+			List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
+	
+			for (int i = 0; i < numberOfInputs; i++) {
+				int inputType = inEdges.get(i).getTypeNumber();
+				InputGate reader = getEnvironment().getInputGate(i);
+				switch (inputType) {
+					case 1:
+						inputList1.add(reader);
+						break;
+					case 2:
+						inputList2.add(reader);
+						break;
+					default:
+						throw new RuntimeException("Invalid input type number: " + inputType);
+				}
 			}
+	
+			this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
+					inputDeserializer1, inputDeserializer2,
+					getCheckpointBarrierListener(),
+					getEnvironment().getIOManager(),
+					getExecutionConfig().areTimestampsEnabled());
+
+			// make sure that stream tasks report their I/O statistics
+			AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+			AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+			this.inputProcessor.setReporter(reporter);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Failed to initialize stream operator: " + e.getMessage(), e);
 		}
-
-		inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2, inputDeserializer1, inputDeserializer2, getExecutionConfig().areTimestampsEnabled());
-
-		AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
-		AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
-		inputProcessor.setReporter(reporter);
-
-		inputProcessor.registerTaskEventListener(getCheckpointBarrierListener(), CheckpointBarrier.class);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index c479f95..b59ad19 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -20,11 +20,8 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
@@ -220,14 +217,4 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 			return isEvent;
 		}
 	}
-
-	public static class DummyEvent extends TaskEvent {
-		@Override
-		public void write(DataOutputView out) throws IOException {
-		}
-
-		@Override
-		public void read(DataInputView in) throws IOException {
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
deleted file mode 100644
index d8a3696..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.junit.Test;
-
-public class BarrierBufferIOTest {
-
-	@Test
-	public void IOTest() throws IOException, InterruptedException {
-
-		BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
-		BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
-
-		MockInputGate myIG = new MockInputGate(new BufferPool[] { pool1, pool2 },
-				new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
-		// new BarrierSimulator[] { new CountBarrier(1000), new
-		// CountBarrier(1000) });
-
-		BarrierBuffer barrierBuffer = new BarrierBuffer(myIG,
-				new BarrierBufferTest.MockReader(myIG));
-
-		try {
-			// long time = System.currentTimeMillis();
-			for (int i = 0; i < 2000000; i++) {
-				BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
-				if (boe.isBuffer()) {
-					boe.getBuffer().recycle();
-				} else {
-					barrierBuffer.processBarrier(boe);
-				}
-			}
-			// System.out.println("Ran for " + (System.currentTimeMillis() -
-			// time));
-		} catch (Exception e) {
-			fail();
-		} finally {
-			barrierBuffer.cleanup();
-		}
-	}
-
-	private static class RandomBarrier implements BarrierGenerator {
-		private static Random rnd = new Random();
-
-		double threshold;
-
-		public RandomBarrier(double expectedEvery) {
-			threshold = 1 / expectedEvery;
-		}
-
-		@Override
-		public boolean isNextBarrier() {
-			return rnd.nextDouble() < threshold;
-		}
-	}
-
-	private static class CountBarrier implements BarrierGenerator {
-
-		long every;
-		long c = 0;
-
-		public CountBarrier(long every) {
-			this.every = every;
-		}
-
-		@Override
-		public boolean isNextBarrier() {
-			return c++ % every == 0;
-		}
-	}
-
-	protected static class MockInputGate implements InputGate {
-
-		private int numChannels;
-		private BufferPool[] bufferPools;
-		private int[] currentBarriers;
-		BarrierGenerator[] barrierGens;
-		int currentChannel = 0;
-		long c = 0;
-
-		public MockInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
-			this.numChannels = bufferPools.length;
-			this.currentBarriers = new int[numChannels];
-			this.bufferPools = bufferPools;
-			this.barrierGens = barrierGens;
-		}
-
-		@Override
-		public int getNumberOfInputChannels() {
-			return numChannels;
-		}
-
-		@Override
-		public boolean isFinished() {
-			return false;
-		}
-
-		@Override
-		public void requestPartitions() throws IOException, InterruptedException {
-		}
-
-		@Override
-		public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
-			currentChannel = (currentChannel + 1) % numChannels;
-
-			if (barrierGens[currentChannel].isNextBarrier()) {
-				return BarrierBufferTest.createBarrier(++currentBarriers[currentChannel],
-						currentChannel);
-			} else {
-				Buffer buffer = bufferPools[currentChannel].requestBuffer();
-				buffer.getMemorySegment().putLong(0, c++);
-
-				return new BufferOrEvent(buffer, currentChannel);
-			}
-
-		}
-
-		@Override
-		public void sendTaskEvent(TaskEvent event) throws IOException {
-		}
-
-		@Override
-		public void registerListener(EventListener<InputGate> listener) {
-		}
-
-	}
-
-	protected interface BarrierGenerator {
-		public boolean isNextBarrier();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
new file mode 100644
index 0000000..c2df4d8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
+import org.junit.Test;
+
+/**
+ * The test generates two random streams (input channels) which independently
+ * and randomly generate checkpoint barriers. The two streams are very
+ * unaligned, putting heavy work on the BarrierBuffer.
+ */
+public class BarrierBufferMassiveRandomTest {
+
+	@Test
+	public void testWithTwoChannelsAndRandomBarriers() {
+		IOManager ioMan = null;
+		try {
+			ioMan = new IOManagerAsync();
+			
+			BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+			BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+
+			RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
+					new BufferPool[] { pool1, pool2 },
+					new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
+	
+			BarrierBuffer barrierBuffer = new BarrierBuffer(myIG, ioMan);
+			
+			for (int i = 0; i < 2000000; i++) {
+				BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
+				if (boe.isBuffer()) {
+					boe.getBuffer().recycle();
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (ioMan != null) {
+				ioMan.shutdown();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Mocks and Generators
+	// ------------------------------------------------------------------------
+	
+	protected interface BarrierGenerator {
+		public boolean isNextBarrier();
+	}
+
+	protected static class RandomBarrier implements BarrierGenerator {
+		
+		private static final Random rnd = new Random();
+
+		private final double threshold;
+
+		public RandomBarrier(double expectedEvery) {
+			threshold = 1 / expectedEvery;
+		}
+
+		@Override
+		public boolean isNextBarrier() {
+			return rnd.nextDouble() < threshold;
+		}
+	}
+
+	private static class CountBarrier implements BarrierGenerator {
+
+		private final long every;
+		private long c = 0;
+
+		public CountBarrier(long every) {
+			this.every = every;
+		}
+
+		@Override
+		public boolean isNextBarrier() {
+			return c++ % every == 0;
+		}
+	}
+
+	protected static class RandomGeneratingInputGate implements InputGate {
+
+		private final int numChannels;
+		private final BufferPool[] bufferPools;
+		private final int[] currentBarriers;
+		private final BarrierGenerator[] barrierGens;
+		private int currentChannel = 0;
+		private long c = 0;
+
+		public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
+			this.numChannels = bufferPools.length;
+			this.currentBarriers = new int[numChannels];
+			this.bufferPools = bufferPools;
+			this.barrierGens = barrierGens;
+		}
+
+		@Override
+		public int getNumberOfInputChannels() {
+			return numChannels;
+		}
+
+		@Override
+		public boolean isFinished() {
+			return false;
+		}
+
+		@Override
+		public void requestPartitions() {}
+
+		@Override
+		public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
+			currentChannel = (currentChannel + 1) % numChannels;
+
+			if (barrierGens[currentChannel].isNextBarrier()) {
+				return new BufferOrEvent(
+						new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis()),
+							currentChannel);
+			} else {
+				Buffer buffer = bufferPools[currentChannel].requestBuffer();
+				buffer.getMemorySegment().putLong(0, c++);
+				return new BufferOrEvent(buffer, currentChannel);
+			}
+		}
+
+		@Override
+		public void sendTaskEvent(TaskEvent event) {}
+
+		@Override
+		public void registerListener(EventListener<InputGate> listener) {}
+	}
+}


[4/8] flink git commit: [FLINK-2421] [streaming] Add tests for basic utility behavior of StreamRecordSerializer (fixed in previous commit).

Posted by se...@apache.org.
[FLINK-2421] [streaming] Add tests for basic utility behavior of StreamRecordSerializer (fixed in previous commit).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d237e18
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d237e18
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d237e18

Branch: refs/heads/master
Commit: 2d237e18a2f7cf21721340933c505bb518c4fc66
Parents: acae9ff
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 28 18:08:19 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:06 2015 +0200

----------------------------------------------------------------------
 .../StreamRecordSerializerTest.java             | 68 ++++++++++++++++++++
 1 file changed, 68 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d237e18/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
new file mode 100644
index 0000000..d48f7f4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class StreamRecordSerializerTest {
+	
+	@Test
+	public void testDeepDuplication() {
+		try {
+			@SuppressWarnings("unchecked")
+			TypeSerializer<Long> serializer1 = (TypeSerializer<Long>) mock(TypeSerializer.class);
+			@SuppressWarnings("unchecked")
+			TypeSerializer<Long> serializer2 = (TypeSerializer<Long>) mock(TypeSerializer.class);
+			
+			when(serializer1.duplicate()).thenReturn(serializer2);
+			
+			StreamRecordSerializer<Long> streamRecSer = new StreamRecordSerializer<Long>(serializer1);
+			assertEquals(serializer1, streamRecSer.getContainedTypeSerializer());
+			
+			StreamRecordSerializer<Long> copy = streamRecSer.duplicate();
+			assertNotEquals(copy, streamRecSer);
+			assertNotEquals(copy.getContainedTypeSerializer(), streamRecSer.getContainedTypeSerializer());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testBasicProperties() {
+		try {
+			StreamRecordSerializer<Long> streamRecSer = new StreamRecordSerializer<Long>(LongSerializer.INSTANCE);
+			
+			assertFalse(streamRecSer.isImmutableType());
+			assertEquals(Long.class, streamRecSer.createInstance().getValue().getClass());
+			assertEquals(LongSerializer.INSTANCE.getLength(), streamRecSer.getLength());
+			
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}


[5/8] flink git commit: [hotfix] Fix generics for stream record and watermark multiplexing.

Posted by se...@apache.org.
[hotfix] Fix generics for stream record and watermark multiplexing.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acae9ff2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acae9ff2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acae9ff2

Branch: refs/heads/master
Commit: acae9ff2583384dada84b40a89d3a068e3b2a00c
Parents: 8ba3213
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 28 18:07:45 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:06 2015 +0200

----------------------------------------------------------------------
 .../runtime/plugable/SerializationDelegate.java |  7 +-
 .../runtime/io/RecordWriterOutput.java          | 21 ++---
 .../runtime/io/StreamInputProcessor.java        | 25 +++---
 .../runtime/io/StreamRecordWriter.java          |  2 +-
 .../runtime/io/StreamTwoInputProcessor.java     | 57 +++++++------
 .../MultiplexingStreamRecordSerializer.java     | 35 ++++++--
 .../runtime/streamrecord/StreamRecord.java      | 16 ++--
 .../streamrecord/StreamRecordSerializer.java    | 65 ++++++++-------
 .../streaming/runtime/tasks/OutputHandler.java  | 85 +++++++++++---------
 .../runtime/tasks/StreamTaskTestHarness.java    |  7 +-
 10 files changed, 185 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
index 3cbaac3..91b6dd9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
@@ -26,7 +26,12 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-
+/**
+ * The serialization delegate exposes an arbitrary element as a {@link IOReadableWritable} for
+ * serialization, with the help of a type serializer.
+ * 
+ * @param <T> The type to be represented as an IOReadableWritable.
+ */
 public class SerializationDelegate<T> implements IOReadableWritable {
 	
 	private T instance;

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index f7d8d47..de8c205 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -39,32 +39,33 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RecordWriterOutput.class);
 
-	private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
-	private SerializationDelegate<StreamRecord<OUT>> serializationDelegate;
+	private RecordWriter<SerializationDelegate<Object>> recordWriter;
+	
+	private SerializationDelegate<Object> serializationDelegate;
 
 	@SuppressWarnings("unchecked")
 	public RecordWriterOutput(
-			RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
+			RecordWriter<?> recordWriter,
 			TypeSerializer<OUT> outSerializer,
 			boolean enableWatermarkMultiplexing) {
+		
 		Preconditions.checkNotNull(recordWriter);
 
-		this.recordWriter = recordWriter;
+		this.recordWriter = (RecordWriter<SerializationDelegate<Object>>) recordWriter;
 
-		StreamRecordSerializer<OUT> outRecordSerializer;
+		TypeSerializer<Object> outRecordSerializer;
 		if (enableWatermarkMultiplexing) {
 			outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer);
 		} else {
-			outRecordSerializer = new StreamRecordSerializer<OUT>(outSerializer);
+			outRecordSerializer = (TypeSerializer<Object>) (TypeSerializer<?>) new StreamRecordSerializer<OUT>(outSerializer);
 		}
 
 		if (outSerializer != null) {
-			serializationDelegate = new SerializationDelegate(outRecordSerializer);
+			serializationDelegate = new SerializationDelegate<Object>(outRecordSerializer);
 		}
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public void collect(StreamRecord<OUT> record) {
 		serializationDelegate.setInstance(record);
 
@@ -79,9 +80,9 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 	}
 
 	@Override
-	@SuppressWarnings("unchecked,rawtypes")
 	public void emitWatermark(Watermark mark) {
-		((SerializationDelegate)serializationDelegate).setInstance(mark);
+		serializationDelegate.setInstance(mark);
+		
 		try {
 			recordWriter.broadcastEmit(serializationDelegate);
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 4c40e5f..9db0178 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -46,9 +46,8 @@ import org.slf4j.LoggerFactory;
 /**
  * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
  *
- * <p>
- * This also keeps track of {@link Watermark} events and forwards them to event subscribers
- * once the {@link Watermark} from all inputs advances.
+ * <p>This also keeps track of {@link Watermark} events and forwards them to event subscribers
+ * once the {@link Watermark} from all inputs advances.</p>
  * 
  * @param <IN> The type of the record that can be read with this record reader.
  */
@@ -63,33 +62,35 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 
 	// We need to keep track of the channel from which a buffer came, so that we can
 	// appropriately map the watermarks to input channels
-	int currentChannel = -1;
+	private int currentChannel = -1;
 
 	private boolean isFinished;
 
 	private final BarrierBuffer barrierBuffer;
 
-	private long[] watermarks;
+	private final long[] watermarks;
 	private long lastEmittedWatermark;
 
-	private DeserializationDelegate<Object> deserializationDelegate;
+	private final DeserializationDelegate<Object> deserializationDelegate;
 
 	@SuppressWarnings("unchecked")
 	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, boolean enableWatermarkMultiplexing) {
 		super(InputGateUtil.createInputGate(inputGates));
 
 		barrierBuffer = new BarrierBuffer(inputGate, this);
-
-		StreamRecordSerializer<IN> inputRecordSerializer;
+		
 		if (enableWatermarkMultiplexing) {
-			inputRecordSerializer = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
+			MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
+			this.deserializationDelegate = new NonReusingDeserializationDelegate<Object>(ser);
 		} else {
-			inputRecordSerializer = new StreamRecordSerializer<IN>(inputSerializer);
+			StreamRecordSerializer<IN> ser = new StreamRecordSerializer<IN>(inputSerializer);
+			this.deserializationDelegate = (NonReusingDeserializationDelegate<Object>)
+					(NonReusingDeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN>>(ser);
 		}
-		this.deserializationDelegate = new NonReusingDeserializationDelegate<Object>(inputRecordSerializer);
-
+		
 		// Initialize one deserializer per input channel
 		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
+		
 		for (int i = 0; i < recordDeserializers.length; i++) {
 			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index b0e2532..321f3b4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -58,7 +58,7 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
 		
 		super(writer, channelSelector);
 		
-		checkArgument(timeout < 0);
+		checkArgument(timeout >= 0);
 		
 		if (timeout == 0) {
 			flushAlways = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 82e7936..e235ffe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -64,64 +64,70 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 
 	// We need to keep track of the channel from which a buffer came, so that we can
 	// appropriately map the watermarks to input channels
-	int currentChannel = -1;
+	private int currentChannel = -1;
 
 	private boolean isFinished;
 
 	private final BarrierBuffer barrierBuffer;
 
-	private long[] watermarks1;
+	private final long[] watermarks1;
 	private long lastEmittedWatermark1;
 
-	private long[] watermarks2;
+	private final long[] watermarks2;
 	private long lastEmittedWatermark2;
 
-	private int numInputChannels1;
-	private int numInputChannels2;
+	private final int numInputChannels1;
 
-	private DeserializationDelegate<Object> deserializationDelegate1;
-	private DeserializationDelegate<Object> deserializationDelegate2;
+	private final DeserializationDelegate<Object> deserializationDelegate1;
+	private final DeserializationDelegate<Object> deserializationDelegate2;
 
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings({"unchecked", "rawtypes"})
 	public StreamTwoInputProcessor(
 			Collection<InputGate> inputGates1,
 			Collection<InputGate> inputGates2,
 			TypeSerializer<IN1> inputSerializer1,
 			TypeSerializer<IN2> inputSerializer2,
 			boolean enableWatermarkMultiplexing) {
+		
 		super(InputGateUtil.createInputGate(inputGates1, inputGates2));
 
 		barrierBuffer = new BarrierBuffer(inputGate, this);
-
-		StreamRecordSerializer<IN1> inputRecordSerializer1;
+		
 		if (enableWatermarkMultiplexing) {
-			inputRecordSerializer1 = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
-		} else {
-			inputRecordSerializer1 = new StreamRecordSerializer<IN1>(inputSerializer1);
+			MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
+			this.deserializationDelegate1 = new NonReusingDeserializationDelegate<Object>(ser);
 		}
-		this.deserializationDelegate1 = new NonReusingDeserializationDelegate(inputRecordSerializer1);
-
-		StreamRecordSerializer<IN2> inputRecordSerializer2;
+		else {
+			StreamRecordSerializer<IN1> ser = new StreamRecordSerializer<IN1>(inputSerializer1);
+			this.deserializationDelegate1 = (DeserializationDelegate<Object>)
+					(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN1>>(ser);
+		}
+		
 		if (enableWatermarkMultiplexing) {
-			inputRecordSerializer2 = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
-		} else {
-			inputRecordSerializer2 = new StreamRecordSerializer<IN2>(inputSerializer2);
+			MultiplexingStreamRecordSerializer<IN2> ser = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
+			this.deserializationDelegate2 = new NonReusingDeserializationDelegate<Object>(ser);
+		}
+		else {
+			StreamRecordSerializer<IN2> ser = new StreamRecordSerializer<IN2>(inputSerializer2);
+			this.deserializationDelegate2 = (DeserializationDelegate<Object>)
+					(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN2>>(ser);
 		}
-		this.deserializationDelegate2 = new NonReusingDeserializationDelegate(inputRecordSerializer2);
 
 		// Initialize one deserializer per input channel
-		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
-				.getNumberOfInputChannels()];
+		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
+		
 		for (int i = 0; i < recordDeserializers.length; i++) {
-			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer();
+			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>();
 		}
 
 		// determine which unioned channels belong to input 1 and which belong to input 2
-		numInputChannels1 = 0;
+		int numInputChannels1 = 0;
 		for (InputGate gate: inputGates1) {
 			numInputChannels1 += gate.getNumberOfInputChannels();
 		}
-		numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
+		
+		this.numInputChannels1 = numInputChannels1;
+		int numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
 
 		watermarks1 = new long[numInputChannels1];
 		for (int i = 0; i < numInputChannels1; i++) {
@@ -262,6 +268,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 		}
 	}
 
+	@Override
 	public void cleanup() throws IOException {
 		barrierBuffer.cleanup();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index 715f0d2..075c4fc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.streamrecord;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -35,17 +36,36 @@ import java.io.IOException;
  *
  * @param <T> The type of value in the {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}
  */
-public final class MultiplexingStreamRecordSerializer<T> extends StreamRecordSerializer<T> {
-
-	private final long IS_WATERMARK = Long.MIN_VALUE;
+public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Object> {
 
 	private static final long serialVersionUID = 1L;
 
+	private static final long IS_WATERMARK = Long.MIN_VALUE;
+	
+	protected final TypeSerializer<T> typeSerializer;
+
+	
 	public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) {
-		super(serializer);
-		if (serializer instanceof MultiplexingStreamRecordSerializer) {
+		if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) {
 			throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
 		}
+		this.typeSerializer = Preconditions.checkNotNull(serializer);
+	}
+	
+	
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<Object> duplicate() {
+		return this;
+	}
+
+	@Override
+	public Object createInstance() {
+		return new StreamRecord<T>(typeSerializer.createInstance(), 0L);
 	}
 
 	@Override
@@ -81,6 +101,11 @@ public final class MultiplexingStreamRecordSerializer<T> extends StreamRecordSer
 	}
 
 	@Override
+	public int getLength() {
+		return 0;
+	}
+
+	@Override
 	@SuppressWarnings("unchecked")
 	public void serialize(Object value, DataOutputView target) throws IOException {
 		if (value instanceof StreamRecord) {

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index 6521e7f..92ce66f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -19,12 +19,13 @@ package org.apache.flink.streaming.runtime.streamrecord;
 
 /**
  * One value in a data stream. This stores the value and the associated timestamp.
+ * 
+ * @param <T> The type encapsulated with the stream record.
  */
 public class StreamRecord<T> {
-
-	// We store it as Object so that we can reuse a StreamElement for emitting
-	// elements of a different type while still reusing the timestamp.
-	private Object value;
+	
+	private T value;
+	
 	private long timestamp;
 
 	/**
@@ -52,9 +53,8 @@ public class StreamRecord<T> {
 	/**
 	 * Returns the value wrapped in this stream value.
 	 */
-	@SuppressWarnings("unchecked")
 	public T getValue() {
-		return (T) value;
+		return value;
 	}
 
 	/**
@@ -74,7 +74,7 @@ public class StreamRecord<T> {
 	 */
 	@SuppressWarnings("unchecked")
 	public <X> StreamRecord<X> replace(X element) {
-		this.value = element;
+		this.value = (T) element;
 		return (StreamRecord<X>) this;
 	}
 
@@ -90,7 +90,7 @@ public class StreamRecord<T> {
 	@SuppressWarnings("unchecked")
 	public <X> StreamRecord<X> replace(X value, long timestamp) {
 		this.timestamp = timestamp;
-		this.value = value;
+		this.value = (T) value;
 		return (StreamRecord<X>) this;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
index 2619891..e58d3c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -38,11 +38,12 @@ import org.apache.flink.core.memory.DataOutputView;
  *
  * @param <T> The type of value in the {@link StreamRecord}
  */
-public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
+public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
 
 	private static final long serialVersionUID = 1L;
 
-	protected final TypeSerializer<T> typeSerializer;
+	private final TypeSerializer<T> typeSerializer;
+	
 
 	public StreamRecordSerializer(TypeSerializer<T> serializer) {
 		if (serializer instanceof StreamRecordSerializer) {
@@ -51,19 +52,36 @@ public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
 		this.typeSerializer = Preconditions.checkNotNull(serializer);
 	}
 
+	public TypeSerializer<T> getContainedTypeSerializer() {
+		return this.typeSerializer;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  General serializer and type utils
+	// ------------------------------------------------------------------------
+
+	@Override
+	public StreamRecordSerializer<T> duplicate() {
+		TypeSerializer<T> serializerCopy = typeSerializer.duplicate();
+		return serializerCopy == typeSerializer ? this : new StreamRecordSerializer<T>(serializerCopy);
+	}
+
 	@Override
 	public boolean isImmutableType() {
 		return false;
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
-	public StreamRecordSerializer<T> duplicate() {
-		return this;
+	public int getLength() {
+		return typeSerializer.getLength();
 	}
 
+	// ------------------------------------------------------------------------
+	//  Type serialization, copying, instantiation
+	// ------------------------------------------------------------------------
+
 	@Override
-	public Object createInstance() {
+	public StreamRecord<T> createInstance() {
 		try {
 			return new StreamRecord<T>(typeSerializer.createInstance());
 		} catch (Exception e) {
@@ -72,46 +90,31 @@ public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
 	}
 	
 	@Override
-	@SuppressWarnings("unchecked")
-	public Object copy(Object from) {
-		StreamRecord<T> fromRecord = (StreamRecord<T>) from;
-		return new StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), fromRecord.getTimestamp());
+	public StreamRecord<T> copy(StreamRecord<T> from) {
+		return new StreamRecord<T>(typeSerializer.copy(from.getValue()), from.getTimestamp());
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
-	public Object copy(Object from, Object reuse) {
-		StreamRecord<T> fromRecord = (StreamRecord<T>) from;
-		StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
-
-		reuseRecord.replace(typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()), 0);
+	public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
+		reuse.replace(typeSerializer.copy(from.getValue(), reuse.getValue()), 0);
 		return reuse;
 	}
 
 	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public void serialize(Object value, DataOutputView target) throws IOException {
-		StreamRecord<T> record = (StreamRecord<T>) value;
-		typeSerializer.serialize(record.getValue(), target);
+	public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException {
+		typeSerializer.serialize(value.getValue(), target);
 	}
 	
 	@Override
-	public Object deserialize(DataInputView source) throws IOException {
+	public StreamRecord<T> deserialize(DataInputView source) throws IOException {
 		T element = typeSerializer.deserialize(source);
 		return new StreamRecord<T>(element, 0);
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
-	public Object deserialize(Object reuse, DataInputView source) throws IOException {
-		StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
-		T element = typeSerializer.deserialize(reuseRecord.getValue(), source);
-		reuseRecord.replace(element, 0);
+	public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
+		T element = typeSerializer.deserialize(reuse.getValue(), source);
+		reuse.replace(element, 0);
 		return reuse;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index aa55151..84614bf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -44,28 +44,30 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class OutputHandler<OUT> {
+	
 	private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
 
-	private StreamTask<OUT, ?> vertex;
-	private StreamConfig configuration;
-	private ClassLoader cl;
-	private Output<StreamRecord<OUT>> outerOutput;
+	private final StreamTask<OUT, ?> vertex;
+	
+	/** The classloader used to access all user code */
+	private final ClassLoader userCodeClassloader;
+	
+	
+	private final Output<StreamRecord<OUT>> outerOutput;
 
-	public List<StreamOperator<?>> chainedOperators;
+	public final List<StreamOperator<?>> chainedOperators;
 
-	private Map<StreamEdge, RecordWriterOutput<?>> outputMap;
+	private final Map<StreamEdge, RecordWriterOutput<?>> outputMap;
 
-	private Map<Integer, StreamConfig> chainedConfigs;
-	private List<StreamEdge> outEdgesInOrder;
+	private final Map<Integer, StreamConfig> chainedConfigs;
 
-	/**
-	 * Counters for the number of records emitted and bytes written.
-	 */
-	protected AccumulatorRegistry.Reporter reporter;
+	/** Counters for the number of records emitted and bytes written. */
+	protected final AccumulatorRegistry.Reporter reporter;
 
 
 	public OutputHandler(StreamTask<OUT, ?> vertex, Map<String, Accumulator<?,?>> accumulatorMap,
@@ -73,17 +75,17 @@ public class OutputHandler<OUT> {
 
 		// Initialize some fields
 		this.vertex = vertex;
-		this.configuration = new StreamConfig(vertex.getTaskConfiguration());
+		StreamConfig configuration = new StreamConfig(vertex.getTaskConfiguration());
 		this.chainedOperators = new ArrayList<StreamOperator<?>>();
 		this.outputMap = new HashMap<StreamEdge, RecordWriterOutput<?>>();
-		this.cl = vertex.getUserCodeClassLoader();
+		this.userCodeClassloader = vertex.getUserCodeClassLoader();
 
 		// We read the chained configs, and the order of record writer
-		// registrations by outputname
-		this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(cl);
+		// registrations by output name
+		this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(userCodeClassloader);
 		this.chainedConfigs.put(configuration.getVertexID(), configuration);
 
-		this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl);
+		List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
 
 		this.reporter = reporter;
 
@@ -133,25 +135,24 @@ public class OutputHandler<OUT> {
 	 * @return Returns the output for the chain starting from the given
 	 * config
 	 */
-	@SuppressWarnings({"unchecked", "rawtypes"})
+	@SuppressWarnings("unchecked")
 	private <X> Output<StreamRecord<X>> createChainedCollector(StreamConfig chainedTaskConfig, Map<String, Accumulator<?,?>> accumulatorMap) {
-
-
+		
 		// We create a wrapper that will encapsulate the chained operators and
 		// network outputs
 
-		OutputSelectorWrapper<?> outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(cl);
+		OutputSelectorWrapper<?> outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(userCodeClassloader);
 		CollectorWrapper wrapper = new CollectorWrapper(outputSelectorWrapper);
 
 		// Create collectors for the network outputs
-		for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) {
+		for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(userCodeClassloader)) {
 			Output<?> output = outputMap.get(outputEdge);
 
 			wrapper.addCollector(output, outputEdge);
 		}
 
 		// Create collectors for the chained outputs
-		for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
+		for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(userCodeClassloader)) {
 			Integer outputId = outputEdge.getTargetId();
 
 			Output<?> output = createChainedCollector(chainedConfigs.get(outputId), accumulatorMap);
@@ -163,11 +164,12 @@ public class OutputHandler<OUT> {
 			// The current task is the first chained task at this vertex so we
 			// return the wrapper
 			return (Output<StreamRecord<X>>) wrapper;
-		} else {
+		}
+		else {
 			// The current task is a part of the chain so we get the chainable
 			// operator which will be returned and set it up using the wrapper
 			OneInputStreamOperator chainableOperator =
-					chainedTaskConfig.getStreamOperator(vertex.getUserCodeClassLoader());
+					chainedTaskConfig.getStreamOperator(userCodeClassloader);
 
 			StreamingRuntimeContext chainedContext = vertex.createRuntimeContext(chainedTaskConfig, accumulatorMap);
 			vertex.contexts.add(chainedContext);
@@ -177,14 +179,20 @@ public class OutputHandler<OUT> {
 			chainedOperators.add(chainableOperator);
 			if (vertex.getExecutionConfig().isObjectReuseEnabled() || chainableOperator.isInputCopyingDisabled()) {
 				return new ChainingOutput<X>(chainableOperator);
-			} else {
-				StreamRecordSerializer serializerIn1;
+			}
+			else {
+				TypeSerializer<X> typeSer = chainedTaskConfig.getTypeSerializerIn1(userCodeClassloader);
+				TypeSerializer<StreamRecord<X>> inSerializer;
+				
 				if (vertex.getExecutionConfig().areTimestampsEnabled()) {
-					serializerIn1 = new MultiplexingStreamRecordSerializer(chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
-				} else {
-					serializerIn1 = new StreamRecordSerializer(chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
+					inSerializer = (TypeSerializer<StreamRecord<X>>) 
+							(TypeSerializer<?>) new MultiplexingStreamRecordSerializer<X>(typeSer);
 				}
-				return new CopyingChainingOutput<X>(chainableOperator, (TypeSerializer<StreamRecord<X>>) serializerIn1);
+				else {
+					inSerializer = new StreamRecordSerializer<X>(typeSer);
+				}
+				
+				return new CopyingChainingOutput<X>(chainableOperator, inSerializer);
 			}
 		}
 
@@ -244,14 +252,14 @@ public class OutputHandler<OUT> {
 	}
 
 	private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
-		protected OneInputStreamOperator<T, ?> operator;
+		
+		protected final OneInputStreamOperator<T, ?> operator;
 
 		public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
 			this.operator = operator;
 		}
 
 		@Override
-		@SuppressWarnings("unchecked")
 		public void collect(StreamRecord<T> record) {
 			try {
 				operator.getRuntimeContext().setNextInput(record);
@@ -268,7 +276,8 @@ public class OutputHandler<OUT> {
 		public void emitWatermark(Watermark mark) {
 			try {
 				operator.processWatermark(mark);
-			} catch (Exception e) {
+			}
+			catch (Exception e) {
 				if (LOG.isErrorEnabled()) {
 					LOG.error("Could not forward element to operator: {}", e);
 				}
@@ -280,10 +289,12 @@ public class OutputHandler<OUT> {
 		public void close() {
 			try {
 				operator.close();
-			} catch (Exception e) {
+			}
+			catch (Exception e) {
 				if (LOG.isErrorEnabled()) {
 					LOG.error("Could not forward close call to operator.", e);
 				}
+				throw new RuntimeException(e);
 			}
 		}
 	}
@@ -298,12 +309,12 @@ public class OutputHandler<OUT> {
 		}
 
 		@Override
-		@SuppressWarnings("unchecked")
 		public void collect(StreamRecord<T> record) {
 			try {
 				operator.getRuntimeContext().setNextInput(record);
 				operator.processElement(serializer.copy(record));
-			} catch (Exception e) {
+			}
+			catch (Exception e) {
 				if (LOG.isErrorEnabled()) {
 					LOG.error("Could not forward element to operator.", e);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 283243e..435831f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.util.InstantiationUtil;
 import org.junit.Assert;
 
@@ -79,9 +78,8 @@ public class StreamTaskTestHarness<OUT> {
 
 	private AbstractInvokable task;
 
-	private TypeInformation<OUT> outputType;
 	private TypeSerializer<OUT> outputSerializer;
-	private StreamRecordSerializer<OUT> outputStreamRecordSerializer;
+	private TypeSerializer<Object> outputStreamRecordSerializer;
 
 	private ConcurrentLinkedQueue<Object> outputList;
 
@@ -114,7 +112,6 @@ public class StreamTaskTestHarness<OUT> {
 		streamConfig.setChainStart();
 		streamConfig.setBufferTimeout(0);
 
-		this.outputType = outputType;
 		outputSerializer = outputType.createSerializer(executionConfig);
 		outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outputSerializer);
 	}
@@ -127,7 +124,7 @@ public class StreamTaskTestHarness<OUT> {
 
 	@SuppressWarnings("unchecked")
 	private void initializeOutput() {
-		outputList = new ConcurrentLinkedQueue();
+		outputList = new ConcurrentLinkedQueue<Object>();
 
 		mockEnv.addOutput(outputList, outputStreamRecordSerializer);
 


[3/8] flink git commit: [FLINK-2418] [streaming] Add an end-to-end exactly-once test for Checkpointed functions.

Posted by se...@apache.org.
[FLINK-2418] [streaming] Add an end-to-end exactly-once test for Checkpointed functions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0556efb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0556efb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0556efb

Branch: refs/heads/master
Commit: a0556efb233f15c6985d17886372a8b4b00392b2
Parents: 5710841
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 28 15:07:43 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:06 2015 +0200

----------------------------------------------------------------------
 .../checkpointing/StateCheckpoinedITCase.java   | 433 +++++++++++++++++++
 1 file changed, 433 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a0556efb/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
new file mode 100644
index 0000000..0693665
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * A simple test that runs a streaming topology with checkpointing enabled.
+ * 
+ * The test triggers a failure after a while and verifies that, after completion, the
+ * state reflects the "exactly once" semantics.
+ */
+@SuppressWarnings("serial")
+public class StateCheckpoinedITCase {
+
+	private static final int NUM_TASK_MANAGERS = 2;
+	private static final int NUM_TASK_SLOTS = 3;
+	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
+
+	private static ForkableFlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void startCluster() {
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
+			config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
+			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+			
+			cluster = new ForkableFlinkMiniCluster(config, false);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to start test cluster: " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void shutdownCluster() {
+		try {
+			cluster.shutdown();
+			cluster = null;
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to stop test cluster: " + e.getMessage());
+		}
+	}
+
+
+	/**
+	 * Runs the following program:
+	 *
+	 * <pre>
+	 *     [ (source)->(filter)->(map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
+	 * </pre>
+	 */
+	@Test
+	public void runCheckpointedProgram() {
+
+		final long NUM_STRINGS = 10000000L;
+		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+		
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+																	"localhost", cluster.getJobManagerRPCPort());
+			env.setParallelism(PARALLELISM);
+			env.enableCheckpointing(500);
+			env.getConfig().enableSysoutLogging();
+
+			DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
+			
+			stream
+					// -------------- first vertex, chained to the source ----------------
+					.filter(new StringRichFilterFunction())
+
+					// -------------- seconds vertex - one-to-one connected ----------------
+					.map(new StringPrefixCountRichMapFunction())
+					.startNewChain()
+					.map(new StatefulCounterFunction())
+
+					// -------------- third vertex - reducer and the sink ----------------
+					.partitionByHash("prefix")
+					.flatMap(new OnceFailingAggregator(NUM_STRINGS))
+					.addSink(new ValidatingSink());
+
+			env.execute();
+			
+			long filterSum = 0;
+			for (long l : StringRichFilterFunction.counts) {
+				filterSum += l;
+			}
+
+			long mapSum = 0;
+			for (long l : StringPrefixCountRichMapFunction.counts) {
+				mapSum += l;
+			}
+
+			long countSum = 0;
+			for (long l : StatefulCounterFunction.counts) {
+				countSum += l;
+			}
+
+			// verify that we counted exactly right
+			assertEquals(NUM_STRINGS, filterSum);
+			assertEquals(NUM_STRINGS, mapSum);
+			assertEquals(NUM_STRINGS, countSum);
+
+			for (Map<Character, Long> map : ValidatingSink.maps) {
+				for (Long count : map.values()) {
+					assertEquals(NUM_STRINGS / 40, count.longValue());
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom Functions
+	// --------------------------------------------------------------------------------------------
+	
+	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> 
+			implements CheckpointedAsynchronously<Integer>
+	{
+		private final long numElements;
+
+		private int index;
+
+		private volatile boolean isRunning = true;
+		
+		
+		StringGeneratingSourceFunction(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			final Object lockingObject = ctx.getCheckpointLock();
+
+			final Random rnd = new Random();
+			final StringBuilder stringBuilder = new StringBuilder();
+			
+			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+			
+			if (index == 0) {
+				index = getRuntimeContext().getIndexOfThisSubtask();
+			}
+
+			while (isRunning && index < numElements) {
+				char first = (char) ((index % 40) + 40);
+
+				stringBuilder.setLength(0);
+				stringBuilder.append(first);
+
+				String result = randomString(stringBuilder, rnd);
+
+				synchronized (lockingObject) {
+					index += step;
+					ctx.collect(result);
+				}
+			}
+		}
+		
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		private static String randomString(StringBuilder bld, Random rnd) {
+			final int len = rnd.nextInt(10) + 5;
+
+			for (int i = 0; i < len; i++) {
+				char next = (char) (rnd.nextInt(20000) + 33);
+				bld.append(next);
+			}
+
+			return bld.toString();
+		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return index;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			index = state;
+		}
+	}
+
+	private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
+
+		static final long[] counts = new long[PARALLELISM];
+
+		private long count;
+
+		@Override
+		public boolean filter(String value) {
+			count++;
+			return value.length() < 100; // should be always true
+		}
+
+		@Override
+		public void close() {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
+		}
+
+		@Override
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+			return count;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			count = state;
+		}
+	}
+
+	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> 
+			implements CheckpointedAsynchronously<Long> {
+		
+		static final long[] counts = new long[PARALLELISM];
+
+		private long count;
+		
+		@Override
+		public PrefixCount map(String value) {
+			count++;
+			return new PrefixCount(value.substring(0, 1), value, 1L);
+		}
+
+		@Override
+		public void close() {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
+		}
+
+		@Override
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+			return count;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			count = state;
+		}
+	}
+	
+	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> {
+
+		static final long[] counts = new long[PARALLELISM];
+		
+		private OperatorState<Long> count;
+
+		@Override
+		public PrefixCount map(PrefixCount value) throws Exception {
+			count.update(count.value() + 1);
+			return value;
+		}
+
+		@Override
+		public void open(Configuration conf) throws IOException {
+			count = getRuntimeContext().getOperatorState("count", 0L, false);
+		}
+
+		@Override
+		public void close() throws IOException {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
+		}
+		
+	}
+	
+	private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount> 
+		implements Checkpointed<HashMap<String, PrefixCount>> {
+
+		private static volatile boolean hasFailed = false;
+
+		private final HashMap<String, PrefixCount> aggregationMap = new HashMap<String, PrefixCount>();
+		
+		private final long numElements;
+		
+		private long failurePos;
+		private long count;
+		
+
+		OnceFailingAggregator(long numElements) {
+			this.numElements = numElements;
+		}
+		
+		@Override
+		public void open(Configuration parameters) {
+			long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+			long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+			count = 0;
+		}
+
+		@Override
+		public void flatMap(PrefixCount value, Collector<PrefixCount> out) throws Exception {
+			count++;
+			if (!hasFailed && count >= failurePos) {
+				hasFailed = true;
+				throw new Exception("Test Failure");
+			}
+			
+			PrefixCount curr = aggregationMap.get(value.prefix);
+			if (curr == null) {
+				aggregationMap.put(value.prefix, value);
+				out.collect(value);
+			}
+			else {
+				curr.count += value.count;
+				out.collect(curr);
+			}
+		}
+
+		@Override
+		public HashMap<String, PrefixCount> snapshotState(long checkpointId, long checkpointTimestamp) {
+			return aggregationMap;
+		}
+
+		@Override
+		public void restoreState(HashMap<String, PrefixCount> state) {
+			aggregationMap.putAll(state);
+		}
+	}
+
+	private static class ValidatingSink extends RichSinkFunction<PrefixCount> 
+			implements Checkpointed<HashMap<Character, Long>> {
+
+		@SuppressWarnings("unchecked")
+		private static Map<Character, Long>[] maps = (Map<Character, Long>[]) new Map<?, ?>[PARALLELISM];
+		
+		private HashMap<Character, Long> counts = new HashMap<Character, Long>();
+
+		@Override
+		public void invoke(PrefixCount value) {
+			Character first = value.prefix.charAt(0);
+			Long previous = counts.get(first);
+			if (previous == null) {
+				counts.put(first, value.count);
+			} else {
+				counts.put(first, Math.max(previous, value.count));
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+			maps[getRuntimeContext().getIndexOfThisSubtask()] = counts;
+		}
+
+		@Override
+		public HashMap<Character, Long> snapshotState(long checkpointId, long checkpointTimestamp) {
+			return counts;
+		}
+
+		@Override
+		public void restoreState(HashMap<Character, Long> state) {
+			counts.putAll(state);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom Type Classes
+	// --------------------------------------------------------------------------------------------
+
+	public static class PrefixCount implements Serializable {
+
+		public String prefix;
+		public String value;
+		public long count;
+
+		public PrefixCount() {}
+
+		public PrefixCount(String prefix, String value, long count) {
+			this.prefix = prefix;
+			this.value = value;
+			this.count = count;
+		}
+
+		@Override
+		public String toString() {
+			return prefix + " / " + value;
+		}
+	}
+}


[8/8] flink git commit: [FLINK-2402] [streaming] Add a stream checkpoint barrier tracker.

Posted by se...@apache.org.
[FLINK-2402] [streaming] Add a stream checkpoint barrier tracker.

The BarrierTracker is non-blocking and only counts barriers.
That way, it does not increase latency of records in the stream, but can only be
used to obtain "at least once" processing guarantees.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f87b716
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f87b716
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f87b716

Branch: refs/heads/master
Commit: 8f87b7164b644ea8f1708f7eb76567e58341b224
Parents: 0579f90
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 26 19:05:30 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:07 2015 +0200

----------------------------------------------------------------------
 .../streaming/runtime/io/BarrierTracker.java    | 194 +++++++++
 .../runtime/io/BarrierTrackerTest.java          | 404 +++++++++++++++++++
 2 files changed, 598 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8f87b716/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
new file mode 100644
index 0000000..6b24556
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+/**
+ * The BarrierTracker keeps track of what checkpoint barriers have been received from
+ * which input channels. 
+ * 
+ * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input
+ * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing
+ * guarantees. It can, however, be used to gain "at least once" processing guarantees.</p>
+ */
+public class BarrierTracker implements CheckpointBarrierHandler {
+
+	private static final int MAX_CHECKPOINTS_TO_TRACK = 50;
+	
+	private final InputGate inputGate;
+	
+	private final int totalNumberOfInputChannels;
+	
+	private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
+	
+	private EventListener<CheckpointBarrier> checkpointHandler;
+	
+	private long latestPendingCheckpointID = -1;
+	
+	public BarrierTracker(InputGate inputGate) {
+		this.inputGate = inputGate;
+		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
+		this.pendingCheckpoints = new ArrayDeque<CheckpointBarrierCount>();
+	}
+
+	@Override
+	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+		while (true) {
+			BufferOrEvent next = inputGate.getNextBufferOrEvent();
+			if (next == null) {
+				return null;
+			}
+			else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
+				return next;
+			}
+			else {
+				processBarrier((CheckpointBarrier) next.getEvent());
+			}
+		}
+	}
+
+	@Override
+	public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
+		if (this.checkpointHandler == null) {
+			this.checkpointHandler = checkpointHandler;
+		}
+		else {
+			throw new IllegalStateException("BarrierTracker already has a registered checkpoint handler");
+		}
+	}
+
+	@Override
+	public void cleanup() {
+		pendingCheckpoints.clear();
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return pendingCheckpoints.isEmpty();
+	}
+
+	private void processBarrier(CheckpointBarrier receivedBarrier) {
+		// fast path for single channel trackers
+		if (totalNumberOfInputChannels == 1) {
+			if (checkpointHandler != null) {
+				checkpointHandler.onEvent(receivedBarrier);
+			}
+			return;
+		}
+		
+		// general path for multiple input channels
+		final long barrierId = receivedBarrier.getId();
+
+		// find the checkpoint barrier in the queue of bending barriers
+		CheckpointBarrierCount cbc = null;
+		int pos = 0;
+		
+		for (CheckpointBarrierCount next : pendingCheckpoints) {
+			if (next.checkpointId == barrierId) {
+				cbc = next;
+				break;
+			}
+			pos++;
+		}
+		
+		if (cbc != null) {
+			// add one to the count to that barrier and check for completion
+			int numBarriersNew = cbc.incrementBarrierCount();
+			if (numBarriersNew == totalNumberOfInputChannels) {
+				// checkpoint can be triggered
+				// first, remove this checkpoint and all all prior pending
+				// checkpoints (which are now subsumed)
+				for (int i = 0; i <= pos; i++) {
+					pendingCheckpoints.pollFirst();
+				}
+				
+				// notify the listener
+				if (checkpointHandler != null) {
+					checkpointHandler.onEvent(receivedBarrier);
+				}
+			}
+		}
+		else {
+			// first barrier for that checkpoint ID
+			// add it only if it is newer than the latest checkpoint.
+			// if it is not newer than the latest checkpoint ID, then there cannot be a
+			// successful checkpoint for that ID anyways
+			if (barrierId > latestPendingCheckpointID) {
+				latestPendingCheckpointID = barrierId;
+				pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));
+				
+				// make sure we do not track too many checkpoints
+				if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
+					pendingCheckpoints.pollFirst();
+				}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Simple class for a checkpoint ID with a barrier counter.
+	 */
+	private static final class CheckpointBarrierCount {
+		
+		private final long checkpointId;
+		
+		private int barrierCount;
+		
+		private CheckpointBarrierCount(long checkpointId) {
+			this.checkpointId = checkpointId;
+			this.barrierCount = 1;
+		}
+
+		public int incrementBarrierCount() {
+			return ++barrierCount;
+		}
+		
+		@Override
+		public int hashCode() {
+			return (int) ((checkpointId >>> 32) ^ checkpointId) + 17 * barrierCount; 
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof  CheckpointBarrierCount) {
+				CheckpointBarrierCount that = (CheckpointBarrierCount) obj;
+				return this.checkpointId == that.checkpointId && this.barrierCount == that.barrierCount;
+			}
+			else {
+				return false;
+			}
+		}
+
+		@Override
+		public String toString() {
+			return String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f87b716/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
new file mode 100644
index 0000000..b2c570e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the behavior of the barrier tracker.
+ */
+public class BarrierTrackerTest {
+
+	@Test
+	public void testSingleChannelNoBarriers() {
+		try {
+			BufferOrEvent[] sequence = { createBuffer(0), createBuffer(0), createBuffer(0) };
+
+			MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+			BarrierTracker tracker = new BarrierTracker(gate);
+
+			for (BufferOrEvent boe : sequence) {
+				assertEquals(boe, tracker.getNextNonBlocked());
+			}
+			
+			assertNull(tracker.getNextNonBlocked());
+			assertNull(tracker.getNextNonBlocked());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testMultiChannelNoBarriers() {
+		try {
+			BufferOrEvent[] sequence = { createBuffer(2), createBuffer(2), createBuffer(0),
+					createBuffer(1), createBuffer(0), createBuffer(3),
+					createBuffer(1), createBuffer(1), createBuffer(2)
+			};
+
+			MockInputGate gate = new MockInputGate(4, Arrays.asList(sequence));
+			BarrierTracker tracker = new BarrierTracker(gate);
+
+			for (BufferOrEvent boe : sequence) {
+				assertEquals(boe, tracker.getNextNonBlocked());
+			}
+
+			assertNull(tracker.getNextNonBlocked());
+			assertNull(tracker.getNextNonBlocked());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSingleChannelWithBarriers() {
+		try {
+			BufferOrEvent[] sequence = {
+					createBuffer(0), createBuffer(0), createBuffer(0),
+					createBarrier(1, 0),
+					createBuffer(0), createBuffer(0), createBuffer(0), createBuffer(0),
+					createBarrier(2, 0), createBarrier(3, 0),
+					createBuffer(0), createBuffer(0),
+					createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0),
+					createBuffer(0)
+			};
+
+			MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+			BarrierTracker tracker = new BarrierTracker(gate);
+
+			CheckpointSequenceValidator validator =
+					new CheckpointSequenceValidator(1, 2, 3, 4, 5, 6);
+			tracker.registerCheckpointEventHandler(validator);
+			
+			for (BufferOrEvent boe : sequence) {
+				if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
+					assertEquals(boe, tracker.getNextNonBlocked());
+				}
+			}
+
+			assertNull(tracker.getNextNonBlocked());
+			assertNull(tracker.getNextNonBlocked());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSingleChannelWithSkippedBarriers() {
+		try {
+			BufferOrEvent[] sequence = {
+					createBuffer(0),
+					createBarrier(1, 0),
+					createBuffer(0), createBuffer(0),
+					createBarrier(3, 0), createBuffer(0),
+					createBarrier(4, 0), createBarrier(6, 0), createBuffer(0),
+					createBarrier(7, 0), createBuffer(0), createBarrier(10, 0),
+					createBuffer(0)
+			};
+
+			MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+			BarrierTracker tracker = new BarrierTracker(gate);
+
+			CheckpointSequenceValidator validator =
+					new CheckpointSequenceValidator(1, 3, 4, 6, 7, 10);
+			tracker.registerCheckpointEventHandler(validator);
+
+			for (BufferOrEvent boe : sequence) {
+				if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
+					assertEquals(boe, tracker.getNextNonBlocked());
+				}
+			}
+
+			assertNull(tracker.getNextNonBlocked());
+			assertNull(tracker.getNextNonBlocked());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testMultiChannelWithBarriers() {
+		try {
+			BufferOrEvent[] sequence = {
+					createBuffer(0), createBuffer(2), createBuffer(0),
+					createBarrier(1, 1), createBarrier(1, 2),
+					createBuffer(2), createBuffer(1),
+					createBarrier(1, 0),
+					
+					createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2),
+					createBarrier(2, 0), createBarrier(2, 1), createBarrier(2, 2),
+					
+					createBuffer(2), createBuffer(2),
+					createBarrier(3, 2),
+					createBuffer(2), createBuffer(2),
+					createBarrier(3, 0), createBarrier(3, 1),
+					
+					createBarrier(4, 1), createBarrier(4, 2), createBarrier(4, 0),
+					
+					createBuffer(0)
+			};
+
+			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			BarrierTracker tracker = new BarrierTracker(gate);
+
+			CheckpointSequenceValidator validator =
+					new CheckpointSequenceValidator(1, 2, 3, 4);
+			tracker.registerCheckpointEventHandler(validator);
+
+			for (BufferOrEvent boe : sequence) {
+				if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
+					assertEquals(boe, tracker.getNextNonBlocked());
+				}
+			}
+
+			assertNull(tracker.getNextNonBlocked());
+			assertNull(tracker.getNextNonBlocked());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testMultiChannelSkippingCheckpoints() {
+		try {
+			BufferOrEvent[] sequence = {
+					createBuffer(0), createBuffer(2), createBuffer(0),
+					createBarrier(1, 1), createBarrier(1, 2),
+					createBuffer(2), createBuffer(1),
+					createBarrier(1, 0),
+
+					createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2),
+					createBarrier(2, 0), createBarrier(2, 1), createBarrier(2, 2),
+
+					createBuffer(2), createBuffer(2),
+					createBarrier(3, 2),
+					createBuffer(2), createBuffer(2),
+					
+					// jump to checkpoint 4
+					createBarrier(4, 0),
+					createBuffer(0), createBuffer(1), createBuffer(2),
+					createBarrier(4, 1),
+					createBuffer(1),
+					createBarrier(4, 2),
+					
+					createBuffer(0)
+			};
+
+			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			BarrierTracker tracker = new BarrierTracker(gate);
+
+			CheckpointSequenceValidator validator =
+					new CheckpointSequenceValidator(1, 2, 4);
+			tracker.registerCheckpointEventHandler(validator);
+
+			for (BufferOrEvent boe : sequence) {
+				if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
+					assertEquals(boe, tracker.getNextNonBlocked());
+				}
+			}
+			
+			assertNull(tracker.getNextNonBlocked());
+			assertNull(tracker.getNextNonBlocked());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * This test validates that the barrier tracker does not immediately
+	 * discard a pending checkpoint as soon as it sees a barrier from a
+	 * later checkpoint from some channel.
+	 * 
+	 * This behavior is crucial, otherwise topologies where different inputs
+	 * have different latency (and that latency is close to or higher than the
+	 * checkpoint interval) may skip many checkpoints, or fail to complete a
+	 * checkpoint all together.
+	 */
+	@Test
+	public void testCompleteCheckpointsOnLateBarriers() {
+		try {
+			BufferOrEvent[] sequence = {
+					// checkpoint 2
+					createBuffer(1), createBuffer(1), createBuffer(0), createBuffer(2),
+					createBarrier(2, 1), createBarrier(2, 0), createBarrier(2, 2),
+					
+					// incomplete checkpoint 3
+					createBuffer(1), createBuffer(0),
+					createBarrier(3, 1), createBarrier(3, 2),
+					
+					// some barriers from checkpoint 4
+					createBuffer(1), createBuffer(0),
+					createBarrier(4, 2), createBarrier(4, 1),
+					createBuffer(1), createBuffer(2),
+	
+					// last barrier from checkpoint 3
+					createBarrier(3, 0),
+					
+					// complete checkpoint 4
+					createBuffer(0), createBarrier(4, 0),
+					
+					// regular checkpoint 5
+					createBuffer(1), createBuffer(2), createBarrier(5, 1), 
+					createBuffer(0), createBarrier(5, 0),
+					createBuffer(1), createBarrier(5, 2),
+					
+					// checkpoint 6 (incomplete),
+					createBuffer(1), createBarrier(6, 1),
+					createBuffer(0), createBarrier(6, 0),
+					
+					// checkpoint 7, with early barriers for checkpoints 8 and 9
+					createBuffer(1), createBarrier(7, 1),
+					createBuffer(0), createBarrier(7, 2),
+					createBuffer(2), createBarrier(8, 2), 
+					createBuffer(0), createBarrier(8, 1),
+					createBuffer(1), createBarrier(9, 1),
+					
+					// complete checkpoint 7, first barriers from checkpoint 10
+					createBarrier(7, 0),
+					createBuffer(0), createBarrier(9, 2),
+					createBuffer(2), createBarrier(10, 2),
+					
+					// complete checkpoint 8 and 9
+					createBarrier(8, 0),
+					createBuffer(1), createBuffer(2), createBarrier(9, 0),
+					
+					// trailing data
+					createBuffer(1), createBuffer(0), createBuffer(2)
+			};
+
+			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			BarrierTracker tracker = new BarrierTracker(gate);
+
+			CheckpointSequenceValidator validator =
+					new CheckpointSequenceValidator(2, 3, 4, 5, 7, 8, 9);
+			tracker.registerCheckpointEventHandler(validator);
+
+			for (BufferOrEvent boe : sequence) {
+				if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
+					assertEquals(boe, tracker.getNextNonBlocked());
+				}
+			}
+
+			assertNull(tracker.getNextNonBlocked());
+			assertNull(tracker.getNextNonBlocked());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utils
+	// ------------------------------------------------------------------------
+
+	private static BufferOrEvent createBarrier(long id, int channel) {
+		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+	}
+
+	private static BufferOrEvent createBuffer(int channel) {
+		return new BufferOrEvent(
+				new Buffer(new MemorySegment(new byte[] { 1 }),  DummyBufferRecycler.INSTANCE), channel);
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Testing Mocks
+	// ------------------------------------------------------------------------
+
+	private static class MockInputGate implements InputGate {
+
+		private final int numChannels;
+		private final Queue<BufferOrEvent> boes;
+
+		public MockInputGate(int numChannels, List<BufferOrEvent> boes) {
+			this.numChannels = numChannels;
+			this.boes = new ArrayDeque<BufferOrEvent>(boes);
+		}
+
+		@Override
+		public int getNumberOfInputChannels() {
+			return numChannels;
+		}
+
+		@Override
+		public boolean isFinished() {
+			return boes.isEmpty();
+		}
+
+		@Override
+		public void requestPartitions() {}
+
+		@Override
+		public BufferOrEvent getNextBufferOrEvent() {
+			return boes.poll();
+		}
+
+		@Override
+		public void sendTaskEvent(TaskEvent event) {}
+
+		@Override
+		public void registerListener(EventListener<InputGate> listener) {}
+	}
+
+	private static class CheckpointSequenceValidator implements EventListener<CheckpointBarrier> {
+
+		private final long[] checkpointIDs;
+		
+		private int i = 0;
+
+		private CheckpointSequenceValidator(long... checkpointIDs) {
+			this.checkpointIDs = checkpointIDs;
+		}
+		
+		@Override
+		public void onEvent(CheckpointBarrier barrier) {
+			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
+			assertNotNull(barrier);
+			assertEquals("wrong checkpoint id", checkpointIDs[i++], barrier.getId());
+			assertTrue(barrier.getTimestamp() > 0);
+		}
+	}
+}


[6/8] flink git commit: [FLINK-2406] [streaming] Abstract and improve stream alignment via the BarrierBuffer

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index cb5e046..ad61c6f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,152 +18,652 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
-import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class BarrierBufferTest {
-
-	@Test
-	public void testWithoutBarriers() throws IOException, InterruptedException {
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
 
-		List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
-		input.add(createBuffer(0));
-		input.add(createBuffer(0));
-		input.add(createBuffer(0));
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
-		InputGate mockIG = new MockInputGate(1, input);
-		AbstractReader mockAR = new MockReader(mockIG);
+/**
+ * Tests for the behavior of the {@link BarrierBuffer}.
+ */
+public class BarrierBufferTest {
 
-		BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
+	private static int SIZE_COUNTER = 0;
+	
+	private static IOManager IO_MANAGER;
 
-		assertEquals(input.get(0), bb.getNextNonBlocked());
-		assertEquals(input.get(1), bb.getNextNonBlocked());
-		assertEquals(input.get(2), bb.getNextNonBlocked());
+	@BeforeClass
+	public static void setup() {
+		IO_MANAGER = new IOManagerAsync();
+		SIZE_COUNTER = 1;
+	}
 
-		bb.cleanup();
+	@AfterClass
+	public static void shutdownIOManager() {
+		IO_MANAGER.shutdown();
 	}
 
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Validates that the buffer behaves correctly if no checkpoint barriers come,
+	 * for a single input channel.
+	 */
 	@Test
-	public void testOneChannelBarrier() throws IOException, InterruptedException {
+	public void testSingleChannelNoBarriers() {
+		try {
+			BufferOrEvent[] sequence = { 
+					createBuffer(0), createBuffer(0), createBuffer(0),
+					createEndOfPartition(0)
+			};
+
+			MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+			for (BufferOrEvent boe : sequence) {
+				assertEquals(boe, buffer.getNextNonBlocked());
+			}
+			
+			assertNull(buffer.getNextNonBlocked());
+			assertNull(buffer.getNextNonBlocked());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 
-		List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
-		input.add(createBuffer(0));
-		input.add(createBuffer(0));
-		input.add(createBarrier(1, 0));
-		input.add(createBuffer(0));
-		input.add(createBuffer(0));
-		input.add(createBarrier(2, 0));
-		input.add(createBuffer(0));
+	/**
+	 * Validates that the buffer behaves correctly if no checkpoint barriers come,
+	 * for an input with multiple input channels.
+	 */
+	@Test
+	public void testMultiChannelNoBarriers() {
+		try {
+			BufferOrEvent[] sequence = { createBuffer(2), createBuffer(2), createBuffer(0),
+					createBuffer(1), createBuffer(0), createEndOfPartition(0),
+					createBuffer(3), createBuffer(1), createEndOfPartition(3),
+					createBuffer(1), createEndOfPartition(1), createBuffer(2), createEndOfPartition(2)
+			};
+
+			MockInputGate gate = new MockInputGate(4, Arrays.asList(sequence));
+			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+			for (BufferOrEvent boe : sequence) {
+				assertEquals(boe, buffer.getNextNonBlocked());
+			}
+
+			assertNull(buffer.getNextNonBlocked());
+			assertNull(buffer.getNextNonBlocked());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 
-		InputGate mockIG = new MockInputGate(1, input);
-		AbstractReader mockAR = new MockReader(mockIG);
+	/**
+	 * Validates that the buffer preserved the order of elements for a 
+	 * input with a single input channel, and checkpoint events.
+	 */
+	@Test
+	public void testSingleChannelWithBarriers() {
+		try {
+			BufferOrEvent[] sequence = {
+					createBuffer(0), createBuffer(0), createBuffer(0),
+					createBarrier(1, 0),
+					createBuffer(0), createBuffer(0), createBuffer(0), createBuffer(0),
+					createBarrier(2, 0), createBarrier(3, 0),
+					createBuffer(0), createBuffer(0),
+					createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0),
+					createBuffer(0), createEndOfPartition(0)
+			};
+
+			MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+			buffer.registerCheckpointEventHandler(handler);
+			handler.setNextExpectedCheckpointId(1L);
+			
+			for (BufferOrEvent boe : sequence) {
+				if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
+					assertEquals(boe, buffer.getNextNonBlocked());
+				}
+			}
+
+			assertNull(buffer.getNextNonBlocked());
+			assertNull(buffer.getNextNonBlocked());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 
-		BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
-		BufferOrEvent nextBoe;
+	/**
+	 * Validates that the buffer correctly aligns the streams for inputs with
+	 * multiple input channels, by buffering and blocking certain inputs.
+	 */
+	@Test
+	public void testMultiChannelWithBarriers() {
+		try {
+			BufferOrEvent[] sequence = {
+					// checkpoint with blocked data
+					createBuffer(0), createBuffer(2), createBuffer(0),
+					createBarrier(1, 1), createBarrier(1, 2),
+					createBuffer(2), createBuffer(1), createBuffer(0),
+					createBarrier(1, 0),
+					
+					// checkpoint without blocked data
+					createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2),
+					createBarrier(2, 0), createBarrier(2, 1), createBarrier(2, 2),
+					
+					// checkpoint with data only from one channel
+					createBuffer(2), createBuffer(2),
+					createBarrier(3, 2),
+					createBuffer(2), createBuffer(2),
+					createBarrier(3, 0), createBarrier(3, 1),
+					
+					// empty checkpoint
+					createBarrier(4, 1), createBarrier(4, 2), createBarrier(4, 0),
+
+					// checkpoint with blocked data in mixed order
+					createBuffer(0), createBuffer(2), createBuffer(0),
+					createBarrier(5, 1),
+					createBuffer(2), createBuffer(0), createBuffer(2), createBuffer(1),
+					createBarrier(5, 2),
+					createBuffer(1), createBuffer(0), createBuffer(2), createBuffer(1),
+					createBarrier(5, 0),
+					
+					// some trailing data
+					createBuffer(0),
+					createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2)
+			};
+
+			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+			buffer.registerCheckpointEventHandler(handler);
+			handler.setNextExpectedCheckpointId(1L);
+			
+			// pre checkpoint 1
+			check(sequence[0], buffer.getNextNonBlocked());
+			check(sequence[1], buffer.getNextNonBlocked());
+			check(sequence[2], buffer.getNextNonBlocked());
+			assertEquals(1L, handler.getNextExpectedCheckpointId());
+
+			// blocking while aligning for checkpoint 1
+			check(sequence[7], buffer.getNextNonBlocked());
+			assertEquals(1L, handler.getNextExpectedCheckpointId());
+
+			// checkpoint 1 done, returning buffered data
+			check(sequence[5], buffer.getNextNonBlocked());
+			assertEquals(2L, handler.getNextExpectedCheckpointId());
+			check(sequence[6], buffer.getNextNonBlocked());
+
+			// pre checkpoint 2
+			check(sequence[9], buffer.getNextNonBlocked());
+			check(sequence[10], buffer.getNextNonBlocked());
+			check(sequence[11], buffer.getNextNonBlocked());
+			check(sequence[12], buffer.getNextNonBlocked());
+			check(sequence[13], buffer.getNextNonBlocked());
+			assertEquals(2L, handler.getNextExpectedCheckpointId());
+			
+			// checkpoint 2 barriers come together
+			check(sequence[17], buffer.getNextNonBlocked());
+			assertEquals(3L, handler.getNextExpectedCheckpointId());
+			check(sequence[18], buffer.getNextNonBlocked());
+
+			// checkpoint 3 starts, data buffered
+			check(sequence[20], buffer.getNextNonBlocked());
+			assertEquals(4L, handler.getNextExpectedCheckpointId());
+			check(sequence[21], buffer.getNextNonBlocked());
+
+			// checkpoint 4 happens without extra data
+			
+			// pre checkpoint 5
+			check(sequence[27], buffer.getNextNonBlocked());
+			assertEquals(5L, handler.getNextExpectedCheckpointId());
+			check(sequence[28], buffer.getNextNonBlocked());
+			check(sequence[29], buffer.getNextNonBlocked());
+			
+			// checkpoint 5 aligning
+			check(sequence[31], buffer.getNextNonBlocked());
+			check(sequence[32], buffer.getNextNonBlocked());
+			check(sequence[33], buffer.getNextNonBlocked());
+			check(sequence[37], buffer.getNextNonBlocked());
+			
+			// buffered data from checkpoint 5 alignment
+			check(sequence[34], buffer.getNextNonBlocked());
+			check(sequence[36], buffer.getNextNonBlocked());
+			check(sequence[38], buffer.getNextNonBlocked());
+			check(sequence[39], buffer.getNextNonBlocked());
+			
+			// remaining data
+			check(sequence[41], buffer.getNextNonBlocked());
+			check(sequence[42], buffer.getNextNonBlocked());
+			check(sequence[43], buffer.getNextNonBlocked());
+			check(sequence[44], buffer.getNextNonBlocked());
+			
+			assertNull(buffer.getNextNonBlocked());
+			assertNull(buffer.getNextNonBlocked());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 
-		assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
-		assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
-		assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
-		bb.processBarrier(nextBoe);
-		assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
-		assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
-		assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
-		bb.processBarrier(nextBoe);
-		assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
+	@Test
+	public void testMultiChannelTrailingBlockedData() {
+		try {
+			BufferOrEvent[] sequence = {
+					createBuffer(0), createBuffer(1), createBuffer(2),
+					createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0),
+					
+					createBuffer(2), createBuffer(1), createBuffer(0),
+					createBarrier(2, 1),
+					createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2),
+					createBarrier(2, 2),
+					createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)
+			};
+
+			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+			buffer.registerCheckpointEventHandler(handler);
+			handler.setNextExpectedCheckpointId(1L);
+
+			// pre-checkpoint 1
+			check(sequence[0], buffer.getNextNonBlocked());
+			check(sequence[1], buffer.getNextNonBlocked());
+			check(sequence[2], buffer.getNextNonBlocked());
+			assertEquals(1L, handler.getNextExpectedCheckpointId());
+
+			// pre-checkpoint 2
+			check(sequence[6], buffer.getNextNonBlocked());
+			assertEquals(2L, handler.getNextExpectedCheckpointId());
+			check(sequence[7], buffer.getNextNonBlocked());
+			check(sequence[8], buffer.getNextNonBlocked());
+			
+			// checkpoint 2 alignment
+			check(sequence[13], buffer.getNextNonBlocked());
+			check(sequence[14], buffer.getNextNonBlocked());
+			check(sequence[18], buffer.getNextNonBlocked());
+			check(sequence[19], buffer.getNextNonBlocked());
+
+			// end of stream: remaining buffered contents
+			check(sequence[10], buffer.getNextNonBlocked());
+			check(sequence[11], buffer.getNextNonBlocked());
+			check(sequence[12], buffer.getNextNonBlocked());
+			check(sequence[16], buffer.getNextNonBlocked());
+			check(sequence[17], buffer.getNextNonBlocked());
+
+			assertNull(buffer.getNextNonBlocked());
+			assertNull(buffer.getNextNonBlocked());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 
-		bb.cleanup();
+	/**
+	 * Validates that the buffer correctly aligns the streams in cases
+	 * where some channels receive barriers from multiple successive checkpoints
+	 * before the pending checkpoint is complete.
+	 */
+	@Test
+	public void testMultiChannelWithQueuedFutureBarriers() {
+		try {
+			BufferOrEvent[] sequence = {
+					// checkpoint 1 - with blocked data
+					createBuffer(0), createBuffer(2), createBuffer(0),
+					createBarrier(1, 1), createBarrier(1, 2),
+					createBuffer(2), createBuffer(1), createBuffer(0),
+					createBarrier(1, 0),
+					createBuffer(1), createBuffer(0),
+
+					// checkpoint 2 - where future checkpoint barriers come before
+					// the current checkpoint is complete
+					createBarrier(2, 1),
+					createBuffer(1), createBuffer(2), createBarrier(2, 0),
+					createBarrier(3, 0), createBuffer(0),
+					createBarrier(3, 1), createBuffer(0), createBuffer(1), createBuffer(2),
+					createBarrier(4, 1), createBuffer(1), createBuffer(2),
+
+					// complete checkpoint 2, send a barrier for checkpoints 4 and 5
+					createBarrier(2, 2),
+					createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
+					createBarrier(4, 0),
+					createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
+					createBarrier(5, 1),
+
+					// complete checkpoint 3
+					createBarrier(3, 2),
+					createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
+					createBarrier(6, 1),
+					
+					// complete checkpoint 4, checkpoint 5 remains not fully triggered
+					createBarrier(4, 2),
+					createBuffer(2),
+					createBuffer(1), createEndOfPartition(1),
+					createBuffer(2), createEndOfPartition(2),
+					createBuffer(0), createEndOfPartition(0)
+			};
+
+			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+			buffer.registerCheckpointEventHandler(handler);
+			handler.setNextExpectedCheckpointId(1L);
+
+			// around checkpoint 1
+			check(sequence[0], buffer.getNextNonBlocked());
+			check(sequence[1], buffer.getNextNonBlocked());
+			check(sequence[2], buffer.getNextNonBlocked());
+			check(sequence[7], buffer.getNextNonBlocked());
+			
+			check(sequence[5], buffer.getNextNonBlocked());
+			assertEquals(2L, handler.getNextExpectedCheckpointId());
+			check(sequence[6], buffer.getNextNonBlocked());
+			check(sequence[9], buffer.getNextNonBlocked());
+			check(sequence[10], buffer.getNextNonBlocked());
+
+			// alignment of checkpoint 2 - buffering also some barriers for
+			// checkpoints 3 and 4
+			check(sequence[13], buffer.getNextNonBlocked());
+			check(sequence[20], buffer.getNextNonBlocked());
+			check(sequence[23], buffer.getNextNonBlocked());
+			
+			// checkpoint 2 completed
+			check(sequence[12], buffer.getNextNonBlocked());
+			check(sequence[25], buffer.getNextNonBlocked());
+			check(sequence[27], buffer.getNextNonBlocked());
+			check(sequence[30], buffer.getNextNonBlocked());
+			check(sequence[32], buffer.getNextNonBlocked());
+
+			// checkpoint 3 completed (emit buffered)
+			check(sequence[16], buffer.getNextNonBlocked());
+			check(sequence[18], buffer.getNextNonBlocked());
+			check(sequence[19], buffer.getNextNonBlocked());
+			check(sequence[28], buffer.getNextNonBlocked());
+			
+			// past checkpoint 3
+			check(sequence[36], buffer.getNextNonBlocked());
+			check(sequence[38], buffer.getNextNonBlocked());
+
+			// checkpoint 4 completed (emit buffered)
+			check(sequence[22], buffer.getNextNonBlocked());
+			check(sequence[26], buffer.getNextNonBlocked());
+			check(sequence[31], buffer.getNextNonBlocked());
+			check(sequence[33], buffer.getNextNonBlocked());
+			check(sequence[39], buffer.getNextNonBlocked());
+			
+			// past checkpoint 4, alignment for checkpoint 5
+			check(sequence[42], buffer.getNextNonBlocked());
+			check(sequence[45], buffer.getNextNonBlocked());
+			check(sequence[46], buffer.getNextNonBlocked());
+			check(sequence[47], buffer.getNextNonBlocked());
+			check(sequence[48], buffer.getNextNonBlocked());
+			
+			// end of input, emit remainder
+			check(sequence[37], buffer.getNextNonBlocked());
+			check(sequence[43], buffer.getNextNonBlocked());
+			check(sequence[44], buffer.getNextNonBlocked());
+			
+			assertNull(buffer.getNextNonBlocked());
+			assertNull(buffer.getNextNonBlocked());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 
+	/**
+	 * Validates that the buffer skips over the current checkpoint if it
+	 * receives a barrier from a later checkpoint on a non-blocked input.
+	 */
 	@Test
-	public void testMultiChannelBarrier() throws IOException, InterruptedException {
-
-		List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
-		input.add(createBuffer(0));
-		input.add(createBuffer(1));
-		input.add(createBarrier(1, 0));
-		input.add(createBarrier(2, 0));
-		input.add(createBuffer(0));
-		input.add(createBarrier(3, 0));
-		input.add(createBuffer(0));
-		input.add(createBuffer(1));
-		input.add(createBarrier(1, 1));
-		input.add(createBuffer(0));
-		input.add(createBuffer(1));
-		input.add(createBarrier(2, 1));
-		input.add(createBarrier(3, 1));
-		input.add(createBarrier(4, 0));
-		input.add(createBuffer(0));
-		input.add(new BufferOrEvent(new EndOfPartitionEvent(), 1));
-		
+	public void testMultiChannelSkippingCheckpoints() {
+		try {
+			BufferOrEvent[] sequence = {
+					// checkpoint 1 - with blocked data
+					createBuffer(0), createBuffer(2), createBuffer(0),
+					createBarrier(1, 1), createBarrier(1, 2),
+					createBuffer(2), createBuffer(1), createBuffer(0),
+					createBarrier(1, 0),
+					createBuffer(1), createBuffer(0),
+
+					// checkpoint 2 will not complete: pre-mature barrier from checkpoint 3
+					createBarrier(2, 1),
+					createBuffer(1), createBuffer(2),
+					createBarrier(2, 0),
+					createBuffer(2), createBuffer(0),
+					createBarrier(3, 2),
+					
+					createBuffer(2),
+					createBuffer(1), createEndOfPartition(1),
+					createBuffer(2), createEndOfPartition(2),
+					createBuffer(0), createEndOfPartition(0)
+			};
+
+			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+			buffer.registerCheckpointEventHandler(handler);
+			handler.setNextExpectedCheckpointId(1L);
+
+			// checkpoint 1
+			check(sequence[0], buffer.getNextNonBlocked());
+			check(sequence[1], buffer.getNextNonBlocked());
+			check(sequence[2], buffer.getNextNonBlocked());
+			check(sequence[7], buffer.getNextNonBlocked());
+			assertEquals(1L, buffer.getCurrentCheckpointId());
+			
+			check(sequence[5], buffer.getNextNonBlocked());
+			check(sequence[6], buffer.getNextNonBlocked());
+			check(sequence[9], buffer.getNextNonBlocked());
+			check(sequence[10], buffer.getNextNonBlocked());
+
+			// alignment of checkpoint 2
+			check(sequence[13], buffer.getNextNonBlocked());
+			assertEquals(2L, buffer.getCurrentCheckpointId());
+			check(sequence[15], buffer.getNextNonBlocked());
+
+			// checkpoint 2 aborted, checkpoint 3 started
+			check(sequence[12], buffer.getNextNonBlocked());
+			assertEquals(3L, buffer.getCurrentCheckpointId());
+			check(sequence[16], buffer.getNextNonBlocked());
+			check(sequence[19], buffer.getNextNonBlocked());
+			check(sequence[20], buffer.getNextNonBlocked());
+			check(sequence[23], buffer.getNextNonBlocked());
+			check(sequence[24], buffer.getNextNonBlocked());
+
+			// end of input, emit remainder
+			check(sequence[18], buffer.getNextNonBlocked());
+			check(sequence[21], buffer.getNextNonBlocked());
+			check(sequence[22], buffer.getNextNonBlocked());
+
+			assertNull(buffer.getNextNonBlocked());
+			assertNull(buffer.getNextNonBlocked());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 
-		InputGate mockIG1 = new MockInputGate(2, input);
-		AbstractReader mockAR1 = new MockReader(mockIG1);
+	/**
+	 * Validates that the buffer skips over a later checkpoint if it
+	 * receives a barrier from an even later checkpoint on a blocked input.
+	 * 
+	 * NOTE: This test currently fails, because the barrier buffer does not support
+	 * to unblock inputs before all previously unblocked data is consumed.
+	 * 
+	 * Since this test checks only that the buffer behaves "failsafe" in cases of
+	 * corrupt checkpoint barrier propagation (a situation that does not occur
+	 * under the current model), we ignore it for the moment.
+	 */
+//	@Test
+	public void testMultiChannelSkippingCheckpointsViaBlockedInputs() {
+		try {
+			BufferOrEvent[] sequence = {
+					// checkpoint 1 - with blocked data
+					createBuffer(0), createBuffer(2), createBuffer(0),
+					createBarrier(1, 1), createBarrier(1, 2),
+					createBuffer(2), createBuffer(1), createBuffer(0),
+					createBarrier(1, 0),
+					createBuffer(1), createBuffer(0),
+
+					// checkpoint 2 will not complete: pre-mature barrier from checkpoint 3
+					createBarrier(2, 1),
+					createBuffer(1), createBuffer(2),
+					createBarrier(2, 0),
+					createBuffer(1), createBuffer(0),
+
+					createBarrier(4, 1), // pre-mature barrier on blocked input
+					createBarrier(3, 0), // queued barrier, ignored on replay
+
+					// complete checkpoint 2
+					createBarrier(2, 0),
+					createBuffer(0),
+					
+					createBarrier(3, 2), // should be ignored
+					createBuffer(2),
+					createBarrier(4, 0),
+					createBuffer(0), createBuffer(1), createBuffer(2),
+					createBarrier(4, 1),
+					
+					createBuffer(1), createEndOfPartition(1),
+					createBuffer(2), createEndOfPartition(2),
+					createBuffer(0), createEndOfPartition(0)
+			};
+
+			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+			// checkpoint 1
+			check(sequence[0], buffer.getNextNonBlocked());
+			check(sequence[1], buffer.getNextNonBlocked());
+			check(sequence[2], buffer.getNextNonBlocked());
+			check(sequence[7], buffer.getNextNonBlocked());
+			assertEquals(1L, buffer.getCurrentCheckpointId());
+			check(sequence[5], buffer.getNextNonBlocked());
+			check(sequence[6], buffer.getNextNonBlocked());
+			check(sequence[9], buffer.getNextNonBlocked());
+			check(sequence[10], buffer.getNextNonBlocked());
+
+			// alignment of checkpoint 2
+			check(sequence[13], buffer.getNextNonBlocked());
+			assertEquals(2L, buffer.getCurrentCheckpointId());
+
+			// checkpoint 2 completed
+			check(sequence[12], buffer.getNextNonBlocked());
+			check(sequence[15], buffer.getNextNonBlocked());
+			check(sequence[16], buffer.getNextNonBlocked());
+			
+			// checkpoint 3 skipped, alignment for 4 started
+			check(sequence[20], buffer.getNextNonBlocked());
+			assertEquals(4L, buffer.getCurrentCheckpointId());
+			check(sequence[22], buffer.getNextNonBlocked());
+			check(sequence[26], buffer.getNextNonBlocked());
+
+			// checkpoint 4 completed
+			check(sequence[24], buffer.getNextNonBlocked());
+			check(sequence[25], buffer.getNextNonBlocked());
+
+			check(sequence[28], buffer.getNextNonBlocked());
+			check(sequence[29], buffer.getNextNonBlocked());
+			check(sequence[30], buffer.getNextNonBlocked());
+			check(sequence[31], buffer.getNextNonBlocked());
+			check(sequence[32], buffer.getNextNonBlocked());
+			check(sequence[33], buffer.getNextNonBlocked());
+			
+			assertNull(buffer.getNextNonBlocked());
+			assertNull(buffer.getNextNonBlocked());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 
-		BarrierBuffer bb = new BarrierBuffer(mockIG1, mockAR1);
-		BufferOrEvent nextBoe;
+	// ------------------------------------------------------------------------
+	//  Utils
+	// ------------------------------------------------------------------------
 
-		check(input.get(0), nextBoe = bb.getNextNonBlocked());
-		check(input.get(1), nextBoe = bb.getNextNonBlocked());
-		check(input.get(2), nextBoe = bb.getNextNonBlocked());
-		bb.processBarrier(nextBoe);
-		check(input.get(7), nextBoe = bb.getNextNonBlocked());
-		check(input.get(8), nextBoe = bb.getNextNonBlocked());
-		bb.processBarrier(nextBoe);
-		check(input.get(3), nextBoe = bb.getNextNonBlocked());
-		bb.processBarrier(nextBoe);
-		check(input.get(10), nextBoe = bb.getNextNonBlocked());
-		check(input.get(11), nextBoe = bb.getNextNonBlocked());
-		bb.processBarrier(nextBoe);
-		check(input.get(4), nextBoe = bb.getNextNonBlocked());
-		check(input.get(5), nextBoe = bb.getNextNonBlocked());
-		bb.processBarrier(nextBoe);
-		check(input.get(12), nextBoe = bb.getNextNonBlocked());
-		bb.processBarrier(nextBoe);
-		check(input.get(6), nextBoe = bb.getNextNonBlocked());
-		check(input.get(9), nextBoe = bb.getNextNonBlocked());
-		check(input.get(13), nextBoe = bb.getNextNonBlocked());
-		bb.processBarrier(nextBoe);
-		check(input.get(14), nextBoe = bb.getNextNonBlocked());
-		check(input.get(15), nextBoe = bb.getNextNonBlocked());
+	private static BufferOrEvent createBarrier(long id, int channel) {
+		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+	}
 
-		bb.cleanup();
+	private static BufferOrEvent createBuffer(int channel) {
+		// since we have no access to the contents, we need to use the size as an
+		// identifier to validate correctness here
+		return new BufferOrEvent(
+				new Buffer(new MemorySegment(new byte[SIZE_COUNTER++]),  DummyBufferRecycler.INSTANCE), channel);
 	}
 
-	private static void check(BufferOrEvent expected, BufferOrEvent actual) {
-		assertEquals(expected.isBuffer(), actual.isBuffer());
-		assertEquals(expected.getChannelIndex(), actual.getChannelIndex());
-		if (expected.isEvent()) {
-			assertEquals(expected.getEvent(), actual.getEvent());
+	private static BufferOrEvent createEndOfPartition(int channel) {
+		return new BufferOrEvent(EndOfPartitionEvent.INSTANCE, channel);
+	}
+	
+	private static void check(BufferOrEvent expected, BufferOrEvent present) {
+		assertNotNull(expected);
+		assertNotNull(present);
+		assertEquals(expected.isBuffer(), present.isBuffer());
+		
+		if (expected.isBuffer()) {
+			// since we have no access to the contents, we need to use the size as an
+			// identifier to validate correctness here
+			assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
+		}
+		else {
+			assertEquals(expected.getEvent(), present.getEvent());
 		}
 	}
+	
+	// ------------------------------------------------------------------------
+	//  Testing Mocks
+	// ------------------------------------------------------------------------
 
-	protected static class MockInputGate implements InputGate {
+	private static class MockInputGate implements InputGate {
 
-		private int numChannels;
-		private Queue<BufferOrEvent> boes;
+		private final int numChannels;
+		private final Queue<BufferOrEvent> boes;
 
 		public MockInputGate(int numChannels, List<BufferOrEvent> boes) {
 			this.numChannels = numChannels;
-			this.boes = new LinkedList<BufferOrEvent>(boes);
+			this.boes = new ArrayDeque<BufferOrEvent>(boes);
 		}
 
 		@Override
@@ -176,48 +677,38 @@ public class BarrierBufferTest {
 		}
 
 		@Override
-		public void requestPartitions() throws IOException, InterruptedException {
-		}
+		public void requestPartitions() {}
 
 		@Override
-		public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
-			return boes.remove();
+		public BufferOrEvent getNextBufferOrEvent() {
+			return boes.poll();
 		}
 
 		@Override
-		public void sendTaskEvent(TaskEvent event) throws IOException {
-		}
+		public void sendTaskEvent(TaskEvent event) {}
 
 		@Override
-		public void registerListener(EventListener<InputGate> listener) {
-		}
-
+		public void registerListener(EventListener<InputGate> listener) {}
 	}
 
-	protected static class MockReader extends AbstractReader {
+	private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier> {
+		
+		private long nextExpectedCheckpointId = -1L;
 
-		protected MockReader(InputGate inputGate) {
-			super(inputGate);
+		public void setNextExpectedCheckpointId(long nextExpectedCheckpointId) {
+			this.nextExpectedCheckpointId = nextExpectedCheckpointId;
 		}
 
-		@Override
-		public void setReporter(AccumulatorRegistry.Reporter reporter) {
-
+		public long getNextExpectedCheckpointId() {
+			return nextExpectedCheckpointId;
 		}
-	}
-
-	protected static BufferOrEvent createBarrier(long id, int channel) {
-		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
-	}
 
-	protected static BufferOrEvent createBuffer(int channel) {
-		return new BufferOrEvent(new Buffer(new MemorySegment(new byte[] { 1 }),
-				new BufferRecycler() {
-
-					@Override
-					public void recycle(MemorySegment memorySegment) {
-					}
-				}), channel);
+		@Override
+		public void onEvent(CheckpointBarrier barrier) {
+			assertNotNull(barrier);
+			assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == barrier.getId());
+			assertTrue(barrier.getTimestamp() > 0);
+			nextExpectedCheckpointId++;
+		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
index 23ca86d..3f815ef 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
@@ -25,10 +25,10 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
  * A BufferRecycler that does nothing.
  */
 public class DummyBufferRecycler implements BufferRecycler {
-
+	
 	public static final BufferRecycler INSTANCE = new DummyBufferRecycler();
-
-
+	
+	
 	@Override
 	public void recycle(MemorySegment memorySegment) {}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
index 9934bd9..b6cd656 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
@@ -22,18 +22,36 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class SpillingBufferOrEventTest {
+	
+	private static IOManager IO_MANAGER;
+	
+	@BeforeClass
+	public static void createIOManager() {
+		IO_MANAGER = new IOManagerAsync();
+	}
+	
+	@AfterClass
+	public static void shutdownIOManager() {
+		IO_MANAGER.shutdown();
+	}
 
+	// ------------------------------------------------------------------------
+	
 	@Test
 	public void testSpilling() throws IOException, InterruptedException {
-		BufferSpiller bsp = new BufferSpiller();
+		BufferSpiller bsp = new BufferSpiller(IO_MANAGER);
 		SpillReader spr = new SpillReader();
 
 		BufferPool pool1 = new NetworkBufferPool(10, 256).createBufferPool(2, true);


[2/8] flink git commit: [FLINK-2420] [streaming] StreamRecordWriter properly reports exceptions on flush.

Posted by se...@apache.org.
[FLINK-2420] [streaming] StreamRecordWriter properly reports exceptions on flush.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ba32133
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ba32133
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ba32133

Branch: refs/heads/master
Commit: 8ba321332b994579f387add8bd0855bd29cb33ec
Parents: a0556ef
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 28 15:38:20 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:06 2015 +0200

----------------------------------------------------------------------
 .../runtime/io/RecordWriterOutput.java          |  13 +-
 .../runtime/io/StreamRecordWriter.java          | 142 ++++++++++++++-----
 .../runtime/io/DummyBufferRecycler.java         |  34 +++++
 .../runtime/io/SpillingBufferOrEventTest.java   |   4 +-
 .../runtime/io/StreamRecordWriterTest.java      | 132 +++++++++++++++++
 .../checkpointing/StateCheckpoinedITCase.java   |   4 +-
 .../StreamCheckpointingITCase.java              |  28 ++--
 7 files changed, 296 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index b656bb5..f7d8d47 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -94,15 +94,16 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 
 	@Override
 	public void close() {
-		if (recordWriter instanceof StreamRecordWriter) {
-			((StreamRecordWriter<?>) recordWriter).close();
-		} else {
-			try {
+		try {
+			if (recordWriter instanceof StreamRecordWriter) {
+				((StreamRecordWriter<?>) recordWriter).close();
+			} else {
 				recordWriter.flush();
-			} catch (IOException e) {
-				e.printStackTrace();
 			}
 		}
+		catch (IOException e) {
+			throw new RuntimeException("Failed to flush final output", e);
+		}
 	}
 
 	public void clearBuffers() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index abae9a4..b0e2532 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -23,38 +23,61 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
 
-public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
-
-	private long timeout;
-	private boolean flushAlways = false;
+import static com.google.common.base.Preconditions.checkArgument;
 
-	private OutputFlusher outputFlusher;
+/**
+ * This record writer keeps data in buffers at most for a certain timeout. It spawns a separate thread
+ * that flushes the outputs in a defined interval, to make sure data does not linger in the buffers for too long.
+ * 
+ * @param <T> The type of elements written.
+ */
+public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
 
-	public StreamRecordWriter(ResultPartitionWriter writer) {
-		this(writer, new RoundRobinChannelSelector<T>(), 1000);
-	}
+	/** Default name for teh output flush thread, if no name with a task reference is given */
+	private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
+	
+	
+	/** The thread that periodically flushes the output, to give an upper latency bound */
+	private final OutputFlusher outputFlusher;
+	
+	/** Flag indicating whether the output should be flushed after every element */
+	private final boolean flushAlways;
 
-	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector) {
-		this(writer, channelSelector, 1000);
+	/** The exception encountered in the flushing thread */
+	private Throwable flusherException;
+	
+	
+	
+	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, long timeout) {
+		this(writer, channelSelector, timeout, null);
 	}
-
+	
 	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
-			long timeout) {
+								long timeout, String taskName) {
+		
 		super(writer, channelSelector);
-
-		this.timeout = timeout;
+		
+		checkArgument(timeout < 0);
+		
 		if (timeout == 0) {
 			flushAlways = true;
-		} else {
-			this.outputFlusher = new OutputFlusher();
+			outputFlusher = null;
+		}
+		else {
+			flushAlways = false;
+
+			String threadName = taskName == null ?
+								DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "Output Timeout Flusher - " + taskName;
+			
+			outputFlusher = new OutputFlusher(threadName, timeout);
 			outputFlusher.start();
 		}
 	}
 	
 	@Override
 	public void emit(T record) throws IOException, InterruptedException {
+		checkErroneous();
 		super.emit(record);
 		if (flushAlways) {
 			flush();
@@ -63,46 +86,101 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
 
 	@Override
 	public void broadcastEmit(T record) throws IOException, InterruptedException {
+		checkErroneous();
 		super.broadcastEmit(record);
 		if (flushAlways) {
 			flush();
 		}
 	}
 
-	public void close() {
-		try {
-			if (outputFlusher != null) {
+	/**
+	 * Closes the writer. This stops the flushing thread (if there is one) and flushes all pending outputs.
+	 * 
+	 * @throws IOException I/O errors may happen during the final flush of the buffers.
+	 */
+	public void close() throws IOException {
+		// propagate exceptions
+		flush();
+		
+		if (outputFlusher != null) {
+			try {
 				outputFlusher.terminate();
 				outputFlusher.join();
 			}
+			catch (InterruptedException e) {
+				// ignore on close
+			}
+		}
 
-			flush();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		} catch (InterruptedException e) {
-			// Do nothing here
+		// final check for asynchronous errors, before we exit with a green light
+		checkErroneous();
+	}
+
+	/**
+	 * Notifies the writer that teh output flusher thread encountered an exception.
+	 * 
+	 * @param t The exception to report.
+	 */
+	void notifyFlusherException(Throwable t) {
+		if (this.flusherException == null) {
+			this.flusherException = t;
+		}
+	}
+	
+	private void checkErroneous() throws IOException {
+		if (flusherException != null) {
+			throw new IOException("An exception happened while flushing the outputs", flusherException);
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * A dedicated thread that periodically flushes the output buffers, to set upper latency bounds.
+	 * 
+	 * The thread is daemonic, because it is only a utility thread.
+	 */
 	private class OutputFlusher extends Thread {
+		
+		private final long timeout;
+		
 		private volatile boolean running = true;
 
+		
+		OutputFlusher(String name, long timeout) {
+			super(name);
+			setDaemon(true);
+			this.timeout = timeout;
+		}
+		
 		public void terminate() {
 			running = false;
+			interrupt();
 		}
 
 		@Override
 		public void run() {
-			while (running) {
-				try {
+			try {
+				while (running) {
+					try {
+						Thread.sleep(timeout);
+					}
+					catch (InterruptedException e) {
+						// propagate this if we are still running, because it should not happen
+						// in that case
+						if (running) {
+							throw new Exception(e);
+						}
+					}
+					
+					// any errors here should let the thread come to a halt and be
+					// recognized by the writer 
 					flush();
-					Thread.sleep(timeout);
-				} catch (InterruptedException e) {
-					// Do nothing here
-				} catch (IOException e) {
-					throw new RuntimeException(e);
 				}
 			}
+			catch (Throwable t) {
+				notifyFlusherException(t);
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
new file mode 100644
index 0000000..23ca86d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
+/**
+ * A BufferRecycler that does nothing.
+ */
+public class DummyBufferRecycler implements BufferRecycler {
+
+	public static final BufferRecycler INSTANCE = new DummyBufferRecycler();
+
+
+	@Override
+	public void recycle(MemorySegment memorySegment) {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
index e0fab17..9934bd9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
@@ -26,9 +26,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.streaming.runtime.io.BufferSpiller;
-import org.apache.flink.streaming.runtime.io.SpillReader;
-import org.apache.flink.streaming.runtime.io.SpillingBufferOrEvent;
+
 import org.junit.Test;
 
 public class SpillingBufferOrEventTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
new file mode 100644
index 0000000..b5bece7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * This test uses the PowerMockRunner runner to work around the fact that the 
+ * {@link ResultPartitionWriter} class is final.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ResultPartitionWriter.class)
+public class StreamRecordWriterTest {
+
+	/**
+	 * Verifies that exceptions during flush from the output flush thread are
+	 * recognized in the writer.
+	 */
+	@Test
+	public void testPropagateAsyncFlushError() {
+		FailingWriter<LongValue> testWriter = null;
+		try {
+			ResultPartitionWriter mockResultPartitionWriter = getMockWriter(5);
+			
+			// test writer that flushes every 5ms and fails after 3 flushes
+			testWriter = new FailingWriter<LongValue>(mockResultPartitionWriter,
+					new RoundRobinChannelSelector<LongValue>(), 5, 3);
+			
+			try {
+				long deadline = System.currentTimeMillis() + 20000; // in max 20 seconds (conservative)
+				long l = 0L;
+				
+				while (System.currentTimeMillis() < deadline) {
+					testWriter.emit(new LongValue(l++));
+				}
+				
+				fail("This should have failed with an exception");
+			}
+			catch (IOException e) {
+				assertNotNull(e.getCause());
+				assertTrue(e.getCause().getMessage().contains("Test Exception"));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (testWriter != null) {
+				try {
+					testWriter.close();
+				}
+				catch (IOException e) {
+					// ignore in tests
+				}
+			}
+		}
+	}
+	
+	private static ResultPartitionWriter getMockWriter(int numPartitions) throws Exception {
+		BufferProvider mockProvider = mock(BufferProvider.class);
+		when(mockProvider.requestBufferBlocking()).thenAnswer(new Answer<Buffer>() {
+			@Override
+			public Buffer answer(InvocationOnMock invocation) {
+				return new Buffer(new MemorySegment(new byte[4096]), DummyBufferRecycler.INSTANCE);
+			}
+		});
+		
+		ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class);
+		when(mockWriter.getBufferProvider()).thenReturn(mockProvider);
+		when(mockWriter.getNumberOfOutputChannels()).thenReturn(numPartitions);
+		
+		
+		return mockWriter;
+	}
+	
+	
+	// ------------------------------------------------------------------------
+	
+	private static class FailingWriter<T extends IOReadableWritable> extends StreamRecordWriter<T> {
+		
+		private int flushesBeforeException;
+		
+		private FailingWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
+								long timeout, int flushesBeforeException) {
+			super(writer, channelSelector, timeout);
+			this.flushesBeforeException = flushesBeforeException;
+		}
+
+		@Override
+		public void flush() throws IOException {
+			if (flushesBeforeException-- <= 0) {
+				throw new IOException("Test Exception");
+			}
+			super.flush();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
index 0693665..39ff2e5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
@@ -31,8 +31,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
 import org.apache.flink.util.Collector;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -110,7 +110,7 @@ public class StateCheckpoinedITCase {
 																	"localhost", cluster.getJobManagerRPCPort());
 			env.setParallelism(PARALLELISM);
 			env.enableCheckpointing(500);
-			env.getConfig().enableSysoutLogging();
+			env.getConfig().disableSysoutLogging();
 
 			DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 1730c63..438e980 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -18,40 +18,32 @@
 
 package org.apache.flink.test.checkpointing;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
 import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.util.Collector;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.