You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/29 00:15:49 UTC

[7/8] flink git commit: [FLINK-2406] [streaming] Abstract and improve stream alignment via the BarrierBuffer

[FLINK-2406] [streaming] Abstract and improve stream alignment via the BarrierBuffer

 - Add an interface for the functionaliy of the barrier buffer (for later addition of other implementatiions)
 - Add broader tests for the BarrierBuffer, inluding trailing data and barrier races.
 - Checkpoint barriers are handled by the buffer directly, rather than being returned and re-injected.
 - Simplify logic in the BarrierBuffer and fix certain corner cases.
 - Give access to spill directories properly via I/O manager, rather than via GlobalConfiguration singleton.
 - Rename the "BarrierBufferIOTest" to "BarrierBufferMassiveRandomTest"
 - A lot of code style/robustness fixes (proplery define constants, visibility, exception signatures)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0579f90b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0579f90b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0579f90b

Branch: refs/heads/master
Commit: 0579f90bab165a7df336163eb9d6337267020029
Parents: ed30ff4
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 26 18:58:37 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:07 2015 +0200

----------------------------------------------------------------------
 .../runtime/io/disk/iomanager/IOManager.java    |  12 +-
 .../io/network/api/EndOfPartitionEvent.java     |  23 +-
 .../partition/consumer/SingleInputGate.java     |   2 +-
 .../streaming/runtime/io/BarrierBuffer.java     | 413 +++++-----
 .../streaming/runtime/io/BufferSpiller.java     |  54 +-
 .../runtime/io/CheckpointBarrierHandler.java    |  55 ++
 .../runtime/io/StreamInputProcessor.java        |  69 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  56 +-
 .../runtime/tasks/OneInputStreamTask.java       |  37 +-
 .../streaming/runtime/tasks/StreamTask.java     |  16 +-
 .../runtime/tasks/TwoInputStreamTask.java       |  74 +-
 .../consumer/StreamTestSingleInputGate.java     |  13 -
 .../runtime/io/BarrierBufferIOTest.java         | 159 ----
 .../io/BarrierBufferMassiveRandomTest.java      | 167 ++++
 .../streaming/runtime/io/BarrierBufferTest.java | 775 +++++++++++++++----
 .../runtime/io/DummyBufferRecycler.java         |   8 +-
 .../runtime/io/SpillingBufferOrEventTest.java   |  20 +-
 17 files changed, 1282 insertions(+), 671 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index c0bd360..45d9b9e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -301,9 +301,19 @@ public abstract class IOManager {
 	 * 
 	 * @return The number of temporary file directories.
 	 */
-	public int getNumberOfTempDirs() {
+	public int getNumberOfSpillingDirectories() {
 		return this.paths.length;
 	}
+
+	/**
+	 * Gets the directories that the I/O manager spills to.
+	 * 
+	 * @return The directories that the I/O manager spills to.
+	 */
+	public File[] getSpillingDirectories() {
+		return this.paths;
+	}
+	
 	
 	protected int getNextPathNum() {
 		final int next = this.nextPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
index 49d7958..3ecdb94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
@@ -22,19 +22,34 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.event.task.RuntimeEvent;
 
-import java.io.IOException;
 
 public class EndOfPartitionEvent extends RuntimeEvent {
 
 	public static final EndOfPartitionEvent INSTANCE = new EndOfPartitionEvent();
-
+	
+	
 	@Override
-	public void read(DataInputView in) throws IOException {
+	public void read(DataInputView in) {
 		// Nothing to do here
 	}
 
 	@Override
-	public void write(DataOutputView out) throws IOException {
+	public void write(DataOutputView out) {
 		// Nothing to do here
 	}
+
+	@Override
+	public int hashCode() {
+		return 1965146673;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj != null && obj.getClass() == EndOfPartitionEvent.class;
+	}
+
+	@Override
+	public String toString() {
+		return getClass().getSimpleName();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 78aa6f7..0aebcae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -408,7 +408,7 @@ public class SingleInputGate implements InputGate {
 
 		// Sanity check that notifications only happen when data is available
 		if (buffer == null) {
-			throw new IllegalStateException("Bug in input gate/channel logic: input gate got" +
+			throw new IllegalStateException("Bug in input gate/channel logic: input gate got " +
 					"notified by channel about available data, but none was available.");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 40e84fc..466b8f7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -19,263 +19,262 @@ package org.apache.flink.streaming.runtime.io;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
+import java.util.ArrayDeque;
 
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The barrier buffer is responsible for implementing the blocking behaviour described
- * here: {@link CheckpointBarrier}.
- *
- * <p>
- * To avoid back-pressuring the
- * readers, we buffer up the new data received from the blocked channels until
- * the blocks are released.
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ * 
+ * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
+ * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until 
+ * the blocks are released.</p>
  */
-public class BarrierBuffer {
+public class BarrierBuffer implements CheckpointBarrierHandler {
 
 	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
+	
+	/** The gate that the buffer draws its input from */
+	private final InputGate inputGate;
+
+	/** Flags that indicate whether a channel is currently blocked/buffered */
+	private final boolean[] blockedChannels;
+	
+	/** The total number of channels that this buffer handles data from */
+	private final int totalNumberOfInputChannels;
+
+	private final SpillReader spillReader;
+	private final BufferSpiller bufferSpiller;
+	
+	private ArrayDeque<SpillingBufferOrEvent> nonProcessed;
+	private ArrayDeque<SpillingBufferOrEvent> blockedNonProcessed;
+
+	/** Handler that receives the checkpoint notifications */
+	private EventListener<CheckpointBarrier> checkpointHandler;
+
+	/** The ID of the checkpoint for which we expect barriers */
+	private long currentCheckpointId = -1L;
+
+	/** The number of received barriers (= number of blocked/buffered channels) */
+	private long numReceivedBarriers;
+	
+	/** Flag to indicate whether we have drawn all available input */
+	private boolean endOfStream;
+
+	
+	public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException {
+		this.inputGate = inputGate;
+		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
+		this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
+		
+		this.nonProcessed = new ArrayDeque<SpillingBufferOrEvent>();
+		this.blockedNonProcessed = new ArrayDeque<SpillingBufferOrEvent>();
+		
+		this.bufferSpiller = new BufferSpiller(ioManager);
+		this.spillReader = new SpillReader();
+	}
 
-	private Queue<SpillingBufferOrEvent> nonProcessed = new LinkedList<SpillingBufferOrEvent>();
-	private Queue<SpillingBufferOrEvent> blockedNonProcessed = new LinkedList<SpillingBufferOrEvent>();
-
-	private Set<Integer> blockedChannels = new HashSet<Integer>();
-	private int totalNumberOfInputChannels;
-
-	private CheckpointBarrier currentBarrier;
-
-	private AbstractReader reader;
-
-	private InputGate inputGate;
-
-	private SpillReader spillReader;
-	private BufferSpiller bufferSpiller;
-
-	private boolean inputFinished = false;
+	// ------------------------------------------------------------------------
+	//  Buffer and barrier handling
+	// ------------------------------------------------------------------------
 
-	private BufferOrEvent endOfStreamEvent = null;
+	@Override
+	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+		while (true) {
+			// process buffered BufferOrEvents before grabbing new ones
+			final SpillingBufferOrEvent nextBuffered = nonProcessed.pollFirst();
+			final BufferOrEvent next = nextBuffered == null ?
+					inputGate.getNextBufferOrEvent() :
+					nextBuffered.getBufferOrEvent();
+			
+			if (next != null) {
+				if (isBlocked(next.getChannelIndex())) {
+					// if the channel is blocked we, we just store the BufferOrEvent
+					blockedNonProcessed.add(new SpillingBufferOrEvent(next, bufferSpiller, spillReader));
+				}
+				else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
+					return next;
+				}
+				else if (!endOfStream) {
+					// process barriers only if there is a chance of the checkpoint completing
+					processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
+				}
+			}
+			else if (!endOfStream) {
+				// end of stream. we feed the data that is still buffered
+				endOfStream = true;
+				releaseBlocks();
+				return getNextNonBlocked();
+			}
+			else {
+				return null;
+			}
+		}
+	}
+	
+	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws IOException {
+		final long barrierId = receivedBarrier.getId();
+
+		if (numReceivedBarriers > 0) {
+			// subsequent barrier of a checkpoint.
+			if (barrierId == currentCheckpointId) {
+				// regular case
+				onBarrier(channelIndex);
+			}
+			else if (barrierId > currentCheckpointId) {
+				// we did not complete the current checkpoint
+				LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
+						"Skipping current checkpoint.", barrierId, currentCheckpointId);
+
+				releaseBlocks();
+				currentCheckpointId = barrierId;
+				onBarrier(channelIndex);
+			}
+			else {
+				// ignore trailing barrier from aborted checkpoint
+				return;
+			}
+			
+		}
+		else if (barrierId > currentCheckpointId) {
+			// first barrier of a new checkpoint
+			currentCheckpointId = barrierId;
+			onBarrier(channelIndex);
+		}
+		else {
+			// trailing barrier from previous (skipped) checkpoint
+			return;
+		}
 
-	private long lastCheckpointId = Long.MIN_VALUE;
+		// check if we have all barriers
+		if (numReceivedBarriers == totalNumberOfInputChannels) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Received all barrier, triggering checkpoint {} at {}",
+						receivedBarrier.getId(), receivedBarrier.getTimestamp());
+			}
 
-	public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
-		this.inputGate = inputGate;
-		totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
-		this.reader = reader;
-		try {
-			this.bufferSpiller = new BufferSpiller();
-			this.spillReader = new SpillReader();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
+			if (checkpointHandler != null) {
+				checkpointHandler.onEvent(receivedBarrier);
+			}
+			
+			releaseBlocks();
 		}
-
+	}
+	
+	@Override
+	public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
+		if (this.checkpointHandler == null) {
+			this.checkpointHandler = checkpointHandler;
+		}
+		else {
+			throw new IllegalStateException("BarrierBuffer already has a registered checkpoint handler");
+		}
+	}
+	
+	@Override
+	public boolean isEmpty() {
+		return nonProcessed.isEmpty() && blockedNonProcessed.isEmpty();
 	}
 
-	/**
-	 * Get then next non-blocked non-processed {@link BufferOrEvent}. Returns null if
-	 * none available.
-	 * 
-	 * @throws IOException
-	 */
-	private BufferOrEvent getNonProcessed() throws IOException {
-		SpillingBufferOrEvent nextNonProcessed;
-
-		while ((nextNonProcessed = nonProcessed.poll()) != null) {
-			BufferOrEvent boe = nextNonProcessed.getBufferOrEvent();
-			if (isBlocked(boe.getChannelIndex())) {
-				blockedNonProcessed.add(new SpillingBufferOrEvent(boe, bufferSpiller, spillReader));
-			} else {
-				return boe;
+	@Override
+	public void cleanup() throws IOException {
+		bufferSpiller.close();
+		File spillfile1 = bufferSpiller.getSpillFile();
+		if (spillfile1 != null) {
+			if (!spillfile1.delete()) {
+				LOG.warn("Cannot remove barrier buffer spill file: " + spillfile1.getAbsolutePath());
 			}
 		}
 
-		return null;
+		spillReader.close();
+		File spillfile2 = spillReader.getSpillFile();
+		if (spillfile2 != null) {
+			if (!spillfile2.delete()) {
+				LOG.warn("Cannot remove barrier buffer spill file: " + spillfile2.getAbsolutePath());
+			}
+		}
 	}
-
+	
 	/**
 	 * Checks whether the channel with the given index is blocked.
 	 * 
-	 * @param channelIndex The channel index to check
+	 * @param channelIndex The channel index to check.
+	 * @return True if the channel is blocked, false if not.
 	 */
 	private boolean isBlocked(int channelIndex) {
-		return blockedChannels.contains(channelIndex);
+		return blockedChannels[channelIndex];
 	}
-
-	/**
-	 * Checks whether all channels are blocked meaning that barriers have been
-	 * received from all channels
-	 */
-	private boolean isAllBlocked() {
-		return blockedChannels.size() == totalNumberOfInputChannels;
-	}
-
-	/**
-	 * Returns the next non-blocked {@link BufferOrEvent}. This is a blocking operator.
-	 */
-	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
-		// If there are non-processed buffers from the previously blocked ones,
-		// we get the next
-		BufferOrEvent bufferOrEvent = getNonProcessed();
-
-		if (bufferOrEvent != null) {
-			return bufferOrEvent;
-		} else if (blockedNonProcessed.isEmpty() && inputFinished) {
-			return endOfStreamEvent;
-		} else {
-			// If no non-processed, get new from input
-			while (true) {
-				if (!inputFinished) {
-					// We read the next buffer from the inputgate
-					bufferOrEvent = inputGate.getNextBufferOrEvent();
-
-					if (!bufferOrEvent.isBuffer()
-							&& bufferOrEvent.getEvent() instanceof EndOfPartitionEvent) {
-						if (inputGate.isFinished()) {
-							// store the event for later if the channel is
-							// closed
-							endOfStreamEvent = bufferOrEvent;
-							inputFinished = true;
-						}
-
-					} else {
-						if (isBlocked(bufferOrEvent.getChannelIndex())) {
-							// If channel blocked we just store it
-							blockedNonProcessed.add(new SpillingBufferOrEvent(bufferOrEvent,
-									bufferSpiller, spillReader));
-						} else {
-							return bufferOrEvent;
-						}
-					}
-				} else {
-					actOnAllBlocked();
-					return getNextNonBlocked();
-				}
-			}
-		}
-	}
-
+	
 	/**
 	 * Blocks the given channel index, from which a barrier has been received.
 	 * 
-	 * @param channelIndex
-	 *            The channel index to block.
+	 * @param channelIndex The channel index to block.
 	 */
-	private void blockChannel(int channelIndex) {
-		if (!blockedChannels.contains(channelIndex)) {
-			blockedChannels.add(channelIndex);
+	private void onBarrier(int channelIndex) throws IOException {
+		if (!blockedChannels[channelIndex]) {
+			blockedChannels[channelIndex] = true;
+			numReceivedBarriers++;
+			
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Channel blocked with index: " + channelIndex);
-			}
-			if (isAllBlocked()) {
-				actOnAllBlocked();
+				LOG.debug("Received barrier from channel " + channelIndex);
 			}
-
-		} else {
-			throw new RuntimeException("Tried to block an already blocked channel");
+		}
+		else {
+			throw new IOException("Stream corrupt: Repeated barrier for same checkpoint and input stream");
 		}
 	}
 
 	/**
 	 * Releases the blocks on all channels.
 	 */
-	private void releaseBlocks() {
-		if (!nonProcessed.isEmpty()) {
-			// sanity check
-			throw new RuntimeException("Error in barrier buffer logic");
-		}
-		nonProcessed = blockedNonProcessed;
-		blockedNonProcessed = new LinkedList<SpillingBufferOrEvent>();
-
-		try {
-			spillReader.setSpillFile(bufferSpiller.getSpillFile());
-			bufferSpiller.resetSpillFile();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-
-		blockedChannels.clear();
-		currentBarrier = null;
+	private void releaseBlocks() throws IOException {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("All barriers received, blocks released");
+			LOG.debug("Releasing blocks");
 		}
-	}
 
-	/**
-	 * Method that is executed once the barrier has been received from all
-	 * channels.
-	 */
-	private void actOnAllBlocked() {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Publishing barrier to the vertex");
+		for (int i = 0; i < blockedChannels.length; i++) {
+			blockedChannels[i] = false;
 		}
-
-		if (currentBarrier != null && !inputFinished) {
-			reader.publish(currentBarrier);
-			lastCheckpointId = currentBarrier.getId();
+		numReceivedBarriers = 0;
+		
+		if (nonProcessed.isEmpty()) {
+			// swap the queues
+			ArrayDeque<SpillingBufferOrEvent> empty = nonProcessed;
+			nonProcessed = blockedNonProcessed;
+			blockedNonProcessed = empty;
 		}
-
-		releaseBlocks();
-	}
-
-	/**
-	 * Processes one {@link org.apache.flink.streaming.runtime.tasks.CheckpointBarrier}
-	 * 
-	 * @param bufferOrEvent The {@link BufferOrEvent} containing the checkpoint barrier
-	 */
-	public void processBarrier(BufferOrEvent bufferOrEvent) {
-		CheckpointBarrier receivedBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
-
-		if (receivedBarrier.getId() < lastCheckpointId) {
-			// a barrier from an old checkpoint, ignore these
-			return;
-		}
-
-		if (currentBarrier == null) {
-			this.currentBarrier = receivedBarrier;
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Checkpoint barrier received start waiting for checkpoint: {}", receivedBarrier);
-			}
-		} else if (receivedBarrier.getId() > currentBarrier.getId()) {
-			// we have a barrier from a more recent checkpoint, free all locks and start with
-			// this newer checkpoint
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Checkpoint barrier received while waiting on checkpoint {}. Restarting waiting with checkpoint {}: ", currentBarrier, receivedBarrier);
-			}
-			releaseBlocks();
-			currentBarrier = receivedBarrier;
-
+		else {
+			throw new IllegalStateException("Unconsumed data from previous checkpoint alignment " +
+					"when starting next checkpoint alignment");
 		}
-		blockChannel(bufferOrEvent.getChannelIndex());
+		
+		// roll over the spill files
+		spillReader.setSpillFile(bufferSpiller.getSpillFile());
+		bufferSpiller.resetSpillFile();
 	}
 
-	public void cleanup() throws IOException {
-		bufferSpiller.close();
-		File spillfile1 = bufferSpiller.getSpillFile();
-		if (spillfile1 != null) {
-			spillfile1.delete();
-		}
+	// ------------------------------------------------------------------------
+	// For Testing
+	// ------------------------------------------------------------------------
 
-		spillReader.close();
-		File spillfile2 = spillReader.getSpillFile();
-		if (spillfile2 != null) {
-			spillfile2.delete();
-		}
+	public long getCurrentCheckpointId() {
+		return this.currentCheckpointId;
 	}
-
+	
+	// ------------------------------------------------------------------------
+	// Utilities 
+	// ------------------------------------------------------------------------
+	
+	@Override
 	public String toString() {
-		return nonProcessed.toString() + blockedNonProcessed.toString();
-	}
-
-	public boolean isEmpty() {
-		return nonProcessed.isEmpty() && blockedNonProcessed.isEmpty();
+		return "Non-Processed: " + nonProcessed + " | Blocked: " + blockedNonProcessed;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 0d57d05..fda612e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -22,28 +22,33 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.util.StringUtils;
 
 public class BufferSpiller {
+	
+	/** The random number generator for temp file names */
+	private static final Random RND = new Random();
 
-	protected static Random rnd = new Random();
+	/** The counter that selects the next directory to spill into */
+	private static final AtomicInteger DIRECTORY_INDEX = new AtomicInteger(0);
+	
+	
+	/** The directories to spill to */
+	private final File tempDir;
 
 	private File spillFile;
-	protected FileChannel spillingChannel;
-	private String tempDir;
-
-	public BufferSpiller() throws IOException {
-		String tempDirString = GlobalConfiguration.getString(
-				ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-				ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
-		String[] tempDirs = tempDirString.split(",|" + File.pathSeparator);
-
-		tempDir = tempDirs[rnd.nextInt(tempDirs.length)];
-
+	
+	private FileChannel spillingChannel;
+	
+	
+
+	public BufferSpiller(IOManager ioManager) throws IOException {
+		File[] tempDirs = ioManager.getSpillingDirectories();
+		this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % tempDirs.length];
 		createSpillingChannel();
 	}
 
@@ -54,24 +59,20 @@ public class BufferSpiller {
 		try {
 			spillingChannel.write(buffer.getNioBuffer());
 			buffer.recycle();
-		} catch (IOException e) {
+		}
+		catch (IOException e) {
 			close();
-			throw new IOException(e);
+			throw e;
 		}
-
 	}
 
 	@SuppressWarnings("resource")
 	private void createSpillingChannel() throws IOException {
-		this.spillFile = new File(tempDir, randomString(rnd) + ".buffer");
+		this.spillFile = new File(tempDir, randomString(RND) + ".buffer");
 		this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
 	}
 
-	private static String randomString(Random random) {
-		final byte[] bytes = new byte[20];
-		random.nextBytes(bytes);
-		return StringUtils.byteToHexString(bytes);
-	}
+
 
 	public void close() throws IOException {
 		if (spillingChannel != null && spillingChannel.isOpen()) {
@@ -87,5 +88,12 @@ public class BufferSpiller {
 	public File getSpillFile() {
 		return spillFile;
 	}
+	
+	// ------------------------------------------------------------------------
 
+	private static String randomString(Random random) {
+		final byte[] bytes = new byte[20];
+		random.nextBytes(bytes);
+		return StringUtils.byteToHexString(bytes);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
new file mode 100644
index 0000000..02dd33d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
+import java.io.IOException;
+
+/**
+ * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the input channels.
+ * Different implementations may either simply track barriers, or block certain inputs on
+ * barriers.
+ */
+public interface CheckpointBarrierHandler {
+
+	/**
+	 * Returns the next {@link BufferOrEvent} that the operator may consume.
+	 * This call blocks until the next BufferOrEvent is available, ir until the stream
+	 * has been determined to be finished.
+	 * 
+	 * @return The next BufferOrEvent, or {@code null}, if the stream is finished.
+	 * @throws java.io.IOException Thrown, if the network or local disk I/O fails.
+	 * @throws java.lang.InterruptedException Thrown, if the thread is interrupted while blocking during
+	 *                                        waiting for the next BufferOrEvent to become available.
+	 */
+	BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
+
+	void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
+	
+	void cleanup() throws IOException;
+
+	/**
+	 * Checks if the barrier handler has buffered any data internally.
+	 * @return True, if no data is buffered internally, false otherwise.
+	 */
+	boolean isEmpty();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 9db0178..4d60375 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
@@ -40,6 +42,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +69,7 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 
 	private boolean isFinished;
 
-	private final BarrierBuffer barrierBuffer;
+	private final CheckpointBarrierHandler barrierHandler;
 
 	private final long[] watermarks;
 	private long lastEmittedWatermark;
@@ -74,10 +77,17 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 	private final DeserializationDelegate<Object> deserializationDelegate;
 
 	@SuppressWarnings("unchecked")
-	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, boolean enableWatermarkMultiplexing) {
+	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
+								EventListener<CheckpointBarrier> checkpointListener,
+								IOManager ioManager,
+								boolean enableWatermarkMultiplexing) throws IOException {
+		
 		super(InputGateUtil.createInputGate(inputGates));
 
-		barrierBuffer = new BarrierBuffer(inputGate, this);
+		this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+		if (checkpointListener != null) {
+			this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
+		}
 		
 		if (enableWatermarkMultiplexing) {
 			MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
@@ -101,8 +111,8 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 		}
 		lastEmittedWatermark = Long.MIN_VALUE;
 	}
-
-	@SuppressWarnings("unchecked")
+	
+	
 	public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator) throws Exception {
 		if (isFinished) {
 			return false;
@@ -137,8 +147,10 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 							}
 						}
 						continue;
-					} else {
+					}
+					else {
 						// now we can do the actual processing
+						@SuppressWarnings("unchecked")
 						StreamRecord<IN> record = (StreamRecord<IN>) deserializationDelegate.getInstance();
 						StreamingRuntimeContext ctx = streamOperator.getRuntimeContext();
 						if (ctx != null) {
@@ -150,32 +162,26 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 				}
 			}
 
-			final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
-
-			if (bufferOrEvent.isBuffer()) {
-				currentChannel = bufferOrEvent.getChannelIndex();
-				currentRecordDeserializer = recordDeserializers[currentChannel];
-				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-			} else {
-				// Event received
-				final AbstractEvent event = bufferOrEvent.getEvent();
-
-				if (event instanceof CheckpointBarrier) {
-					barrierBuffer.processBarrier(bufferOrEvent);
-				} else {
-					if (handleEvent(event)) {
-						if (inputGate.isFinished()) {
-							if (!barrierBuffer.isEmpty()) {
-								throw new RuntimeException("BarrierBuffer should be empty at this point");
-							}
-							isFinished = true;
-							return false;
-						} else if (hasReachedEndOfSuperstep()) {
-							return false;
-						} // else: More data is coming...
-					}
+			final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
+			if (bufferOrEvent != null) {
+				if (bufferOrEvent.isBuffer()) {
+					currentChannel = bufferOrEvent.getChannelIndex();
+					currentRecordDeserializer = recordDeserializers[currentChannel];
+					currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+				}
+				else {
+					// Event received
+					final AbstractEvent event = bufferOrEvent.getEvent();
+					handleEvent(event);
 				}
 			}
+			else {
+				isFinished = true;
+				if (!barrierHandler.isEmpty()) {
+					throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
+				}
+				return false;
+			}
 		}
 	}
 
@@ -195,7 +201,8 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 		}
 	}
 
+	@Override
 	public void cleanup() throws IOException {
-		barrierBuffer.cleanup();
+		barrierHandler.cleanup();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index e235ffe..9668c7f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
@@ -31,6 +32,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
@@ -68,7 +70,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 
 	private boolean isFinished;
 
-	private final BarrierBuffer barrierBuffer;
+	private final CheckpointBarrierHandler barrierHandler;
 
 	private final long[] watermarks1;
 	private long lastEmittedWatermark1;
@@ -87,11 +89,17 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 			Collection<InputGate> inputGates2,
 			TypeSerializer<IN1> inputSerializer1,
 			TypeSerializer<IN2> inputSerializer2,
-			boolean enableWatermarkMultiplexing) {
+			EventListener<CheckpointBarrier> checkpointListener,
+			IOManager ioManager,
+			boolean enableWatermarkMultiplexing) throws IOException {
 		
 		super(InputGateUtil.createInputGate(inputGates1, inputGates2));
 
-		barrierBuffer = new BarrierBuffer(inputGate, this);
+		this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+		if (checkpointListener != null) {
+			this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
+		}
+
 		
 		if (enableWatermarkMultiplexing) {
 			MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
@@ -186,32 +194,26 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 				}
 			}
 
-			final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
-
-			if (bufferOrEvent.isBuffer()) {
-				currentChannel = bufferOrEvent.getChannelIndex();
-				currentRecordDeserializer = recordDeserializers[currentChannel];
-				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-
-			} else {
-				// Event received
-				final AbstractEvent event = bufferOrEvent.getEvent();
+			final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
+			if (bufferOrEvent != null) {
 
-				if (event instanceof CheckpointBarrier) {
-					barrierBuffer.processBarrier(bufferOrEvent);
+				if (bufferOrEvent.isBuffer()) {
+					currentChannel = bufferOrEvent.getChannelIndex();
+					currentRecordDeserializer = recordDeserializers[currentChannel];
+					currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+	
 				} else {
-					if (handleEvent(event)) {
-						if (inputGate.isFinished()) {
-							if (!barrierBuffer.isEmpty()) {
-								throw new RuntimeException("BarrierBuffer should be empty at this point");
-							}
-							isFinished = true;
-							return false;
-						} else if (hasReachedEndOfSuperstep()) {
-							return false;
-						} // else: More data is coming...
-					}
+					// Event received
+					final AbstractEvent event = bufferOrEvent.getEvent();
+					handleEvent(event);
+				}
+			}
+			else {
+				isFinished = true;
+				if (!barrierHandler.isEmpty()) {
+					throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
 				}
+				return false;
 			}
 		}
 	}
@@ -270,6 +272,6 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 
 	@Override
 	public void cleanup() throws IOException {
-		barrierBuffer.cleanup();
+		barrierHandler.cleanup();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 9d6e88e..d078320 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -34,22 +34,27 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 
 	@Override
 	public void registerInputOutput() {
-		super.registerInputOutput();
-
-		TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
-
-		int numberOfInputs = configuration.getNumberOfInputs();
-
-		if (numberOfInputs > 0) {
-			InputGate[] inputGates = getEnvironment().getAllInputGates();
-			inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer, getExecutionConfig().areTimestampsEnabled());
-
-			inputProcessor.registerTaskEventListener(getCheckpointBarrierListener(), CheckpointBarrier.class);
-
-			AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
-			AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
-
-			inputProcessor.setReporter(reporter);
+		try {
+			super.registerInputOutput();
+			
+			TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
+			int numberOfInputs = configuration.getNumberOfInputs();
+	
+			if (numberOfInputs > 0) {
+				InputGate[] inputGates = getEnvironment().getAllInputGates();
+				inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
+						getCheckpointBarrierListener(), 
+						getEnvironment().getIOManager(),
+						getExecutionConfig().areTimestampsEnabled());
+	
+				// make sure that stream tasks report their I/O statistics
+				AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+				AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+				inputProcessor.setReporter(reporter);
+			}
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Failed to initialize stream operator: " + e.getMessage(), e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 75bdd57..d829833 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
@@ -74,7 +73,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 	protected ClassLoader userClassLoader;
 	
-	private EventListener<TaskEvent> checkpointBarrierListener;
+	private EventListener<CheckpointBarrier> checkpointBarrierListener;
 
 	public StreamTask() {
 		streamOperator = null;
@@ -106,7 +105,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 			streamOperator.setup(outputHandler.getOutput(), headContext);
 		}
 
-		hasChainedOperators = !(outputHandler.getChainedOperators().size() == 1);
+		hasChainedOperators = outputHandler.getChainedOperators().size() != 1;
 	}
 
 	public String getName() {
@@ -199,7 +198,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		this.isRunning = false;
 	}
 
-	public EventListener<TaskEvent> getCheckpointBarrierListener() {
+	public EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
 		return this.checkpointBarrierListener;
 	}
 
@@ -211,7 +210,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	@Override
 	public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {
 
-		// We retrieve end restore the states for the chained oeprators.
+		// We retrieve end restore the states for the chained operators.
 		List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>> chainedStates = (List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>>) stateHandle.getState();
 
 		// We restore all stateful chained operators
@@ -306,13 +305,12 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 	// ------------------------------------------------------------------------
 
-	private class CheckpointBarrierListener implements EventListener<TaskEvent> {
+	private class CheckpointBarrierListener implements EventListener<CheckpointBarrier> {
 
 		@Override
-		public void onEvent(TaskEvent event) {
+		public void onEvent(CheckpointBarrier barrier) {
 			try {
-				CheckpointBarrier sStep = (CheckpointBarrier) event;
-				triggerCheckpoint(sStep.getId(), sStep.getTimestamp());
+				triggerCheckpoint(barrier.getId(), barrier.getTimestamp());
 			}
 			catch (Exception e) {
 				throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index f981cd5..b4667b2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -34,44 +34,52 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 
 	private static final Logger LOG = LoggerFactory.getLogger(TwoInputStreamTask.class);
 
-	StreamTwoInputProcessor<IN1, IN2> inputProcessor;
+	private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
 
 	@Override
 	public void registerInputOutput() {
-		super.registerInputOutput();
-
-		TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
-		TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
-
-		int numberOfInputs = configuration.getNumberOfInputs();
-
-		ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
-		ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
-
-		List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
-
-		for (int i = 0; i < numberOfInputs; i++) {
-			int inputType = inEdges.get(i).getTypeNumber();
-			InputGate reader = getEnvironment().getInputGate(i);
-			switch (inputType) {
-				case 1:
-					inputList1.add(reader);
-					break;
-				case 2:
-					inputList2.add(reader);
-					break;
-				default:
-					throw new RuntimeException("Invalid input type number: " + inputType);
+		try {
+			super.registerInputOutput();
+	
+			TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
+			TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
+	
+			int numberOfInputs = configuration.getNumberOfInputs();
+	
+			ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
+			ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
+	
+			List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
+	
+			for (int i = 0; i < numberOfInputs; i++) {
+				int inputType = inEdges.get(i).getTypeNumber();
+				InputGate reader = getEnvironment().getInputGate(i);
+				switch (inputType) {
+					case 1:
+						inputList1.add(reader);
+						break;
+					case 2:
+						inputList2.add(reader);
+						break;
+					default:
+						throw new RuntimeException("Invalid input type number: " + inputType);
+				}
 			}
+	
+			this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
+					inputDeserializer1, inputDeserializer2,
+					getCheckpointBarrierListener(),
+					getEnvironment().getIOManager(),
+					getExecutionConfig().areTimestampsEnabled());
+
+			// make sure that stream tasks report their I/O statistics
+			AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+			AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+			this.inputProcessor.setReporter(reporter);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Failed to initialize stream operator: " + e.getMessage(), e);
 		}
-
-		inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2, inputDeserializer1, inputDeserializer2, getExecutionConfig().areTimestampsEnabled());
-
-		AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
-		AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
-		inputProcessor.setReporter(reporter);
-
-		inputProcessor.registerTaskEventListener(getCheckpointBarrierListener(), CheckpointBarrier.class);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index c479f95..b59ad19 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -20,11 +20,8 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
@@ -220,14 +217,4 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 			return isEvent;
 		}
 	}
-
-	public static class DummyEvent extends TaskEvent {
-		@Override
-		public void write(DataOutputView out) throws IOException {
-		}
-
-		@Override
-		public void read(DataInputView in) throws IOException {
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
deleted file mode 100644
index d8a3696..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.junit.Test;
-
-public class BarrierBufferIOTest {
-
-	@Test
-	public void IOTest() throws IOException, InterruptedException {
-
-		BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
-		BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
-
-		MockInputGate myIG = new MockInputGate(new BufferPool[] { pool1, pool2 },
-				new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
-		// new BarrierSimulator[] { new CountBarrier(1000), new
-		// CountBarrier(1000) });
-
-		BarrierBuffer barrierBuffer = new BarrierBuffer(myIG,
-				new BarrierBufferTest.MockReader(myIG));
-
-		try {
-			// long time = System.currentTimeMillis();
-			for (int i = 0; i < 2000000; i++) {
-				BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
-				if (boe.isBuffer()) {
-					boe.getBuffer().recycle();
-				} else {
-					barrierBuffer.processBarrier(boe);
-				}
-			}
-			// System.out.println("Ran for " + (System.currentTimeMillis() -
-			// time));
-		} catch (Exception e) {
-			fail();
-		} finally {
-			barrierBuffer.cleanup();
-		}
-	}
-
-	private static class RandomBarrier implements BarrierGenerator {
-		private static Random rnd = new Random();
-
-		double threshold;
-
-		public RandomBarrier(double expectedEvery) {
-			threshold = 1 / expectedEvery;
-		}
-
-		@Override
-		public boolean isNextBarrier() {
-			return rnd.nextDouble() < threshold;
-		}
-	}
-
-	private static class CountBarrier implements BarrierGenerator {
-
-		long every;
-		long c = 0;
-
-		public CountBarrier(long every) {
-			this.every = every;
-		}
-
-		@Override
-		public boolean isNextBarrier() {
-			return c++ % every == 0;
-		}
-	}
-
-	protected static class MockInputGate implements InputGate {
-
-		private int numChannels;
-		private BufferPool[] bufferPools;
-		private int[] currentBarriers;
-		BarrierGenerator[] barrierGens;
-		int currentChannel = 0;
-		long c = 0;
-
-		public MockInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
-			this.numChannels = bufferPools.length;
-			this.currentBarriers = new int[numChannels];
-			this.bufferPools = bufferPools;
-			this.barrierGens = barrierGens;
-		}
-
-		@Override
-		public int getNumberOfInputChannels() {
-			return numChannels;
-		}
-
-		@Override
-		public boolean isFinished() {
-			return false;
-		}
-
-		@Override
-		public void requestPartitions() throws IOException, InterruptedException {
-		}
-
-		@Override
-		public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
-			currentChannel = (currentChannel + 1) % numChannels;
-
-			if (barrierGens[currentChannel].isNextBarrier()) {
-				return BarrierBufferTest.createBarrier(++currentBarriers[currentChannel],
-						currentChannel);
-			} else {
-				Buffer buffer = bufferPools[currentChannel].requestBuffer();
-				buffer.getMemorySegment().putLong(0, c++);
-
-				return new BufferOrEvent(buffer, currentChannel);
-			}
-
-		}
-
-		@Override
-		public void sendTaskEvent(TaskEvent event) throws IOException {
-		}
-
-		@Override
-		public void registerListener(EventListener<InputGate> listener) {
-		}
-
-	}
-
-	protected interface BarrierGenerator {
-		public boolean isNextBarrier();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
new file mode 100644
index 0000000..c2df4d8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
+import org.junit.Test;
+
+/**
+ * The test generates two random streams (input channels) which independently
+ * and randomly generate checkpoint barriers. The two streams are very
+ * unaligned, putting heavy work on the BarrierBuffer.
+ */
+public class BarrierBufferMassiveRandomTest {
+
+	@Test
+	public void testWithTwoChannelsAndRandomBarriers() {
+		IOManager ioMan = null;
+		try {
+			ioMan = new IOManagerAsync();
+			
+			BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+			BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+
+			RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
+					new BufferPool[] { pool1, pool2 },
+					new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
+	
+			BarrierBuffer barrierBuffer = new BarrierBuffer(myIG, ioMan);
+			
+			for (int i = 0; i < 2000000; i++) {
+				BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
+				if (boe.isBuffer()) {
+					boe.getBuffer().recycle();
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (ioMan != null) {
+				ioMan.shutdown();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Mocks and Generators
+	// ------------------------------------------------------------------------
+	
+	protected interface BarrierGenerator {
+		public boolean isNextBarrier();
+	}
+
+	protected static class RandomBarrier implements BarrierGenerator {
+		
+		private static final Random rnd = new Random();
+
+		private final double threshold;
+
+		public RandomBarrier(double expectedEvery) {
+			threshold = 1 / expectedEvery;
+		}
+
+		@Override
+		public boolean isNextBarrier() {
+			return rnd.nextDouble() < threshold;
+		}
+	}
+
+	private static class CountBarrier implements BarrierGenerator {
+
+		private final long every;
+		private long c = 0;
+
+		public CountBarrier(long every) {
+			this.every = every;
+		}
+
+		@Override
+		public boolean isNextBarrier() {
+			return c++ % every == 0;
+		}
+	}
+
+	protected static class RandomGeneratingInputGate implements InputGate {
+
+		private final int numChannels;
+		private final BufferPool[] bufferPools;
+		private final int[] currentBarriers;
+		private final BarrierGenerator[] barrierGens;
+		private int currentChannel = 0;
+		private long c = 0;
+
+		public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
+			this.numChannels = bufferPools.length;
+			this.currentBarriers = new int[numChannels];
+			this.bufferPools = bufferPools;
+			this.barrierGens = barrierGens;
+		}
+
+		@Override
+		public int getNumberOfInputChannels() {
+			return numChannels;
+		}
+
+		@Override
+		public boolean isFinished() {
+			return false;
+		}
+
+		@Override
+		public void requestPartitions() {}
+
+		@Override
+		public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
+			currentChannel = (currentChannel + 1) % numChannels;
+
+			if (barrierGens[currentChannel].isNextBarrier()) {
+				return new BufferOrEvent(
+						new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis()),
+							currentChannel);
+			} else {
+				Buffer buffer = bufferPools[currentChannel].requestBuffer();
+				buffer.getMemorySegment().putLong(0, c++);
+				return new BufferOrEvent(buffer, currentChannel);
+			}
+		}
+
+		@Override
+		public void sendTaskEvent(TaskEvent event) {}
+
+		@Override
+		public void registerListener(EventListener<InputGate> listener) {}
+	}
+}