You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/02 17:14:33 UTC

[5/8] flink git commit: [FLINK-2438] [runtime] Improve channel event serialization performance.

[FLINK-2438] [runtime] Improve channel event serialization performance.

Because channel events may become very frequent now (frequent at-least-once checkpointing), their serialization perfomance starts to matter.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af88aa09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af88aa09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af88aa09

Branch: refs/heads/master
Commit: af88aa09ec94d4d11f38a7134d36793420d7d19d
Parents: aa0105a
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 30 19:02:14 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 2 15:58:28 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/event/AbstractEvent.java      |  28 ++++
 .../flink/runtime/event/RuntimeEvent.java       |  24 ++++
 .../apache/flink/runtime/event/TaskEvent.java   |  25 ++++
 .../flink/runtime/event/task/AbstractEvent.java |  28 ----
 .../flink/runtime/event/task/RuntimeEvent.java  |  24 ----
 .../flink/runtime/event/task/TaskEvent.java     |  25 ----
 .../runtime/io/network/TaskEventDispatcher.java |   2 +-
 .../io/network/api/CheckpointBarrier.java       |  98 ++++++++++++++
 .../io/network/api/EndOfPartitionEvent.java     |   2 +-
 .../io/network/api/EndOfSuperstepEvent.java     |   2 +-
 .../io/network/api/TaskEventHandler.java        |   2 +-
 .../io/network/api/reader/AbstractReader.java   |   4 +-
 .../io/network/api/reader/ReaderBase.java       |   2 +-
 .../api/serialization/EventSerializer.java      | 132 ++++++++++++++-----
 .../io/network/api/writer/RecordWriter.java     |   2 +-
 .../api/writer/ResultPartitionWriter.java       |   4 +-
 .../network/buffer/FreeingBufferRecycler.java   |  43 ++++++
 .../runtime/io/network/netty/NettyMessage.java  |   2 +-
 .../network/netty/PartitionRequestClient.java   |   2 +-
 .../netty/PartitionRequestClientHandler.java    |   5 +-
 .../partition/consumer/BufferOrEvent.java       |   2 +-
 .../partition/consumer/InputChannel.java        |   2 +-
 .../network/partition/consumer/InputGate.java   |   2 +-
 .../partition/consumer/LocalInputChannel.java   |   2 +-
 .../partition/consumer/RemoteInputChannel.java  |   2 +-
 .../partition/consumer/SingleInputGate.java     |   4 +-
 .../partition/consumer/UnionInputGate.java      |   2 +-
 .../partition/consumer/UnknownInputChannel.java |   2 +-
 .../iterative/concurrent/SuperstepBarrier.java  |   2 +-
 .../event/IterationEventWithAggregators.java    |   2 +-
 .../iterative/event/TerminationEvent.java       |   2 +-
 .../task/IterationSynchronizationSinkTask.java  |   2 +-
 .../iterative/task/SyncEventHandler.java        |   2 +-
 .../flink/runtime/event/task/EventList.java     |   3 +-
 .../runtime/event/task/IntegerTaskEvent.java    |   1 +
 .../runtime/event/task/StringTaskEvent.java     |   1 +
 .../flink/runtime/event/task/TaskEventTest.java |   3 +-
 .../network/api/reader/AbstractReaderTest.java  |   9 +-
 .../io/network/api/reader/BufferReaderTest.java |   2 +-
 .../api/serialization/EventSerializerTest.java  |  39 ++++--
 .../partition/PipelinedSubpartitionTest.java    |   2 +-
 .../partition/consumer/InputChannelTest.java    |   2 +-
 .../partition/consumer/SingleInputGateTest.java |   2 +-
 .../io/network/util/TestConsumerCallback.java   |   2 +-
 .../network/util/TestSubpartitionConsumer.java  |   2 +-
 .../runtime/io/network/util/TestTaskEvent.java  |   2 +-
 .../concurrent/SuperstepBarrierTest.java        |   2 +-
 .../util/event/TaskEventHandlerTest.java        |   2 +-
 .../streaming/runtime/io/BarrierBuffer.java     |   2 +-
 .../streaming/runtime/io/BarrierTracker.java    |   2 +-
 .../streaming/runtime/io/BufferSpiller.java     |   3 +-
 .../runtime/io/CheckpointBarrierHandler.java    |  14 +-
 .../runtime/io/FreeingBufferRecycler.java       |  37 ------
 .../runtime/io/RecordWriterOutput.java          |   4 +-
 .../runtime/io/StreamInputProcessor.java        |   4 +-
 .../runtime/io/StreamTwoInputProcessor.java     |   4 +-
 .../runtime/tasks/CheckpointBarrier.java        |  97 --------------
 .../streaming/runtime/tasks/OutputHandler.java  |   1 +
 .../streaming/runtime/tasks/StreamTask.java     |   1 +
 .../consumer/StreamTestSingleInputGate.java     |   3 +-
 .../io/BarrierBufferMassiveRandomTest.java      |   4 +-
 .../streaming/runtime/io/BarrierBufferTest.java |   5 +-
 .../runtime/io/BarrierTrackerTest.java          |   5 +-
 .../streaming/runtime/io/BufferSpillerTest.java |   1 +
 .../runtime/io/StreamRecordWriterTest.java      |   1 +
 .../flink/streaming/runtime/io/TestEvent.java   |   2 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |   1 +
 .../tasks/OneInputStreamTaskTestHarness.java    |   2 +-
 .../runtime/tasks/StreamMockEnvironment.java    |   3 +-
 .../runtime/tasks/StreamTaskTestHarness.java    |   2 +-
 .../runtime/tasks/TwoInputStreamTaskTest.java   |   3 +-
 .../tasks/TwoInputStreamTaskTestHarness.java    |   2 +-
 .../manual/StreamingScalabilityAndLatency.java  |   3 +-
 73 files changed, 435 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/AbstractEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/AbstractEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/AbstractEvent.java
new file mode 100644
index 0000000..a20aad9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/AbstractEvent.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event;
+
+import org.apache.flink.core.io.IOReadableWritable;
+
+/**
+ * This type of event can be used to exchange notification messages between
+ * different {@link org.apache.flink.runtime.taskmanager.TaskManager} objects
+ * at runtime using the communication channels.
+ */
+public abstract class AbstractEvent implements IOReadableWritable {}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/RuntimeEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/RuntimeEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/RuntimeEvent.java
new file mode 100644
index 0000000..6d712ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/RuntimeEvent.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event;
+
+/**
+ * Subclasses of this event are recognized as events exchanged by the core runtime.
+ */
+public abstract class RuntimeEvent extends AbstractEvent {}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/TaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/TaskEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/TaskEvent.java
new file mode 100644
index 0000000..d37d031
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/TaskEvent.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event;
+
+/**
+ * Subclasses of this event are recognized as custom events that are not part of the core
+ * flink runtime.
+ */
+public abstract class TaskEvent extends AbstractEvent {}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
deleted file mode 100644
index 244d672..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.event.task;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-/**
- * This type of event can be used to exchange notification messages between
- * different {@link org.apache.flink.runtime.taskmanager.TaskManager} objects
- * at runtime using the communication channels.
- */
-public abstract class AbstractEvent implements IOReadableWritable {}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
deleted file mode 100644
index 8c44073..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.event.task;
-
-/**
- * Subclasses of this event are recognized as events exchanged by the core runtime.
- */
-public abstract class RuntimeEvent extends AbstractEvent {}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
deleted file mode 100644
index 01ecce2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.event.task;
-
-/**
- * Subclasses of this event are recognized as custom events that are not part of the core
- * flink runtime.
- */
-public abstract class TaskEvent extends AbstractEvent {}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
index 82793e2..eddba8d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network;
 
 import com.google.common.collect.Maps;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
new file mode 100644
index 0000000..59f56b0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+/**
+ * Checkpoint barriers are used to align checkpoints throughout the streaming topology. The
+ * barriers are emitted by the sources when instructed to do so by the JobManager. When
+ * operators receive a CheckpointBarrier on one of its inputs, it knows that this is the point 
+ * between the pre-checkpoint and post-checkpoint data.
+ * 
+ * <p>Once an operator has received a checkpoint barrier from all its input channels, it
+ * knows that a certain checkpoint is complete. It can trigger the operator specific checkpoint
+ * behavior and broadcast the barrier to downstream operators.</p>
+ * 
+ * <p>Depending on the semantic guarantees, may hold off post-checkpoint data until the checkpoint
+ * is complete (exactly once)</p>
+ * 
+ * <p>The checkpoint barrier IDs are strictly monotonous increasing.</p>
+ */
+public class CheckpointBarrier extends RuntimeEvent {
+
+	private long id;
+	private long timestamp;
+
+	public CheckpointBarrier() {}
+
+	public CheckpointBarrier(long id, long timestamp) {
+		this.id = id;
+		this.timestamp = timestamp;
+	}
+
+	public long getId() {
+		return id;
+	}
+
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeLong(id);
+		out.writeLong(timestamp);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		id = in.readLong();
+		timestamp = in.readLong();
+	}
+	
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
+	}
+
+	@Override
+	public boolean equals(Object other) {
+		if (other == null || !(other instanceof CheckpointBarrier)) {
+			return false;
+		}
+		else {
+			CheckpointBarrier that = (CheckpointBarrier) other;
+			return that.id == this.id && that.timestamp == this.timestamp;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return String.format("CheckpointBarrier %d @ %d", id, timestamp);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
index c2b6fa4..293287b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.api;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.RuntimeEvent;
+import org.apache.flink.runtime.event.RuntimeEvent;
 
 /**
  * This event marks a subpartition as fully consumed.

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
index 162afb7..7f51187 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.api;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.RuntimeEvent;
+import org.apache.flink.runtime.event.RuntimeEvent;
 
 /**
  * Marks the end of a superstep of one particular iteration superstep.

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
index ccd0feb..d2dc46b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.api;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
index 90564a8..84189be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.api.TaskEventHandler;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
index 9f8ae20..a1d705f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.io.network.api.reader;
 import java.io.IOException;
 
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 76c88c1..b23b83b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -19,63 +19,122 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
+/**
+ * Utility class to serialize and deserialize task events.
+ */
 public class EventSerializer {
+	
+	private static final int END_OF_PARTITION_EVENT = 0;
 
-	public final static BufferRecycler RECYCLER = new BufferRecycler() {
-		@Override
-		public void recycle(MemorySegment memorySegment) {
-			memorySegment.free();
-		}
-	};
-
-	public static ByteBuffer toSerializedEvent(AbstractEvent event) {
-		try {
-			final DataOutputSerializer serializer = new DataOutputSerializer(128);
+	private static final int CHECKPOINT_BARRIER_EVENT = 1;
 
-			serializer.writeUTF(event.getClass().getName());
-			event.write(serializer);
+	private static final int END_OF_SUPERSTEP_EVENT = 2;
 
-			return serializer.wrapAsByteBuffer();
+	private static final int OTHER_EVENT = 3;
+	
+	// ------------------------------------------------------------------------
+	
+	public static ByteBuffer toSerializedEvent(AbstractEvent event) {
+		final Class<?> eventClass = event.getClass();
+		if (eventClass == EndOfPartitionEvent.class) {
+			return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_PARTITION_EVENT });
+		}
+		else if (eventClass == CheckpointBarrier.class) {
+			CheckpointBarrier barrier = (CheckpointBarrier) event;
+			
+			ByteBuffer buf = ByteBuffer.allocate(20);
+			buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
+			buf.putLong(4, barrier.getId());
+			buf.putLong(12, barrier.getTimestamp());
+			return buf;
 		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while serializing event.", e);
+		else if (eventClass == EndOfSuperstepEvent.class) {
+			return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_SUPERSTEP_EVENT });
+		}
+		else {
+			try {
+				final DataOutputSerializer serializer = new DataOutputSerializer(128);
+				serializer.writeInt(OTHER_EVENT);
+				serializer.writeUTF(event.getClass().getName());
+				event.write(serializer);
+	
+				return serializer.wrapAsByteBuffer();
+			}
+			catch (IOException e) {
+				throw new RuntimeException("Error while serializing event.", e);
+			}
 		}
 	}
 
 	public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) {
+		if (buffer.remaining() < 4) {
+			throw new RuntimeException("Incomplete event");
+		}
+		
+		final ByteOrder bufferOrder = buffer.order();
+		buffer.order(ByteOrder.BIG_ENDIAN);
+		
 		try {
-			final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
-
-			final String className = deserializer.readUTF();
-
-			final Class<? extends AbstractEvent> clazz;
-			try {
-				clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
+			int type = buffer.getInt();
+				
+			if (type == END_OF_PARTITION_EVENT) {
+				return EndOfPartitionEvent.INSTANCE;
 			}
-			catch (ClassNotFoundException e) {
-				throw new RuntimeException("Could not load event class '" + className + "'.", e);
+			else if (type == CHECKPOINT_BARRIER_EVENT) {
+				long id = buffer.getLong();
+				long timestamp = buffer.getLong();
+				return new CheckpointBarrier(id, timestamp);
 			}
-			catch (ClassCastException e) {
-				throw new RuntimeException("The class '" + className + "' is not a valid subclass of '" + AbstractEvent.class.getName() + "'.", e);
+			else if (type == END_OF_SUPERSTEP_EVENT) {
+				return EndOfSuperstepEvent.INSTANCE;
+			}
+			else if (type == OTHER_EVENT) {
+				try {
+					final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
+		
+					final String className = deserializer.readUTF();
+		
+					final Class<? extends AbstractEvent> clazz;
+					try {
+						clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
+					}
+					catch (ClassNotFoundException e) {
+						throw new RuntimeException("Could not load event class '" + className + "'.", e);
+					}
+					catch (ClassCastException e) {
+						throw new RuntimeException("The class '" + className + "' is not a valid subclass of '"
+								+ AbstractEvent.class.getName() + "'.", e);
+					}
+		
+					final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
+					event.read(deserializer);
+		
+					return event;
+				}
+				catch (Exception e) {
+					throw new RuntimeException("Error while deserializing or instantiating event.", e);
+				}
+			} 
+			else {
+				throw new RuntimeException("Corrupt byte stream for event");
 			}
-
-			final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
-			event.read(deserializer);
-
-			return event;
 		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while deserializing event.", e);
+		finally {
+			buffer.order(bufferOrder);
 		}
 	}
 
@@ -86,7 +145,8 @@ public class EventSerializer {
 	public static Buffer toBuffer(AbstractEvent event) {
 		final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);
 
-		final Buffer buffer = new Buffer(new MemorySegment(serializedEvent.array()), RECYCLER, false);
+		final Buffer buffer = new Buffer(new MemorySegment(serializedEvent.array()),
+				FreeingBufferRecycler.INSTANCE, false);
 		buffer.setSize(serializedEvent.remaining());
 
 		return buffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 885c316..2ae61ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 1192dbb..79c21c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.api.TaskEventHandler;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
new file mode 100644
index 0000000..fdce883
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * A simple buffer recycler that frees the memory segments.
+ */
+public class FreeingBufferRecycler implements BufferRecycler {
+	
+	public static final BufferRecycler INSTANCE = new FreeingBufferRecycler();
+	
+	// ------------------------------------------------------------------------
+	
+	// Not instantiable
+	private FreeingBufferRecycler() {}
+
+	/**
+	 * Frees the given memory segment.
+	 * @param memorySegment The memory segment to be recycled.
+	 */
+	@Override
+	public void recycle(MemorySegment memorySegment) {
+		memorySegment.free();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 1540369..3a24181 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 78f6398..f6120d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.io.network.netty;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 51b436b..3b7d921 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -21,10 +21,11 @@ package org.apache.flink.runtime.io.network.netty;
 import com.google.common.collect.Maps;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
 import org.apache.flink.runtime.io.network.netty.exception.TransportException;
@@ -300,7 +301,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 				byte[] byteArray = new byte[bufferOrEvent.getSize()];
 				bufferOrEvent.getNettyBuffer().readBytes(byteArray);
 
-				Buffer buffer = new Buffer(new MemorySegment(byteArray), EventSerializer.RECYCLER, false);
+				Buffer buffer = new Buffer(new MemorySegment(byteArray), FreeingBufferRecycler.INSTANCE, false);
 
 				inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
index d2f3035..55e5767 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
 import static com.google.common.base.Preconditions.checkArgument;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 3998279..d282db5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index c4f9dc4..1f42cfa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index e72f612..ff12153 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index b70c3a8..be2509f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 80a79d2..896fa9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 730ead2..5599687 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index e4b9e57..cdf28be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
index c91be1a..cc5d3c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.iterative.concurrent;
 
 import java.util.concurrent.CountDownLatch;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
index bc815dc..e259523 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
@@ -31,7 +31,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.InstantiationUtil;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
index 9e74c34..28181e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 
 /**
  * Signals that the iteration is completely executed, participating tasks must terminate now

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index 5eccd7b..fed0a17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.types.IntValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
index 8cce6ef..d7549d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.iterative.task;
 import java.util.Map;
 
 import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
 import org.apache.flink.types.Value;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java
index 1f97a15..c055a92 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java
@@ -19,10 +19,11 @@
 
 package org.apache.flink.runtime.event.task;
 
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.util.SerializableArrayList;
 
 /**
- * Objects of this class can store and serialize/deserialize {@link org.apache.flink.runtime.event.task.AbstractEvent}
+ * Objects of this class can store and serialize/deserialize {@link org.apache.flink.runtime.event.AbstractEvent}
  * objects.
  * 
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
index 648dacc..cc67482 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.TaskEvent;
 
 /**
  * This class provides a simple implementation of an event that holds an integer value.

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
index 87f2e91..9095cc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.util.StringUtils;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
index 1ed8e39..b508923 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
@@ -25,12 +25,13 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.junit.Test;
 
 /**
  * This class contains serialization tests concerning task events derived from
- * {@link org.apache.flink.runtime.event.task.AbstractEvent}.
+ * {@link org.apache.flink.runtime.event.AbstractEvent}.
  * 
  */
 public class TaskEventTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
index 14bf022..6853722 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -70,7 +69,7 @@ public class AbstractReaderTest {
 	public void testEndOfPartitionEvent() throws Exception {
 		final AbstractReader reader = new MockReader(createInputGate(1));
 
-		assertTrue(reader.handleEvent(new EndOfPartitionEvent()));
+		assertTrue(reader.handleEvent(EndOfPartitionEvent.INSTANCE));
 	}
 
 	/**
@@ -95,7 +94,7 @@ public class AbstractReaderTest {
 		}
 
 		try {
-			reader.handleEvent(new EndOfSuperstepEvent());
+			reader.handleEvent(EndOfSuperstepEvent.INSTANCE);
 
 			fail("Did not throw expected exception when handling end of superstep event with non-iterative reader.");
 		}
@@ -122,7 +121,7 @@ public class AbstractReaderTest {
 			// All good, expected exception.
 		}
 
-		EndOfSuperstepEvent eos = new EndOfSuperstepEvent();
+		EndOfSuperstepEvent eos = EndOfSuperstepEvent.INSTANCE;
 
 		// One end of superstep event for each input channel. The superstep finishes with the last
 		// received event.

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
index e1f8fd8..8519ac6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.TestSingleInputGate;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index 5a20a4b..ddfd758 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -18,25 +18,44 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
+
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 public class EventSerializerTest {
 
 	@Test
 	public void testSerializeDeserializeEvent() {
-
-		TestTaskEvent expected = new TestTaskEvent(Math.random(), 12361231273L);
-
-		ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(expected);
-
-		AbstractEvent actual = EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
-
-		assertEquals(expected, actual);
+		try {
+			AbstractEvent[] events = {
+					EndOfPartitionEvent.INSTANCE,
+					EndOfSuperstepEvent.INSTANCE,
+					new CheckpointBarrier(1678L, 4623784L),
+					new TestTaskEvent(Math.random(), 12361231273L)
+			};
+			
+			for (AbstractEvent evt : events) {
+				ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
+				assertTrue(serializedEvent.hasRemaining());
+
+				AbstractEvent deserialized = 
+						EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
+				assertNotNull(deserialized);
+				assertEquals(evt, deserialized);
+			}
+			
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 74549df..8750a1a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index e95c774..9717530 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 2454399..82cc730 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
index 52083c4..0d66f13 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network.util;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
 import java.util.concurrent.atomic.AtomicInteger;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
index 2766e53..18e0d4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.util;
 
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
index 0b29032..091d5d7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.util;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
index a6c0d72..2f26670 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.Random;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java
index 5c6aeb7..cb76276 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.util.event;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.event.task.IntegerTaskEvent;
 import org.apache.flink.runtime.event.task.StringTaskEvent;
 import org.apache.flink.runtime.io.network.api.TaskEventHandler;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 0441937..b7766ee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index a0b924f..119fb23 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.io.IOException;
 import java.util.ArrayDeque;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 5f9a162..2bad197 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -27,10 +27,11 @@ import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.util.StringUtils;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 02dd33d..791fd40 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.io.IOException;
 
@@ -43,8 +43,18 @@ public interface CheckpointBarrierHandler {
 	 */
 	BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
 
+	/**
+	 * Registers the given event handler to be notified on successful checkpoints.
+	 * 
+	 * @param checkpointHandler The handler to register.
+	 */
 	void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
-	
+
+	/**
+	 * Cleans up all internally held resources.
+	 * 
+	 * @throws IOException Thrown, if the cleanup of I/O resources failed.
+	 */
 	void cleanup() throws IOException;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java
deleted file mode 100644
index 27e37a5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-
-/**
- * A simple buffer recycler that only frees the memory segments.
- */
-public class FreeingBufferRecycler implements BufferRecycler {
-	
-	public static final BufferRecycler INSTANCE = new FreeingBufferRecycler();
-	
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public void recycle(MemorySegment memorySegment) {
-		memorySegment.free();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index de8c205..f0f18b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 
 import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.operators.Output;
@@ -111,7 +111,7 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 		recordWriter.clearBuffers();
 	}
 
-	public void broadcastEvent(TaskEvent barrier) throws IOException, InterruptedException {
+	public void broadcastEvent(AbstractEvent barrier) throws IOException, InterruptedException {
 		recordWriter.broadcastEvent(barrier);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index f7d7fb0..4ad5b45 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
@@ -41,7 +41,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index ae97974..e3d2911 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;