You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/02 17:14:29 UTC

[1/8] flink git commit: [FLINK-2461] [tests] Guard tests that rely on unresolvable host names with the appropriate assumption.

Repository: flink
Updated Branches:
  refs/heads/master b0f237990 -> 40eef52e9


[FLINK-2461] [tests] Guard tests that rely on unresolvable host names with the appropriate assumption.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/645d7cd9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/645d7cd9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/645d7cd9

Branch: refs/heads/master
Commit: 645d7cd9e437d3de301ca0ce5c9cdc87bcce494b
Parents: c58ba3d
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Aug 2 15:54:44 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 2 15:58:27 2015 +0200

----------------------------------------------------------------------
 .../RemoteExecutorHostnameResolutionTest.java   | 24 ++++++++++++++++++++
 .../program/ClientHostnameResolutionTest.java   | 24 ++++++++++++++++++++
 2 files changed, 48 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/645d7cd9/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index a1bd0e2..9293148 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -23,11 +23,13 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.junit.Test;
 
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.Collections;
 
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 public class RemoteExecutorHostnameResolutionTest {
 
@@ -37,6 +39,9 @@ public class RemoteExecutorHostnameResolutionTest {
 	
 	@Test
 	public void testUnresolvableHostname1() {
+		
+		checkPreconditions();
+		
 		try {
 			RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
 			exec.executePlan(getProgram());
@@ -54,6 +59,9 @@ public class RemoteExecutorHostnameResolutionTest {
 
 	@Test
 	public void testUnresolvableHostname2() {
+
+		checkPreconditions();
+		
 		try {
 			InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
 			RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList());
@@ -75,4 +83,20 @@ public class RemoteExecutorHostnameResolutionTest {
 		env.fromElements(1, 2, 3).output(new DiscardingOutputFormat<Integer>());
 		return env.createProgramPlan();
 	}
+
+	private static void checkPreconditions() {
+		// the test can only work if the invalid URL cannot be resolves
+		// some internet providers resolve unresolvable URLs to navigational aid servers,
+		// voiding this test.
+		boolean throwsException;
+		try {
+			//noinspection ResultOfMethodCallIgnored
+			InetAddress.getByName(nonExistingHostname);
+			throwsException = false;
+		}
+		catch (UnknownHostException e) {
+			throwsException = true;
+		}
+		assumeTrue(throwsException);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/645d7cd9/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java
index 2cdb1a0..41294e6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java
@@ -22,10 +22,12 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.junit.Test;
 
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 
 import static org.junit.Assert.*;
+import static org.junit.Assume.*;
 
 /**
  * Tests that verify that the client correctly handles non-resolvable host names and does not
@@ -37,6 +39,9 @@ public class ClientHostnameResolutionTest {
 	
 	@Test
 	public void testUnresolvableHostname1() {
+		
+		checkPreconditions();
+		
 		try {
 			InetSocketAddress addr = new InetSocketAddress(nonExistingHostname, 17234);
 			new Client(addr, new Configuration(), getClass().getClassLoader(), 1);
@@ -54,6 +59,9 @@ public class ClientHostnameResolutionTest {
 
 	@Test
 	public void testUnresolvableHostname2() {
+
+		checkPreconditions();
+		
 		try {
 			Configuration config = new Configuration();
 			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
@@ -71,4 +79,20 @@ public class ClientHostnameResolutionTest {
 			fail(e.getMessage());
 		}
 	}
+	
+	private static void checkPreconditions() {
+		// the test can only work if the invalid URL cannot be resolves
+		// some internet providers resolve unresolvable URLs to navigational aid servers,
+		// voiding this test.
+		boolean throwsException;
+		try {
+			//noinspection ResultOfMethodCallIgnored
+			InetAddress.getByName(nonExistingHostname);
+			throwsException = false;
+		}
+		catch (UnknownHostException e) {
+			throwsException = true;
+		}
+		assumeTrue(throwsException);
+	}
 }


[7/8] flink git commit: [FLINK-2427] [streaming] Make the BarrierBuffer more robust against lost/missing checkpoint barriers.

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
deleted file mode 100644
index b6cd656..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class SpillingBufferOrEventTest {
-	
-	private static IOManager IO_MANAGER;
-	
-	@BeforeClass
-	public static void createIOManager() {
-		IO_MANAGER = new IOManagerAsync();
-	}
-	
-	@AfterClass
-	public static void shutdownIOManager() {
-		IO_MANAGER.shutdown();
-	}
-
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void testSpilling() throws IOException, InterruptedException {
-		BufferSpiller bsp = new BufferSpiller(IO_MANAGER);
-		SpillReader spr = new SpillReader();
-
-		BufferPool pool1 = new NetworkBufferPool(10, 256).createBufferPool(2, true);
-		BufferPool pool2 = new NetworkBufferPool(10, 256).createBufferPool(2, true);
-
-		Buffer b1 = pool1.requestBuffer();
-		b1.getMemorySegment().putInt(0, 10000);
-		BufferOrEvent boe1 = new BufferOrEvent(b1, 2);
-		SpillingBufferOrEvent sboe1 = new SpillingBufferOrEvent(boe1, bsp, spr);
-
-		assertTrue(sboe1.isSpilled());
-
-		Buffer b2 = pool2.requestBuffer();
-		b2.getMemorySegment().putInt(0, 10000);
-		BufferOrEvent boe2 = new BufferOrEvent(b2, 4);
-		SpillingBufferOrEvent sboe2 = new SpillingBufferOrEvent(boe2, bsp, spr);
-
-		assertTrue(sboe2.isSpilled());
-
-		Buffer b3 = pool1.requestBuffer();
-		b3.getMemorySegment().putInt(0, 50000);
-		BufferOrEvent boe3 = new BufferOrEvent(b3, 0);
-		SpillingBufferOrEvent sboe3 = new SpillingBufferOrEvent(boe3, bsp, spr);
-
-		assertTrue(sboe3.isSpilled());
-
-		Buffer b4 = pool2.requestBuffer();
-		b4.getMemorySegment().putInt(0, 60000);
-		BufferOrEvent boe4 = new BufferOrEvent(b4, 0);
-		SpillingBufferOrEvent sboe4 = new SpillingBufferOrEvent(boe4, bsp, spr);
-
-		assertTrue(sboe4.isSpilled());
-
-		bsp.close();
-
-		spr.setSpillFile(bsp.getSpillFile());
-
-		Buffer b1ret = sboe1.getBufferOrEvent().getBuffer();
-		assertEquals(10000, b1ret.getMemorySegment().getInt(0));
-		assertEquals(2, sboe1.getBufferOrEvent().getChannelIndex());
-		b1ret.recycle();
-
-		Buffer b2ret = sboe2.getBufferOrEvent().getBuffer();
-		assertEquals(10000, b2ret.getMemorySegment().getInt(0));
-		assertEquals(4, sboe2.getBufferOrEvent().getChannelIndex());
-		b2ret.recycle();
-
-		Buffer b3ret = sboe3.getBufferOrEvent().getBuffer();
-		assertEquals(50000, b3ret.getMemorySegment().getInt(0));
-		assertEquals(0, sboe3.getBufferOrEvent().getChannelIndex());
-		b3ret.recycle();
-
-		Buffer b4ret = sboe4.getBufferOrEvent().getBuffer();
-		assertEquals(60000, b4ret.getMemorySegment().getInt(0));
-		b4ret.recycle();
-
-		spr.close();
-		bsp.getSpillFile().delete();
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
index b5bece7..fb3beea 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -26,10 +26,13 @@ import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.types.LongValue;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -96,7 +99,7 @@ public class StreamRecordWriterTest {
 		when(mockProvider.requestBufferBlocking()).thenAnswer(new Answer<Buffer>() {
 			@Override
 			public Buffer answer(InvocationOnMock invocation) {
-				return new Buffer(new MemorySegment(new byte[4096]), DummyBufferRecycler.INSTANCE);
+				return new Buffer(new MemorySegment(new byte[4096]), FreeingBufferRecycler.INSTANCE);
 			}
 		});
 		
@@ -108,7 +111,6 @@ public class StreamRecordWriterTest {
 		return mockWriter;
 	}
 	
-	
 	// ------------------------------------------------------------------------
 	
 	private static class FailingWriter<T extends IOReadableWritable> extends StreamRecordWriter<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
new file mode 100644
index 0000000..4a77757
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.util.StringUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * A simple task event, used for validation of buffer or event blocking/buffering.
+ */
+public class TestEvent extends AbstractEvent {
+
+	private long magicNumber;
+
+	private byte[] payload;
+
+	public TestEvent() {}
+
+	public TestEvent(long magicNumber, byte[] payload) {
+		this.magicNumber = magicNumber;
+		this.payload = payload;
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Serialization
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeLong(magicNumber);
+		out.writeInt(payload.length);
+		out.write(payload);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.magicNumber = in.readLong();
+		this.payload = new byte[in.readInt()];
+		in.read(this.payload);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Standard utilities
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return Long.valueOf(magicNumber).hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj != null && obj.getClass() == TestEvent.class) {
+			TestEvent that = (TestEvent) obj;
+			return this.magicNumber == that.magicNumber && Arrays.equals(this.payload, that.payload);
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return String.format("TestEvent %d (%s)", magicNumber, StringUtils.byteToHexString(payload));
+	}
+}
\ No newline at end of file


[5/8] flink git commit: [FLINK-2438] [runtime] Improve channel event serialization performance.

Posted by se...@apache.org.
[FLINK-2438] [runtime] Improve channel event serialization performance.

Because channel events may become very frequent now (frequent at-least-once checkpointing), their serialization perfomance starts to matter.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af88aa09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af88aa09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af88aa09

Branch: refs/heads/master
Commit: af88aa09ec94d4d11f38a7134d36793420d7d19d
Parents: aa0105a
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 30 19:02:14 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 2 15:58:28 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/event/AbstractEvent.java      |  28 ++++
 .../flink/runtime/event/RuntimeEvent.java       |  24 ++++
 .../apache/flink/runtime/event/TaskEvent.java   |  25 ++++
 .../flink/runtime/event/task/AbstractEvent.java |  28 ----
 .../flink/runtime/event/task/RuntimeEvent.java  |  24 ----
 .../flink/runtime/event/task/TaskEvent.java     |  25 ----
 .../runtime/io/network/TaskEventDispatcher.java |   2 +-
 .../io/network/api/CheckpointBarrier.java       |  98 ++++++++++++++
 .../io/network/api/EndOfPartitionEvent.java     |   2 +-
 .../io/network/api/EndOfSuperstepEvent.java     |   2 +-
 .../io/network/api/TaskEventHandler.java        |   2 +-
 .../io/network/api/reader/AbstractReader.java   |   4 +-
 .../io/network/api/reader/ReaderBase.java       |   2 +-
 .../api/serialization/EventSerializer.java      | 132 ++++++++++++++-----
 .../io/network/api/writer/RecordWriter.java     |   2 +-
 .../api/writer/ResultPartitionWriter.java       |   4 +-
 .../network/buffer/FreeingBufferRecycler.java   |  43 ++++++
 .../runtime/io/network/netty/NettyMessage.java  |   2 +-
 .../network/netty/PartitionRequestClient.java   |   2 +-
 .../netty/PartitionRequestClientHandler.java    |   5 +-
 .../partition/consumer/BufferOrEvent.java       |   2 +-
 .../partition/consumer/InputChannel.java        |   2 +-
 .../network/partition/consumer/InputGate.java   |   2 +-
 .../partition/consumer/LocalInputChannel.java   |   2 +-
 .../partition/consumer/RemoteInputChannel.java  |   2 +-
 .../partition/consumer/SingleInputGate.java     |   4 +-
 .../partition/consumer/UnionInputGate.java      |   2 +-
 .../partition/consumer/UnknownInputChannel.java |   2 +-
 .../iterative/concurrent/SuperstepBarrier.java  |   2 +-
 .../event/IterationEventWithAggregators.java    |   2 +-
 .../iterative/event/TerminationEvent.java       |   2 +-
 .../task/IterationSynchronizationSinkTask.java  |   2 +-
 .../iterative/task/SyncEventHandler.java        |   2 +-
 .../flink/runtime/event/task/EventList.java     |   3 +-
 .../runtime/event/task/IntegerTaskEvent.java    |   1 +
 .../runtime/event/task/StringTaskEvent.java     |   1 +
 .../flink/runtime/event/task/TaskEventTest.java |   3 +-
 .../network/api/reader/AbstractReaderTest.java  |   9 +-
 .../io/network/api/reader/BufferReaderTest.java |   2 +-
 .../api/serialization/EventSerializerTest.java  |  39 ++++--
 .../partition/PipelinedSubpartitionTest.java    |   2 +-
 .../partition/consumer/InputChannelTest.java    |   2 +-
 .../partition/consumer/SingleInputGateTest.java |   2 +-
 .../io/network/util/TestConsumerCallback.java   |   2 +-
 .../network/util/TestSubpartitionConsumer.java  |   2 +-
 .../runtime/io/network/util/TestTaskEvent.java  |   2 +-
 .../concurrent/SuperstepBarrierTest.java        |   2 +-
 .../util/event/TaskEventHandlerTest.java        |   2 +-
 .../streaming/runtime/io/BarrierBuffer.java     |   2 +-
 .../streaming/runtime/io/BarrierTracker.java    |   2 +-
 .../streaming/runtime/io/BufferSpiller.java     |   3 +-
 .../runtime/io/CheckpointBarrierHandler.java    |  14 +-
 .../runtime/io/FreeingBufferRecycler.java       |  37 ------
 .../runtime/io/RecordWriterOutput.java          |   4 +-
 .../runtime/io/StreamInputProcessor.java        |   4 +-
 .../runtime/io/StreamTwoInputProcessor.java     |   4 +-
 .../runtime/tasks/CheckpointBarrier.java        |  97 --------------
 .../streaming/runtime/tasks/OutputHandler.java  |   1 +
 .../streaming/runtime/tasks/StreamTask.java     |   1 +
 .../consumer/StreamTestSingleInputGate.java     |   3 +-
 .../io/BarrierBufferMassiveRandomTest.java      |   4 +-
 .../streaming/runtime/io/BarrierBufferTest.java |   5 +-
 .../runtime/io/BarrierTrackerTest.java          |   5 +-
 .../streaming/runtime/io/BufferSpillerTest.java |   1 +
 .../runtime/io/StreamRecordWriterTest.java      |   1 +
 .../flink/streaming/runtime/io/TestEvent.java   |   2 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |   1 +
 .../tasks/OneInputStreamTaskTestHarness.java    |   2 +-
 .../runtime/tasks/StreamMockEnvironment.java    |   3 +-
 .../runtime/tasks/StreamTaskTestHarness.java    |   2 +-
 .../runtime/tasks/TwoInputStreamTaskTest.java   |   3 +-
 .../tasks/TwoInputStreamTaskTestHarness.java    |   2 +-
 .../manual/StreamingScalabilityAndLatency.java  |   3 +-
 73 files changed, 435 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/AbstractEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/AbstractEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/AbstractEvent.java
new file mode 100644
index 0000000..a20aad9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/AbstractEvent.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event;
+
+import org.apache.flink.core.io.IOReadableWritable;
+
+/**
+ * This type of event can be used to exchange notification messages between
+ * different {@link org.apache.flink.runtime.taskmanager.TaskManager} objects
+ * at runtime using the communication channels.
+ */
+public abstract class AbstractEvent implements IOReadableWritable {}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/RuntimeEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/RuntimeEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/RuntimeEvent.java
new file mode 100644
index 0000000..6d712ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/RuntimeEvent.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event;
+
+/**
+ * Subclasses of this event are recognized as events exchanged by the core runtime.
+ */
+public abstract class RuntimeEvent extends AbstractEvent {}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/TaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/TaskEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/TaskEvent.java
new file mode 100644
index 0000000..d37d031
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/TaskEvent.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event;
+
+/**
+ * Subclasses of this event are recognized as custom events that are not part of the core
+ * flink runtime.
+ */
+public abstract class TaskEvent extends AbstractEvent {}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
deleted file mode 100644
index 244d672..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.event.task;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-/**
- * This type of event can be used to exchange notification messages between
- * different {@link org.apache.flink.runtime.taskmanager.TaskManager} objects
- * at runtime using the communication channels.
- */
-public abstract class AbstractEvent implements IOReadableWritable {}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
deleted file mode 100644
index 8c44073..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.event.task;
-
-/**
- * Subclasses of this event are recognized as events exchanged by the core runtime.
- */
-public abstract class RuntimeEvent extends AbstractEvent {}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
deleted file mode 100644
index 01ecce2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.event.task;
-
-/**
- * Subclasses of this event are recognized as custom events that are not part of the core
- * flink runtime.
- */
-public abstract class TaskEvent extends AbstractEvent {}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
index 82793e2..eddba8d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network;
 
 import com.google.common.collect.Maps;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
new file mode 100644
index 0000000..59f56b0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+/**
+ * Checkpoint barriers are used to align checkpoints throughout the streaming topology. The
+ * barriers are emitted by the sources when instructed to do so by the JobManager. When
+ * operators receive a CheckpointBarrier on one of its inputs, it knows that this is the point 
+ * between the pre-checkpoint and post-checkpoint data.
+ * 
+ * <p>Once an operator has received a checkpoint barrier from all its input channels, it
+ * knows that a certain checkpoint is complete. It can trigger the operator specific checkpoint
+ * behavior and broadcast the barrier to downstream operators.</p>
+ * 
+ * <p>Depending on the semantic guarantees, may hold off post-checkpoint data until the checkpoint
+ * is complete (exactly once)</p>
+ * 
+ * <p>The checkpoint barrier IDs are strictly monotonous increasing.</p>
+ */
+public class CheckpointBarrier extends RuntimeEvent {
+
+	private long id;
+	private long timestamp;
+
+	public CheckpointBarrier() {}
+
+	public CheckpointBarrier(long id, long timestamp) {
+		this.id = id;
+		this.timestamp = timestamp;
+	}
+
+	public long getId() {
+		return id;
+	}
+
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeLong(id);
+		out.writeLong(timestamp);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		id = in.readLong();
+		timestamp = in.readLong();
+	}
+	
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
+	}
+
+	@Override
+	public boolean equals(Object other) {
+		if (other == null || !(other instanceof CheckpointBarrier)) {
+			return false;
+		}
+		else {
+			CheckpointBarrier that = (CheckpointBarrier) other;
+			return that.id == this.id && that.timestamp == this.timestamp;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return String.format("CheckpointBarrier %d @ %d", id, timestamp);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
index c2b6fa4..293287b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.api;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.RuntimeEvent;
+import org.apache.flink.runtime.event.RuntimeEvent;
 
 /**
  * This event marks a subpartition as fully consumed.

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
index 162afb7..7f51187 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.api;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.RuntimeEvent;
+import org.apache.flink.runtime.event.RuntimeEvent;
 
 /**
  * Marks the end of a superstep of one particular iteration superstep.

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
index ccd0feb..d2dc46b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.api;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
index 90564a8..84189be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.api.TaskEventHandler;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
index 9f8ae20..a1d705f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.io.network.api.reader;
 import java.io.IOException;
 
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 76c88c1..b23b83b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -19,63 +19,122 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
+/**
+ * Utility class to serialize and deserialize task events.
+ */
 public class EventSerializer {
+	
+	private static final int END_OF_PARTITION_EVENT = 0;
 
-	public final static BufferRecycler RECYCLER = new BufferRecycler() {
-		@Override
-		public void recycle(MemorySegment memorySegment) {
-			memorySegment.free();
-		}
-	};
-
-	public static ByteBuffer toSerializedEvent(AbstractEvent event) {
-		try {
-			final DataOutputSerializer serializer = new DataOutputSerializer(128);
+	private static final int CHECKPOINT_BARRIER_EVENT = 1;
 
-			serializer.writeUTF(event.getClass().getName());
-			event.write(serializer);
+	private static final int END_OF_SUPERSTEP_EVENT = 2;
 
-			return serializer.wrapAsByteBuffer();
+	private static final int OTHER_EVENT = 3;
+	
+	// ------------------------------------------------------------------------
+	
+	public static ByteBuffer toSerializedEvent(AbstractEvent event) {
+		final Class<?> eventClass = event.getClass();
+		if (eventClass == EndOfPartitionEvent.class) {
+			return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_PARTITION_EVENT });
+		}
+		else if (eventClass == CheckpointBarrier.class) {
+			CheckpointBarrier barrier = (CheckpointBarrier) event;
+			
+			ByteBuffer buf = ByteBuffer.allocate(20);
+			buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
+			buf.putLong(4, barrier.getId());
+			buf.putLong(12, barrier.getTimestamp());
+			return buf;
 		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while serializing event.", e);
+		else if (eventClass == EndOfSuperstepEvent.class) {
+			return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_SUPERSTEP_EVENT });
+		}
+		else {
+			try {
+				final DataOutputSerializer serializer = new DataOutputSerializer(128);
+				serializer.writeInt(OTHER_EVENT);
+				serializer.writeUTF(event.getClass().getName());
+				event.write(serializer);
+	
+				return serializer.wrapAsByteBuffer();
+			}
+			catch (IOException e) {
+				throw new RuntimeException("Error while serializing event.", e);
+			}
 		}
 	}
 
 	public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) {
+		if (buffer.remaining() < 4) {
+			throw new RuntimeException("Incomplete event");
+		}
+		
+		final ByteOrder bufferOrder = buffer.order();
+		buffer.order(ByteOrder.BIG_ENDIAN);
+		
 		try {
-			final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
-
-			final String className = deserializer.readUTF();
-
-			final Class<? extends AbstractEvent> clazz;
-			try {
-				clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
+			int type = buffer.getInt();
+				
+			if (type == END_OF_PARTITION_EVENT) {
+				return EndOfPartitionEvent.INSTANCE;
 			}
-			catch (ClassNotFoundException e) {
-				throw new RuntimeException("Could not load event class '" + className + "'.", e);
+			else if (type == CHECKPOINT_BARRIER_EVENT) {
+				long id = buffer.getLong();
+				long timestamp = buffer.getLong();
+				return new CheckpointBarrier(id, timestamp);
 			}
-			catch (ClassCastException e) {
-				throw new RuntimeException("The class '" + className + "' is not a valid subclass of '" + AbstractEvent.class.getName() + "'.", e);
+			else if (type == END_OF_SUPERSTEP_EVENT) {
+				return EndOfSuperstepEvent.INSTANCE;
+			}
+			else if (type == OTHER_EVENT) {
+				try {
+					final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
+		
+					final String className = deserializer.readUTF();
+		
+					final Class<? extends AbstractEvent> clazz;
+					try {
+						clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
+					}
+					catch (ClassNotFoundException e) {
+						throw new RuntimeException("Could not load event class '" + className + "'.", e);
+					}
+					catch (ClassCastException e) {
+						throw new RuntimeException("The class '" + className + "' is not a valid subclass of '"
+								+ AbstractEvent.class.getName() + "'.", e);
+					}
+		
+					final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
+					event.read(deserializer);
+		
+					return event;
+				}
+				catch (Exception e) {
+					throw new RuntimeException("Error while deserializing or instantiating event.", e);
+				}
+			} 
+			else {
+				throw new RuntimeException("Corrupt byte stream for event");
 			}
-
-			final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
-			event.read(deserializer);
-
-			return event;
 		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while deserializing event.", e);
+		finally {
+			buffer.order(bufferOrder);
 		}
 	}
 
@@ -86,7 +145,8 @@ public class EventSerializer {
 	public static Buffer toBuffer(AbstractEvent event) {
 		final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);
 
-		final Buffer buffer = new Buffer(new MemorySegment(serializedEvent.array()), RECYCLER, false);
+		final Buffer buffer = new Buffer(new MemorySegment(serializedEvent.array()),
+				FreeingBufferRecycler.INSTANCE, false);
 		buffer.setSize(serializedEvent.remaining());
 
 		return buffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 885c316..2ae61ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 1192dbb..79c21c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.api.TaskEventHandler;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
new file mode 100644
index 0000000..fdce883
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * A simple buffer recycler that frees the memory segments.
+ */
+public class FreeingBufferRecycler implements BufferRecycler {
+	
+	public static final BufferRecycler INSTANCE = new FreeingBufferRecycler();
+	
+	// ------------------------------------------------------------------------
+	
+	// Not instantiable
+	private FreeingBufferRecycler() {}
+
+	/**
+	 * Frees the given memory segment.
+	 * @param memorySegment The memory segment to be recycled.
+	 */
+	@Override
+	public void recycle(MemorySegment memorySegment) {
+		memorySegment.free();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 1540369..3a24181 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 78f6398..f6120d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.io.network.netty;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 51b436b..3b7d921 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -21,10 +21,11 @@ package org.apache.flink.runtime.io.network.netty;
 import com.google.common.collect.Maps;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
 import org.apache.flink.runtime.io.network.netty.exception.TransportException;
@@ -300,7 +301,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 				byte[] byteArray = new byte[bufferOrEvent.getSize()];
 				bufferOrEvent.getNettyBuffer().readBytes(byteArray);
 
-				Buffer buffer = new Buffer(new MemorySegment(byteArray), EventSerializer.RECYCLER, false);
+				Buffer buffer = new Buffer(new MemorySegment(byteArray), FreeingBufferRecycler.INSTANCE, false);
 
 				inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
index d2f3035..55e5767 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
 import static com.google.common.base.Preconditions.checkArgument;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 3998279..d282db5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index c4f9dc4..1f42cfa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index e72f612..ff12153 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index b70c3a8..be2509f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 80a79d2..896fa9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 730ead2..5599687 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index e4b9e57..cdf28be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
index c91be1a..cc5d3c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.iterative.concurrent;
 
 import java.util.concurrent.CountDownLatch;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
index bc815dc..e259523 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
@@ -31,7 +31,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.InstantiationUtil;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
index 9e74c34..28181e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 
 /**
  * Signals that the iteration is completely executed, participating tasks must terminate now

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index 5eccd7b..fed0a17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.types.IntValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
index 8cce6ef..d7549d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.iterative.task;
 import java.util.Map;
 
 import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
 import org.apache.flink.types.Value;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java
index 1f97a15..c055a92 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java
@@ -19,10 +19,11 @@
 
 package org.apache.flink.runtime.event.task;
 
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.util.SerializableArrayList;
 
 /**
- * Objects of this class can store and serialize/deserialize {@link org.apache.flink.runtime.event.task.AbstractEvent}
+ * Objects of this class can store and serialize/deserialize {@link org.apache.flink.runtime.event.AbstractEvent}
  * objects.
  * 
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
index 648dacc..cc67482 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.TaskEvent;
 
 /**
  * This class provides a simple implementation of an event that holds an integer value.

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
index 87f2e91..9095cc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.util.StringUtils;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
index 1ed8e39..b508923 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
@@ -25,12 +25,13 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.junit.Test;
 
 /**
  * This class contains serialization tests concerning task events derived from
- * {@link org.apache.flink.runtime.event.task.AbstractEvent}.
+ * {@link org.apache.flink.runtime.event.AbstractEvent}.
  * 
  */
 public class TaskEventTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
index 14bf022..6853722 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -70,7 +69,7 @@ public class AbstractReaderTest {
 	public void testEndOfPartitionEvent() throws Exception {
 		final AbstractReader reader = new MockReader(createInputGate(1));
 
-		assertTrue(reader.handleEvent(new EndOfPartitionEvent()));
+		assertTrue(reader.handleEvent(EndOfPartitionEvent.INSTANCE));
 	}
 
 	/**
@@ -95,7 +94,7 @@ public class AbstractReaderTest {
 		}
 
 		try {
-			reader.handleEvent(new EndOfSuperstepEvent());
+			reader.handleEvent(EndOfSuperstepEvent.INSTANCE);
 
 			fail("Did not throw expected exception when handling end of superstep event with non-iterative reader.");
 		}
@@ -122,7 +121,7 @@ public class AbstractReaderTest {
 			// All good, expected exception.
 		}
 
-		EndOfSuperstepEvent eos = new EndOfSuperstepEvent();
+		EndOfSuperstepEvent eos = EndOfSuperstepEvent.INSTANCE;
 
 		// One end of superstep event for each input channel. The superstep finishes with the last
 		// received event.

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
index e1f8fd8..8519ac6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.TestSingleInputGate;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index 5a20a4b..ddfd758 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -18,25 +18,44 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
+
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 public class EventSerializerTest {
 
 	@Test
 	public void testSerializeDeserializeEvent() {
-
-		TestTaskEvent expected = new TestTaskEvent(Math.random(), 12361231273L);
-
-		ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(expected);
-
-		AbstractEvent actual = EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
-
-		assertEquals(expected, actual);
+		try {
+			AbstractEvent[] events = {
+					EndOfPartitionEvent.INSTANCE,
+					EndOfSuperstepEvent.INSTANCE,
+					new CheckpointBarrier(1678L, 4623784L),
+					new TestTaskEvent(Math.random(), 12361231273L)
+			};
+			
+			for (AbstractEvent evt : events) {
+				ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
+				assertTrue(serializedEvent.hasRemaining());
+
+				AbstractEvent deserialized = 
+						EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
+				assertNotNull(deserialized);
+				assertEquals(evt, deserialized);
+			}
+			
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 74549df..8750a1a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index e95c774..9717530 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 2454399..82cc730 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
index 52083c4..0d66f13 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network.util;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
 import java.util.concurrent.atomic.AtomicInteger;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
index 2766e53..18e0d4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.util;
 
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
index 0b29032..091d5d7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.util;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
index a6c0d72..2f26670 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.Random;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java
index 5c6aeb7..cb76276 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.util.event;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.event.task.IntegerTaskEvent;
 import org.apache.flink.runtime.event.task.StringTaskEvent;
 import org.apache.flink.runtime.io.network.api.TaskEventHandler;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 0441937..b7766ee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index a0b924f..119fb23 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.io.IOException;
 import java.util.ArrayDeque;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 5f9a162..2bad197 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -27,10 +27,11 @@ import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.util.StringUtils;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 02dd33d..791fd40 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.io.IOException;
 
@@ -43,8 +43,18 @@ public interface CheckpointBarrierHandler {
 	 */
 	BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
 
+	/**
+	 * Registers the given event handler to be notified on successful checkpoints.
+	 * 
+	 * @param checkpointHandler The handler to register.
+	 */
 	void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
-	
+
+	/**
+	 * Cleans up all internally held resources.
+	 * 
+	 * @throws IOException Thrown, if the cleanup of I/O resources failed.
+	 */
 	void cleanup() throws IOException;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java
deleted file mode 100644
index 27e37a5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-
-/**
- * A simple buffer recycler that only frees the memory segments.
- */
-public class FreeingBufferRecycler implements BufferRecycler {
-	
-	public static final BufferRecycler INSTANCE = new FreeingBufferRecycler();
-	
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public void recycle(MemorySegment memorySegment) {
-		memorySegment.free();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index de8c205..f0f18b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 
 import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.operators.Output;
@@ -111,7 +111,7 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 		recordWriter.clearBuffers();
 	}
 
-	public void broadcastEvent(TaskEvent barrier) throws IOException, InterruptedException {
+	public void broadcastEvent(AbstractEvent barrier) throws IOException, InterruptedException {
 		recordWriter.broadcastEvent(barrier);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index f7d7fb0..4ad5b45 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
@@ -41,7 +41,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index ae97974..e3d2911 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 


[3/8] flink git commit: [runtime] Cleanup channel events. Add comments and move some classes to test scope.

Posted by se...@apache.org.
[runtime] Cleanup channel events. Add comments and move some classes to test scope.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa0105aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa0105aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa0105aa

Branch: refs/heads/master
Commit: aa0105aa309da0dc8570b47880bb0624114e8231
Parents: 9311b9a
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 30 18:26:32 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 2 15:58:28 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/event/task/EventList.java     | 35 -------
 .../runtime/event/task/IntegerTaskEvent.java    | 96 --------------------
 .../flink/runtime/event/task/RuntimeEvent.java  |  6 +-
 .../runtime/event/task/StringTaskEvent.java     | 96 --------------------
 .../flink/runtime/event/task/TaskEvent.java     |  7 +-
 .../io/network/api/EndOfPartitionEvent.java     | 13 ++-
 .../io/network/api/EndOfSuperstepEvent.java     | 31 ++++++-
 .../flink/runtime/event/task/EventList.java     | 35 +++++++
 .../runtime/event/task/IntegerTaskEvent.java    | 96 ++++++++++++++++++++
 .../runtime/event/task/StringTaskEvent.java     | 96 ++++++++++++++++++++
 10 files changed, 274 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aa0105aa/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/EventList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/EventList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/EventList.java
deleted file mode 100644
index 1f97a15..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/EventList.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.event.task;
-
-import org.apache.flink.runtime.util.SerializableArrayList;
-
-/**
- * Objects of this class can store and serialize/deserialize {@link org.apache.flink.runtime.event.task.AbstractEvent}
- * objects.
- * 
- */
-public class EventList extends SerializableArrayList<AbstractEvent> {
-
-	/**
-	 * Generated serial UID.
-	 */
-	private static final long serialVersionUID = 4788250278393432776L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa0105aa/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
deleted file mode 100644
index 648dacc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.event.task;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * This class provides a simple implementation of an event that holds an integer value.
- * 
- */
-public class IntegerTaskEvent extends TaskEvent {
-
-	/**
-	 * The integer value transported by this integer task event.
-	 */
-	private int value = -1;
-
-	/**
-	 * Default constructor (should only be used for deserialization).
-	 */
-	public IntegerTaskEvent() {
-		// default constructor implementation.
-		// should only be used for deserialization
-	}
-
-	/**
-	 * Constructs a new integer task event.
-	 * 
-	 * @param value
-	 *        the integer value to be transported inside this integer task event
-	 */
-	public IntegerTaskEvent(final int value) {
-		this.value = value;
-	}
-
-	/**
-	 * Returns the stored integer value.
-	 * 
-	 * @return the stored integer value or <code>-1</code> if no value has been set
-	 */
-	public int getInteger() {
-		return this.value;
-	}
-
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-		out.writeInt(this.value);
-	}
-
-
-	@Override
-	public void read(final DataInputView in) throws IOException {
-		this.value = in.readInt();
-	}
-
-
-	@Override
-	public int hashCode() {
-
-		return this.value;
-	}
-
-
-	@Override
-	public boolean equals(final Object obj) {
-
-		if (!(obj instanceof IntegerTaskEvent)) {
-			return false;
-		}
-
-		final IntegerTaskEvent taskEvent = (IntegerTaskEvent) obj;
-
-		return (this.value == taskEvent.getInteger());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa0105aa/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
index cd19bc4..8c44073 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
@@ -18,5 +18,7 @@
 
 package org.apache.flink.runtime.event.task;
 
-public abstract class RuntimeEvent extends AbstractEvent {
-}
+/**
+ * Subclasses of this event are recognized as events exchanged by the core runtime.
+ */
+public abstract class RuntimeEvent extends AbstractEvent {}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa0105aa/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
deleted file mode 100644
index 87f2e91..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.event.task;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.StringUtils;
-
-/**
- * This class provides a simple implementation of an event that holds a string value.
- */
-public class StringTaskEvent extends TaskEvent {
-
-	/**
-	 * The string encapsulated by this event.
-	 */
-	private String message;
-
-	/**
-	 * The default constructor implementation. It should only be used for deserialization.
-	 */
-	public StringTaskEvent() {}
-
-	/**
-	 * Constructs a new string task event with the given string message.
-	 * 
-	 * @param message
-	 *        the string message that shall be stored in this event
-	 */
-	public StringTaskEvent(final String message) {
-		this.message = message;
-	}
-
-	/**
-	 * Returns the stored string.
-	 * 
-	 * @return the stored string or <code>null</code> if no string is set
-	 */
-	public String getString() {
-		return this.message;
-	}
-
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		StringUtils.writeNullableString(this.message, out);
-	}
-
-	@Override
-	public void read(final DataInputView in) throws IOException {
-		this.message = StringUtils.readNullableString(in);
-	}
-
-
-	@Override
-	public int hashCode() {
-		if (this.message == null) {
-			return 0;
-		}
-
-		return this.message.hashCode();
-	}
-
-
-	@Override
-	public boolean equals(final Object obj) {
-		if (!(obj instanceof StringTaskEvent)) {
-			return false;
-		}
-
-		final StringTaskEvent ste = (StringTaskEvent) obj;
-		if (this.message == null) {
-			return ste.getString() == null;
-		}
-
-		return this.message.equals(ste.getString());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa0105aa/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
index 9501b95..01ecce2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
@@ -18,5 +18,8 @@
 
 package org.apache.flink.runtime.event.task;
 
-public abstract class TaskEvent extends AbstractEvent {
-}
+/**
+ * Subclasses of this event are recognized as custom events that are not part of the core
+ * flink runtime.
+ */
+public abstract class TaskEvent extends AbstractEvent {}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa0105aa/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
index 3ecdb94..c2b6fa4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
@@ -22,11 +22,20 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.event.task.RuntimeEvent;
 
-
+/**
+ * This event marks a subpartition as fully consumed.
+ */
 public class EndOfPartitionEvent extends RuntimeEvent {
 
+	/** The singleton instance of this event */
 	public static final EndOfPartitionEvent INSTANCE = new EndOfPartitionEvent();
 	
+	// ------------------------------------------------------------------------
+
+	// not instantiable
+	private EndOfPartitionEvent() {}
+	
+	// ------------------------------------------------------------------------
 	
 	@Override
 	public void read(DataInputView in) {
@@ -38,6 +47,8 @@ public class EndOfPartitionEvent extends RuntimeEvent {
 		// Nothing to do here
 	}
 
+	// ------------------------------------------------------------------------
+	
 	@Override
 	public int hashCode() {
 		return 1965146673;

http://git-wip-us.apache.org/repos/asf/flink/blob/aa0105aa/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
index 5d0199c..162afb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
@@ -22,20 +22,41 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.event.task.RuntimeEvent;
 
-import java.io.IOException;
-
 /**
- * Marks the end of a superstep of one particular iteration head
+ * Marks the end of a superstep of one particular iteration superstep.
  */
 public class EndOfSuperstepEvent extends RuntimeEvent {
 
+	/** The singleton instance of this event */
 	public static final EndOfSuperstepEvent INSTANCE = new EndOfSuperstepEvent();
 
+	// ------------------------------------------------------------------------
+	
+	// not instantiable
+	private EndOfSuperstepEvent() {}
+	
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public void write(DataOutputView out) {}
+
+	@Override
+	public void read(DataInputView in) {}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return 41;
+	}
+
 	@Override
-	public void write(DataOutputView out) throws IOException {
+	public boolean equals(Object obj) {
+		return obj != null && obj.getClass() == EndOfSuperstepEvent.class;
 	}
 
 	@Override
-	public void read(DataInputView in) throws IOException {
+	public String toString() {
+		return getClass().getSimpleName();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa0105aa/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java
new file mode 100644
index 0000000..1f97a15
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.event.task;
+
+import org.apache.flink.runtime.util.SerializableArrayList;
+
+/**
+ * Objects of this class can store and serialize/deserialize {@link org.apache.flink.runtime.event.task.AbstractEvent}
+ * objects.
+ * 
+ */
+public class EventList extends SerializableArrayList<AbstractEvent> {
+
+	/**
+	 * Generated serial UID.
+	 */
+	private static final long serialVersionUID = 4788250278393432776L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa0105aa/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
new file mode 100644
index 0000000..648dacc
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.event.task;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * This class provides a simple implementation of an event that holds an integer value.
+ * 
+ */
+public class IntegerTaskEvent extends TaskEvent {
+
+	/**
+	 * The integer value transported by this integer task event.
+	 */
+	private int value = -1;
+
+	/**
+	 * Default constructor (should only be used for deserialization).
+	 */
+	public IntegerTaskEvent() {
+		// default constructor implementation.
+		// should only be used for deserialization
+	}
+
+	/**
+	 * Constructs a new integer task event.
+	 * 
+	 * @param value
+	 *        the integer value to be transported inside this integer task event
+	 */
+	public IntegerTaskEvent(final int value) {
+		this.value = value;
+	}
+
+	/**
+	 * Returns the stored integer value.
+	 * 
+	 * @return the stored integer value or <code>-1</code> if no value has been set
+	 */
+	public int getInteger() {
+		return this.value;
+	}
+
+
+	@Override
+	public void write(final DataOutputView out) throws IOException {
+		out.writeInt(this.value);
+	}
+
+
+	@Override
+	public void read(final DataInputView in) throws IOException {
+		this.value = in.readInt();
+	}
+
+
+	@Override
+	public int hashCode() {
+
+		return this.value;
+	}
+
+
+	@Override
+	public boolean equals(final Object obj) {
+
+		if (!(obj instanceof IntegerTaskEvent)) {
+			return false;
+		}
+
+		final IntegerTaskEvent taskEvent = (IntegerTaskEvent) obj;
+
+		return (this.value == taskEvent.getInteger());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa0105aa/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
new file mode 100644
index 0000000..87f2e91
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event.task;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.StringUtils;
+
+/**
+ * This class provides a simple implementation of an event that holds a string value.
+ */
+public class StringTaskEvent extends TaskEvent {
+
+	/**
+	 * The string encapsulated by this event.
+	 */
+	private String message;
+
+	/**
+	 * The default constructor implementation. It should only be used for deserialization.
+	 */
+	public StringTaskEvent() {}
+
+	/**
+	 * Constructs a new string task event with the given string message.
+	 * 
+	 * @param message
+	 *        the string message that shall be stored in this event
+	 */
+	public StringTaskEvent(final String message) {
+		this.message = message;
+	}
+
+	/**
+	 * Returns the stored string.
+	 * 
+	 * @return the stored string or <code>null</code> if no string is set
+	 */
+	public String getString() {
+		return this.message;
+	}
+
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		StringUtils.writeNullableString(this.message, out);
+	}
+
+	@Override
+	public void read(final DataInputView in) throws IOException {
+		this.message = StringUtils.readNullableString(in);
+	}
+
+
+	@Override
+	public int hashCode() {
+		if (this.message == null) {
+			return 0;
+		}
+
+		return this.message.hashCode();
+	}
+
+
+	@Override
+	public boolean equals(final Object obj) {
+		if (!(obj instanceof StringTaskEvent)) {
+			return false;
+		}
+
+		final StringTaskEvent ste = (StringTaskEvent) obj;
+		if (this.message == null) {
+			return ste.getString() == null;
+		}
+
+		return this.message.equals(ste.getString());
+	}
+}


[6/8] flink git commit: [hotfix] [streaming] Fix race in stream tasks when canceling tasks early.

Posted by se...@apache.org.
[hotfix] [streaming] Fix race in stream tasks when canceling tasks early.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40eef52e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40eef52e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40eef52e

Branch: refs/heads/master
Commit: 40eef52e9f68c3c7727e9b9493959d5fd36d7f70
Parents: af88aa0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 30 21:09:57 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 2 15:58:28 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/runtime/tasks/OneInputStreamTask.java     | 2 --
 .../flink/streaming/runtime/tasks/SourceStreamTask.java       | 2 --
 .../flink/streaming/runtime/tasks/StreamIterationHead.java    | 1 -
 .../org/apache/flink/streaming/runtime/tasks/StreamTask.java  | 7 +++++--
 .../flink/streaming/runtime/tasks/TwoInputStreamTask.java     | 2 --
 5 files changed, 5 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40eef52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 605b8f5..6136f24 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -61,8 +61,6 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 
 	@Override
 	public void invoke() throws Exception {
-		this.isRunning = true;
-
 		boolean operatorOpen = false;
 
 		if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/40eef52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 1940c11..4b25577 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -45,8 +45,6 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
 	public void invoke() throws Exception {
 		final SourceOutput<StreamRecord<OUT>> output = new SourceOutput<StreamRecord<OUT>>(outputHandler.getOutput(), checkpointLock);
 
-		this.isRunning = true;
-
 		boolean operatorOpen = false;
 
 		if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/40eef52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index 1736e52..2911f44 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -72,7 +72,6 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 	@SuppressWarnings("unchecked")
 	@Override
 	public void invoke() throws Exception {
-		isRunning = true;
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Iteration source {} invoked", getName());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/40eef52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index aabc95d..88813d0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -66,7 +66,8 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 	protected boolean hasChainedOperators;
 
-	protected volatile boolean isRunning = false;
+	// needs to be initialized to true, so that early cancel() before invoke() behaves correctly
+	protected volatile boolean isRunning = true;
 
 	protected List<StreamingRuntimeContext> contexts;
 
@@ -229,10 +230,12 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	@Override
 	public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
 
+		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
+		
 		synchronized (checkpointLock) {
 			if (isRunning) {
 				try {
-					LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
+					
 
 					// We wrap the states of the chained operators in a list, marking non-stateful oeprators with null
 					List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = new ArrayList<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>();

http://git-wip-us.apache.org/repos/asf/flink/blob/40eef52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 99c053b..8cf5a40 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -85,8 +85,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 
 	@Override
 	public void invoke() throws Exception {
-		this.isRunning = true;
-
 		boolean operatorOpen = false;
 
 		if (LOG.isDebugEnabled()) {


[2/8] flink git commit: [hotfix] Fix SourceStreamTaskTest to switch source to running before checkpointing

Posted by se...@apache.org.
[hotfix] Fix SourceStreamTaskTest to switch source to running before checkpointing


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c58ba3da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c58ba3da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c58ba3da

Branch: refs/heads/master
Commit: c58ba3da90884846a52b174098c782eb08308c25
Parents: b0f2379
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Aug 2 15:51:14 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 2 15:58:27 2015 +0200

----------------------------------------------------------------------
 .../runtime/tasks/SourceStreamTaskTest.java     | 72 +++++++++++---------
 1 file changed, 39 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c58ba3da/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 0f6e5f1..232485d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -15,21 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.tasks;
 
+package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -98,38 +98,46 @@ public class SourceStreamTaskTest {
 		final int SOURCE_CHECKPOINT_DELAY = 1000; // how many random values we sum up in storeCheckpoint
 		final int SOURCE_READ_DELAY = 1; // in ms
 
-
-		final TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
-		final SourceStreamTask<Tuple2<Long, Integer>> sourceTask = new SourceStreamTask<Tuple2<Long, Integer>>();
-		final StreamTaskTestHarness<Tuple2<Long, Integer>> testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, typeInfo);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-		StreamSource<Tuple2<Long, Integer>> sourceOperator = new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));
-		streamConfig.setStreamOperator(sourceOperator);
-
-
 		ExecutorService executor = Executors.newFixedThreadPool(10);
-		Future<Boolean>[] checkpointerResults = new Future[NUM_CHECKPOINTERS];
-		for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
-			checkpointerResults[i] = executor.submit(new Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask));
-		}
-
-		testHarness.invoke();
-		testHarness.waitForTaskCompletion();
-
-		// Get the result from the checkpointers, if these threw an exception it
-		// will be rethrown here
-		for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
-			if (!checkpointerResults[i].isDone()) {
-				checkpointerResults[i].cancel(true);
+		try {
+			final TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+			final SourceStreamTask<Tuple2<Long, Integer>> sourceTask = new SourceStreamTask<Tuple2<Long, Integer>>();
+			final StreamTaskTestHarness<Tuple2<Long, Integer>> testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, typeInfo);
+	
+			StreamConfig streamConfig = testHarness.getStreamConfig();
+			StreamSource<Tuple2<Long, Integer>> sourceOperator = new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));
+			streamConfig.setStreamOperator(sourceOperator);
+			
+			// prepare the 
+			
+			Future<Boolean>[] checkpointerResults = new Future[NUM_CHECKPOINTERS];
+	
+			// invoke this first, so the tasks are actually running when the checkpoints are scheduled
+			testHarness.invoke();
+			
+			for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
+				checkpointerResults[i] = executor.submit(new Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask));
 			}
-			if (!checkpointerResults[i].isCancelled()) {
-				checkpointerResults[i].get();
+			
+			testHarness.waitForTaskCompletion();
+	
+			// Get the result from the checkpointers, if these threw an exception it
+			// will be rethrown here
+			for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
+				if (!checkpointerResults[i].isDone()) {
+					checkpointerResults[i].cancel(true);
+				}
+				if (!checkpointerResults[i].isCancelled()) {
+					checkpointerResults[i].get();
+				}
 			}
+	
+			List<Tuple2<Long, Integer>> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+			Assert.assertEquals(NUM_ELEMENTS, resultElements.size());
+		}
+		finally {
+			executor.shutdown();
 		}
-
-		List<Tuple2<Long, Integer>> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
-		Assert.assertEquals(NUM_ELEMENTS, resultElements.size());
 	}
 
 	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> {
@@ -267,9 +275,7 @@ public class SourceStreamTaskTest {
 		}
 
 		@Override
-		public void cancel() {
-
-		}
+		public void cancel() {}
 	}
 }
 


[8/8] flink git commit: [FLINK-2427] [streaming] Make the BarrierBuffer more robust against lost/missing checkpoint barriers.

Posted by se...@apache.org.
[FLINK-2427] [streaming] Make the BarrierBuffer more robust against lost/missing checkpoint barriers.

Checkpoint barriers are now tolerated to be lost (as may happen if the checkpoint triggering actor
messages are lost). This is realized by allowing the BarrierBuffer to maintain multiple queues of blocked inputs.

The patch also reworks the buffer spilling logic, to increase I/O efficiency, and reduce the main memory
footprint in cases where the buffers have little contents (low flush timeouts).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9311b9a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9311b9a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9311b9a9

Branch: refs/heads/master
Commit: 9311b9a9da57796e1eb91aa0ec5fa8948b732a47
Parents: 645d7cd
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 29 13:09:03 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 2 15:58:28 2015 +0200

----------------------------------------------------------------------
 .../network/partition/consumer/InputGate.java   |   1 +
 .../partition/consumer/SingleInputGate.java     |  10 +
 .../partition/consumer/UnionInputGate.java      |  13 +
 .../runtime/util/DataInputDeserializer.java     |   8 +-
 .../streaming/runtime/io/BarrierBuffer.java     | 108 +++--
 .../streaming/runtime/io/BarrierTracker.java    |  19 +-
 .../streaming/runtime/io/BufferSpiller.java     | 371 ++++++++++++--
 .../runtime/io/FreeingBufferRecycler.java       |  37 ++
 .../flink/streaming/runtime/io/SpillReader.java |  78 ---
 .../runtime/io/SpillingBufferOrEvent.java       |  66 ---
 .../consumer/StreamTestSingleInputGate.java     |   2 +
 .../io/BarrierBufferMassiveRandomTest.java      |  11 +-
 .../streaming/runtime/io/BarrierBufferTest.java | 205 +++++++-
 .../runtime/io/BarrierTrackerTest.java          |   9 +-
 .../streaming/runtime/io/BufferSpillerTest.java | 390 +++++++++++++++
 .../runtime/io/DummyBufferRecycler.java         |  34 --
 .../io/SpilledBufferOrEventSequenceTest.java    | 482 +++++++++++++++++++
 .../runtime/io/SpillingBufferOrEventTest.java   | 115 -----
 .../runtime/io/StreamRecordWriterTest.java      |   6 +-
 .../flink/streaming/runtime/io/TestEvent.java   |  88 ++++
 20 files changed, 1650 insertions(+), 403 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index af089fc..c4f9dc4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -37,4 +37,5 @@ public interface InputGate {
 
 	void registerListener(EventListener<InputGate> listener);
 
+	int getPageSize();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 0aebcae..80a79d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -211,6 +211,16 @@ public class SingleInputGate implements InputGate {
 		return bufferPool;
 	}
 
+	@Override
+	public int getPageSize() {
+		if (bufferPool != null) {
+			return bufferPool.getMemorySegmentSize();
+		}
+		else {
+			throw new IllegalStateException("Input gate has not been initialized with buffers.");
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	// Setup/Life-cycle
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 1f974de..730ead2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -182,6 +182,19 @@ public class UnionInputGate implements InputGate {
 		inputGateListener.registerListener(listener);
 	}
 
+	@Override
+	public int getPageSize() {
+		int pageSize = -1;
+		for (InputGate gate : inputGates) {
+			if (pageSize == -1) {
+				pageSize = gate.getPageSize();
+			} else if (gate.getPageSize() != pageSize) {
+				throw new IllegalStateException("Found input gates with different page sizes.");
+			}
+		}
+		return pageSize;
+	}
+
 	/**
 	 * Data availability listener at all unioned input gates.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
index 9915aba..e8e8f6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
@@ -109,7 +109,7 @@ public class DataInputDeserializer implements DataInputView {
 	@Override
 	public char readChar() throws IOException {
 		if (this.position < this.end - 1) {
-			return (char) (((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0));
+			return (char) (((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff));
 		} else {
 			throw new EOFException();
 		}
@@ -205,7 +205,7 @@ public class DataInputDeserializer implements DataInputView {
 	@Override
 	public short readShort() throws IOException {
 		if (position >= 0 && position < this.end - 1) {
-			return (short) ((((this.buffer[position++]) & 0xff) << 8) | (((this.buffer[position++]) & 0xff) << 0));
+			return (short) ((((this.buffer[position++]) & 0xff) << 8) | ((this.buffer[position++]) & 0xff));
 		} else {
 			throw new EOFException();
 		}
@@ -271,7 +271,7 @@ public class DataInputDeserializer implements DataInputView {
 				if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
 					throw new UTFDataFormatException("malformed input around byte " + (count - 1));
 				}
-				chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+				chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
 				break;
 			default:
 				/* 10xx xxxx, 1111 xxxx */
@@ -294,7 +294,7 @@ public class DataInputDeserializer implements DataInputView {
 	@Override
 	public int readUnsignedShort() throws IOException {
 		if (this.position < this.end - 1) {
-			return ((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0);
+			return ((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff);
 		} else {
 			throw new EOFException();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 466b8f7..0441937 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayDeque;
 
@@ -50,12 +49,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	
 	/** The total number of channels that this buffer handles data from */
 	private final int totalNumberOfInputChannels;
-
-	private final SpillReader spillReader;
-	private final BufferSpiller bufferSpiller;
 	
-	private ArrayDeque<SpillingBufferOrEvent> nonProcessed;
-	private ArrayDeque<SpillingBufferOrEvent> blockedNonProcessed;
+	/** To utility to write blocked data to a file channel */
+	private final BufferSpiller bufferSpiller;
+
+	/** The pending blocked buffer/event sequences. Must be consumed before requesting
+	 * further data from the input gate. */
+	private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> queuedBuffered;
+
+	/** The sequence of buffers/events that has been unblocked and must now be consumed
+	 * before requesting further data from the input gate */
+	private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
 
 	/** Handler that receives the checkpoint notifications */
 	private EventListener<CheckpointBarrier> checkpointHandler;
@@ -69,17 +73,21 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	/** Flag to indicate whether we have drawn all available input */
 	private boolean endOfStream;
 
-	
+
+	/**
+	 * 
+	 * @param inputGate Teh input gate to draw the buffers and events from.
+	 * @param ioManager The I/O manager that gives access to the temp directories.
+	 * 
+	 * @throws IOException Thrown, when the spilling to temp files cannot be initialized.
+	 */
 	public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException {
 		this.inputGate = inputGate;
 		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
 		this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
 		
-		this.nonProcessed = new ArrayDeque<SpillingBufferOrEvent>();
-		this.blockedNonProcessed = new ArrayDeque<SpillingBufferOrEvent>();
-		
-		this.bufferSpiller = new BufferSpiller(ioManager);
-		this.spillReader = new SpillReader();
+		this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize());
+		this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
 	}
 
 	// ------------------------------------------------------------------------
@@ -90,15 +98,25 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
 		while (true) {
 			// process buffered BufferOrEvents before grabbing new ones
-			final SpillingBufferOrEvent nextBuffered = nonProcessed.pollFirst();
-			final BufferOrEvent next = nextBuffered == null ?
-					inputGate.getNextBufferOrEvent() :
-					nextBuffered.getBufferOrEvent();
+			BufferOrEvent next;
+			if (currentBuffered != null) {
+				next = currentBuffered.getNext();
+				if (next == null) {
+					currentBuffered = queuedBuffered.pollFirst();
+					if (currentBuffered != null) {
+						currentBuffered.open();
+					}
+					return getNextNonBlocked();
+				}
+			}
+			else {
+				next = inputGate.getNextBufferOrEvent();
+			}
 			
 			if (next != null) {
 				if (isBlocked(next.getChannelIndex())) {
 					// if the channel is blocked we, we just store the BufferOrEvent
-					blockedNonProcessed.add(new SpillingBufferOrEvent(next, bufferSpiller, spillReader));
+					bufferSpiller.add(next);
 				}
 				else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
 					return next;
@@ -181,25 +199,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	
 	@Override
 	public boolean isEmpty() {
-		return nonProcessed.isEmpty() && blockedNonProcessed.isEmpty();
+		return currentBuffered == null;
 	}
 
 	@Override
 	public void cleanup() throws IOException {
 		bufferSpiller.close();
-		File spillfile1 = bufferSpiller.getSpillFile();
-		if (spillfile1 != null) {
-			if (!spillfile1.delete()) {
-				LOG.warn("Cannot remove barrier buffer spill file: " + spillfile1.getAbsolutePath());
-			}
+		if (currentBuffered != null) {
+			currentBuffered.cleanup();
 		}
-
-		spillReader.close();
-		File spillfile2 = spillReader.getSpillFile();
-		if (spillfile2 != null) {
-			if (!spillfile2.delete()) {
-				LOG.warn("Cannot remove barrier buffer spill file: " + spillfile2.getAbsolutePath());
-			}
+		for (BufferSpiller.SpilledBufferOrEventSequence seq : queuedBuffered) {
+			seq.cleanup();
 		}
 	}
 	
@@ -233,7 +243,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	}
 
 	/**
-	 * Releases the blocks on all channels.
+	 * Releases the blocks on all channels. Makes sure the just written data
+	 * is the next to be consumed.
 	 */
 	private void releaseBlocks() throws IOException {
 		if (LOG.isDebugEnabled()) {
@@ -244,27 +255,36 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			blockedChannels[i] = false;
 		}
 		numReceivedBarriers = 0;
-		
-		if (nonProcessed.isEmpty()) {
-			// swap the queues
-			ArrayDeque<SpillingBufferOrEvent> empty = nonProcessed;
-			nonProcessed = blockedNonProcessed;
-			blockedNonProcessed = empty;
+
+		if (currentBuffered == null) {
+			// common case: no more buffered data
+			currentBuffered = bufferSpiller.rollOver();
+			if (currentBuffered != null) {
+				currentBuffered.open();
+			}
 		}
 		else {
-			throw new IllegalStateException("Unconsumed data from previous checkpoint alignment " +
-					"when starting next checkpoint alignment");
+			// uncommon case: buffered data pending
+			// push back the pending data
+			queuedBuffered.addFirst(currentBuffered);
+			
+			// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one 
+			currentBuffered = bufferSpiller.rollOverWithNewBuffer();
+			if (currentBuffered != null) {
+				currentBuffered.open();
+			}
 		}
-		
-		// roll over the spill files
-		spillReader.setSpillFile(bufferSpiller.getSpillFile());
-		bufferSpiller.resetSpillFile();
 	}
 
 	// ------------------------------------------------------------------------
 	// For Testing
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Gets the ID defining the current pending, or just completed, checkpoint.
+	 * 
+	 * @return The ID of the pending of completed checkpoint. 
+	 */
 	public long getCurrentCheckpointId() {
 		return this.currentCheckpointId;
 	}
@@ -275,6 +295,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	
 	@Override
 	public String toString() {
-		return "Non-Processed: " + nonProcessed + " | Blocked: " + blockedNonProcessed;
+		return String.format("last checkpoint: %d, current barriers: %d", currentCheckpointId, numReceivedBarriers);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 6b24556..a0b924f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -28,26 +28,39 @@ import java.util.ArrayDeque;
 
 /**
  * The BarrierTracker keeps track of what checkpoint barriers have been received from
- * which input channels. 
+ * which input channels. Once it has observed all checkpoint barriers for a checkpoint ID,
+ * it notifies its listener of a completed checkpoint.
  * 
  * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input
  * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing
  * guarantees. It can, however, be used to gain "at least once" processing guarantees.</p>
+ * 
+ * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.</p>
  */
 public class BarrierTracker implements CheckpointBarrierHandler {
 
+	/** The tracker tracks a maximum number of checkpoints, for which some, but not all
+	 * barriers have yet arrived. */
 	private static final int MAX_CHECKPOINTS_TO_TRACK = 50;
 	
+	/** The input gate, to draw the buffers and events from */
 	private final InputGate inputGate;
 	
+	/** The number of channels. Once that many barriers have been received for a checkpoint,
+	 * the checkpoint is considered complete. */
 	private final int totalNumberOfInputChannels;
-	
+
+	/** All checkpoints for which some (but not all) barriers have been received,
+	 * and that are not yet known to be subsumed by newer checkpoints */
 	private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
 	
+	/** The listener to be notified on complete checkpoints */
 	private EventListener<CheckpointBarrier> checkpointHandler;
 	
+	/** The highest checkpoint ID encountered so far */
 	private long latestPendingCheckpointID = -1;
 	
+	
 	public BarrierTracker(InputGate inputGate) {
 		this.inputGate = inputGate;
 		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
@@ -149,8 +162,6 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	}
 
 	// ------------------------------------------------------------------------
-	//  
-	// ------------------------------------------------------------------------
 
 	/**
 	 * Simple class for a checkpoint ID with a barrier counter.

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index fda612e..5f9a162 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -20,80 +20,389 @@ package org.apache.flink.streaming.runtime.io;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.util.StringUtils;
 
+/**
+ * The buffer spiller takes the buffers and events from a data stream and adds them to a spill file.
+ * After a number of elements have been spilled, the spiller can "roll over": It presents the spilled
+ * elements as a readable sequence, and opens a new spill file.
+ * 
+ * <p>This implementation buffers data effectively in the OS cache, which gracefully extends to the
+ * disk. Most data is written and re-read milliseconds later. The file is deleted after the read.
+ * Consequently, in most cases, the data will never actually hit the physical disks.</p>
+ * 
+ * <p>IMPORTANT: The SpilledBufferOrEventSequences created by this spiller all reuse the same
+ * reading memory (to reduce overhead) and can consequently not be read concurrently.</p>
+ */
 public class BufferSpiller {
-	
-	/** The random number generator for temp file names */
-	private static final Random RND = new Random();
 
 	/** The counter that selects the next directory to spill into */
 	private static final AtomicInteger DIRECTORY_INDEX = new AtomicInteger(0);
 	
+	/** The size of the buffer with which data is read back in */
+	private static final int READ_BUFFER_SIZE = 1024 * 1024;
 	
 	/** The directories to spill to */
 	private final File tempDir;
-
-	private File spillFile;
 	
-	private FileChannel spillingChannel;
+	/** The name prefix for spill files */
+	private final String spillFilePrefix;
+	
+	/** The buffer used for bulk reading data (used in the SpilledBufferOrEventSequence) */
+	private final ByteBuffer readBuffer;
 	
+	/** The buffer that encodes the spilled header */
+	private final ByteBuffer headBuffer;
 	
+	/** The reusable array that holds header and contents buffers */
+	private final ByteBuffer[] sources;
+	
+	/** The file that we currently spill to */
+	private File currentSpillFile;
+	
+	/** The channel of the file we currently spill to */
+	private FileChannel currentChannel;
 
-	public BufferSpiller(IOManager ioManager) throws IOException {
+	/** The page size, to let this reader instantiate properly sized memory segments */
+	private final int pageSize;
+	
+	/** A counter, to created numbered spill files */
+	private int fileCounter;
+	
+	/** A flag to check whether the spiller has written since the last roll over */
+	private boolean hasWritten;
+	
+	/**
+	 * Creates a new buffer spiller, spilling to one of the I/O manager's temp directories.
+	 * 
+	 * @param ioManager The I/O manager for access to teh temp directories.
+	 * @param pageSize The page size used to re-create spilled buffers.
+	 * @throws IOException Thrown if the temp files for spilling cannot be initialized.
+	 */
+	public BufferSpiller(IOManager ioManager, int pageSize) throws IOException {
+		this.pageSize = pageSize;
+		
+		this.readBuffer = ByteBuffer.allocateDirect(READ_BUFFER_SIZE);
+		this.readBuffer.order(ByteOrder.LITTLE_ENDIAN);
+		
+		this.headBuffer = ByteBuffer.allocateDirect(16);
+		this.headBuffer.order(ByteOrder.LITTLE_ENDIAN);
+		
+		this.sources = new ByteBuffer[] { this.headBuffer, null };
+		
 		File[] tempDirs = ioManager.getSpillingDirectories();
 		this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % tempDirs.length];
+		
+		byte[] rndBytes = new byte[32];
+		new Random().nextBytes(rndBytes);
+		this.spillFilePrefix = StringUtils.byteToHexString(rndBytes) + '.';
+		
+		// prepare for first contents
 		createSpillingChannel();
 	}
 
 	/**
-	 * Dumps the contents of the buffer to disk and recycles the buffer.
+	 * Adds a buffer or event to the sequence of spilled buffers and events.
+	 * 
+	 * @param boe The buffer or event to add and spill.
+	 * @throws IOException Thrown, if the buffer of event could not be spilled.
 	 */
-	public void spill(Buffer buffer) throws IOException {
+	public void add(BufferOrEvent boe) throws IOException {
+		hasWritten = true;
 		try {
-			spillingChannel.write(buffer.getNioBuffer());
-			buffer.recycle();
+			ByteBuffer contents;
+			if (boe.isBuffer()) {
+				Buffer buf = boe.getBuffer();
+				contents = buf.getMemorySegment().wrap(0, buf.getSize());
+			}
+			else {
+				contents = EventSerializer.toSerializedEvent(boe.getEvent());
+			}
+			
+			headBuffer.clear();
+			headBuffer.putInt(boe.getChannelIndex());
+			headBuffer.putInt(contents.remaining());
+			headBuffer.put((byte) (boe.isBuffer() ? 0 : 1));
+			headBuffer.flip();
+			
+			sources[1] = contents;
+			currentChannel.write(sources);
 		}
-		catch (IOException e) {
-			close();
-			throw e;
+		finally {
+			if (boe.isBuffer()) {
+				boe.getBuffer().recycle();
+			}
 		}
 	}
 
-	@SuppressWarnings("resource")
-	private void createSpillingChannel() throws IOException {
-		this.spillFile = new File(tempDir, randomString(RND) + ".buffer");
-		this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
+	/**
+	 * Starts a new sequence of spilled buffers and event and returns the current sequence of spilled buffers
+	 * for reading. This method returns {@code null}, if nothing was added since the creation of the spiller, or the
+	 * last call to this method.
+	 * 
+	 * <p>NOTE: The SpilledBufferOrEventSequences created by this method all reuse the same
+	 * reading memory (to reduce overhead) and can consequently not be read concurrently with each other.
+	 * To create a sequence that can be read concurrently with the previous SpilledBufferOrEventSequence, use the
+	 * {@link #rollOverWithNewBuffer()} method.</p>
+	 * 
+	 * @return The readable sequence of spilled buffers and events, or 'null', if nothing was added.
+	 * @throws IOException Thrown, if the readable sequence could not be created, or no new spill
+	 *                     file could be created.
+	 */
+	public SpilledBufferOrEventSequence rollOver() throws IOException {
+		return rollOverInternal(false);
 	}
 
+	/**
+	 * Starts a new sequence of spilled buffers and event and returns the current sequence of spilled buffers
+	 * for reading. This method returns {@code null}, if nothing was added since the creation of the spiller, or the
+	 * last call to this method.
+	 * 
+	 * <p>The SpilledBufferOrEventSequence returned by this method is safe for concurrent consumption with
+	 * any previously returned sequence.</p>
+	 *
+	 * @return The readable sequence of spilled buffers and events, or 'null', if nothing was added.
+	 * @throws IOException Thrown, if the readable sequence could not be created, or no new spill
+	 *                     file could be created.
+	 */
+	public SpilledBufferOrEventSequence rollOverWithNewBuffer() throws IOException {
+		return rollOverInternal(true);
+	}
+	
+	private SpilledBufferOrEventSequence rollOverInternal(boolean newBuffer) throws IOException {
+		if (!hasWritten) {
+			return null;
+		}
+		
+		ByteBuffer buf;
+		if (newBuffer) {
+			buf = ByteBuffer.allocateDirect(READ_BUFFER_SIZE);
+			buf.order(ByteOrder.LITTLE_ENDIAN);
+		} else {
+			buf = readBuffer;
+		}
+		
+		// create a reader for the spilled data
+		currentChannel.position(0L);
+		SpilledBufferOrEventSequence seq = 
+				new SpilledBufferOrEventSequence(currentSpillFile, currentChannel, buf, pageSize);
+		
+		// create ourselves a new spill file
+		createSpillingChannel();
+		
+		hasWritten = false;
+		return seq;
+	}
 
-
+	/**
+	 * Cleans up the current spilling channel and file.
+	 * 
+	 * Does not clean up the SpilledBufferOrEventSequences generated by calls to 
+	 * {@link #rollOver()}.
+	 * 
+	 * @throws IOException Thrown if channel closing or file deletion fail.
+	 */
 	public void close() throws IOException {
-		if (spillingChannel != null && spillingChannel.isOpen()) {
-			spillingChannel.close();
+		currentChannel.close();
+		if (!currentSpillFile.delete()) {
+			throw new IOException("Cannot delete spill file");
 		}
 	}
 
-	public void resetSpillFile() throws IOException {
-		close();
-		createSpillingChannel();
-	}
+	// ------------------------------------------------------------------------
+	//  For testing
+	// ------------------------------------------------------------------------
 
-	public File getSpillFile() {
-		return spillFile;
+	File getCurrentSpillFile() {
+		return currentSpillFile;
+	}
+	
+	FileChannel getCurrentChannel() {
+		return currentChannel;
 	}
 	
 	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
+	@SuppressWarnings("resource")
+	private void createSpillingChannel() throws IOException {
+		currentSpillFile = new File(tempDir, spillFilePrefix + (fileCounter++) +".buffer");
+		currentChannel = new RandomAccessFile(currentSpillFile, "rw").getChannel();
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This class represents a sequence of spilled buffers and events, created by the
+	 * {@link BufferSpiller}. The sequence of buffers and events can be read back using the
+	 * method {@link #getNext()}.
+	 */
+	public static class SpilledBufferOrEventSequence {
+		
+		/** Header is "channel index" (4 bytes) + length (4 bytes) + buffer/event (1 byte) */
+		private static final int HEADER_LENGTH = 9;
+
+		/** The file containing the data */
+		private final File file;
+		
+		/** The file channel to draw the data from */
+		private final FileChannel fileChannel;
+		
+		/** The byte buffer for bulk reading */
+		private final ByteBuffer buffer;
 
-	private static String randomString(Random random) {
-		final byte[] bytes = new byte[20];
-		random.nextBytes(bytes);
-		return StringUtils.byteToHexString(bytes);
+		/** The page size to instantiate properly sized memory segments */
+		private final int pageSize;
+
+		/** Flag to track whether the sequence has been opened already */
+		private boolean opened = false;
+
+		/**
+		 * Create a reader that reads a sequence of spilled buffers and events.
+		 * 
+		 * @param file The file with the data.
+		 * @param fileChannel The file channel to read the data from.
+		 * @param buffer The buffer used for bulk reading.
+		 * @param pageSize The page size to use for the created memory segments.
+		 */
+		SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize) {
+			this.file = file;
+			this.fileChannel = fileChannel;
+			this.buffer = buffer;
+			this.pageSize = pageSize;
+		}
+
+		/**
+		 * Initializes the sequence for reading.
+		 * This method needs to be called before the first call to {@link #getNext()}. Otherwise
+		 * the results of {@link #getNext()} are not predictable.
+		 */
+		public void open() {
+			if (!opened) {
+				opened = true;
+				buffer.position(0);
+				buffer.limit(0);
+			}
+		}
+
+		/**
+		 * Gets the next BufferOrEvent from the spilled sequence, or {@code null}, if the
+		 * sequence is exhausted.
+		 *         
+		 * @return The next BufferOrEvent from the spilled sequence, or {@code null} (end of sequence).
+		 * @throws IOException Thrown, if the reads failed, of if the byte stream is corrupt.
+		 */
+		public BufferOrEvent getNext() throws IOException {
+			if (buffer.remaining() < HEADER_LENGTH) {
+				buffer.compact();
+				
+				while (buffer.position() < HEADER_LENGTH) {
+					if (fileChannel.read(buffer) == -1) {
+						if (buffer.position() == 0) {
+							// no trailing data
+							return null;
+						} else {
+							throw new IOException("Found trailing incomplete buffer or event");
+						}
+					}
+				}
+				
+				buffer.flip();
+			}
+			
+			final int channel = buffer.getInt();
+			final int length = buffer.getInt();
+			final boolean isBuffer = buffer.get() == 0;
+			
+			
+			if (isBuffer) {
+				// deserialize buffer
+				if (length > pageSize) {
+					throw new IOException(String.format(
+							"Spilled buffer (%d bytes) is larger than page size of (%d bytes)", length, pageSize));
+				}
+
+				MemorySegment seg = new MemorySegment(new byte[pageSize]);
+				
+				int segPos = 0;
+				int bytesRemaining = length;
+				
+				while (true) {
+					int toCopy = Math.min(buffer.remaining(), bytesRemaining);
+					if (toCopy > 0) {
+						seg.put(segPos, buffer, toCopy);
+						segPos += toCopy;
+						bytesRemaining -= toCopy;
+					}
+					
+					if (bytesRemaining == 0) {
+						break;
+					}
+					else {
+						buffer.clear();
+						if (fileChannel.read(buffer) == -1) {
+							throw new IOException("Found trailing incomplete buffer");
+						}
+						buffer.flip();
+					}
+				}
+				
+				
+				Buffer buf = new Buffer(seg, FreeingBufferRecycler.INSTANCE);
+				buf.setSize(length);
+				
+				return new BufferOrEvent(buf, channel);
+			}
+			else {
+				// deserialize event
+				if (length > buffer.capacity() - HEADER_LENGTH) {
+					throw new IOException("Event is too large");
+				}
+
+				if (buffer.remaining() < length) {
+					buffer.compact();
+
+					while (buffer.position() < length) {
+						if (fileChannel.read(buffer) == -1) {
+							throw new IOException("Found trailing incomplete event");
+						}
+					}
+
+					buffer.flip();
+				}
+
+				int oldLimit = buffer.limit();
+				buffer.limit(buffer.position() + length);
+				AbstractEvent evt = EventSerializer.fromSerializedEvent(buffer, getClass().getClassLoader());
+				buffer.limit(oldLimit);
+				
+				return new BufferOrEvent(evt, channel);
+			}
+		}
+
+		/**
+		 * Cleans up all file resources held by this spilled sequence.
+		 * 
+		 * @throws IOException Thrown, if file channel closing or file deletion fail. 
+		 */
+		public void cleanup() throws IOException {
+			fileChannel.close();
+			if (!file.delete()) {
+				throw new IOException("Cannot remove temp file for stream alignment writer");
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java
new file mode 100644
index 0000000..27e37a5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
+/**
+ * A simple buffer recycler that only frees the memory segments.
+ */
+public class FreeingBufferRecycler implements BufferRecycler {
+	
+	public static final BufferRecycler INSTANCE = new FreeingBufferRecycler();
+	
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public void recycle(MemorySegment memorySegment) {
+		memorySegment.free();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java
deleted file mode 100644
index 356b491..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-
-public class SpillReader {
-
-	private FileChannel spillingChannel;
-	private File spillFile;
-
-	/**
-	 * Reads the next buffer from the spilled file.
-	 */
-	public Buffer readNextBuffer(int bufferSize) throws IOException {
-		try {
-			Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]),
-					new BufferRecycler() {
-
-						@Override
-						public void recycle(MemorySegment memorySegment) {
-							memorySegment.free();
-						}
-					});
-
-			spillingChannel.read(buffer.getMemorySegment().wrap(0, bufferSize));
-
-			return buffer;
-		} catch (Exception e) {
-			close();
-			throw new IOException(e);
-		}
-	}
-
-	@SuppressWarnings("resource")
-	public void setSpillFile(File nextSpillFile) throws IOException {
-		// We can close and delete the file now
-		close();
-		if (spillFile != null) {
-			spillFile.delete();
-		}
-		this.spillFile = nextSpillFile;
-		this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
-	}
-
-	public File getSpillFile() {
-		return spillFile;
-	}
-
-	public void close() throws IOException {
-		if (this.spillingChannel != null && this.spillingChannel.isOpen()) {
-			this.spillingChannel.close();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java
deleted file mode 100644
index 368e373..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-
-public class SpillingBufferOrEvent {
-
-	private BufferOrEvent boe;
-	private boolean isSpilled = false;
-
-	private SpillReader spillReader;
-
-	private int channelIndex;
-	private int bufferSize;
-
-	public SpillingBufferOrEvent(BufferOrEvent boe, BufferSpiller spiller, SpillReader reader)
-			throws IOException {
-
-		this.boe = boe;
-		this.channelIndex = boe.getChannelIndex();
-		this.spillReader = reader;
-
-		if (boe.isBuffer()) {
-			this.bufferSize = boe.getBuffer().getSize();
-			spiller.spill(boe.getBuffer());
-			this.boe = null;
-			this.isSpilled = true;
-		}
-	}
-
-	/**
-	 * If the buffer wasn't spilled simply returns the instance from the field,
-	 * otherwise reads it from the spill reader
-	 */
-	public BufferOrEvent getBufferOrEvent() throws IOException {
-		if (isSpilled) {
-			boe = new BufferOrEvent(spillReader.readNextBuffer(bufferSize), channelIndex);
-			this.isSpilled = false;
-			return boe;
-		} else {
-			return boe;
-		}
-	}
-
-	public boolean isSpilled() {
-		return isSpilled;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index b59ad19..4007da8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -38,6 +38,7 @@ import org.mockito.stubbing.Answer;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -75,6 +76,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 		inputQueues = new ConcurrentLinkedQueue[numInputChannels];
 
 		setupInputChannels();
+		doReturn(bufferSize).when(inputGate).getPageSize();
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index c2df4d8..7350516 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -42,14 +42,16 @@ import org.junit.Test;
  */
 public class BarrierBufferMassiveRandomTest {
 
+	private static final int PAGE_SIZE = 1024;
+	
 	@Test
 	public void testWithTwoChannelsAndRandomBarriers() {
 		IOManager ioMan = null;
 		try {
 			ioMan = new IOManagerAsync();
 			
-			BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
-			BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+			BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE).createBufferPool(100, true);
+			BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE).createBufferPool(100, true);
 
 			RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
 					new BufferPool[] { pool1, pool2 },
@@ -163,5 +165,10 @@ public class BarrierBufferMassiveRandomTest {
 
 		@Override
 		public void registerListener(EventListener<InputGate> listener) {}
+
+		@Override
+		public int getPageSize() {
+			return PAGE_SIZE;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index ad61c6f..b8b3a8c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -49,6 +49,8 @@ import static org.junit.Assert.fail;
  */
 public class BarrierBufferTest {
 
+	private static final int PAGE_SIZE = 512;
+	
 	private static int SIZE_COUNTER = 0;
 	
 	private static IOManager IO_MANAGER;
@@ -89,6 +91,8 @@ public class BarrierBufferTest {
 			
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+			
+			buffer.cleanup();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -118,6 +122,8 @@ public class BarrierBufferTest {
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+
+			buffer.cleanup();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -157,6 +163,8 @@ public class BarrierBufferTest {
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+
+			buffer.cleanup();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -272,6 +280,8 @@ public class BarrierBufferTest {
 			
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+
+			buffer.cleanup();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -327,6 +337,8 @@ public class BarrierBufferTest {
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+			buffer.cleanup();
+			
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -441,6 +453,8 @@ public class BarrierBufferTest {
 			
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+
+			buffer.cleanup();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -516,6 +530,95 @@ public class BarrierBufferTest {
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+			
+			buffer.cleanup();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Validates that the buffer skips over the current checkpoint if it
+	 * receives a barrier from a later checkpoint on a non-blocked input.
+	 */
+	@Test
+	public void testMultiChannelJumpingOverCheckpoint() {
+		try {
+			BufferOrEvent[] sequence = {
+					// checkpoint 1 - with blocked data
+					createBuffer(0), createBuffer(2), createBuffer(0),
+					createBarrier(1, 1), createBarrier(1, 2),
+					createBuffer(2), createBuffer(1), createBuffer(0),
+					createBarrier(1, 0),
+					createBuffer(1), createBuffer(0),
+
+					// checkpoint 2 will not complete: pre-mature barrier from checkpoint 3
+					createBarrier(2, 1),
+					createBuffer(1), createBuffer(2),
+					createBarrier(2, 0),
+					createBuffer(2), createBuffer(0),
+					createBarrier(3, 1),
+					createBuffer(1), createBuffer(2),
+					createBarrier(3, 0),
+					createBuffer(2), createBuffer(0),
+					createBarrier(4, 2),
+
+					createBuffer(2),
+					createBuffer(1), createEndOfPartition(1),
+					createBuffer(2), createEndOfPartition(2),
+					createBuffer(0), createEndOfPartition(0)
+			};
+
+			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+			buffer.registerCheckpointEventHandler(handler);
+			handler.setNextExpectedCheckpointId(1L);
+
+			// checkpoint 1
+			check(sequence[0], buffer.getNextNonBlocked());
+			check(sequence[1], buffer.getNextNonBlocked());
+			check(sequence[2], buffer.getNextNonBlocked());
+			check(sequence[7], buffer.getNextNonBlocked());
+			assertEquals(1L, buffer.getCurrentCheckpointId());
+
+			check(sequence[5], buffer.getNextNonBlocked());
+			check(sequence[6], buffer.getNextNonBlocked());
+			check(sequence[9], buffer.getNextNonBlocked());
+			check(sequence[10], buffer.getNextNonBlocked());
+
+			// alignment of checkpoint 2
+			check(sequence[13], buffer.getNextNonBlocked());
+			assertEquals(2L, buffer.getCurrentCheckpointId());
+			check(sequence[15], buffer.getNextNonBlocked());
+			check(sequence[19], buffer.getNextNonBlocked());
+			check(sequence[21], buffer.getNextNonBlocked());
+
+			// checkpoint 2 aborted, checkpoint 4 started. replay buffered
+			check(sequence[12], buffer.getNextNonBlocked());
+			assertEquals(4L, buffer.getCurrentCheckpointId());
+			check(sequence[16], buffer.getNextNonBlocked());
+			check(sequence[18], buffer.getNextNonBlocked());
+			check(sequence[22], buffer.getNextNonBlocked());
+			
+			// align remainder
+			check(sequence[25], buffer.getNextNonBlocked());
+			check(sequence[26], buffer.getNextNonBlocked());
+			check(sequence[29], buffer.getNextNonBlocked());
+			check(sequence[30], buffer.getNextNonBlocked());
+			
+			// end of input, emit remainder
+			check(sequence[24], buffer.getNextNonBlocked());
+			check(sequence[27], buffer.getNextNonBlocked());
+			check(sequence[28], buffer.getNextNonBlocked());
+
+			assertNull(buffer.getNextNonBlocked());
+			assertNull(buffer.getNextNonBlocked());
+
+			buffer.cleanup();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -526,15 +629,8 @@ public class BarrierBufferTest {
 	/**
 	 * Validates that the buffer skips over a later checkpoint if it
 	 * receives a barrier from an even later checkpoint on a blocked input.
-	 * 
-	 * NOTE: This test currently fails, because the barrier buffer does not support
-	 * to unblock inputs before all previously unblocked data is consumed.
-	 * 
-	 * Since this test checks only that the buffer behaves "failsafe" in cases of
-	 * corrupt checkpoint barrier propagation (a situation that does not occur
-	 * under the current model), we ignore it for the moment.
 	 */
-//	@Test
+	@Test
 	public void testMultiChannelSkippingCheckpointsViaBlockedInputs() {
 		try {
 			BufferOrEvent[] sequence = {
@@ -551,18 +647,23 @@ public class BarrierBufferTest {
 					createBarrier(2, 0),
 					createBuffer(1), createBuffer(0),
 
+					createBarrier(3, 0), // queued barrier on blocked input
+					createBuffer(0),
+					
 					createBarrier(4, 1), // pre-mature barrier on blocked input
-					createBarrier(3, 0), // queued barrier, ignored on replay
+					createBuffer(1),
+					createBuffer(0),
+					createBuffer(2),
 
 					// complete checkpoint 2
-					createBarrier(2, 0),
+					createBarrier(2, 2),
 					createBuffer(0),
 					
 					createBarrier(3, 2), // should be ignored
 					createBuffer(2),
 					createBarrier(4, 0),
 					createBuffer(0), createBuffer(1), createBuffer(2),
-					createBarrier(4, 1),
+					createBarrier(4, 2),
 					
 					createBuffer(1), createEndOfPartition(1),
 					createBuffer(2), createEndOfPartition(2),
@@ -585,6 +686,7 @@ public class BarrierBufferTest {
 
 			// alignment of checkpoint 2
 			check(sequence[13], buffer.getNextNonBlocked());
+			check(sequence[22], buffer.getNextNonBlocked());
 			assertEquals(2L, buffer.getCurrentCheckpointId());
 
 			// checkpoint 2 completed
@@ -593,24 +695,79 @@ public class BarrierBufferTest {
 			check(sequence[16], buffer.getNextNonBlocked());
 			
 			// checkpoint 3 skipped, alignment for 4 started
-			check(sequence[20], buffer.getNextNonBlocked());
+			check(sequence[18], buffer.getNextNonBlocked());
 			assertEquals(4L, buffer.getCurrentCheckpointId());
-			check(sequence[22], buffer.getNextNonBlocked());
+			check(sequence[21], buffer.getNextNonBlocked());
+			check(sequence[24], buffer.getNextNonBlocked());
 			check(sequence[26], buffer.getNextNonBlocked());
-
+			check(sequence[30], buffer.getNextNonBlocked());
+			
 			// checkpoint 4 completed
-			check(sequence[24], buffer.getNextNonBlocked());
-			check(sequence[25], buffer.getNextNonBlocked());
-
+			check(sequence[20], buffer.getNextNonBlocked());
 			check(sequence[28], buffer.getNextNonBlocked());
 			check(sequence[29], buffer.getNextNonBlocked());
-			check(sequence[30], buffer.getNextNonBlocked());
-			check(sequence[31], buffer.getNextNonBlocked());
+			
 			check(sequence[32], buffer.getNextNonBlocked());
 			check(sequence[33], buffer.getNextNonBlocked());
+			check(sequence[34], buffer.getNextNonBlocked());
+			check(sequence[35], buffer.getNextNonBlocked());
+			check(sequence[36], buffer.getNextNonBlocked());
+			check(sequence[37], buffer.getNextNonBlocked());
 			
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
+
+			buffer.cleanup();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testEarlyCleanup() {
+		try {
+			BufferOrEvent[] sequence = {
+					createBuffer(0), createBuffer(1), createBuffer(2),
+					createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0),
+
+					createBuffer(2), createBuffer(1), createBuffer(0),
+					createBarrier(2, 1),
+					createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2),
+					createBarrier(2, 2),
+					createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)
+			};
+
+			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+			buffer.registerCheckpointEventHandler(handler);
+			handler.setNextExpectedCheckpointId(1L);
+
+			// pre-checkpoint 1
+			check(sequence[0], buffer.getNextNonBlocked());
+			check(sequence[1], buffer.getNextNonBlocked());
+			check(sequence[2], buffer.getNextNonBlocked());
+			assertEquals(1L, handler.getNextExpectedCheckpointId());
+
+			// pre-checkpoint 2
+			check(sequence[6], buffer.getNextNonBlocked());
+			assertEquals(2L, handler.getNextExpectedCheckpointId());
+			check(sequence[7], buffer.getNextNonBlocked());
+			check(sequence[8], buffer.getNextNonBlocked());
+
+			// checkpoint 2 alignment
+			check(sequence[13], buffer.getNextNonBlocked());
+			check(sequence[14], buffer.getNextNonBlocked());
+			check(sequence[18], buffer.getNextNonBlocked());
+			check(sequence[19], buffer.getNextNonBlocked());
+
+			// end of stream: remaining buffered contents
+			buffer.getNextNonBlocked();
+			buffer.cleanup();
+
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -629,8 +786,9 @@ public class BarrierBufferTest {
 	private static BufferOrEvent createBuffer(int channel) {
 		// since we have no access to the contents, we need to use the size as an
 		// identifier to validate correctness here
-		return new BufferOrEvent(
-				new Buffer(new MemorySegment(new byte[SIZE_COUNTER++]),  DummyBufferRecycler.INSTANCE), channel);
+		Buffer buf = new Buffer(new MemorySegment(new byte[PAGE_SIZE]), FreeingBufferRecycler.INSTANCE);
+		buf.setSize(SIZE_COUNTER++);
+		return new BufferOrEvent(buf, channel);
 	}
 
 	private static BufferOrEvent createEndOfPartition(int channel) {
@@ -689,6 +847,11 @@ public class BarrierBufferTest {
 
 		@Override
 		public void registerListener(EventListener<InputGate> listener) {}
+
+		@Override
+		public int getPageSize() {
+			return PAGE_SIZE;
+		}
 	}
 
 	private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier> {

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index b2c570e..532078c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -39,7 +39,7 @@ import static org.junit.Assert.*;
  * Tests for the behavior of the barrier tracker.
  */
 public class BarrierTrackerTest {
-
+	
 	@Test
 	public void testSingleChannelNoBarriers() {
 		try {
@@ -341,7 +341,7 @@ public class BarrierTrackerTest {
 
 	private static BufferOrEvent createBuffer(int channel) {
 		return new BufferOrEvent(
-				new Buffer(new MemorySegment(new byte[] { 1 }),  DummyBufferRecycler.INSTANCE), channel);
+				new Buffer(new MemorySegment(new byte[] { 1, 2 }), FreeingBufferRecycler.INSTANCE), channel);
 	}
 	
 	// ------------------------------------------------------------------------
@@ -381,6 +381,11 @@ public class BarrierTrackerTest {
 
 		@Override
 		public void registerListener(EventListener<InputGate> listener) {}
+
+		@Override
+		public int getPageSize() {
+			return 2;
+		}
 	}
 
 	private static class CheckpointSequenceValidator implements EventListener<CheckpointBarrier> {

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
new file mode 100644
index 0000000..ae384e1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
+public class BufferSpillerTest {
+
+	private static final int PAGE_SIZE = 4096;
+
+	private static IOManager IO_MANAGER;
+
+	private BufferSpiller spiller;
+
+
+	// ------------------------------------------------------------------------
+	//  Setup / Cleanup
+	// ------------------------------------------------------------------------
+	
+	@BeforeClass
+	public static void setupIOManager() {
+		IO_MANAGER = new IOManagerAsync();
+	}
+
+	@AfterClass
+	public static void shutdownIOManager() {
+		IO_MANAGER.shutdown();
+	}
+	
+	@Before
+	public void createSpiller() {
+		try {
+			spiller = new BufferSpiller(IO_MANAGER, PAGE_SIZE);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Cannot create BufferSpiller: " + e.getMessage());
+		}
+	}
+	
+	@After
+	public void cleanupSpiller() {
+		if (spiller != null) {
+			try {
+				spiller.close();
+			}
+			catch (Exception e) {
+				e.printStackTrace();
+				fail("Cannot properly close the BufferSpiller: " + e.getMessage());
+			}
+			
+			assertFalse(spiller.getCurrentChannel().isOpen());
+			assertFalse(spiller.getCurrentSpillFile().exists());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void testRollOverEmptySequences() {
+		try {
+			assertNull(spiller.rollOver());
+			assertNull(spiller.rollOver());
+			assertNull(spiller.rollOver());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSpillAndRollOverSimple() {
+		try {
+			final Random rnd = new Random();
+			final Random bufferRnd = new Random();
+
+			final int maxNumEventsAndBuffers = 3000;
+			final int maxNumChannels = 1656;
+
+			// do multiple spilling / rolling over rounds
+			for (int round = 0; round < 5; round++) {
+				
+				final long bufferSeed = rnd.nextLong();
+				bufferRnd.setSeed(bufferSeed);
+				
+				final int numEventsAndBuffers = rnd.nextInt(maxNumEventsAndBuffers) + 1;
+				final int numChannels = rnd.nextInt(maxNumChannels) + 1;
+				
+				final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
+
+				// generate sequence
+				for (int i = 0; i < numEventsAndBuffers; i++) {
+					boolean isEvent = rnd.nextDouble() < 0.05d;
+					if (isEvent) {
+						BufferOrEvent evt = generateRandomEvent(rnd, numChannels);
+						events.add(evt);
+						spiller.add(evt);
+					}
+					else {
+						BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
+						spiller.add(evt);
+					}
+				}
+
+				// reset and create reader
+				bufferRnd.setSeed(bufferSeed);
+			
+				BufferSpiller.SpilledBufferOrEventSequence seq = spiller.rollOver();
+				seq.open();
+
+				// read and validate the sequence
+
+				int numEvent = 0;
+				for (int i = 0; i < numEventsAndBuffers; i++) {
+					BufferOrEvent next = seq.getNext();
+					assertNotNull(next);
+					if (next.isEvent()) {
+						BufferOrEvent expected = events.get(numEvent++);
+						assertEquals(expected.getEvent(), next.getEvent());
+						assertEquals(expected.getChannelIndex(), next.getChannelIndex());
+					}
+					else {
+						validateBuffer(next, bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
+					}
+				}
+
+				// no further data
+				assertNull(seq.getNext());
+
+				// all events need to be consumed
+				assertEquals(events.size(), numEvent);
+				
+				seq.cleanup();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSpillWhileReading() {
+		try {
+			final int sequences = 10;
+			
+			final Random rnd = new Random();
+			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+			
+			final SequenceConsumer consumer = new SequenceConsumer(error, sequences);
+			consumer.start();
+			
+			final int maxNumEventsAndBuffers = 30000;
+			final int maxNumChannels = 1656;
+			
+			// do multiple spilling / rolling over rounds
+			for (int round = 0; round < 2*sequences; round++) {
+
+				if (round % 2 == 1) {
+					// make this an empty sequence
+					assertNull(spiller.rollOver());
+				}
+				else {
+					// proper spilled sequence
+					final long bufferSeed = rnd.nextLong();
+					final Random bufferRnd = new Random(bufferSeed);
+					
+					final int numEventsAndBuffers = rnd.nextInt(maxNumEventsAndBuffers) + 1;
+					final int numChannels = rnd.nextInt(maxNumChannels) + 1;
+	
+					final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
+	
+					// generate sequence
+					for (int i = 0; i < numEventsAndBuffers; i++) {
+						boolean isEvent = rnd.nextDouble() < 0.05d;
+						if (isEvent) {
+							BufferOrEvent evt = generateRandomEvent(rnd, numChannels);
+							events.add(evt);
+							spiller.add(evt);
+						}
+						else {
+							BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
+							spiller.add(evt);
+						}
+					}
+	
+					// reset and create reader
+					bufferRnd.setSeed(bufferSeed);
+					BufferSpiller.SpilledBufferOrEventSequence seq = spiller.rollOver();
+					
+					SequenceToConsume stc = new SequenceToConsume(bufferRnd, events, seq, numEventsAndBuffers, numChannels);
+					consumer.queue(stc);
+				}
+			}
+			
+			// wait for the consumer
+			consumer.join(180000);
+			assertFalse("sequence consumer did not finish its work in time", consumer.isAlive());
+			
+			// validate there was no error in the consumer
+			if (error.get() != null) {
+				Throwable t = error.get();
+				if (t instanceof Error) {
+					throw (Error) t;
+				}
+				else {
+					throw new Exception("Error while consuming the spilled records", t);
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utils
+	// ------------------------------------------------------------------------
+	
+	private static BufferOrEvent generateRandomEvent(Random rnd, int numChannels) {
+		long magicNumber = rnd.nextLong();
+		byte[] data = new byte[rnd.nextInt(1000)];
+		rnd.nextBytes(data);
+		TestEvent evt = new TestEvent(magicNumber, data);
+
+		int channelIndex = rnd.nextInt(numChannels);
+		
+		return new BufferOrEvent(evt, channelIndex);
+	}
+
+	private static BufferOrEvent generateRandomBuffer(int size, int channelIndex) {
+		MemorySegment seg = new MemorySegment(new byte[PAGE_SIZE]);
+		for (int i = 0; i < size; i++) {
+			seg.put(i, (byte) i);
+		}
+		
+		Buffer buf = new Buffer(seg, FreeingBufferRecycler.INSTANCE);
+		buf.setSize(size);
+		return new BufferOrEvent(buf, channelIndex);
+	}
+
+	private static void validateBuffer(BufferOrEvent boe, int expectedSize, int expectedChannelIndex) {
+		assertEquals("wrong channel index", expectedChannelIndex, boe.getChannelIndex());
+		assertTrue("is not buffer", boe.isBuffer());
+
+		Buffer buf = boe.getBuffer();
+		assertEquals("wrong buffer size", expectedSize, buf.getSize());
+
+		MemorySegment seg = buf.getMemorySegment();
+		for (int i = 0; i < expectedSize; i++) {
+			assertEquals("wrong buffer contents", (byte) i, seg.get(i));
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Async Consumer
+	// ------------------------------------------------------------------------
+	
+	private static class SequenceToConsume {
+
+		final BufferSpiller.SpilledBufferOrEventSequence sequence;
+		final ArrayList<BufferOrEvent> events;
+		final Random bufferRnd;
+		final int numBuffersAndEvents;
+		final int numChannels;
+
+		private SequenceToConsume(Random bufferRnd, ArrayList<BufferOrEvent> events,
+									BufferSpiller.SpilledBufferOrEventSequence sequence,
+									int numBuffersAndEvents, int numChannels) {
+			this.bufferRnd = bufferRnd;
+			this.events = events;
+			this.sequence = sequence;
+			this.numBuffersAndEvents = numBuffersAndEvents;
+			this.numChannels = numChannels;
+		}
+	}
+	
+	private static class SequenceConsumer extends Thread {
+		
+		private final AtomicReference<Throwable> error;
+		private final BlockingQueue<SequenceToConsume> sequences;
+		
+		private final int numSequencesToConsume;
+		
+		private int consumedSequences;
+
+		private SequenceConsumer(AtomicReference<Throwable> error, int numSequencesToConsume) {
+			super("Sequence Consumer");
+			setDaemon(true);
+			
+			this.error = error;
+			this.numSequencesToConsume = numSequencesToConsume;
+			this.sequences = new LinkedBlockingQueue<SequenceToConsume>();
+		}
+
+
+		@Override
+		public void run() {
+			try {
+				while (consumedSequences < numSequencesToConsume) {
+					// get next sequence
+					SequenceToConsume nextSequence = sequences.take();
+				
+					// wait a bit, allow some stuff to queue up
+					Thread.sleep(50);
+
+					BufferSpiller.SpilledBufferOrEventSequence seq = nextSequence.sequence;
+					ArrayList<BufferOrEvent> events = nextSequence.events;
+					Random bufferRnd = nextSequence.bufferRnd;
+					int numBuffersAndEvents = nextSequence.numBuffersAndEvents;
+					int numChannels = nextSequence.numChannels;
+
+					// consume sequence
+					seq.open();
+					
+					int numEvent = 0;
+					for (int i = 0; i < numBuffersAndEvents; i++) {
+						BufferOrEvent next = seq.getNext();
+						assertNotNull(next);
+						if (next.isEvent()) {
+							BufferOrEvent expected = events.get(numEvent++);
+							assertEquals(expected.getEvent(), next.getEvent());
+							assertEquals(expected.getChannelIndex(), next.getChannelIndex());
+						}
+						else {
+							validateBuffer(next, bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
+						}
+					}
+	
+					// no further data
+					assertNull(seq.getNext());
+	
+					// all events need to be consumed
+					assertEquals(events.size(), numEvent);
+	
+					// remove all temp files
+					seq.cleanup();
+					
+					consumedSequences++;
+				}
+				
+			}
+			catch (Throwable t) {
+				error.set(t);
+			}
+		}
+		
+		public void queue(SequenceToConsume next) {
+			sequences.add(next);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
deleted file mode 100644
index 3f815ef..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-
-/**
- * A BufferRecycler that does nothing.
- */
-public class DummyBufferRecycler implements BufferRecycler {
-	
-	public static final BufferRecycler INSTANCE = new DummyBufferRecycler();
-	
-	
-	@Override
-	public void recycle(MemorySegment memorySegment) {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9311b9a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
new file mode 100644
index 0000000..991b033
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.streaming.runtime.io.BufferSpiller.SpilledBufferOrEventSequence;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests that validate the behavior of the {@link SpilledBufferOrEventSequence} in isolation,
+ * with respect to detecting corrupt sequences, trailing data, and interleaved buffers and events.
+ */
+public class SpilledBufferOrEventSequenceTest {
+	
+	private final ByteBuffer buffer = ByteBuffer.allocateDirect(128 * 1024).order(ByteOrder.LITTLE_ENDIAN);
+	private final int pageSize = 32*1024;
+	
+	private File tempFile;
+	private FileChannel fileChannel;
+	
+	
+	@Before
+	public void initTempChannel() {
+		try {
+			tempFile = File.createTempFile("testdata", "tmp");
+			fileChannel = new RandomAccessFile(tempFile, "rw").getChannel();
+		}
+		catch (Exception e) {
+			cleanup();
+		}
+	}
+	
+	@After
+	public void cleanup() {
+		if (fileChannel != null) {
+			try {
+				fileChannel.close();
+			}
+			catch (IOException e) {
+				// ignore
+			}
+		}
+		if (tempFile != null) {
+			//noinspection ResultOfMethodCallIgnored
+			tempFile.delete();
+		}
+	}
+	
+	
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void testEmptyChannel() {
+		try {
+			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			seq.open();
+			
+			assertNull(seq.getNext());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testIncompleteHeaderOnFirstElement() {
+		try {
+			ByteBuffer buf = ByteBuffer.allocate(7);
+			buf.order(ByteOrder.LITTLE_ENDIAN);
+			
+			fileChannel.write(buf);
+			fileChannel.position(0);
+			
+			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			seq.open();
+			
+			try {
+				seq.getNext();
+				fail("should fail with an exception");
+			}
+			catch (IOException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testBufferSequence() {
+		try {
+			final Random rnd = new Random();
+			final long seed = rnd.nextLong();
+			
+			final int numBuffers = 325;
+			final int numChannels = 671;
+			
+			rnd.setSeed(seed);
+			
+			for (int i = 0; i < numBuffers; i++) {
+				writeBuffer(fileChannel, rnd.nextInt(pageSize) + 1, rnd.nextInt(numChannels));
+			}
+
+			fileChannel.position(0L);
+			rnd.setSeed(seed);
+
+			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			seq.open();
+			
+			for (int i = 0; i < numBuffers; i++) {
+				validateBuffer(seq.getNext(), rnd.nextInt(pageSize) + 1, rnd.nextInt(numChannels));
+			}
+			
+			// should have no more data
+			assertNull(seq.getNext());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBufferSequenceWithIncompleteBuffer() {
+		try {
+			writeBuffer(fileChannel, 1672, 7);
+			
+			// write an incomplete buffer
+			ByteBuffer data = ByteBuffer.allocate(615);
+			data.order(ByteOrder.LITTLE_ENDIAN);
+			
+			data.putInt(2);
+			data.putInt(999);
+			data.put((byte) 0);
+			data.position(0);
+			data.limit(312);
+			fileChannel.write(data);
+			fileChannel.position(0L);
+
+			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			seq.open();
+			
+			// first one is valid
+			validateBuffer(seq.getNext(), 1672, 7);
+			
+			// next one should fail
+			try {
+				seq.getNext();
+				fail("should fail with an exception");
+			}
+			catch (IOException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testEventSequence() {
+		try {
+			final Random rnd = new Random();
+			final int numEvents = 3000;
+			final int numChannels = 1656;
+			
+			final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(numEvents);
+			
+			for (int i = 0; i < numEvents; i++) {
+				events.add(generateAndWriteEvent(fileChannel, rnd, numChannels));
+			}
+
+			fileChannel.position(0L);
+			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			seq.open();
+			
+			int i = 0;
+			BufferOrEvent boe;
+			while ((boe = seq.getNext()) != null) {
+				BufferOrEvent expected = events.get(i);
+				assertTrue(boe.isEvent());
+				assertEquals(expected.getEvent(), boe.getEvent());
+				assertEquals(expected.getChannelIndex(), boe.getChannelIndex());
+				i++;
+			}
+			
+			assertEquals(numEvents, i);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testMixedSequence() {
+		try {
+			final Random rnd = new Random();
+			final Random bufferRnd = new Random();
+
+			final long bufferSeed = rnd.nextLong();
+			bufferRnd.setSeed(bufferSeed);
+			
+			final int numEventsAndBuffers = 3000;
+			final int numChannels = 1656;
+
+			final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
+
+			// generate sequence
+			
+			for (int i = 0; i < numEventsAndBuffers; i++) {
+				boolean isEvent = rnd.nextDouble() < 0.05d;
+				if (isEvent) {
+					events.add(generateAndWriteEvent(fileChannel, rnd, numChannels));
+				}
+				else {
+					writeBuffer(fileChannel, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
+				}
+			}
+			
+			// reset and create reader
+			
+			fileChannel.position(0L);
+			bufferRnd.setSeed(bufferSeed);
+			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			seq.open();
+			
+			// read and validate the sequence
+			
+			int numEvent = 0;
+			for (int i = 0; i < numEventsAndBuffers; i++) {
+				BufferOrEvent next = seq.getNext();
+				if (next.isEvent()) {
+					BufferOrEvent expected = events.get(numEvent++);
+					assertEquals(expected.getEvent(), next.getEvent());
+					assertEquals(expected.getChannelIndex(), next.getChannelIndex());
+				}
+				else {
+					validateBuffer(next, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
+				}
+			}
+			
+			// no further data
+			assertNull(seq.getNext());
+			
+			// all events need to be consumed
+			assertEquals(events.size(), numEvent);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testMultipleSequences() {
+		File secondFile = null;
+		FileChannel secondChannel = null;
+		
+		try {
+			// create the second file channel
+			secondFile = File.createTempFile("testdata", "tmp");
+			secondChannel = new RandomAccessFile(secondFile, "rw").getChannel();
+			
+			final Random rnd = new Random();
+			final Random bufferRnd = new Random();
+
+			final long bufferSeed = rnd.nextLong();
+			bufferRnd.setSeed(bufferSeed);
+
+			final int numEventsAndBuffers1 = 272;
+			final int numEventsAndBuffers2 = 151;
+			
+			final int numChannels = 1656;
+
+			final ArrayList<BufferOrEvent> events1 = new ArrayList<BufferOrEvent>(128);
+			final ArrayList<BufferOrEvent> events2 = new ArrayList<BufferOrEvent>(128);
+
+			// generate sequence 1
+
+			for (int i = 0; i < numEventsAndBuffers1; i++) {
+				boolean isEvent = rnd.nextDouble() < 0.05d;
+				if (isEvent) {
+					events1.add(generateAndWriteEvent(fileChannel, rnd, numChannels));
+				}
+				else {
+					writeBuffer(fileChannel, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
+				}
+			}
+
+			// generate sequence 2
+
+			for (int i = 0; i < numEventsAndBuffers2; i++) {
+				boolean isEvent = rnd.nextDouble() < 0.05d;
+				if (isEvent) {
+					events2.add(generateAndWriteEvent(secondChannel, rnd, numChannels));
+				}
+				else {
+					writeBuffer(secondChannel, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
+				}
+			}
+
+			// reset and create reader
+
+			fileChannel.position(0L);
+			secondChannel.position(0L);
+			
+			bufferRnd.setSeed(bufferSeed);
+			
+			SpilledBufferOrEventSequence seq1 = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			SpilledBufferOrEventSequence seq2 = new SpilledBufferOrEventSequence(secondFile, secondChannel, buffer, pageSize);
+
+			// read and validate the sequence 1
+			seq1.open();
+
+			int numEvent = 0;
+			for (int i = 0; i < numEventsAndBuffers1; i++) {
+				BufferOrEvent next = seq1.getNext();
+				if (next.isEvent()) {
+					BufferOrEvent expected = events1.get(numEvent++);
+					assertEquals(expected.getEvent(), next.getEvent());
+					assertEquals(expected.getChannelIndex(), next.getChannelIndex());
+				}
+				else {
+					validateBuffer(next, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
+				}
+			}
+			assertNull(seq1.getNext());
+			assertEquals(events1.size(), numEvent);
+
+			// read and validate the sequence 2
+			seq2.open();
+
+			numEvent = 0;
+			for (int i = 0; i < numEventsAndBuffers2; i++) {
+				BufferOrEvent next = seq2.getNext();
+				if (next.isEvent()) {
+					BufferOrEvent expected = events2.get(numEvent++);
+					assertEquals(expected.getEvent(), next.getEvent());
+					assertEquals(expected.getChannelIndex(), next.getChannelIndex());
+				}
+				else {
+					validateBuffer(next, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
+				}
+			}
+			assertNull(seq2.getNext());
+			assertEquals(events2.size(), numEvent);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (secondChannel != null) {
+				try {
+					secondChannel.close();
+				}
+				catch (IOException e) {
+					// ignore here
+				}
+			}
+			if (secondFile != null) {
+				//noinspection ResultOfMethodCallIgnored
+				secondFile.delete();
+			}
+		}
+	}
+
+	@Test
+	public void testCleanup() {
+		try {
+			ByteBuffer data = ByteBuffer.allocate(157);
+			data.order(ByteOrder.LITTLE_ENDIAN);
+			
+			fileChannel.write(data);
+			fileChannel.position(54);
+			
+			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
+			seq.open();
+			seq.cleanup();
+			
+			assertFalse(fileChannel.isOpen());
+			assertFalse(tempFile.exists());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utils
+	// ------------------------------------------------------------------------
+
+	private static BufferOrEvent generateAndWriteEvent(FileChannel fileChannel, Random rnd, int numChannels) throws IOException {
+		long magicNumber = rnd.nextLong();
+		byte[] data = new byte[rnd.nextInt(1000)];
+		rnd.nextBytes(data);
+		TestEvent evt = new TestEvent(magicNumber, data);
+		
+		int channelIndex = rnd.nextInt(numChannels);
+		
+		ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
+		ByteBuffer header = ByteBuffer.allocate(9);
+		header.order(ByteOrder.LITTLE_ENDIAN);
+		
+		header.putInt(channelIndex);
+		header.putInt(serializedEvent.remaining());
+		header.put((byte) 1);
+		header.flip();
+		
+		fileChannel.write(header);
+		fileChannel.write(serializedEvent);
+		return new BufferOrEvent(evt, channelIndex);
+	}
+	
+	private static void writeBuffer(FileChannel fileChannel, int size, int channelIndex) throws IOException {
+		ByteBuffer data = ByteBuffer.allocate(size + 9);
+		data.order(ByteOrder.LITTLE_ENDIAN);
+		
+		data.putInt(channelIndex);
+		data.putInt(size);
+		data.put((byte) 0);
+		for (int i = 0; i < size; i++) {
+			data.put((byte) i);
+		}
+		data.flip();
+		fileChannel.write(data);
+	}
+
+	private static void validateBuffer(BufferOrEvent boe, int expectedSize, int expectedChannelIndex) {
+		assertEquals("wrong channel index", expectedChannelIndex, boe.getChannelIndex());
+		assertTrue("is not buffer", boe.isBuffer());
+		
+		Buffer buf = boe.getBuffer();
+		assertEquals("wrong buffer size", expectedSize, buf.getSize());
+		
+		MemorySegment seg = buf.getMemorySegment();
+		for (int i = 0; i < expectedSize; i++) {
+			assertEquals("wrong buffer contents", (byte) i, seg.get(i));
+		}
+	}
+}


[4/8] flink git commit: [FLINK-2438] [runtime] Improve channel event serialization performance.

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
deleted file mode 100644
index d94b5b4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.TaskEvent;
-
-/**
- * Checkpoint barriers are used to synchronize checkpoints throughout the streaming topology. The
- * barriers are emitted by the sources when instructed to do so by the JobManager. When
- * operators receive a {@link CheckpointBarrier} on one of its inputs it must block processing
- * of further elements on this input until all inputs received the checkpoint barrier
- * corresponding to to that checkpoint. Once all inputs received the checkpoint barrier for
- * a checkpoint the operator is to perform the checkpoint and then broadcast the barrier to
- * downstream operators.
- *
- * <p>
- * The checkpoint barrier IDs are advancing. Once an operator receives a {@link CheckpointBarrier}
- * for a checkpoint with a higher id it is to discard all barriers that it received from previous
- * checkpoints and unblock all other inputs.
- */
-public class CheckpointBarrier extends TaskEvent {
-
-	protected long id;
-	protected long timestamp;
-
-	public CheckpointBarrier() {}
-
-	public CheckpointBarrier(long id, long timestamp) {
-		this.id = id;
-		this.timestamp = timestamp;
-	}
-
-	public long getId() {
-		return id;
-	}
-
-	public long getTimestamp() {
-		return id;
-	}
-
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeLong(id);
-		out.writeLong(timestamp);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		id = in.readLong();
-		timestamp = in.readLong();
-	}
-	
-	// ------------------------------------------------------------------------
-
-	@Override
-	public int hashCode() {
-		return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof CheckpointBarrier)) {
-			return false;
-		}
-		else {
-			CheckpointBarrier that = (CheckpointBarrier) other;
-			return that.id == this.id && that.timestamp == this.timestamp;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return String.format("CheckpointBarrier %d @ %d", id, timestamp);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index 84614bf..c8fa9e3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2098da8..aabc95d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 4007da8..173e894 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 7350516..678b145 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.Random;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index b8b3a8c..872e226 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -19,15 +19,16 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 532078c..fb61633 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -19,12 +19,13 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index ae384e1..05ce4be 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 
 import org.junit.After;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
index fb3beea..f07e3a5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.types.LongValue;
 
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
index 4a77757..286477a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.util.StringUtils;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 4399a10..296324a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamMap;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index a8029e6..4f07fdb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index df0c9ee..bbc64f1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -18,14 +18,13 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 435831f..0f372cb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 3c7204d..f87d7ea 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
@@ -28,7 +29,6 @@ import org.apache.flink.streaming.api.operators.co.CoStreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.joda.time.Instant;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -36,7 +36,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.List;
-import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index f37eb66..2b20101 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -45,7 +45,7 @@ import java.util.List;
  * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
  * thread to finish. Use {@link #processElement}
  * to send elements to the task. Use
- * {@link #processEvent(org.apache.flink.runtime.event.task.AbstractEvent)} to send events to the task.
+ * {@link #processEvent(org.apache.flink.runtime.event.AbstractEvent)} to send events to the task.
  * Before waiting for the task to finish you must call {@link #endInput()} to signal to the task
  * that data entry is finished.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index a34ec15..8d41292 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
@@ -74,7 +75,7 @@ public class StreamingScalabilityAndLatency {
 		env.getConfig().enableObjectReuse();
 
 		env.setBufferTimeout(5L);
-//		env.enableCheckpointing(1000);
+		env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
 
 		env
 			.addSource(new TimeStampingSource())