You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/02 17:14:36 UTC

[8/8] flink git commit: [FLINK-2427] [streaming] Make the BarrierBuffer more robust against lost/missing checkpoint barriers.

[FLINK-2427] [streaming] Make the BarrierBuffer more robust against lost/missing checkpoint barriers.

Checkpoint barriers are now tolerated to be lost (as may happen if the checkpoint triggering actor
messages are lost). This is realized by allowing the BarrierBuffer to maintain multiple queues of blocked inputs.

The patch also reworks the buffer spilling logic, to increase I/O efficiency, and reduce the main memory
footprint in cases where the buffers have little contents (low flush timeouts).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9311b9a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9311b9a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9311b9a9

Branch: refs/heads/master
Commit: 9311b9a9da57796e1eb91aa0ec5fa8948b732a47
Parents: 645d7cd
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 29 13:09:03 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 2 15:58:28 2015 +0200

----------------------------------------------------------------------
 .../network/partition/consumer/InputGate.java   |   1 +
 .../partition/consumer/SingleInputGate.java     |  10 +
 .../partition/consumer/UnionInputGate.java      |  13 +
 .../runtime/util/DataInputDeserializer.java     |   8 +-
 .../streaming/runtime/io/BarrierBuffer.java     | 108 +++--
 .../streaming/runtime/io/BarrierTracker.java    |  19 +-
 .../streaming/runtime/io/BufferSpiller.java     | 371 ++++++++++++--
 .../runtime/io/FreeingBufferRecycler.java       |  37 ++
 .../flink/streaming/runtime/io/SpillReader.java |  78 ---
 .../runtime/io/SpillingBufferOrEvent.java       |  66 ---
 .../consumer/StreamTestSingleInputGate.java     |   2 +
 .../io/BarrierBufferMassiveRandomTest.java      |  11 +-
 .../streaming/runtime/io/BarrierBufferTest.java | 205 +++++++-
 .../runtime/io/BarrierTrackerTest.java          |   9 +-
 .../streaming/runtime/io/BufferSpillerTest.java | 390 +++++++++++++++
 .../runtime/io/DummyBufferRecycler.java         |  34 --
 .../io/SpilledBufferOrEventSequenceTest.java    | 482 +++++++++++++++++++
 .../runtime/io/SpillingBufferOrEventTest.java   | 115 -----
 .../runtime/io/StreamRecordWriterTest.java      |   6 +-
 .../flink/streaming/runtime/io/TestEvent.java   |  88 ++++
 20 files changed, 1650 insertions(+), 403 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index af089fc..c4f9dc4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -37,4 +37,5 @@ public interface InputGate {
 
 	void registerListener(EventListener<InputGate> listener);
 
+	int getPageSize();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 0aebcae..80a79d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -211,6 +211,16 @@ public class SingleInputGate implements InputGate {
 		return bufferPool;
 	}
 
+	@Override
+	public int getPageSize() {
+		if (bufferPool != null) {
+			return bufferPool.getMemorySegmentSize();
+		}
+		else {
+			throw new IllegalStateException("Input gate has not been initialized with buffers.");
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	// Setup/Life-cycle
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 1f974de..730ead2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -182,6 +182,19 @@ public class UnionInputGate implements InputGate {
 		inputGateListener.registerListener(listener);
 	}
 
+	@Override
+	public int getPageSize() {
+		int pageSize = -1;
+		for (InputGate gate : inputGates) {
+			if (pageSize == -1) {
+				pageSize = gate.getPageSize();
+			} else if (gate.getPageSize() != pageSize) {
+				throw new IllegalStateException("Found input gates with different page sizes.");
+			}
+		}
+		return pageSize;
+	}
+
 	/**
 	 * Data availability listener at all unioned input gates.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
index 9915aba..e8e8f6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
@@ -109,7 +109,7 @@ public class DataInputDeserializer implements DataInputView {
 	@Override
 	public char readChar() throws IOException {
 		if (this.position < this.end - 1) {
-			return (char) (((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0));
+			return (char) (((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff));
 		} else {
 			throw new EOFException();
 		}
@@ -205,7 +205,7 @@ public class DataInputDeserializer implements DataInputView {
 	@Override
 	public short readShort() throws IOException {
 		if (position >= 0 && position < this.end - 1) {
-			return (short) ((((this.buffer[position++]) & 0xff) << 8) | (((this.buffer[position++]) & 0xff) << 0));
+			return (short) ((((this.buffer[position++]) & 0xff) << 8) | ((this.buffer[position++]) & 0xff));
 		} else {
 			throw new EOFException();
 		}
@@ -271,7 +271,7 @@ public class DataInputDeserializer implements DataInputView {
 				if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
 					throw new UTFDataFormatException("malformed input around byte " + (count - 1));
 				}
-				chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+				chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
 				break;
 			default:
 				/* 10xx xxxx, 1111 xxxx */
@@ -294,7 +294,7 @@ public class DataInputDeserializer implements DataInputView {
 	@Override
 	public int readUnsignedShort() throws IOException {
 		if (this.position < this.end - 1) {
-			return ((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0);
+			return ((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff);
 		} else {
 			throw new EOFException();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 466b8f7..0441937 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayDeque;
 
@@ -50,12 +49,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	
 	/** The total number of channels that this buffer handles data from */
 	private final int totalNumberOfInputChannels;
-
-	private final SpillReader spillReader;
-	private final BufferSpiller bufferSpiller;
 	
-	private ArrayDeque<SpillingBufferOrEvent> nonProcessed;
-	private ArrayDeque<SpillingBufferOrEvent> blockedNonProcessed;
+	/** To utility to write blocked data to a file channel */
+	private final BufferSpiller bufferSpiller;
+
+	/** The pending blocked buffer/event sequences. Must be consumed before requesting
+	 * further data from the input gate. */
+	private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> queuedBuffered;
+
+	/** The sequence of buffers/events that has been unblocked and must now be consumed
+	 * before requesting further data from the input gate */
+	private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
 
 	/** Handler that receives the checkpoint notifications */
 	private EventListener<CheckpointBarrier> checkpointHandler;
@@ -69,17 +73,21 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	/** Flag to indicate whether we have drawn all available input */
 	private boolean endOfStream;
 
-	
+
+	/**
+	 * 
+	 * @param inputGate Teh input gate to draw the buffers and events from.
+	 * @param ioManager The I/O manager that gives access to the temp directories.
+	 * 
+	 * @throws IOException Thrown, when the spilling to temp files cannot be initialized.
+	 */
 	public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException {
 		this.inputGate = inputGate;
 		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
 		this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
 		
-		this.nonProcessed = new ArrayDeque<SpillingBufferOrEvent>();
-		this.blockedNonProcessed = new ArrayDeque<SpillingBufferOrEvent>();
-		
-		this.bufferSpiller = new BufferSpiller(ioManager);
-		this.spillReader = new SpillReader();
+		this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize());
+		this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
 	}
 
 	// ------------------------------------------------------------------------
@@ -90,15 +98,25 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
 		while (true) {
 			// process buffered BufferOrEvents before grabbing new ones
-			final SpillingBufferOrEvent nextBuffered = nonProcessed.pollFirst();
-			final BufferOrEvent next = nextBuffered == null ?
-					inputGate.getNextBufferOrEvent() :
-					nextBuffered.getBufferOrEvent();
+			BufferOrEvent next;
+			if (currentBuffered != null) {
+				next = currentBuffered.getNext();
+				if (next == null) {
+					currentBuffered = queuedBuffered.pollFirst();
+					if (currentBuffered != null) {
+						currentBuffered.open();
+					}
+					return getNextNonBlocked();
+				}
+			}
+			else {
+				next = inputGate.getNextBufferOrEvent();
+			}
 			
 			if (next != null) {
 				if (isBlocked(next.getChannelIndex())) {
 					// if the channel is blocked we, we just store the BufferOrEvent
-					blockedNonProcessed.add(new SpillingBufferOrEvent(next, bufferSpiller, spillReader));
+					bufferSpiller.add(next);
 				}
 				else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
 					return next;
@@ -181,25 +199,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	
 	@Override
 	public boolean isEmpty() {
-		return nonProcessed.isEmpty() && blockedNonProcessed.isEmpty();
+		return currentBuffered == null;
 	}
 
 	@Override
 	public void cleanup() throws IOException {
 		bufferSpiller.close();
-		File spillfile1 = bufferSpiller.getSpillFile();
-		if (spillfile1 != null) {
-			if (!spillfile1.delete()) {
-				LOG.warn("Cannot remove barrier buffer spill file: " + spillfile1.getAbsolutePath());
-			}
+		if (currentBuffered != null) {
+			currentBuffered.cleanup();
 		}
-
-		spillReader.close();
-		File spillfile2 = spillReader.getSpillFile();
-		if (spillfile2 != null) {
-			if (!spillfile2.delete()) {
-				LOG.warn("Cannot remove barrier buffer spill file: " + spillfile2.getAbsolutePath());
-			}
+		for (BufferSpiller.SpilledBufferOrEventSequence seq : queuedBuffered) {
+			seq.cleanup();
 		}
 	}
 	
@@ -233,7 +243,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	}
 
 	/**
-	 * Releases the blocks on all channels.
+	 * Releases the blocks on all channels. Makes sure the just written data
+	 * is the next to be consumed.
 	 */
 	private void releaseBlocks() throws IOException {
 		if (LOG.isDebugEnabled()) {
@@ -244,27 +255,36 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			blockedChannels[i] = false;
 		}
 		numReceivedBarriers = 0;
-		
-		if (nonProcessed.isEmpty()) {
-			// swap the queues
-			ArrayDeque<SpillingBufferOrEvent> empty = nonProcessed;
-			nonProcessed = blockedNonProcessed;
-			blockedNonProcessed = empty;
+
+		if (currentBuffered == null) {
+			// common case: no more buffered data
+			currentBuffered = bufferSpiller.rollOver();
+			if (currentBuffered != null) {
+				currentBuffered.open();
+			}
 		}
 		else {
-			throw new IllegalStateException("Unconsumed data from previous checkpoint alignment " +
-					"when starting next checkpoint alignment");
+			// uncommon case: buffered data pending
+			// push back the pending data
+			queuedBuffered.addFirst(currentBuffered);
+			
+			// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one 
+			currentBuffered = bufferSpiller.rollOverWithNewBuffer();
+			if (currentBuffered != null) {
+				currentBuffered.open();
+			}
 		}
-		
-		// roll over the spill files
-		spillReader.setSpillFile(bufferSpiller.getSpillFile());
-		bufferSpiller.resetSpillFile();
 	}
 
 	// ------------------------------------------------------------------------
 	// For Testing
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Gets the ID defining the current pending, or just completed, checkpoint.
+	 * 
+	 * @return The ID of the pending of completed checkpoint. 
+	 */
 	public long getCurrentCheckpointId() {
 		return this.currentCheckpointId;
 	}
@@ -275,6 +295,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	
 	@Override
 	public String toString() {
-		return "Non-Processed: " + nonProcessed + " | Blocked: " + blockedNonProcessed;
+		return String.format("last checkpoint: %d, current barriers: %d", currentCheckpointId, numReceivedBarriers);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 6b24556..a0b924f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -28,26 +28,39 @@ import java.util.ArrayDeque;
 
 /**
  * The BarrierTracker keeps track of what checkpoint barriers have been received from
- * which input channels. 
+ * which input channels. Once it has observed all checkpoint barriers for a checkpoint ID,
+ * it notifies its listener of a completed checkpoint.
  * 
  * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input
  * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing
  * guarantees. It can, however, be used to gain "at least once" processing guarantees.</p>
+ * 
+ * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.</p>
  */
 public class BarrierTracker implements CheckpointBarrierHandler {
 
+	/** The tracker tracks a maximum number of checkpoints, for which some, but not all
+	 * barriers have yet arrived. */
 	private static final int MAX_CHECKPOINTS_TO_TRACK = 50;
 	
+	/** The input gate, to draw the buffers and events from */
 	private final InputGate inputGate;
 	
+	/** The number of channels. Once that many barriers have been received for a checkpoint,
+	 * the checkpoint is considered complete. */
 	private final int totalNumberOfInputChannels;
-	
+
+	/** All checkpoints for which some (but not all) barriers have been received,
+	 * and that are not yet known to be subsumed by newer checkpoints */
 	private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
 	
+	/** The listener to be notified on complete checkpoints */
 	private EventListener<CheckpointBarrier> checkpointHandler;
 	
+	/** The highest checkpoint ID encountered so far */
 	private long latestPendingCheckpointID = -1;
 	
+	
 	public BarrierTracker(InputGate inputGate) {
 		this.inputGate = inputGate;
 		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
@@ -149,8 +162,6 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	}
 
 	// ------------------------------------------------------------------------
-	//  
-	// ------------------------------------------------------------------------
 
 	/**
 	 * Simple class for a checkpoint ID with a barrier counter.

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index fda612e..5f9a162 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -20,80 +20,389 @@ package org.apache.flink.streaming.runtime.io;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.util.StringUtils;
 
+/**
+ * The buffer spiller takes the buffers and events from a data stream and adds them to a spill file.
+ * After a number of elements have been spilled, the spiller can "roll over": It presents the spilled
+ * elements as a readable sequence, and opens a new spill file.
+ * 
+ * <p>This implementation buffers data effectively in the OS cache, which gracefully extends to the
+ * disk. Most data is written and re-read milliseconds later. The file is deleted after the read.
+ * Consequently, in most cases, the data will never actually hit the physical disks.</p>
+ * 
+ * <p>IMPORTANT: The SpilledBufferOrEventSequences created by this spiller all reuse the same
+ * reading memory (to reduce overhead) and can consequently not be read concurrently.</p>
+ */
 public class BufferSpiller {
-	
-	/** The random number generator for temp file names */
-	private static final Random RND = new Random();
 
 	/** The counter that selects the next directory to spill into */
 	private static final AtomicInteger DIRECTORY_INDEX = new AtomicInteger(0);
 	
+	/** The size of the buffer with which data is read back in */
+	private static final int READ_BUFFER_SIZE = 1024 * 1024;
 	
 	/** The directories to spill to */
 	private final File tempDir;
-
-	private File spillFile;
 	
-	private FileChannel spillingChannel;
+	/** The name prefix for spill files */
+	private final String spillFilePrefix;
+	
+	/** The buffer used for bulk reading data (used in the SpilledBufferOrEventSequence) */
+	private final ByteBuffer readBuffer;
 	
+	/** The buffer that encodes the spilled header */
+	private final ByteBuffer headBuffer;
 	
+	/** The reusable array that holds header and contents buffers */
+	private final ByteBuffer[] sources;
+	
+	/** The file that we currently spill to */
+	private File currentSpillFile;
+	
+	/** The channel of the file we currently spill to */
+	private FileChannel currentChannel;
 
-	public BufferSpiller(IOManager ioManager) throws IOException {
+	/** The page size, to let this reader instantiate properly sized memory segments */
+	private final int pageSize;
+	
+	/** A counter, to created numbered spill files */
+	private int fileCounter;
+	
+	/** A flag to check whether the spiller has written since the last roll over */
+	private boolean hasWritten;
+	
+	/**
+	 * Creates a new buffer spiller, spilling to one of the I/O manager's temp directories.
+	 * 
+	 * @param ioManager The I/O manager for access to teh temp directories.
+	 * @param pageSize The page size used to re-create spilled buffers.
+	 * @throws IOException Thrown if the temp files for spilling cannot be initialized.
+	 */
+	public BufferSpiller(IOManager ioManager, int pageSize) throws IOException {
+		this.pageSize = pageSize;
+		
+		this.readBuffer = ByteBuffer.allocateDirect(READ_BUFFER_SIZE);
+		this.readBuffer.order(ByteOrder.LITTLE_ENDIAN);
+		
+		this.headBuffer = ByteBuffer.allocateDirect(16);
+		this.headBuffer.order(ByteOrder.LITTLE_ENDIAN);
+		
+		this.sources = new ByteBuffer[] { this.headBuffer, null };
+		
 		File[] tempDirs = ioManager.getSpillingDirectories();
 		this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % tempDirs.length];
+		
+		byte[] rndBytes = new byte[32];
+		new Random().nextBytes(rndBytes);
+		this.spillFilePrefix = StringUtils.byteToHexString(rndBytes) + '.';
+		
+		// prepare for first contents
 		createSpillingChannel();
 	}
 
 	/**
-	 * Dumps the contents of the buffer to disk and recycles the buffer.
+	 * Adds a buffer or event to the sequence of spilled buffers and events.
+	 * 
+	 * @param boe The buffer or event to add and spill.
+	 * @throws IOException Thrown, if the buffer of event could not be spilled.
 	 */
-	public void spill(Buffer buffer) throws IOException {
+	public void add(BufferOrEvent boe) throws IOException {
+		hasWritten = true;
 		try {
-			spillingChannel.write(buffer.getNioBuffer());
-			buffer.recycle();
+			ByteBuffer contents;
+			if (boe.isBuffer()) {
+				Buffer buf = boe.getBuffer();
+				contents = buf.getMemorySegment().wrap(0, buf.getSize());
+			}
+			else {
+				contents = EventSerializer.toSerializedEvent(boe.getEvent());
+			}
+			
+			headBuffer.clear();
+			headBuffer.putInt(boe.getChannelIndex());
+			headBuffer.putInt(contents.remaining());
+			headBuffer.put((byte) (boe.isBuffer() ? 0 : 1));
+			headBuffer.flip();
+			
+			sources[1] = contents;
+			currentChannel.write(sources);
 		}
-		catch (IOException e) {
-			close();
-			throw e;
+		finally {
+			if (boe.isBuffer()) {
+				boe.getBuffer().recycle();
+			}
 		}
 	}
 
-	@SuppressWarnings("resource")
-	private void createSpillingChannel() throws IOException {
-		this.spillFile = new File(tempDir, randomString(RND) + ".buffer");
-		this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
+	/**
+	 * Starts a new sequence of spilled buffers and event and returns the current sequence of spilled buffers
+	 * for reading. This method returns {@code null}, if nothing was added since the creation of the spiller, or the
+	 * last call to this method.
+	 * 
+	 * <p>NOTE: The SpilledBufferOrEventSequences created by this method all reuse the same
+	 * reading memory (to reduce overhead) and can consequently not be read concurrently with each other.
+	 * To create a sequence that can be read concurrently with the previous SpilledBufferOrEventSequence, use the
+	 * {@link #rollOverWithNewBuffer()} method.</p>
+	 * 
+	 * @return The readable sequence of spilled buffers and events, or 'null', if nothing was added.
+	 * @throws IOException Thrown, if the readable sequence could not be created, or no new spill
+	 *                     file could be created.
+	 */
+	public SpilledBufferOrEventSequence rollOver() throws IOException {
+		return rollOverInternal(false);
 	}
 
+	/**
+	 * Starts a new sequence of spilled buffers and event and returns the current sequence of spilled buffers
+	 * for reading. This method returns {@code null}, if nothing was added since the creation of the spiller, or the
+	 * last call to this method.
+	 * 
+	 * <p>The SpilledBufferOrEventSequence returned by this method is safe for concurrent consumption with
+	 * any previously returned sequence.</p>
+	 *
+	 * @return The readable sequence of spilled buffers and events, or 'null', if nothing was added.
+	 * @throws IOException Thrown, if the readable sequence could not be created, or no new spill
+	 *                     file could be created.
+	 */
+	public SpilledBufferOrEventSequence rollOverWithNewBuffer() throws IOException {
+		return rollOverInternal(true);
+	}
+	
+	private SpilledBufferOrEventSequence rollOverInternal(boolean newBuffer) throws IOException {
+		if (!hasWritten) {
+			return null;
+		}
+		
+		ByteBuffer buf;
+		if (newBuffer) {
+			buf = ByteBuffer.allocateDirect(READ_BUFFER_SIZE);
+			buf.order(ByteOrder.LITTLE_ENDIAN);
+		} else {
+			buf = readBuffer;
+		}
+		
+		// create a reader for the spilled data
+		currentChannel.position(0L);
+		SpilledBufferOrEventSequence seq = 
+				new SpilledBufferOrEventSequence(currentSpillFile, currentChannel, buf, pageSize);
+		
+		// create ourselves a new spill file
+		createSpillingChannel();
+		
+		hasWritten = false;
+		return seq;
+	}
 
-
+	/**
+	 * Cleans up the current spilling channel and file.
+	 * 
+	 * Does not clean up the SpilledBufferOrEventSequences generated by calls to 
+	 * {@link #rollOver()}.
+	 * 
+	 * @throws IOException Thrown if channel closing or file deletion fail.
+	 */
 	public void close() throws IOException {
-		if (spillingChannel != null && spillingChannel.isOpen()) {
-			spillingChannel.close();
+		currentChannel.close();
+		if (!currentSpillFile.delete()) {
+			throw new IOException("Cannot delete spill file");
 		}
 	}
 
-	public void resetSpillFile() throws IOException {
-		close();
-		createSpillingChannel();
-	}
+	// ------------------------------------------------------------------------
+	//  For testing
+	// ------------------------------------------------------------------------
 
-	public File getSpillFile() {
-		return spillFile;
+	File getCurrentSpillFile() {
+		return currentSpillFile;
+	}
+	
+	FileChannel getCurrentChannel() {
+		return currentChannel;
 	}
 	
 	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
+	@SuppressWarnings("resource")
+	private void createSpillingChannel() throws IOException {
+		currentSpillFile = new File(tempDir, spillFilePrefix + (fileCounter++) +".buffer");
+		currentChannel = new RandomAccessFile(currentSpillFile, "rw").getChannel();
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This class represents a sequence of spilled buffers and events, created by the
+	 * {@link BufferSpiller}. The sequence of buffers and events can be read back using the
+	 * method {@link #getNext()}.
+	 */
+	public static class SpilledBufferOrEventSequence {
+		
+		/** Header is "channel index" (4 bytes) + length (4 bytes) + buffer/event (1 byte) */
+		private static final int HEADER_LENGTH = 9;
+
+		/** The file containing the data */
+		private final File file;
+		
+		/** The file channel to draw the data from */
+		private final FileChannel fileChannel;
+		
+		/** The byte buffer for bulk reading */
+		private final ByteBuffer buffer;
 
-	private static String randomString(Random random) {
-		final byte[] bytes = new byte[20];
-		random.nextBytes(bytes);
-		return StringUtils.byteToHexString(bytes);
+		/** The page size to instantiate properly sized memory segments */
+		private final int pageSize;
+
+		/** Flag to track whether the sequence has been opened already */
+		private boolean opened = false;
+
+		/**
+		 * Create a reader that reads a sequence of spilled buffers and events.
+		 * 
+		 * @param file The file with the data.
+		 * @param fileChannel The file channel to read the data from.
+		 * @param buffer The buffer used for bulk reading.
+		 * @param pageSize The page size to use for the created memory segments.
+		 */
+		SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize) {
+			this.file = file;
+			this.fileChannel = fileChannel;
+			this.buffer = buffer;
+			this.pageSize = pageSize;
+		}
+
+		/**
+		 * Initializes the sequence for reading.
+		 * This method needs to be called before the first call to {@link #getNext()}. Otherwise
+		 * the results of {@link #getNext()} are not predictable.
+		 */
+		public void open() {
+			if (!opened) {
+				opened = true;
+				buffer.position(0);
+				buffer.limit(0);
+			}
+		}
+
+		/**
+		 * Gets the next BufferOrEvent from the spilled sequence, or {@code null}, if the
+		 * sequence is exhausted.
+		 *         
+		 * @return The next BufferOrEvent from the spilled sequence, or {@code null} (end of sequence).
+		 * @throws IOException Thrown, if the reads failed, of if the byte stream is corrupt.
+		 */
+		public BufferOrEvent getNext() throws IOException {
+			if (buffer.remaining() < HEADER_LENGTH) {
+				buffer.compact();
+				
+				while (buffer.position() < HEADER_LENGTH) {
+					if (fileChannel.read(buffer) == -1) {
+						if (buffer.position() == 0) {
+							// no trailing data
+							return null;
+						} else {
+							throw new IOException("Found trailing incomplete buffer or event");
+						}
+					}
+				}
+				
+				buffer.flip();
+			}
+			
+			final int channel = buffer.getInt();
+			final int length = buffer.getInt();
+			final boolean isBuffer = buffer.get() == 0;
+			
+			
+			if (isBuffer) {
+				// deserialize buffer
+				if (length > pageSize) {
+					throw new IOException(String.format(
+							"Spilled buffer (%d bytes) is larger than page size of (%d bytes)", length, pageSize));
+				}
+
+				MemorySegment seg = new MemorySegment(new byte[pageSize]);
+				
+				int segPos = 0;
+				int bytesRemaining = length;
+				
+				while (true) {
+					int toCopy = Math.min(buffer.remaining(), bytesRemaining);
+					if (toCopy > 0) {
+						seg.put(segPos, buffer, toCopy);
+						segPos += toCopy;
+						bytesRemaining -= toCopy;
+					}
+					
+					if (bytesRemaining == 0) {
+						break;
+					}
+					else {
+						buffer.clear();
+						if (fileChannel.read(buffer) == -1) {
+							throw new IOException("Found trailing incomplete buffer");
+						}
+						buffer.flip();
+					}
+				}
+				
+				
+				Buffer buf = new Buffer(seg, FreeingBufferRecycler.INSTANCE);
+				buf.setSize(length);
+				
+				return new BufferOrEvent(buf, channel);
+			}
+			else {
+				// deserialize event
+				if (length > buffer.capacity() - HEADER_LENGTH) {
+					throw new IOException("Event is too large");
+				}
+
+				if (buffer.remaining() < length) {
+					buffer.compact();
+
+					while (buffer.position() < length) {
+						if (fileChannel.read(buffer) == -1) {
+							throw new IOException("Found trailing incomplete event");
+						}
+					}
+
+					buffer.flip();
+				}
+
+				int oldLimit = buffer.limit();
+				buffer.limit(buffer.position() + length);
+				AbstractEvent evt = EventSerializer.fromSerializedEvent(buffer, getClass().getClassLoader());
+				buffer.limit(oldLimit);
+				
+				return new BufferOrEvent(evt, channel);
+			}
+		}
+
+		/**
+		 * Cleans up all file resources held by this spilled sequence.
+		 * 
+		 * @throws IOException Thrown, if file channel closing or file deletion fail. 
+		 */
+		public void cleanup() throws IOException {
+			fileChannel.close();
+			if (!file.delete()) {
+				throw new IOException("Cannot remove temp file for stream alignment writer");
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java
new file mode 100644
index 0000000..27e37a5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
+/**
+ * A simple buffer recycler that only frees the memory segments.
+ */
+public class FreeingBufferRecycler implements BufferRecycler {
+	
+	public static final BufferRecycler INSTANCE = new FreeingBufferRecycler();
+	
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public void recycle(MemorySegment memorySegment) {
+		memorySegment.free();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java
deleted file mode 100644
index 356b491..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-
-public class SpillReader {
-
-	private FileChannel spillingChannel;
-	private File spillFile;
-
-	/**
-	 * Reads the next buffer from the spilled file.
-	 */
-	public Buffer readNextBuffer(int bufferSize) throws IOException {
-		try {
-			Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]),
-					new BufferRecycler() {
-
-						@Override
-						public void recycle(MemorySegment memorySegment) {
-							memorySegment.free();
-						}
-					});
-
-			spillingChannel.read(buffer.getMemorySegment().wrap(0, bufferSize));
-
-			return buffer;
-		} catch (Exception e) {
-			close();
-			throw new IOException(e);
-		}
-	}
-
-	@SuppressWarnings("resource")
-	public void setSpillFile(File nextSpillFile) throws IOException {
-		// We can close and delete the file now
-		close();
-		if (spillFile != null) {
-			spillFile.delete();
-		}
-		this.spillFile = nextSpillFile;
-		this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
-	}
-
-	public File getSpillFile() {
-		return spillFile;
-	}
-
-	public void close() throws IOException {
-		if (this.spillingChannel != null && this.spillingChannel.isOpen()) {
-			this.spillingChannel.close();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java
deleted file mode 100644
index 368e373..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-
-public class SpillingBufferOrEvent {
-
-	private BufferOrEvent boe;
-	private boolean isSpilled = false;
-
-	private SpillReader spillReader;
-
-	private int channelIndex;
-	private int bufferSize;
-
-	public SpillingBufferOrEvent(BufferOrEvent boe, BufferSpiller spiller, SpillReader reader)
-			throws IOException {
-
-		this.boe = boe;
-		this.channelIndex = boe.getChannelIndex();
-		this.spillReader = reader;
-
-		if (boe.isBuffer()) {
-			this.bufferSize = boe.getBuffer().getSize();
-			spiller.spill(boe.getBuffer());
-			this.boe = null;
-			this.isSpilled = true;
-		}
-	}
-
-	/**
-	 * If the buffer wasn't spilled simply returns the instance from the field,
-	 * otherwise reads it from the spill reader
-	 */
-	public BufferOrEvent getBufferOrEvent() throws IOException {
-		if (isSpilled) {
-			boe = new BufferOrEvent(spillReader.readNextBuffer(bufferSize), channelIndex);
-			this.isSpilled = false;
-			return boe;
-		} else {
-			return boe;
-		}
-	}
-
-	public boolean isSpilled() {
-		return isSpilled;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index b59ad19..4007da8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -38,6 +38,7 @@ import org.mockito.stubbing.Answer;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -75,6 +76,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 		inputQueues = new ConcurrentLinkedQueue[numInputChannels];
 
 		setupInputChannels();
+		doReturn(bufferSize).when(inputGate).getPageSize();
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index c2df4d8..7350516 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -42,14 +42,16 @@ import org.junit.Test;
  */
 public class BarrierBufferMassiveRandomTest {
 
+	private static final int PAGE_SIZE = 1024;
+	
 	@Test
 	public void testWithTwoChannelsAndRandomBarriers() {
 		IOManager ioMan = null;
 		try {
 			ioMan = new IOManagerAsync();
 			
-			BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
-			BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+			BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE).createBufferPool(100, true);
+			BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE).createBufferPool(100, true);
 
 			RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
 					new BufferPool[] { pool1, pool2 },
@@ -163,5 +165,10 @@ public class BarrierBufferMassiveRandomTest {
 
 		@Override
 		public void registerListener(EventListener<InputGate> listener) {}
+
+		@Override
+		public int getPageSize() {
+			return PAGE_SIZE;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index ad61c6f..b8b3a8c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -49,6 +49,8 @@ import static org.junit.Assert.fail;
  */
 public class BarrierBufferTest {
 
+	private static final int PAGE_SIZE = 512;
+	
 	private static int SIZE_COUNTER = 0;
 	
 	private static IOManager IO_MANAGER;
@@ -89,6 +91,8 @@ public class BarrierBufferTest {
 			
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+			
+			buffer.cleanup();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -118,6 +122,8 @@ public class BarrierBufferTest {
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+
+			buffer.cleanup();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -157,6 +163,8 @@ public class BarrierBufferTest {
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+
+			buffer.cleanup();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -272,6 +280,8 @@ public class BarrierBufferTest {
 			
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+
+			buffer.cleanup();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -327,6 +337,8 @@ public class BarrierBufferTest {
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+			buffer.cleanup();
+			
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -441,6 +453,8 @@ public class BarrierBufferTest {
 			
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+
+			buffer.cleanup();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -516,6 +530,95 @@ public class BarrierBufferTest {
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+			
+			buffer.cleanup();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Validates that the buffer skips over the current checkpoint if it
+	 * receives a barrier from a later checkpoint on a non-blocked input.
+	 */
+	@Test
+	public void testMultiChannelJumpingOverCheckpoint() {
+		try {
+			BufferOrEvent[] sequence = {
+					// checkpoint 1 - with blocked data
+					createBuffer(0), createBuffer(2), createBuffer(0),
+					createBarrier(1, 1), createBarrier(1, 2),
+					createBuffer(2), createBuffer(1), createBuffer(0),
+					createBarrier(1, 0),
+					createBuffer(1), createBuffer(0),
+
+					// checkpoint 2 will not complete: pre-mature barrier from checkpoint 3
+					createBarrier(2, 1),
+					createBuffer(1), createBuffer(2),
+					createBarrier(2, 0),
+					createBuffer(2), createBuffer(0),
+					createBarrier(3, 1),
+					createBuffer(1), createBuffer(2),
+					createBarrier(3, 0),
+					createBuffer(2), createBuffer(0),
+					createBarrier(4, 2),
+
+					createBuffer(2),
+					createBuffer(1), createEndOfPartition(1),
+					createBuffer(2), createEndOfPartition(2),
+					createBuffer(0), createEndOfPartition(0)
+			};
+
+			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+			buffer.registerCheckpointEventHandler(handler);
+			handler.setNextExpectedCheckpointId(1L);
+
+			// checkpoint 1
+			check(sequence[0], buffer.getNextNonBlocked());
+			check(sequence[1], buffer.getNextNonBlocked());
+			check(sequence[2], buffer.getNextNonBlocked());
+			check(sequence[7], buffer.getNextNonBlocked());
+			assertEquals(1L, buffer.getCurrentCheckpointId());
+
+			check(sequence[5], buffer.getNextNonBlocked());
+			check(sequence[6], buffer.getNextNonBlocked());
+			check(sequence[9], buffer.getNextNonBlocked());
+			check(sequence[10], buffer.getNextNonBlocked());
+
+			// alignment of checkpoint 2
+			check(sequence[13], buffer.getNextNonBlocked());
+			assertEquals(2L, buffer.getCurrentCheckpointId());
+			check(sequence[15], buffer.getNextNonBlocked());
+			check(sequence[19], buffer.getNextNonBlocked());
+			check(sequence[21], buffer.getNextNonBlocked());
+
+			// checkpoint 2 aborted, checkpoint 4 started. replay buffered
+			check(sequence[12], buffer.getNextNonBlocked());
+			assertEquals(4L, buffer.getCurrentCheckpointId());
+			check(sequence[16], buffer.getNextNonBlocked());
+			check(sequence[18], buffer.getNextNonBlocked());
+			check(sequence[22], buffer.getNextNonBlocked());
+			
+			// align remainder
+			check(sequence[25], buffer.getNextNonBlocked());
+			check(sequence[26], buffer.getNextNonBlocked());
+			check(sequence[29], buffer.getNextNonBlocked());
+			check(sequence[30], buffer.getNextNonBlocked());
+			
+			// end of input, emit remainder
+			check(sequence[24], buffer.getNextNonBlocked());
+			check(sequence[27], buffer.getNextNonBlocked());
+			check(sequence[28], buffer.getNextNonBlocked());
+
+			assertNull(buffer.getNextNonBlocked());
+			assertNull(buffer.getNextNonBlocked());
+
+			buffer.cleanup();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -526,15 +629,8 @@ public class BarrierBufferTest {
 	/**
 	 * Validates that the buffer skips over a later checkpoint if it
 	 * receives a barrier from an even later checkpoint on a blocked input.
-	 * 
-	 * NOTE: This test currently fails, because the barrier buffer does not support
-	 * to unblock inputs before all previously unblocked data is consumed.
-	 * 
-	 * Since this test checks only that the buffer behaves "failsafe" in cases of
-	 * corrupt checkpoint barrier propagation (a situation that does not occur
-	 * under the current model), we ignore it for the moment.
 	 */
-//	@Test
+	@Test
 	public void testMultiChannelSkippingCheckpointsViaBlockedInputs() {
 		try {
 			BufferOrEvent[] sequence = {
@@ -551,18 +647,23 @@ public class BarrierBufferTest {
 					createBarrier(2, 0),
 					createBuffer(1), createBuffer(0),
 
+					createBarrier(3, 0), // queued barrier on blocked input
+					createBuffer(0),
+					
 					createBarrier(4, 1), // pre-mature barrier on blocked input
-					createBarrier(3, 0), // queued barrier, ignored on replay
+					createBuffer(1),
+					createBuffer(0),
+					createBuffer(2),
 
 					// complete checkpoint 2
-					createBarrier(2, 0),
+					createBarrier(2, 2),
 					createBuffer(0),
 					
 					createBarrier(3, 2), // should be ignored
 					createBuffer(2),
 					createBarrier(4, 0),
 					createBuffer(0), createBuffer(1), createBuffer(2),
-					createBarrier(4, 1),
+					createBarrier(4, 2),
 					
 					createBuffer(1), createEndOfPartition(1),
 					createBuffer(2), createEndOfPartition(2),
@@ -585,6 +686,7 @@ public class BarrierBufferTest {
 
 			// alignment of checkpoint 2
 			check(sequence[13], buffer.getNextNonBlocked());
+			check(sequence[22], buffer.getNextNonBlocked());
 			assertEquals(2L, buffer.getCurrentCheckpointId());
 
 			// checkpoint 2 completed
@@ -593,24 +695,79 @@ public class BarrierBufferTest {
 			check(sequence[16], buffer.getNextNonBlocked());
 			
 			// checkpoint 3 skipped, alignment for 4 started
-			check(sequence[20], buffer.getNextNonBlocked());
+			check(sequence[18], buffer.getNextNonBlocked());
 			assertEquals(4L, buffer.getCurrentCheckpointId());
-			check(sequence[22], buffer.getNextNonBlocked());
+			check(sequence[21], buffer.getNextNonBlocked());
+			check(sequence[24], buffer.getNextNonBlocked());
 			check(sequence[26], buffer.getNextNonBlocked());
-
+			check(sequence[30], buffer.getNextNonBlocked());
+			
 			// checkpoint 4 completed
-			check(sequence[24], buffer.getNextNonBlocked());
-			check(sequence[25], buffer.getNextNonBlocked());
-
+			check(sequence[20], buffer.getNextNonBlocked());
 			check(sequence[28], buffer.getNextNonBlocked());
 			check(sequence[29], buffer.getNextNonBlocked());
-			check(sequence[30], buffer.getNextNonBlocked());
-			check(sequence[31], buffer.getNextNonBlocked());
+			
 			check(sequence[32], buffer.getNextNonBlocked());
 			check(sequence[33], buffer.getNextNonBlocked());
+			check(sequence[34], buffer.getNextNonBlocked());
+			check(sequence[35], buffer.getNextNonBlocked());
+			check(sequence[36], buffer.getNextNonBlocked());
+			check(sequence[37], buffer.getNextNonBlocked());
 			
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+
+			buffer.cleanup();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testEarlyCleanup() {
+		try {
+			BufferOrEvent[] sequence = {
+					createBuffer(0), createBuffer(1), createBuffer(2),
+					createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0),
+
+					createBuffer(2), createBuffer(1), createBuffer(0),
+					createBarrier(2, 1),
+					createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2),
+					createBarrier(2, 2),
+					createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)
+			};
+
+			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+			buffer.registerCheckpointEventHandler(handler);
+			handler.setNextExpectedCheckpointId(1L);
+
+			// pre-checkpoint 1
+			check(sequence[0], buffer.getNextNonBlocked());
+			check(sequence[1], buffer.getNextNonBlocked());
+			check(sequence[2], buffer.getNextNonBlocked());
+			assertEquals(1L, handler.getNextExpectedCheckpointId());
+
+			// pre-checkpoint 2
+			check(sequence[6], buffer.getNextNonBlocked());
+			assertEquals(2L, handler.getNextExpectedCheckpointId());
+			check(sequence[7], buffer.getNextNonBlocked());
+			check(sequence[8], buffer.getNextNonBlocked());
+
+			// checkpoint 2 alignment
+			check(sequence[13], buffer.getNextNonBlocked());
+			check(sequence[14], buffer.getNextNonBlocked());
+			check(sequence[18], buffer.getNextNonBlocked());
+			check(sequence[19], buffer.getNextNonBlocked());
+
+			// end of stream: remaining buffered contents
+			buffer.getNextNonBlocked();
+			buffer.cleanup();
+
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -629,8 +786,9 @@ public class BarrierBufferTest {
 	private static BufferOrEvent createBuffer(int channel) {
 		// since we have no access to the contents, we need to use the size as an
 		// identifier to validate correctness here
-		return new BufferOrEvent(
-				new Buffer(new MemorySegment(new byte[SIZE_COUNTER++]),  DummyBufferRecycler.INSTANCE), channel);
+		Buffer buf = new Buffer(new MemorySegment(new byte[PAGE_SIZE]), FreeingBufferRecycler.INSTANCE);
+		buf.setSize(SIZE_COUNTER++);
+		return new BufferOrEvent(buf, channel);
 	}
 
 	private static BufferOrEvent createEndOfPartition(int channel) {
@@ -689,6 +847,11 @@ public class BarrierBufferTest {
 
 		@Override
 		public void registerListener(EventListener<InputGate> listener) {}
+
+		@Override
+		public int getPageSize() {
+			return PAGE_SIZE;
+		}
 	}
 
 	private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier> {

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index b2c570e..532078c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -39,7 +39,7 @@ import static org.junit.Assert.*;
  * Tests for the behavior of the barrier tracker.
  */
 public class BarrierTrackerTest {
-
+	
 	@Test
 	public void testSingleChannelNoBarriers() {
 		try {
@@ -341,7 +341,7 @@ public class BarrierTrackerTest {
 
 	private static BufferOrEvent createBuffer(int channel) {
 		return new BufferOrEvent(
-				new Buffer(new MemorySegment(new byte[] { 1 }),  DummyBufferRecycler.INSTANCE), channel);
+				new Buffer(new MemorySegment(new byte[] { 1, 2 }), FreeingBufferRecycler.INSTANCE), channel);
 	}
 	
 	// ------------------------------------------------------------------------
@@ -381,6 +381,11 @@ public class BarrierTrackerTest {
 
 		@Override
 		public void registerListener(EventListener<InputGate> listener) {}
+
+		@Override
+		public int getPageSize() {
+			return 2;
+		}
 	}
 
 	private static class CheckpointSequenceValidator implements EventListener<CheckpointBarrier> {

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
new file mode 100644
index 0000000..ae384e1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
+public class BufferSpillerTest {
+
+	private static final int PAGE_SIZE = 4096;
+
+	private static IOManager IO_MANAGER;
+
+	private BufferSpiller spiller;
+
+
+	// ------------------------------------------------------------------------
+	//  Setup / Cleanup
+	// ------------------------------------------------------------------------
+	
+	@BeforeClass
+	public static void setupIOManager() {
+		IO_MANAGER = new IOManagerAsync();
+	}
+
+	@AfterClass
+	public static void shutdownIOManager() {
+		IO_MANAGER.shutdown();
+	}
+	
+	@Before
+	public void createSpiller() {
+		try {
+			spiller = new BufferSpiller(IO_MANAGER, PAGE_SIZE);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Cannot create BufferSpiller: " + e.getMessage());
+		}
+	}
+	
+	@After
+	public void cleanupSpiller() {
+		if (spiller != null) {
+			try {
+				spiller.close();
+			}
+			catch (Exception e) {
+				e.printStackTrace();
+				fail("Cannot properly close the BufferSpiller: " + e.getMessage());
+			}
+			
+			assertFalse(spiller.getCurrentChannel().isOpen());
+			assertFalse(spiller.getCurrentSpillFile().exists());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void testRollOverEmptySequences() {
+		try {
+			assertNull(spiller.rollOver());
+			assertNull(spiller.rollOver());
+			assertNull(spiller.rollOver());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSpillAndRollOverSimple() {
+		try {
+			final Random rnd = new Random();
+			final Random bufferRnd = new Random();
+
+			final int maxNumEventsAndBuffers = 3000;
+			final int maxNumChannels = 1656;
+
+			// do multiple spilling / rolling over rounds
+			for (int round = 0; round < 5; round++) {
+				
+				final long bufferSeed = rnd.nextLong();
+				bufferRnd.setSeed(bufferSeed);
+				
+				final int numEventsAndBuffers = rnd.nextInt(maxNumEventsAndBuffers) + 1;
+				final int numChannels = rnd.nextInt(maxNumChannels) + 1;
+				
+				final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
+
+				// generate sequence
+				for (int i = 0; i < numEventsAndBuffers; i++) {
+					boolean isEvent = rnd.nextDouble() < 0.05d;
+					if (isEvent) {
+						BufferOrEvent evt = generateRandomEvent(rnd, numChannels);
+						events.add(evt);
+						spiller.add(evt);
+					}
+					else {
+						BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
+						spiller.add(evt);
+					}
+				}
+
+				// reset and create reader
+				bufferRnd.setSeed(bufferSeed);
+			
+				BufferSpiller.SpilledBufferOrEventSequence seq = spiller.rollOver();
+				seq.open();
+
+				// read and validate the sequence
+
+				int numEvent = 0;
+				for (int i = 0; i < numEventsAndBuffers; i++) {
+					BufferOrEvent next = seq.getNext();
+					assertNotNull(next);
+					if (next.isEvent()) {
+						BufferOrEvent expected = events.get(numEvent++);
+						assertEquals(expected.getEvent(), next.getEvent());
+						assertEquals(expected.getChannelIndex(), next.getChannelIndex());
+					}
+					else {
+						validateBuffer(next, bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
+					}
+				}
+
+				// no further data
+				assertNull(seq.getNext());
+
+				// all events need to be consumed
+				assertEquals(events.size(), numEvent);
+				
+				seq.cleanup();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSpillWhileReading() {
+		try {
+			final int sequences = 10;
+			
+			final Random rnd = new Random();
+			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+			
+			final SequenceConsumer consumer = new SequenceConsumer(error, sequences);
+			consumer.start();
+			
+			final int maxNumEventsAndBuffers = 30000;
+			final int maxNumChannels = 1656;
+			
+			// do multiple spilling / rolling over rounds
+			for (int round = 0; round < 2*sequences; round++) {
+
+				if (round % 2 == 1) {
+					// make this an empty sequence
+					assertNull(spiller.rollOver());
+				}
+				else {
+					// proper spilled sequence
+					final long bufferSeed = rnd.nextLong();
+					final Random bufferRnd = new Random(bufferSeed);
+					
+					final int numEventsAndBuffers = rnd.nextInt(maxNumEventsAndBuffers) + 1;
+					final int numChannels = rnd.nextInt(maxNumChannels) + 1;
+	
+					final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
+	
+					// generate sequence
+					for (int i = 0; i < numEventsAndBuffers; i++) {
+						boolean isEvent = rnd.nextDouble() < 0.05d;
+						if (isEvent) {
+							BufferOrEvent evt = generateRandomEvent(rnd, numChannels);
+							events.add(evt);
+							spiller.add(evt);
+						}
+						else {
+							BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
+							spiller.add(evt);
+						}
+					}
+	
+					// reset and create reader
+					bufferRnd.setSeed(bufferSeed);
+					BufferSpiller.SpilledBufferOrEventSequence seq = spiller.rollOver();
+					
+					SequenceToConsume stc = new SequenceToConsume(bufferRnd, events, seq, numEventsAndBuffers, numChannels);
+					consumer.queue(stc);
+				}
+			}
+			
+			// wait for the consumer
+			consumer.join(180000);
+			assertFalse("sequence consumer did not finish its work in time", consumer.isAlive());
+			
+			// validate there was no error in the consumer
+			if (error.get() != null) {
+				Throwable t = error.get();
+				if (t instanceof Error) {
+					throw (Error) t;
+				}
+				else {
+					throw new Exception("Error while consuming the spilled records", t);
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utils
+	// ------------------------------------------------------------------------
+	
+	private static BufferOrEvent generateRandomEvent(Random rnd, int numChannels) {
+		long magicNumber = rnd.nextLong();
+		byte[] data = new byte[rnd.nextInt(1000)];
+		rnd.nextBytes(data);
+		TestEvent evt = new TestEvent(magicNumber, data);
+
+		int channelIndex = rnd.nextInt(numChannels);
+		
+		return new BufferOrEvent(evt, channelIndex);
+	}
+
+	private static BufferOrEvent generateRandomBuffer(int size, int channelIndex) {
+		MemorySegment seg = new MemorySegment(new byte[PAGE_SIZE]);
+		for (int i = 0; i < size; i++) {
+			seg.put(i, (byte) i);
+		}
+		
+		Buffer buf = new Buffer(seg, FreeingBufferRecycler.INSTANCE);
+		buf.setSize(size);
+		return new BufferOrEvent(buf, channelIndex);
+	}
+
+	private static void validateBuffer(BufferOrEvent boe, int expectedSize, int expectedChannelIndex) {
+		assertEquals("wrong channel index", expectedChannelIndex, boe.getChannelIndex());
+		assertTrue("is not buffer", boe.isBuffer());
+
+		Buffer buf = boe.getBuffer();
+		assertEquals("wrong buffer size", expectedSize, buf.getSize());
+
+		MemorySegment seg = buf.getMemorySegment();
+		for (int i = 0; i < expectedSize; i++) {
+			assertEquals("wrong buffer contents", (byte) i, seg.get(i));
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Async Consumer
+	// ------------------------------------------------------------------------
+	
+	private static class SequenceToConsume {
+
+		final BufferSpiller.SpilledBufferOrEventSequence sequence;
+		final ArrayList<BufferOrEvent> events;
+		final Random bufferRnd;
+		final int numBuffersAndEvents;
+		final int numChannels;
+
+		private SequenceToConsume(Random bufferRnd, ArrayList<BufferOrEvent> events,
+									BufferSpiller.SpilledBufferOrEventSequence sequence,
+									int numBuffersAndEvents, int numChannels) {
+			this.bufferRnd = bufferRnd;
+			this.events = events;
+			this.sequence = sequence;
+			this.numBuffersAndEvents = numBuffersAndEvents;
+			this.numChannels = numChannels;
+		}
+	}
+	
+	private static class SequenceConsumer extends Thread {
+		
+		private final AtomicReference<Throwable> error;
+		private final BlockingQueue<SequenceToConsume> sequences;
+		
+		private final int numSequencesToConsume;
+		
+		private int consumedSequences;
+
+		private SequenceConsumer(AtomicReference<Throwable> error, int numSequencesToConsume) {
+			super("Sequence Consumer");
+			setDaemon(true);
+			
+			this.error = error;
+			this.numSequencesToConsume = numSequencesToConsume;
+			this.sequences = new LinkedBlockingQueue<SequenceToConsume>();
+		}
+
+
+		@Override
+		public void run() {
+			try {
+				while (consumedSequences < numSequencesToConsume) {
+					// get next sequence
+					SequenceToConsume nextSequence = sequences.take();
+				
+					// wait a bit, allow some stuff to queue up
+					Thread.sleep(50);
+
+					BufferSpiller.SpilledBufferOrEventSequence seq = nextSequence.sequence;
+					ArrayList<BufferOrEvent> events = nextSequence.events;
+					Random bufferRnd = nextSequence.bufferRnd;
+					int numBuffersAndEvents = nextSequence.numBuffersAndEvents;
+					int numChannels = nextSequence.numChannels;
+
+					// consume sequence
+					seq.open();
+					
+					int numEvent = 0;
+					for (int i = 0; i < numBuffersAndEvents; i++) {
+						BufferOrEvent next = seq.getNext();
+						assertNotNull(next);
+						if (next.isEvent()) {
+							BufferOrEvent expected = events.get(numEvent++);
+							assertEquals(expected.getEvent(), next.getEvent());
+							assertEquals(expected.getChannelIndex(), next.getChannelIndex());
+						}
+						else {
+							validateBuffer(next, bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
+						}
+					}
+	
+					// no further data
+					assertNull(seq.getNext());
+	
+					// all events need to be consumed
+					assertEquals(events.size(), numEvent);
+	
+					// remove all temp files
+					seq.cleanup();
+					
+					consumedSequences++;
+				}
+				
+			}
+			catch (Throwable t) {
+				error.set(t);
+			}
+		}
+		
+		public void queue(SequenceToConsume next) {
+			sequences.add(next);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
deleted file mode 100644
index 3f815ef..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-
-/**
- * A BufferRecycler that does nothing.
- */
-public class DummyBufferRecycler implements BufferRecycler {
-	
-	public static final BufferRecycler INSTANCE = new DummyBufferRecycler();
-	
-	
-	@Override
-	public void recycle(MemorySegment memorySegment) {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
new file mode 100644
index 0000000..991b033
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.streaming.runtime.io.BufferSpiller.SpilledBufferOrEventSequence;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests that validate the behavior of the {@link SpilledBufferOrEventSequence} in isolation,
+ * with respect to detecting corrupt sequences, trailing data, and interleaved buffers and events.
+ */
+public class SpilledBufferOrEventSequenceTest {
+	
+	private final ByteBuffer buffer = ByteBuffer.allocateDirect(128 * 1024).order(ByteOrder.LITTLE_ENDIAN);
+	private final int pageSize = 32*1024;
+	
+	private File tempFile;
+	private FileChannel fileChannel;
+	
+	
+	@Before
+	public void initTempChannel() {
+		try {
+			tempFile = File.createTempFile("testdata", "tmp");
+			fileChannel = new RandomAccessFile(tempFile, "rw").getChannel();
+		}
+		catch (Exception e) {
+			cleanup();
+		}
+	}
+	
+	@After
+	public void cleanup() {
+		if (fileChannel != null) {
+			try {
+				fileChannel.close();
+			}
+			catch (IOException e) {
+				// ignore
+			}
+		}
+		if (tempFile != null) {
+			//noinspection ResultOfMethodCallIgnored
+			tempFile.delete();
+		}
+	}
+	
+	
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void testEmptyChannel() {
+		try {
+			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			seq.open();
+			
+			assertNull(seq.getNext());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testIncompleteHeaderOnFirstElement() {
+		try {
+			ByteBuffer buf = ByteBuffer.allocate(7);
+			buf.order(ByteOrder.LITTLE_ENDIAN);
+			
+			fileChannel.write(buf);
+			fileChannel.position(0);
+			
+			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			seq.open();
+			
+			try {
+				seq.getNext();
+				fail("should fail with an exception");
+			}
+			catch (IOException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testBufferSequence() {
+		try {
+			final Random rnd = new Random();
+			final long seed = rnd.nextLong();
+			
+			final int numBuffers = 325;
+			final int numChannels = 671;
+			
+			rnd.setSeed(seed);
+			
+			for (int i = 0; i < numBuffers; i++) {
+				writeBuffer(fileChannel, rnd.nextInt(pageSize) + 1, rnd.nextInt(numChannels));
+			}
+
+			fileChannel.position(0L);
+			rnd.setSeed(seed);
+
+			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			seq.open();
+			
+			for (int i = 0; i < numBuffers; i++) {
+				validateBuffer(seq.getNext(), rnd.nextInt(pageSize) + 1, rnd.nextInt(numChannels));
+			}
+			
+			// should have no more data
+			assertNull(seq.getNext());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBufferSequenceWithIncompleteBuffer() {
+		try {
+			writeBuffer(fileChannel, 1672, 7);
+			
+			// write an incomplete buffer
+			ByteBuffer data = ByteBuffer.allocate(615);
+			data.order(ByteOrder.LITTLE_ENDIAN);
+			
+			data.putInt(2);
+			data.putInt(999);
+			data.put((byte) 0);
+			data.position(0);
+			data.limit(312);
+			fileChannel.write(data);
+			fileChannel.position(0L);
+
+			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			seq.open();
+			
+			// first one is valid
+			validateBuffer(seq.getNext(), 1672, 7);
+			
+			// next one should fail
+			try {
+				seq.getNext();
+				fail("should fail with an exception");
+			}
+			catch (IOException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testEventSequence() {
+		try {
+			final Random rnd = new Random();
+			final int numEvents = 3000;
+			final int numChannels = 1656;
+			
+			final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(numEvents);
+			
+			for (int i = 0; i < numEvents; i++) {
+				events.add(generateAndWriteEvent(fileChannel, rnd, numChannels));
+			}
+
+			fileChannel.position(0L);
+			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			seq.open();
+			
+			int i = 0;
+			BufferOrEvent boe;
+			while ((boe = seq.getNext()) != null) {
+				BufferOrEvent expected = events.get(i);
+				assertTrue(boe.isEvent());
+				assertEquals(expected.getEvent(), boe.getEvent());
+				assertEquals(expected.getChannelIndex(), boe.getChannelIndex());
+				i++;
+			}
+			
+			assertEquals(numEvents, i);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testMixedSequence() {
+		try {
+			final Random rnd = new Random();
+			final Random bufferRnd = new Random();
+
+			final long bufferSeed = rnd.nextLong();
+			bufferRnd.setSeed(bufferSeed);
+			
+			final int numEventsAndBuffers = 3000;
+			final int numChannels = 1656;
+
+			final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
+
+			// generate sequence
+			
+			for (int i = 0; i < numEventsAndBuffers; i++) {
+				boolean isEvent = rnd.nextDouble() < 0.05d;
+				if (isEvent) {
+					events.add(generateAndWriteEvent(fileChannel, rnd, numChannels));
+				}
+				else {
+					writeBuffer(fileChannel, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
+				}
+			}
+			
+			// reset and create reader
+			
+			fileChannel.position(0L);
+			bufferRnd.setSeed(bufferSeed);
+			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			seq.open();
+			
+			// read and validate the sequence
+			
+			int numEvent = 0;
+			for (int i = 0; i < numEventsAndBuffers; i++) {
+				BufferOrEvent next = seq.getNext();
+				if (next.isEvent()) {
+					BufferOrEvent expected = events.get(numEvent++);
+					assertEquals(expected.getEvent(), next.getEvent());
+					assertEquals(expected.getChannelIndex(), next.getChannelIndex());
+				}
+				else {
+					validateBuffer(next, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
+				}
+			}
+			
+			// no further data
+			assertNull(seq.getNext());
+			
+			// all events need to be consumed
+			assertEquals(events.size(), numEvent);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testMultipleSequences() {
+		File secondFile = null;
+		FileChannel secondChannel = null;
+		
+		try {
+			// create the second file channel
+			secondFile = File.createTempFile("testdata", "tmp");
+			secondChannel = new RandomAccessFile(secondFile, "rw").getChannel();
+			
+			final Random rnd = new Random();
+			final Random bufferRnd = new Random();
+
+			final long bufferSeed = rnd.nextLong();
+			bufferRnd.setSeed(bufferSeed);
+
+			final int numEventsAndBuffers1 = 272;
+			final int numEventsAndBuffers2 = 151;
+			
+			final int numChannels = 1656;
+
+			final ArrayList<BufferOrEvent> events1 = new ArrayList<BufferOrEvent>(128);
+			final ArrayList<BufferOrEvent> events2 = new ArrayList<BufferOrEvent>(128);
+
+			// generate sequence 1
+
+			for (int i = 0; i < numEventsAndBuffers1; i++) {
+				boolean isEvent = rnd.nextDouble() < 0.05d;
+				if (isEvent) {
+					events1.add(generateAndWriteEvent(fileChannel, rnd, numChannels));
+				}
+				else {
+					writeBuffer(fileChannel, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
+				}
+			}
+
+			// generate sequence 2
+
+			for (int i = 0; i < numEventsAndBuffers2; i++) {
+				boolean isEvent = rnd.nextDouble() < 0.05d;
+				if (isEvent) {
+					events2.add(generateAndWriteEvent(secondChannel, rnd, numChannels));
+				}
+				else {
+					writeBuffer(secondChannel, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
+				}
+			}
+
+			// reset and create reader
+
+			fileChannel.position(0L);
+			secondChannel.position(0L);
+			
+			bufferRnd.setSeed(bufferSeed);
+			
+			SpilledBufferOrEventSequence seq1 = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			SpilledBufferOrEventSequence seq2 = new SpilledBufferOrEventSequence(secondFile, secondChannel, buffer, pageSize);
+
+			// read and validate the sequence 1
+			seq1.open();
+
+			int numEvent = 0;
+			for (int i = 0; i < numEventsAndBuffers1; i++) {
+				BufferOrEvent next = seq1.getNext();
+				if (next.isEvent()) {
+					BufferOrEvent expected = events1.get(numEvent++);
+					assertEquals(expected.getEvent(), next.getEvent());
+					assertEquals(expected.getChannelIndex(), next.getChannelIndex());
+				}
+				else {
+					validateBuffer(next, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
+				}
+			}
+			assertNull(seq1.getNext());
+			assertEquals(events1.size(), numEvent);
+
+			// read and validate the sequence 2
+			seq2.open();
+
+			numEvent = 0;
+			for (int i = 0; i < numEventsAndBuffers2; i++) {
+				BufferOrEvent next = seq2.getNext();
+				if (next.isEvent()) {
+					BufferOrEvent expected = events2.get(numEvent++);
+					assertEquals(expected.getEvent(), next.getEvent());
+					assertEquals(expected.getChannelIndex(), next.getChannelIndex());
+				}
+				else {
+					validateBuffer(next, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
+				}
+			}
+			assertNull(seq2.getNext());
+			assertEquals(events2.size(), numEvent);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (secondChannel != null) {
+				try {
+					secondChannel.close();
+				}
+				catch (IOException e) {
+					// ignore here
+				}
+			}
+			if (secondFile != null) {
+				//noinspection ResultOfMethodCallIgnored
+				secondFile.delete();
+			}
+		}
+	}
+
+	@Test
+	public void testCleanup() {
+		try {
+			ByteBuffer data = ByteBuffer.allocate(157);
+			data.order(ByteOrder.LITTLE_ENDIAN);
+			
+			fileChannel.write(data);
+			fileChannel.position(54);
+			
+			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			seq.open();
+			seq.cleanup();
+			
+			assertFalse(fileChannel.isOpen());
+			assertFalse(tempFile.exists());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utils
+	// ------------------------------------------------------------------------
+
+	private static BufferOrEvent generateAndWriteEvent(FileChannel fileChannel, Random rnd, int numChannels) throws IOException {
+		long magicNumber = rnd.nextLong();
+		byte[] data = new byte[rnd.nextInt(1000)];
+		rnd.nextBytes(data);
+		TestEvent evt = new TestEvent(magicNumber, data);
+		
+		int channelIndex = rnd.nextInt(numChannels);
+		
+		ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
+		ByteBuffer header = ByteBuffer.allocate(9);
+		header.order(ByteOrder.LITTLE_ENDIAN);
+		
+		header.putInt(channelIndex);
+		header.putInt(serializedEvent.remaining());
+		header.put((byte) 1);
+		header.flip();
+		
+		fileChannel.write(header);
+		fileChannel.write(serializedEvent);
+		return new BufferOrEvent(evt, channelIndex);
+	}
+	
+	private static void writeBuffer(FileChannel fileChannel, int size, int channelIndex) throws IOException {
+		ByteBuffer data = ByteBuffer.allocate(size + 9);
+		data.order(ByteOrder.LITTLE_ENDIAN);
+		
+		data.putInt(channelIndex);
+		data.putInt(size);
+		data.put((byte) 0);
+		for (int i = 0; i < size; i++) {
+			data.put((byte) i);
+		}
+		data.flip();
+		fileChannel.write(data);
+	}
+
+	private static void validateBuffer(BufferOrEvent boe, int expectedSize, int expectedChannelIndex) {
+		assertEquals("wrong channel index", expectedChannelIndex, boe.getChannelIndex());
+		assertTrue("is not buffer", boe.isBuffer());
+		
+		Buffer buf = boe.getBuffer();
+		assertEquals("wrong buffer size", expectedSize, buf.getSize());
+		
+		MemorySegment seg = buf.getMemorySegment();
+		for (int i = 0; i < expectedSize; i++) {
+			assertEquals("wrong buffer contents", (byte) i, seg.get(i));
+		}
+	}
+}