You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/12 19:27:29 UTC

[02/10] flink git commit: [FLINK-7748][network] Properly use the TaskEventDispatcher for subscribing to events

[FLINK-7748][network] Properly use the TaskEventDispatcher for subscribing to events

Previously, the ResultPartitionWriter implemented the EventListener interface
and was used for event registration, although event publishing was already
handled via the TaskEventDispatcher. Now, we use the TaskEventDispatcher for
both, event registration and publishing.

It also adds the TaskEventDispatcher to the Environment information for a task
to be able to work with it (only IterationHeadTask so far).

This closes #4761.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/175e1b38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/175e1b38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/175e1b38

Branch: refs/heads/master
Commit: 175e1b3871b13fee3e423aef87cb45ceed409783
Parents: c5efb1f
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Aug 29 18:24:00 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Dec 12 17:01:14 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/execution/Environment.java    |   3 +
 .../runtime/io/network/NetworkEnvironment.java  |   4 +-
 .../runtime/io/network/TaskEventDispatcher.java | 110 +++++++++---
 .../api/writer/ResultPartitionWriter.java       |  20 +--
 .../iterative/task/IterationHeadTask.java       |   8 +-
 .../runtime/taskmanager/RuntimeEnvironment.java |  10 ++
 .../apache/flink/runtime/taskmanager/Task.java  |  28 ++-
 .../io/network/TaskEventDispatcherTest.java     | 180 +++++++++++++++++++
 .../operators/testutils/DummyEnvironment.java   |   5 +
 .../operators/testutils/MockEnvironment.java    |   8 +
 .../taskexecutor/TaskExecutorITCase.java        |   3 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |   3 +
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   3 +
 .../flink/runtime/taskmanager/TaskTest.java     |   7 +
 .../runtime/util/JvmExitOnFatalErrorTest.java   |   3 +
 .../tasks/InterruptSensitiveRestoreTest.java    |   3 +
 .../runtime/tasks/StreamMockEnvironment.java    |   8 +
 .../tasks/StreamTaskTerminationTest.java        |   3 +
 .../streaming/runtime/tasks/StreamTaskTest.java |   3 +
 .../tasks/TaskCheckpointingBehaviourTest.java   |   3 +
 20 files changed, 358 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 203ee85..ad66c57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -209,4 +210,6 @@ public interface Environment {
 	InputGate getInputGate(int index);
 
 	InputGate[] getAllInputGates();
+
+	TaskEventDispatcher getTaskEventDispatcher();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 71d0386..f2619e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -214,7 +214,7 @@ public class NetworkEnvironment {
 				}
 
 				// Register writer with task event dispatcher
-				taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer);
+				taskEventDispatcher.registerPartition(writer.getPartitionId());
 			}
 
 			// Setup the buffer pool for each buffer reader
@@ -266,7 +266,7 @@ public class NetworkEnvironment {
 			ResultPartitionWriter[] writers = task.getAllWriters();
 			if (writers != null) {
 				for (ResultPartitionWriter writer : writers) {
-					taskEventDispatcher.unregisterWriter(writer);
+					taskEventDispatcher.unregisterPartition(writer.getPartitionId());
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
index 8816e32..1ec4ade 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
@@ -19,70 +19,126 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.TaskEventHandler;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.util.event.EventListener;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The task event dispatcher dispatches events flowing backwards from a consuming task to the task
  * producing the consumed result.
  *
- * <p> Backwards events only work for tasks, which produce pipelined results, where both the
+ * <p>Backwards events only work for tasks, which produce pipelined results, where both the
  * producing and consuming task are running at the same time.
  */
 public class TaskEventDispatcher {
+	private static final Logger LOG = LoggerFactory.getLogger(TaskEventDispatcher.class);
 
-	private final Map<ResultPartitionID, ResultPartitionWriter> registeredWriters = Maps.newHashMap();
+	private final Map<ResultPartitionID, TaskEventHandler> registeredHandlers = new HashMap<>();
 
-	public void registerWriterForIncomingTaskEvents(ResultPartitionID partitionId, ResultPartitionWriter writer) {
-		synchronized (registeredWriters) {
-			if (registeredWriters.put(partitionId, writer) != null) {
-				throw new IllegalStateException("Already registered at task event dispatcher.");
+	/**
+	 * Registers the given partition for incoming task events allowing calls to {@link
+	 * #subscribeToEvent(ResultPartitionID, EventListener, Class)}.
+	 *
+	 * @param partitionId
+	 * 		the partition ID
+	 */
+	public void registerPartition(ResultPartitionID partitionId) {
+		checkNotNull(partitionId);
+
+		synchronized (registeredHandlers) {
+			LOG.debug("registering {}", partitionId);
+			if (registeredHandlers.put(partitionId, new TaskEventHandler()) != null) {
+				throw new IllegalStateException(
+					"Partition " + partitionId + " already registered at task event dispatcher.");
 			}
 		}
 	}
 
-	public void unregisterWriter(ResultPartitionWriter writer) {
-		synchronized (registeredWriters) {
-			registeredWriters.remove(writer.getPartitionId());
+	/**
+	 * Removes the given partition from listening to incoming task events, thus forbidding calls to
+	 * {@link #subscribeToEvent(ResultPartitionID, EventListener, Class)}.
+	 *
+	 * @param partitionId
+	 * 		the partition ID
+	 */
+	public void unregisterPartition(ResultPartitionID partitionId) {
+		checkNotNull(partitionId);
+
+		synchronized (registeredHandlers) {
+			LOG.debug("unregistering {}", partitionId);
+			// NOTE: tolerate un-registration of non-registered task (unregister is always called
+			//       in the cleanup phase of a task even if it never came to the registration - see
+			//       Task.java)
+			registeredHandlers.remove(partitionId);
 		}
 	}
 
 	/**
-	 * Publishes the event to the registered {@link ResultPartitionWriter} instances.
-	 * <p>
-	 * This method is either called directly from a {@link LocalInputChannel} or the network I/O
+	 * Subscribes a listener to this dispatcher for events on a partition.
+	 *
+	 * @param partitionId
+	 * 		ID of the partition to subscribe for (must be registered via {@link
+	 * 		#registerPartition(ResultPartitionID)} first!)
+	 * @param eventListener
+	 * 		the event listener to subscribe
+	 * @param eventType
+	 * 		event type to subscribe to
+	 */
+	public void subscribeToEvent(
+			ResultPartitionID partitionId,
+			EventListener<TaskEvent> eventListener,
+			Class<? extends TaskEvent> eventType) {
+		checkNotNull(partitionId);
+		checkNotNull(eventListener);
+		checkNotNull(eventType);
+
+		TaskEventHandler taskEventHandler = registeredHandlers.get(partitionId);
+		if (taskEventHandler == null) {
+			throw new IllegalStateException(
+				"Partition " + partitionId + " not registered at task event dispatcher.");
+		}
+		taskEventHandler.subscribe(eventListener, eventType);
+	}
+
+	/**
+	 * Publishes the event to the registered {@link EventListener} instances.
+	 *
+	 * <p>This method is either called directly from a {@link LocalInputChannel} or the network I/O
 	 * thread on behalf of a {@link RemoteInputChannel}.
+	 *
+	 * @return whether the event was published to a registered event handler (initiated via {@link
+	 * #registerPartition(ResultPartitionID)}) or not
 	 */
 	public boolean publish(ResultPartitionID partitionId, TaskEvent event) {
-		EventListener<TaskEvent> listener = registeredWriters.get(partitionId);
+		checkNotNull(partitionId);
+		checkNotNull(event);
 
-		if (listener != null) {
-			listener.onEvent(event);
+		TaskEventHandler taskEventHandler = registeredHandlers.get(partitionId);
+
+		if (taskEventHandler != null) {
+			taskEventHandler.publish(event);
 			return true;
 		}
 
 		return false;
 	}
 
-	public void clearAll() {
-		synchronized (registeredWriters) {
-			registeredWriters.clear();
-		}
-	}
-
 	/**
-	 * Returns the number of currently registered writers.
+	 * Removes all registered event handlers.
 	 */
-	int getNumberOfRegisteredWriters() {
-		synchronized (registeredWriters) {
-			return registeredWriters.size();
+	public void clearAll() {
+		synchronized (registeredHandlers) {
+			registeredHandlers.clear();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 57c7098..777c7ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -18,13 +18,10 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
-import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.network.api.TaskEventHandler;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import java.io.IOException;
 
@@ -34,12 +31,10 @@ import java.io.IOException;
  * The {@link ResultPartitionWriter} is the runtime API for producing results. It
  * supports two kinds of data to be sent: buffers and events.
  */
-public class ResultPartitionWriter implements EventListener<TaskEvent> {
+public class ResultPartitionWriter {
 
 	private final ResultPartition partition;
 
-	private final TaskEventHandler taskEventHandler = new TaskEventHandler();
-
 	public ResultPartitionWriter(ResultPartition partition) {
 		this.partition = partition;
 	}
@@ -94,17 +89,4 @@ public class ResultPartitionWriter implements EventListener<TaskEvent> {
 			eventBuffer.recycle();
 		}
 	}
-
-	// ------------------------------------------------------------------------
-	// Event handling
-	// ------------------------------------------------------------------------
-
-	public void subscribeToEvent(EventListener<TaskEvent> eventListener, Class<? extends TaskEvent> eventType) {
-		taskEventHandler.subscribe(eventListener, eventType);
-	}
-
-	@Override
-	public void onEvent(TaskEvent event) {
-		taskEventHandler.publish(event);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index b673ba0..65dd8ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -27,10 +27,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
 import org.apache.flink.runtime.iterative.concurrent.Broker;
@@ -223,8 +225,10 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 
 	private SuperstepBarrier initSuperstepBarrier() {
 		SuperstepBarrier barrier = new SuperstepBarrier(getUserCodeClassLoader());
-		this.toSync.subscribeToEvent(barrier, AllWorkersDoneEvent.class);
-		this.toSync.subscribeToEvent(barrier, TerminationEvent.class);
+		TaskEventDispatcher taskEventDispatcher = getEnvironment().getTaskEventDispatcher();
+		ResultPartitionID partitionId = toSync.getPartitionId();
+		taskEventDispatcher.subscribeToEvent(partitionId, barrier, AllWorkersDoneEvent.class);
+		taskEventDispatcher.subscribeToEvent(partitionId, barrier, TerminationEvent.class);
 		return barrier;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 92b5886..60738f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -69,6 +70,8 @@ public class RuntimeEnvironment implements Environment {
 
 	private final ResultPartitionWriter[] writers;
 	private final InputGate[] inputGates;
+
+	private final TaskEventDispatcher taskEventDispatcher;
 	
 	private final CheckpointResponder checkpointResponder;
 
@@ -101,6 +104,7 @@ public class RuntimeEnvironment implements Environment {
 			Map<String, Future<Path>> distCacheEntries,
 			ResultPartitionWriter[] writers,
 			InputGate[] inputGates,
+			TaskEventDispatcher taskEventDispatcher,
 			CheckpointResponder checkpointResponder,
 			TaskManagerRuntimeInfo taskManagerInfo,
 			TaskMetricGroup metrics,
@@ -123,6 +127,7 @@ public class RuntimeEnvironment implements Environment {
 		this.distCacheEntries = checkNotNull(distCacheEntries);
 		this.writers = checkNotNull(writers);
 		this.inputGates = checkNotNull(inputGates);
+		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
 		this.checkpointResponder = checkNotNull(checkpointResponder);
 		this.taskManagerInfo = checkNotNull(taskManagerInfo);
 		this.containingTask = containingTask;
@@ -237,6 +242,11 @@ public class RuntimeEnvironment implements Environment {
 	}
 
 	@Override
+	public TaskEventDispatcher getTaskEventDispatcher() {
+		return taskEventDispatcher;
+	}
+
+	@Override
 	public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
 		acknowledgeCheckpoint(checkpointId, checkpointMetrics, null);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 2cb356c..e54adb9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -667,12 +667,28 @@ public class Task implements Runnable, TaskActions {
 					.createKvStateTaskRegistry(jobId, getJobVertexId());
 
 			Environment env = new RuntimeEnvironment(
-				jobId, vertexId, executionId, executionConfig, taskInfo,
-				jobConfiguration, taskConfiguration, userCodeClassLoader,
-				memoryManager, ioManager, broadcastVariableManager,
-				accumulatorRegistry, kvStateRegistry, inputSplitProvider,
-				distributedCacheEntries, writers, inputGates,
-				checkpointResponder, taskManagerConfig, metrics, this);
+				jobId,
+				vertexId,
+				executionId,
+				executionConfig,
+				taskInfo,
+				jobConfiguration,
+				taskConfiguration,
+				userCodeClassLoader,
+				memoryManager,
+				ioManager,
+				broadcastVariableManager,
+				accumulatorRegistry,
+				kvStateRegistry,
+				inputSplitProvider,
+				distributedCacheEntries,
+				writers,
+				inputGates,
+				network.getTaskEventDispatcher(),
+				checkpointResponder,
+				taskManagerConfig,
+				metrics,
+				this);
 
 			// let the task code create its readers and writers
 			invokable.setEnvironment(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TaskEventDispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TaskEventDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TaskEventDispatcherTest.java
new file mode 100644
index 0000000..41201cf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TaskEventDispatcherTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
+import org.apache.flink.runtime.iterative.event.TerminationEvent;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Basic tests for {@link TaskEventDispatcher}.
+ */
+public class TaskEventDispatcherTest extends TestLogger {
+
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
+	@Test
+	public void registerPartitionTwice() throws Exception {
+		ResultPartitionID partitionId = new ResultPartitionID();
+		TaskEventDispatcher ted = new TaskEventDispatcher();
+		ted.registerPartition(partitionId);
+
+		expectedException.expect(IllegalStateException.class);
+		expectedException.expectMessage("already registered at task event dispatcher");
+
+		ted.registerPartition(partitionId);
+	}
+
+	@Test
+	public void subscribeToEventNotRegistered() throws Exception {
+		TaskEventDispatcher ted = new TaskEventDispatcher();
+
+		expectedException.expect(IllegalStateException.class);
+		expectedException.expectMessage("not registered at task event dispatcher");
+
+		ted.subscribeToEvent(new ResultPartitionID(), new ZeroShotEventListener(), TaskEvent.class);
+	}
+
+	/**
+	 * Tests {@link TaskEventDispatcher#publish(ResultPartitionID, TaskEvent)} and {@link TaskEventDispatcher#subscribeToEvent(ResultPartitionID, EventListener, Class)} methods.
+	 */
+	@Test
+	public void publishSubscribe() throws Exception {
+		ResultPartitionID partitionId1 = new ResultPartitionID();
+		ResultPartitionID partitionId2 = new ResultPartitionID();
+		TaskEventDispatcher ted = new TaskEventDispatcher();
+
+		AllWorkersDoneEvent event1 = new AllWorkersDoneEvent();
+		TerminationEvent event2 = new TerminationEvent();
+		assertFalse(ted.publish(partitionId1, event1));
+
+		ted.registerPartition(partitionId1);
+		ted.registerPartition(partitionId2);
+
+		// no event listener subscribed yet, but the event is forwarded to a TaskEventHandler
+		assertTrue(ted.publish(partitionId1, event1));
+
+		OneShotEventListener eventListener1a = new OneShotEventListener(event1);
+		ZeroShotEventListener eventListener1b = new ZeroShotEventListener();
+		ZeroShotEventListener eventListener2 = new ZeroShotEventListener();
+		OneShotEventListener eventListener3 = new OneShotEventListener(event2);
+		ted.subscribeToEvent(partitionId1, eventListener1a, AllWorkersDoneEvent.class);
+		ted.subscribeToEvent(partitionId2, eventListener1b, AllWorkersDoneEvent.class);
+		ted.subscribeToEvent(partitionId1, eventListener2, TaskEvent.class);
+		ted.subscribeToEvent(partitionId1, eventListener3, TerminationEvent.class);
+
+		assertTrue(ted.publish(partitionId1, event1));
+		assertTrue("listener should have fired for AllWorkersDoneEvent", eventListener1a.fired);
+		assertFalse("listener should not have fired for AllWorkersDoneEvent", eventListener3.fired);
+
+		// publish another event, verify that only the right subscriber is called
+		assertTrue(ted.publish(partitionId1, event2));
+		assertTrue("listener should have fired for TerminationEvent", eventListener3.fired);
+	}
+
+	@Test
+	public void unregisterPartition() throws Exception {
+		ResultPartitionID partitionId1 = new ResultPartitionID();
+		ResultPartitionID partitionId2 = new ResultPartitionID();
+		TaskEventDispatcher ted = new TaskEventDispatcher();
+
+		AllWorkersDoneEvent event = new AllWorkersDoneEvent();
+		assertFalse(ted.publish(partitionId1, event));
+
+		ted.registerPartition(partitionId1);
+		ted.registerPartition(partitionId2);
+
+		OneShotEventListener eventListener1a = new OneShotEventListener(event);
+		ZeroShotEventListener eventListener1b = new ZeroShotEventListener();
+		OneShotEventListener eventListener2 = new OneShotEventListener(event);
+		ted.subscribeToEvent(partitionId1, eventListener1a, AllWorkersDoneEvent.class);
+		ted.subscribeToEvent(partitionId2, eventListener1b, AllWorkersDoneEvent.class);
+		ted.subscribeToEvent(partitionId1, eventListener2, AllWorkersDoneEvent.class);
+
+		ted.unregisterPartition(partitionId2);
+
+		// publis something for partitionId1 triggering all according listeners
+		assertTrue(ted.publish(partitionId1, event));
+		assertTrue("listener should have fired for AllWorkersDoneEvent", eventListener1a.fired);
+		assertTrue("listener should have fired for AllWorkersDoneEvent", eventListener2.fired);
+
+		// now publish something for partitionId2 which should not trigger any listeners
+		assertFalse(ted.publish(partitionId2, event));
+	}
+
+	@Test
+	public void clearAll() throws Exception {
+		ResultPartitionID partitionId = new ResultPartitionID();
+		TaskEventDispatcher ted = new TaskEventDispatcher();
+		ted.registerPartition(partitionId);
+
+		//noinspection unchecked
+		ZeroShotEventListener eventListener1 = new ZeroShotEventListener();
+		ted.subscribeToEvent(partitionId, eventListener1, AllWorkersDoneEvent.class);
+
+		ted.clearAll();
+
+		assertFalse(ted.publish(partitionId, new AllWorkersDoneEvent()));
+	}
+
+	/**
+	 * Event listener that expects a given {@link TaskEvent} once in its {@link #onEvent(TaskEvent)}
+	 * call and will fail for any subsequent call.
+	 *
+	 * <p>Be sure to check that {@link #fired} is <tt>true</tt> to ensure that this handle has been
+	 * called once.
+	 */
+	private static class OneShotEventListener implements EventListener<TaskEvent> {
+		private final TaskEvent expected;
+		boolean fired = false;
+
+		OneShotEventListener(TaskEvent expected) {
+			this.expected = expected;
+		}
+
+		public void onEvent(TaskEvent actual) {
+			checkState(!fired, "Should only fire once");
+			fired = true;
+			checkArgument(actual == expected,
+				"Fired on unexpected event: %s (expected: %s)", actual, expected);
+		}
+	}
+
+	/**
+	 * Event listener which ensures that it's {@link #onEvent(TaskEvent)} method is never called.
+	 */
+	private static class ZeroShotEventListener implements EventListener<TaskEvent> {
+		public void onEvent(TaskEvent actual) {
+			throw new IllegalStateException("Should never fire");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 0125a5e..718ecfe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -190,4 +191,8 @@ public class DummyEnvironment implements Environment {
 		return null;
 	}
 
+	@Override
+	public TaskEventDispatcher getTaskEventDispatcher() {
+		throw new UnsupportedOperationException();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 7514cc4..c8ca654 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -100,6 +101,8 @@ public class MockEnvironment implements Environment {
 
 	private final ClassLoader userCodeClassLoader;
 
+	private TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class);
+
 	public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this(taskName, memorySize, inputSplitProvider, bufferSize, new Configuration(), new ExecutionConfig());
 	}
@@ -324,6 +327,11 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
+	public TaskEventDispatcher getTaskEventDispatcher() {
+		return taskEventDispatcher;
+	}
+
+	@Override
 	public JobVertexID getJobVertexId() {
 		return new JobVertexID(new byte[16]);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 1f1d09d..9c12fff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.MockNetworkEnvironment;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -119,7 +120,7 @@ public class TaskExecutorITCase extends TestLogger {
 		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(taskManagerResourceId, InetAddress.getLocalHost(), 1234);
 		final MemoryManager memoryManager = mock(MemoryManager.class);
 		final IOManager ioManager = mock(IOManager.class);
-		final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+		final NetworkEnvironment networkEnvironment = MockNetworkEnvironment.getMock();
 		final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class);
 		final BroadcastVariableManager broadcastVariableManager = mock(BroadcastVariableManager.class);
 		final FileCache fileCache = mock(FileCache.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 776bdf9..6372792 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -723,9 +724,11 @@ public class TaskExecutorTest extends TestLogger {
 		when(taskSlotTable.existsActiveSlot(eq(jobId), eq(allocationId))).thenReturn(true);
 		when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
 
+		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
 		final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
 
 		when(networkEnvironment.createKvStateTaskRegistry(eq(jobId), eq(jobVertexId))).thenReturn(mock(TaskKvStateRegistry.class));
+		when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
 		final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 5045606..f2c60de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -217,11 +218,13 @@ public class TaskAsyncCallTest {
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
 		PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 		Executor executor = mock(Executor.class);
+		TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class);
 		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
 		when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
 		when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 		when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
+		when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
 		TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
 		when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index b089997..4fa36bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -273,10 +274,12 @@ public class TaskTest extends TestLogger {
 			ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 			ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
 			PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
+			TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class);
 			Executor executor = mock(Executor.class);
 			NetworkEnvironment network = mock(NetworkEnvironment.class);
 			when(network.getResultPartitionManager()).thenReturn(partitionManager);
 			when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
+			when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 			doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class));
 
 			Task task = createTask(TestInvokableCorrect.class, blobService, libCache, network, consumableNotifier, partitionProducerStateChecker, executor);
@@ -629,6 +632,7 @@ public class TaskTest extends TestLogger {
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
 
 		PartitionProducerStateChecker partitionChecker = mock(PartitionProducerStateChecker.class);
+		TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class);
 
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
 		NetworkEnvironment network = mock(NetworkEnvironment.class);
@@ -636,6 +640,7 @@ public class TaskTest extends TestLogger {
 		when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 			.thenReturn(mock(TaskKvStateRegistry.class));
+		when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
 		createTask(InvokableBlockingInInvoke.class, blobService, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 
@@ -933,12 +938,14 @@ public class TaskTest extends TestLogger {
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
 		PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
+		TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class);
 		Executor executor = mock(Executor.class);
 		NetworkEnvironment network = mock(NetworkEnvironment.class);
 		when(network.getResultPartitionManager()).thenReturn(partitionManager);
 		when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
+		when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
 		return createTask(invokable, blobService, libCache, network, consumableNotifier, partitionProducerStateChecker, executor, config, execConfig);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 8072295..38238cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -157,6 +158,8 @@ public class JvmExitOnFatalErrorTest {
 
 				final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
 				when(networkEnvironment.createKvStateTaskRegistry(jid, jobVertexId)).thenReturn(mock(TaskKvStateRegistry.class));
+				TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class);
+				when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
 				final TaskManagerRuntimeInfo tmInfo = TaskManagerConfiguration.fromConfiguration(taskManagerConfig);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index c641aa8..eacded6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -175,9 +176,11 @@ public class InterruptSensitiveRestoreTest {
 			StreamStateHandle state,
 			int mode) throws IOException {
 
+		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
 		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
 		when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
+		when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
 		Collection<KeyedStateHandle> keyedStateFromBackend = Collections.emptyList();
 		Collection<KeyedStateHandle> keyedStateFromStream = Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 231f59e..6b6506a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
@@ -107,6 +108,8 @@ public class StreamMockEnvironment implements Environment {
 
 	private volatile boolean wasFailedExternally = false;
 
+	private TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class);
+
 	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, ExecutionConfig executionConfig,
 								long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this.taskInfo = new TaskInfo(
@@ -304,6 +307,11 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
+	public TaskEventDispatcher getTaskEventDispatcher() {
+		return taskEventDispatcher;
+	}
+
+	@Override
 	public JobVertexID getJobVertexId() {
 		return new JobVertexID(new byte[16]);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 4c73e72..5480ce7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -134,8 +135,10 @@ public class StreamTaskTerminationTest extends TestLogger {
 
 		final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
 
+		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
 		final NetworkEnvironment networkEnv = mock(NetworkEnvironment.class);
 		when(networkEnv.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))).thenReturn(mock(TaskKvStateRegistry.class));
+		when(networkEnv.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
 		BlobCacheService blobService =
 			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index b31fb41..d0ea714 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -887,12 +888,14 @@ public class StreamTaskTest extends TestLogger {
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
 		PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 		Executor executor = mock(Executor.class);
+		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
 
 		NetworkEnvironment network = mock(NetworkEnvironment.class);
 		when(network.getResultPartitionManager()).thenReturn(partitionManager);
 		when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
+		when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
 		JobInformation jobInformation = new JobInformation(
 			new JobID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index d755c56..d61b95d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -210,8 +211,10 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 				taskConfig);
 
 		TaskKvStateRegistry mockKvRegistry = mock(TaskKvStateRegistry.class);
+		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
 		NetworkEnvironment network = mock(NetworkEnvironment.class);
 		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))).thenReturn(mockKvRegistry);
+		when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
 		BlobCacheService blobService =
 			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));