You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/02 13:05:40 UTC

[1/2] flink git commit: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnvironment independent of ActorGateway and JobManager association

Repository: flink
Updated Branches:
  refs/heads/master bc9d52391 -> 78f2a1586


http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 4597e3b..9f39de1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -19,39 +19,34 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.DummyActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.taskmanager.ActorGatewayResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.NetUtils;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import scala.Some;
-import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise;
 
-import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
@@ -61,100 +56,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class NetworkEnvironmentTest {
-
-	@Test
-	public void testAssociateDisassociate() {
-		final int BUFFER_SIZE = 1024;
-		final int NUM_BUFFERS = 20;
-
-		final int port;
-		try {
-			port = NetUtils.getAvailablePort();
-		}
-		catch (Throwable t) {
-			// ignore
-			return;
-		}
-
-		try {
-			NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, 1, new Configuration());
-			NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration(
-					NUM_BUFFERS, BUFFER_SIZE, MemoryType.HEAP,
-					IOManager.IOMode.SYNC, 0, 0, 0, new Some<>(nettyConf),
-					new Tuple2<>(0, 0));
-
-			NetworkEnvironment env = new NetworkEnvironment(
-				TestingUtils.defaultExecutionContext(),
-				new FiniteDuration(30, TimeUnit.SECONDS),
-				config,
-				new InstanceConnectionInfo(InetAddress.getLocalHost(), port));
-
-			assertFalse(env.isShutdown());
-			assertFalse(env.isAssociated());
-
-			// pool must be started already
-			assertNotNull(env.getNetworkBufferPool());
-			assertEquals(NUM_BUFFERS, env.getNetworkBufferPool().getTotalNumberOfMemorySegments());
-
-			// others components are still shut down
-			assertNull(env.getConnectionManager());
-			assertNull(env.getPartitionConsumableNotifier());
-			assertNull(env.getTaskEventDispatcher());
-			assertNull(env.getPartitionManager());
-
-			// associate the environment with some mock actors
-			env.associateWithTaskManagerAndJobManager(
-					DummyActorGateway.INSTANCE,
-					DummyActorGateway.INSTANCE);
-
-			assertNotNull(env.getConnectionManager());
-			assertNotNull(env.getPartitionConsumableNotifier());
-			assertNotNull(env.getTaskEventDispatcher());
-			assertNotNull(env.getPartitionManager());
-
-			// allocate some buffer pool
-			BufferPool localPool = env.getNetworkBufferPool().createBufferPool(10, false);
-			assertNotNull(localPool);
-
-			// disassociate
-			env.disassociate();
-
-			assertNull(env.getConnectionManager());
-			assertNull(env.getPartitionConsumableNotifier());
-			assertNull(env.getTaskEventDispatcher());
-			assertNull(env.getPartitionManager());
-
-			assertNotNull(env.getNetworkBufferPool());
-			assertTrue(localPool.isDestroyed());
-
-			// associate once again
-			env.associateWithTaskManagerAndJobManager(
-					DummyActorGateway.INSTANCE,
-					DummyActorGateway.INSTANCE
-			);
-
-			assertNotNull(env.getConnectionManager());
-			assertNotNull(env.getPartitionConsumableNotifier());
-			assertNotNull(env.getTaskEventDispatcher());
-			assertNotNull(env.getPartitionManager());
-
-			// shutdown for good
-			env.shutdown();
-
-			assertTrue(env.isShutdown());
-			assertFalse(env.isAssociated());
-			assertNull(env.getConnectionManager());
-			assertNull(env.getPartitionConsumableNotifier());
-			assertNull(env.getTaskEventDispatcher());
-			assertNull(env.getPartitionManager());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-
 	/**
 	 * Registers a task with an eager and non-eager partition at the network
 	 * environment and verifies that there is exactly on schedule or update
@@ -164,45 +65,61 @@ public class NetworkEnvironmentTest {
 	@SuppressWarnings("unchecked")
 	public void testEagerlyDeployConsumers() throws Exception {
 		// Mock job manager => expected interactions will be verified
-		ActorGateway jobManager = mock(ActorGateway.class);
+		final ActorGateway jobManager = mock(ActorGateway.class);
 		when(jobManager.ask(anyObject(), any(FiniteDuration.class)))
 				.thenReturn(new Promise.DefaultPromise<>().future());
 
 		// Network environment setup
 		NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration(
-				20,
-				1024,
-				MemoryType.HEAP,
-				IOManager.IOMode.SYNC,
-				0,
-				0,
-				0,
-				Some.<NettyConfig>empty(),
-				new Tuple2<>(0, 0));
+			20,
+			1024,
+			MemoryType.HEAP,
+			IOManager.IOMode.SYNC,
+			0,
+			0,
+			0,
+			Some.<NettyConfig>empty(),
+			0,
+			0);
 
 		NetworkEnvironment env = new NetworkEnvironment(
-				TestingUtils.defaultExecutionContext(),
-				new FiniteDuration(30, TimeUnit.SECONDS),
-				config,
-				new InstanceConnectionInfo(InetAddress.getLocalHost(), 12232));
-
-		// Associate the environment with the mock actors
-		env.associateWithTaskManagerAndJobManager(
-				jobManager,
-				DummyActorGateway.INSTANCE);
+			new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize(), config.memoryType()),
+			new LocalConnectionManager(),
+			new ResultPartitionManager(),
+			new TaskEventDispatcher(),
+			new KvStateRegistry(),
+			null,
+			config.ioMode(),
+			config.partitionRequestInitialBackoff(),
+			config.partitinRequestMaxBackoff());
+
+		env.start();
+
+		JobManagerCommunicationFactory jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class);
+
+		when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenAnswer(new Answer<ResultPartitionConsumableNotifier>() {
+			@Override
+			public ResultPartitionConsumableNotifier answer(InvocationOnMock invocation) throws Throwable {
+				return new ActorGatewayResultPartitionConsumableNotifier(
+					TestingUtils.defaultExecutionContext(),
+					jobManager,
+					(Task)invocation.getArguments()[0],
+					new FiniteDuration(30, TimeUnit.SECONDS));
+			}
+		});
 
 		// Register mock task
 		JobID jobId = new JobID();
+		Task mockTask = mock(Task.class);
 
 		ResultPartition[] partitions = new ResultPartition[2];
-		partitions[0] = createPartition("p1", jobId, true, env);
-		partitions[1] = createPartition("p2", jobId, false, env);
+		partitions[0] = createPartition(mockTask, "p1", jobId, true, env, jobManagerCommunicationFactory);
+		partitions[1] = createPartition(mockTask, "p2", jobId, false, env, jobManagerCommunicationFactory);
 
 		ResultPartitionWriter[] writers = new ResultPartitionWriter[2];
 		writers[0] = new ResultPartitionWriter(partitions[0]);
 		writers[1] = new ResultPartitionWriter(partitions[1]);
 
-		Task mockTask = mock(Task.class);
 		when(mockTask.getAllInputGates()).thenReturn(new SingleInputGate[0]);
 		when(mockTask.getAllWriters()).thenReturn(writers);
 		when(mockTask.getProducedPartitions()).thenReturn(partitions);
@@ -221,10 +138,12 @@ public class NetworkEnvironmentTest {
 	 * Helper to create a mock result partition.
 	 */
 	private static ResultPartition createPartition(
-			String name,
-			JobID jobId,
-			boolean eagerlyDeployConsumers,
-			NetworkEnvironment env) {
+		Task owningTask,
+		String name,
+		JobID jobId,
+		boolean eagerlyDeployConsumers,
+		NetworkEnvironment env,
+		JobManagerCommunicationFactory jobManagerCommunicationFactory) {
 
 		return new ResultPartition(
 				name,
@@ -233,8 +152,8 @@ public class NetworkEnvironmentTest {
 				ResultPartitionType.PIPELINED,
 				eagerlyDeployConsumers,
 				1,
-				env.getPartitionManager(),
-				env.getPartitionConsumableNotifier(),
+				env.getResultPartitionManager(),
+				jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(owningTask),
 				mock(IOManager.class),
 				env.getDefaultIOMode());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index 0868398..8884b29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.junit.Test;
-import scala.Tuple2;
 
 import java.io.IOException;
 
@@ -103,10 +102,11 @@ public class InputChannelTest {
 
 	private InputChannel createInputChannel(int initialBackoff, int maxBackoff) {
 		return new MockInputChannel(
-				mock(SingleInputGate.class),
-				0,
-				new ResultPartitionID(),
-				new Tuple2<Integer, Integer>(initialBackoff, maxBackoff));
+			mock(SingleInputGate.class),
+			0,
+			new ResultPartitionID(),
+			initialBackoff,
+			maxBackoff);
 	}
 
 	// ---------------------------------------------------------------------------------------------
@@ -117,9 +117,10 @@ public class InputChannelTest {
 				SingleInputGate inputGate,
 				int channelIndex,
 				ResultPartitionID partitionId,
-				Tuple2<Integer, Integer> initialAndMaxBackoff) {
+				int initialBackoff,
+				int maxBackoff) {
 
-			super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new SimpleCounter());
+			super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, new SimpleCounter());
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index f91a4ba..18d9073 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -267,13 +267,14 @@ public class LocalInputChannelTest {
 			throws IOException, InterruptedException {
 
 		return new LocalInputChannel(
-				inputGate,
-				0,
-				new ResultPartitionID(),
-				partitionManager,
-				mock(TaskEventDispatcher.class),
-				initialAndMaxRequestBackoff,
-				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			inputGate,
+			0,
+			new ResultPartitionID(),
+			partitionManager,
+			mock(TaskEventDispatcher.class),
+			initialAndMaxRequestBackoff._1(),
+			initialAndMaxRequestBackoff._2(),
+			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 9eb49ef..9a79ff8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -299,12 +299,13 @@ public class RemoteInputChannelTest {
 				.thenReturn(partitionRequestClient);
 
 		return new RemoteInputChannel(
-				inputGate,
-				0,
-				new ResultPartitionID(),
-				mock(ConnectionID.class),
-				connectionManager,
-				initialAndMaxRequestBackoff,
-				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			inputGate,
+			0,
+			new ResultPartitionID(),
+			mock(ConnectionID.class),
+			connectionManager,
+			initialAndMaxRequestBackoff._1(),
+			initialAndMaxRequestBackoff._2(),
+			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 05427a1..f55fee5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -128,7 +128,7 @@ public class SingleInputGateTest {
 		// Unknown
 		ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
 
-		InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), new Tuple2<Integer, Integer>(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+		InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		// Set channels
 		inputGate.setInputChannel(localPartitionId.getPartitionId(), local);
@@ -174,13 +174,15 @@ public class SingleInputGateTest {
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 
 		InputChannel unknown = new UnknownInputChannel(
-				inputGate,
-				0,
-				new ResultPartitionID(),
-				partitionManager,
-				new TaskEventDispatcher(),
-				new LocalConnectionManager(),
-				new Tuple2<Integer, Integer>(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			inputGate,
+			0,
+			new ResultPartitionID(),
+			partitionManager,
+			new TaskEventDispatcher(),
+			new LocalConnectionManager(),
+			0,
+			0,
+			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
 
@@ -213,14 +215,15 @@ public class SingleInputGateTest {
 				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		InputChannel unknown = new UnknownInputChannel(
-				inputGate,
-				0,
-				new ResultPartitionID(),
-				new ResultPartitionManager(),
-				new TaskEventDispatcher(),
-				new LocalConnectionManager(),
-				new Tuple2<Integer, Integer>(0, 0),
-				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			inputGate,
+			0,
+			new ResultPartitionID(),
+			new ResultPartitionManager(),
+			new TaskEventDispatcher(),
+			new LocalConnectionManager(),
+			0,
+			0,
+			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index ab4ca3b..9501c7c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -150,12 +150,14 @@ public class TaskAsyncCallTest {
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
 		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
-		when(networkEnvironment.getPartitionManager()).thenReturn(partitionManager);
-		when(networkEnvironment.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
+		when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
 		when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 		when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
 
+		JobManagerCommunicationFactory jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class);
+		when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier);
+
 		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
 				new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(),
 				new SerializedValue<>(new ExecutionConfig()),
@@ -170,17 +172,18 @@ public class TaskAsyncCallTest {
 
 		ActorGateway taskManagerGateway = DummyActorGateway.INSTANCE;
 		return new Task(tdd,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				networkEnvironment,
-				mock(BroadcastVariableManager.class),
-				taskManagerGateway,
-				DummyActorGateway.INSTANCE,
-				new FiniteDuration(60, TimeUnit.SECONDS),
-				libCache,
-				mock(FileCache.class),
-				new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
-				mock(TaskMetricGroup.class));
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			networkEnvironment,
+			jobManagerCommunicationFactory,
+			mock(BroadcastVariableManager.class),
+			taskManagerGateway,
+			DummyActorGateway.INSTANCE,
+			new FiniteDuration(60, TimeUnit.SECONDS),
+			libCache,
+			mock(FileCache.class),
+			new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
+			mock(TaskMetricGroup.class));
 	}
 
 	public static class CheckpointsInOrderInvokable extends AbstractInvokable implements StatefulTask {

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 147a3e0..3371c49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -35,8 +35,12 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -44,11 +48,11 @@ import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager;
 
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.junit.Test;
 import scala.Option;
-import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.net.InetAddress;
@@ -99,32 +103,40 @@ public class TaskManagerComponentsStartupShutdownTest {
 
 			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
 					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, 0,
-					Option.<NettyConfig>empty(), new Tuple2<Integer, Integer>(0, 0));
+					Option.<NettyConfig>empty(), 0, 0);
 
 			final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(), 10000);
 
 			final MemoryManager memManager = new MemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, false);
 			final IOManager ioManager = new IOManagerAsync(TMP_DIR);
 			final NetworkEnvironment network = new NetworkEnvironment(
-				TestingUtils.defaultExecutionContext(),
-				timeout,
-				netConf,
-				connectionInfo);
+				new NetworkBufferPool(netConf.numNetworkBuffers(), netConf.networkBufferSize(), netConf.memoryType()),
+				new LocalConnectionManager(),
+				new ResultPartitionManager(),
+				new TaskEventDispatcher(),
+				new KvStateRegistry(),
+				null,
+				netConf.ioMode(),
+				netConf.partitionRequestInitialBackoff(),
+				netConf.partitinRequestMaxBackoff());
+
+			network.start();
+
 			final int numberOfSlots = 1;
 
 			LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManager.path().toString());
 
 			// create the task manager
 			final Props tmProps = Props.create(
-					TaskManager.class,
-					tmConfig,
-					ResourceID.generate(),
-					connectionInfo,
-					memManager,
-					ioManager,
-					network,
-					numberOfSlots,
-					leaderRetrievalService);
+				TaskManager.class,
+				tmConfig,
+				ResourceID.generate(),
+				connectionInfo,
+				memManager,
+				ioManager,
+				network,
+				numberOfSlots,
+				leaderRetrievalService);
 
 			final ActorRef taskManager = actorSystem.actorOf(tmProps);
 
@@ -142,9 +154,6 @@ public class TaskManagerComponentsStartupShutdownTest {
 				};
 			}};
 
-			// the components should now all be initialized
-			assertTrue(network.isAssociated());
-
 			// shut down all actors and the actor system
 			// Kill the Task down the JobManager
 			taskManager.tell(Kill.getInstance(), ActorRef.noSender());
@@ -156,7 +165,6 @@ public class TaskManagerComponentsStartupShutdownTest {
 			actorSystem = null;
 
 			// now that the TaskManager is shut down, the components should be shut down as well
-			assertFalse(network.isAssociated());
 			assertTrue(network.isShutdown());
 			assertTrue(ioManager.isProperlyShutDown());
 			assertTrue(memManager.isShutdown());

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index e23aba7..53fa7c1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -28,12 +28,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
@@ -47,15 +45,11 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import scala.Option;
-import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.ServerSocket;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -548,69 +542,6 @@ public class TaskManagerRegistrationTest extends TestLogger {
 		}};
 	}
 
-
-	@Test
-	public void testStartupWhenNetworkStackFailsToInitialize() {
-
-		ServerSocket blocker = null;
-
-		try {
-			blocker = new ServerSocket(0, 50, InetAddress.getByName("localhost"));
-
-			final Configuration cfg = new Configuration();
-			cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
-			cfg.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, blocker.getLocalPort());
-			cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
-
-			new JavaTestKit(actorSystem) {{
-				ActorRef taskManager = null;
-				ActorRef jobManager = null;
-				ActorRef resourceManager = null;
-
-				try {
-					// a simple JobManager
-					jobManager = startJobManager(config);
-
-					resourceManager = startResourceManager(config, jobManager);
-
-					// start a task manager with a configuration that provides a blocked port
-					taskManager = TaskManager.startTaskManagerComponentsAndActor(
-							cfg, ResourceID.generate(), actorSystem, "localhost",
-							NONE_STRING, // no actor name -> random
-							new Some<LeaderRetrievalService>(new StandaloneLeaderRetrievalService(jobManager.path().toString())),
-							false, // init network stack !!!
-							TaskManager.class);
-
-					watch(taskManager);
-
-					expectTerminated(timeout, taskManager);
-				}
-				catch (Exception e) {
-					e.printStackTrace();
-					fail(e.getMessage());
-				} finally {
-					stopActor(taskManager);
-					stopActor(jobManager);
-				}
-			}};
-		}
-		catch (Exception e) {
-			// does not work, skip test
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			if (blocker != null) {
-				try {
-					blocker.close();
-				}
-				catch (IOException e) {
-					// ignore, best effort
-				}
-			}
-		}
-	}
-
 	@Test
 	public void testCheckForValidRegistrationSessionIDs() {
 		new JavaTestKit(actorSystem) {{

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index 149df6e..686de76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -25,9 +25,10 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.StartupUtils;
-import org.junit.Before;
 import org.junit.Test;
+import scala.Option;
 
 import java.io.File;
 import java.io.IOException;
@@ -179,4 +180,43 @@ public class TaskManagerStartupTest {
 			fail(e.getMessage());
 		}
 	}
+
+	/**
+	 * Tests that the task manager start-up fails if the network stack cannot be initialized.
+	 * @throws Exception
+	 */
+	@Test(expected = IOException.class)
+	public void testStartupWhenNetworkStackFailsToInitialize() throws Exception {
+
+		ServerSocket blocker = null;
+
+		try {
+			blocker = new ServerSocket(0, 50, InetAddress.getByName("localhost"));
+
+			final Configuration cfg = new Configuration();
+			cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
+			cfg.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, blocker.getLocalPort());
+			cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
+
+			TaskManager.startTaskManagerComponentsAndActor(
+				cfg,
+				ResourceID.generate(),
+				null,
+				"localhost",
+				Option.<String>empty(),
+				Option.<LeaderRetrievalService>empty(),
+				false,
+				TaskManager.class);
+		}
+		finally {
+			if (blocker != null) {
+				try {
+					blocker.close();
+				}
+				catch (IOException e) {
+					// ignore, best effort
+				}
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 54cd7c6..0e53673 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -990,11 +990,8 @@ public class TaskManagerTest extends TestLogger {
 
 				jobManager = new AkkaActorGateway(jm, leaderSessionID);
 
-				final int dataPort = NetUtils.getAvailablePort();
 				final Configuration config = new Configuration();
 
-				config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
-
 				taskManager = TestingUtils.createTaskManager(
 						system,
 						jobManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index fea21be..cfa7fb6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -66,10 +66,20 @@ public class TaskStopTest {
 		when(tddMock.getSerializedExecutionConfig()).thenReturn(mock(SerializedValue.class));
 		when(tddMock.getInvokableClassName()).thenReturn("className");
 
-		task = new Task(tddMock, mock(MemoryManager.class), mock(IOManager.class), mock(NetworkEnvironment.class),
-				mock(BroadcastVariableManager.class), mock(ActorGateway.class), mock(ActorGateway.class),
-				mock(FiniteDuration.class), mock(LibraryCacheManager.class), mock(FileCache.class),
-				mock(TaskManagerRuntimeInfo.class), mock(TaskMetricGroup.class));
+		task = new Task(
+			tddMock,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			mock(NetworkEnvironment.class),
+			mock(JobManagerCommunicationFactory.class),
+			mock(BroadcastVariableManager.class),
+			mock(ActorGateway.class),
+			mock(ActorGateway.class),
+			mock(FiniteDuration.class),
+			mock(LibraryCacheManager.class),
+			mock(FileCache.class),
+			mock(TaskManagerRuntimeInfo.class),
+			mock(TaskMetricGroup.class));
 		Field f = task.getClass().getDeclaredField("invokable");
 		f.setAccessible(true);
 		f.set(task, taskMock);

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index f145b48..9e8f8f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -244,12 +244,14 @@ public class TaskTest {
 			ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 			ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
 			NetworkEnvironment network = mock(NetworkEnvironment.class);
-			when(network.getPartitionManager()).thenReturn(partitionManager);
-			when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
+			when(network.getResultPartitionManager()).thenReturn(partitionManager);
 			when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 			doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class));
+
+			JobManagerCommunicationFactory jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class);
+			when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier);
 			
-			Task task = createTask(TestInvokableCorrect.class, libCache, network);
+			Task task = createTask(TestInvokableCorrect.class, libCache, network, jobManagerCommunicationFactory);
 
 			task.registerExecutionListener(listenerGateway);
 
@@ -598,18 +600,22 @@ public class TaskTest {
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
 		NetworkEnvironment network = mock(NetworkEnvironment.class);
-		when(network.getPartitionManager()).thenReturn(partitionManager);
-		when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
+		JobManagerCommunicationFactory jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class);
+		when(network.getResultPartitionManager()).thenReturn(partitionManager);
 		when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
+
+		when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier);
 		
-		return createTask(invokable, libCache, network);
+		return createTask(invokable, libCache, network, jobManagerCommunicationFactory);
 	}
 	
-	private Task createTask(Class<? extends AbstractInvokable> invokable,
-							LibraryCacheManager libCache,
-							NetworkEnvironment networkEnvironment) {
+	private Task createTask(
+		Class<? extends AbstractInvokable> invokable,
+		LibraryCacheManager libCache,
+		NetworkEnvironment networkEnvironment,
+		JobManagerCommunicationFactory jobManagerCommunicationFactory) {
 		
 		TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(invokable);
 		
@@ -618,6 +624,7 @@ public class TaskTest {
 				mock(MemoryManager.class),
 				mock(IOManager.class),
 				networkEnvironment,
+			jobManagerCommunicationFactory,
 				mock(BroadcastVariableManager.class),
 				taskManagerGateway,
 				jobManagerGateway,

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index f8b4063..e1c9407 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -41,13 +41,12 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractCloseableHandle;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -148,19 +147,20 @@ public class InterruptSensitiveRestoreTest {
 				.thenReturn(mock(TaskKvStateRegistry.class));
 
 		return new Task(
-				tdd,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				networkEnvironment,
-				mock(BroadcastVariableManager.class),
-				mock(ActorGateway.class),
-				mock(ActorGateway.class),
-				new FiniteDuration(10, TimeUnit.SECONDS),
-				new FallbackLibraryCacheManager(),
-				new FileCache(new Configuration()),
-				new TaskManagerRuntimeInfo(
-						"localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
-				new UnregisteredTaskMetricsGroup());
+			tdd,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			networkEnvironment,
+			mock(JobManagerCommunicationFactory.class),
+			mock(BroadcastVariableManager.class),
+			mock(ActorGateway.class),
+			mock(ActorGateway.class),
+			new FiniteDuration(10, TimeUnit.SECONDS),
+			new FallbackLibraryCacheManager(),
+			new FileCache(new Configuration()),
+			new TaskManagerRuntimeInfo(
+					"localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
+			new UnregisteredTaskMetricsGroup());
 		
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 408b5b1..0a9d2fa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -81,9 +82,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class StreamTaskTest {
 
@@ -228,12 +226,14 @@ public class StreamTaskTest {
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
 		NetworkEnvironment network = mock(NetworkEnvironment.class);
-		when(network.getPartitionManager()).thenReturn(partitionManager);
-		when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
+		when(network.getResultPartitionManager()).thenReturn(partitionManager);
 		when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
 
+		JobManagerCommunicationFactory jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class);
+		when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier);
+
 		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
 				new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(),
 				new SerializedValue<>(new ExecutionConfig()),
@@ -248,18 +248,19 @@ public class StreamTaskTest {
 				0);
 
 		return new Task(
-				tdd,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				network,
-				mock(BroadcastVariableManager.class),
-				new DummyGateway(),
-				new DummyGateway(),
-				new FiniteDuration(60, TimeUnit.SECONDS),
-				libCache,
-				mock(FileCache.class),
-				new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
-				mock(TaskMetricGroup.class));
+			tdd,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			network,
+			jobManagerCommunicationFactory,
+			mock(BroadcastVariableManager.class),
+			new DummyGateway(),
+			new DummyGateway(),
+			new FiniteDuration(60, TimeUnit.SECONDS),
+			libCache,
+			mock(FileCache.class),
+			new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
+			mock(TaskMetricGroup.class));
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 82dbd1f..8915bff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -233,13 +233,19 @@ public class RescalingITCase extends TestLogger {
 
 			cluster.submitJobDetached(jobGraph);
 
-			Future<Object> allTasksRunning = jobManager.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobID), deadline.timeLeft());
+			Object savepointResponse = null;
 
-			Await.ready(allTasksRunning, deadline.timeLeft());
+			// we might be too early for taking a savepoint if the operators have not been started yet
+			while (deadline.hasTimeLeft()) {
 
-			Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID), deadline.timeLeft());
+				Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID), deadline.timeLeft());
+
+				savepointResponse = Await.result(savepointPathFuture, deadline.timeLeft());
 
-			Object savepointResponse = Await.result(savepointPathFuture, deadline.timeLeft());
+				if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) {
+					break;
+				}
+			}
 
 			assertTrue(savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index 8b39f52..107801d 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -18,17 +18,13 @@
 
 package org.apache.flink.yarn
 
-import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, TaskManager}
-import org.apache.flink.runtime.util.ProcessShutDownThread
-
-import scala.concurrent.duration._
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration}
 
 /** An extension of the TaskManager that listens for additional YARN related
   * messages.


[2/2] flink git commit: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnvironment independent of ActorGateway and JobManager association

Posted by tr...@apache.org.
[FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnvironment independent of ActorGateway and JobManager association

Makes the NetworkEnvironment independent of the JobManager association. This means that the
NetworkEnvironment and with it the ConnectionManager is started before the TaskManager actor
is executed. Furthermore, the ConnectionManager keeps running even in case of a JobManager
disassocation. In the wake of the remodelling this behaviour, the PartitionStateChecker and
the ResultPartitionConsumableNotifier which depend on the JobManager association were moved
out of the NetworkEnvironment. They are now contained in the SlotEnvironment which will be
set up when the TaskManager connects to a JobManager. The SlotEnvironment contains all
information related to the associated JobManager. Since all slots are implicitly associated
with the JobManager which is the leader, we only create one SlotEnvironment which is shared
by all Tasks.

Introduce SlotEnvironment to accommodate the PartitionStateChecker and ResultPartitionConsumableNotifier

Remove the PartitionStateChecker and the ResultPartitionConsumableNotifier from the
NetworkEnvironment. Start the NetworkEnvironment when the TaskManager components are
created. Keep the NetworkEnvironment running also when the JobManager is disassociated.

Fix CassandraConnectorITCase

Remove ExecutionContext from TaskManager; Rename SlotEnvironment into JobManagerConnection

Introduce JobManagerCommunicationFactory to generate job manager specific communication components

This closes #2449.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78f2a158
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78f2a158
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78f2a158

Branch: refs/heads/master
Commit: 78f2a15867734055a9712ea3a27f54d9bed3e43b
Parents: bc9d523
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Aug 31 09:33:46 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Sep 2 15:05:21 2016 +0200

----------------------------------------------------------------------
 .../instance/InstanceConnectionInfo.java        |   7 +-
 .../flink/runtime/io/network/ConnectionID.java  |   2 +
 .../runtime/io/network/ConnectionManager.java   |   2 +
 .../io/network/LocalConnectionManager.java      |   5 +
 .../runtime/io/network/NetworkEnvironment.java  | 514 ++++---------------
 .../runtime/io/network/netty/NettyConfig.java   |   2 +-
 .../network/netty/NettyConnectionManager.java   |   9 +
 .../runtime/io/network/netty/NettyServer.java   |  10 +
 .../io/network/partition/ResultPartition.java   |  25 +-
 .../partition/ResultPartitionManager.java       |   2 +
 .../partition/consumer/InputChannel.java        |   8 +-
 .../partition/consumer/LocalInputChannel.java   |   8 +-
 .../partition/consumer/RemoteInputChannel.java  |   8 +-
 .../partition/consumer/SingleInputGate.java     |  32 +-
 .../partition/consumer/UnknownInputChannel.java |  17 +-
 .../flink/runtime/query/KvStateRegistry.java    |   7 +
 ...orGatewayJobManagerCommunicationFactory.java |  61 +++
 .../ActorGatewayKvStateRegistryListener.java    |  82 +++
 .../ActorGatewayPartitionStateChecker.java      |  59 +++
 ...atewayResultPartitionConsumableNotifier.java |  82 +++
 .../JobManagerCommunicationFactory.java         |  47 ++
 .../apache/flink/runtime/taskmanager/Task.java  |  28 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +
 .../NetworkEnvironmentConfiguration.scala       |   3 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 152 ++++--
 .../testingUtils/TestingTaskManagerLike.scala   |   2 +-
 .../io/network/NetworkEnvironmentTest.java      | 189 ++-----
 .../partition/consumer/InputChannelTest.java    |  15 +-
 .../consumer/LocalInputChannelTest.java         |  15 +-
 .../consumer/RemoteInputChannelTest.java        |  15 +-
 .../partition/consumer/SingleInputGateTest.java |  35 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  29 +-
 ...askManagerComponentsStartupShutdownTest.java |  46 +-
 .../TaskManagerRegistrationTest.java            |  69 ---
 .../taskmanager/TaskManagerStartupTest.java     |  42 +-
 .../runtime/taskmanager/TaskManagerTest.java    |   3 -
 .../flink/runtime/taskmanager/TaskStopTest.java |  18 +-
 .../flink/runtime/taskmanager/TaskTest.java     |  25 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |  30 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  35 +-
 .../test/checkpointing/RescalingITCase.java     |  14 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |   6 +-
 42 files changed, 932 insertions(+), 834 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
index eb87292..2830f04 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.instance;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
  * for data exchange. This class also contains utilities to work with the
  * TaskManager's host name, which is used to localize work assignments.
  */
-public class InstanceConnectionInfo implements IOReadableWritable, Comparable<InstanceConnectionInfo>, java.io.Serializable {
+public class InstanceConnectionInfo implements IOReadableWritable, Comparable<InstanceConnectionInfo>, Serializable {
 
 	private static final long serialVersionUID = -8254407801276350716L;
 	
@@ -77,7 +78,9 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 		if (inetAddress == null) {
 			throw new IllegalArgumentException("Argument inetAddress must not be null");
 		}
-		if (dataPort <= 0) {
+
+		// -1 indicates a local instance connection info
+		if (dataPort != -1 && dataPort <= 0) {
 			throw new IllegalArgumentException("Argument dataPort must be greater than zero");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
index c15e72e..0569dae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
@@ -37,6 +37,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ConnectionID implements Serializable {
 
+	private static final long serialVersionUID = -8068626194818666857L;
+
 	private final InetSocketAddress address;
 
 	private final int connectionIndex;

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index 2f535fe..02deb9d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -46,6 +46,8 @@ public interface ConnectionManager {
 
 	int getNumberOfActiveConnections();
 
+	int getDataPort();
+
 	void shutdown() throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 410f8ab..4f51a56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -48,5 +48,10 @@ public class LocalConnectionManager implements ConnectionManager {
 	}
 
 	@Override
+	public int getDataPort() {
+		return -1;
+	}
+
+	@Override
 	public void shutdown() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 844bc2d..b221ec7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -18,60 +18,34 @@
 
 package org.apache.flink.runtime.io.network;
 
-import akka.dispatch.OnFailure;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
-import org.apache.flink.runtime.messages.TaskMessages.FailTask;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateMessage;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateRegistryListener;
-import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
-import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 
-import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Network I/O components of each {@link TaskManager} instance. The network environment contains
  * the data structures that keep track of all intermediate results and all data exchanges.
- *
- * When initialized, the NetworkEnvironment will allocate the network buffer pool.
- * All other components (netty, intermediate result managers, ...) are only created once the
- * environment is "associated" with a TaskManager and JobManager. This happens as soon as the
- * TaskManager actor gets created and registers itself at the JobManager.
  */
 public class NetworkEnvironment {
 
@@ -79,69 +53,61 @@ public class NetworkEnvironment {
 
 	private final Object lock = new Object();
 
-	private final NetworkEnvironmentConfiguration configuration;
-
-	private final FiniteDuration jobManagerTimeout;
-
 	private final NetworkBufferPool networkBufferPool;
 
-	private ConnectionManager connectionManager;
+	private final ConnectionManager connectionManager;
 
-	private ResultPartitionManager partitionManager;
+	private final ResultPartitionManager resultPartitionManager;
 
-	private TaskEventDispatcher taskEventDispatcher;
-
-	private ResultPartitionConsumableNotifier partitionConsumableNotifier;
-
-	private PartitionStateChecker partitionStateChecker;
+	private final TaskEventDispatcher taskEventDispatcher;
 
 	/** Server for {@link org.apache.flink.runtime.state.KvState} requests. */
-	private KvStateServer kvStateServer;
+	private final KvStateServer kvStateServer;
 
 	/** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */
-	private KvStateRegistry kvStateRegistry;
+	private final KvStateRegistry kvStateRegistry;
 
-	private boolean isShutdown;
+	private final IOManager.IOMode defaultIOMode;
 
-	/**
-	 * ExecutionEnvironment which is used to execute remote calls with the
-	 * {@link JobManagerResultPartitionConsumableNotifier}
-	 */
-	private final ExecutionContext executionContext;
+	private final int partitionRequestInitialBackoff;
 
-	private final InstanceConnectionInfo connectionInfo;
+	private final int partitionRequestMaxBackoff;
+
+	private boolean isShutdown;
 
-	/**
-	 * Initializes all network I/O components.
-	 */
 	public NetworkEnvironment(
-			ExecutionContext executionContext,
-			FiniteDuration jobManagerTimeout,
-			NetworkEnvironmentConfiguration config,
-			InstanceConnectionInfo connectionInfo) throws IOException {
-
-		this.executionContext = executionContext;
-		this.configuration = checkNotNull(config);
-		this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
-		this.connectionInfo = checkNotNull(connectionInfo);
-
-		// create the network buffers - this is the operation most likely to fail upon
-		// mis-configuration, so we do this first
-		try {
-			networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(),
-					config.networkBufferSize(), config.memoryType());
-		}
-		catch (Throwable t) {
-			throw new IOException("Cannot allocate network buffer pool: " + t.getMessage(), t);
-		}
+		NetworkBufferPool networkBufferPool,
+		ConnectionManager connectionManager,
+		ResultPartitionManager resultPartitionManager,
+		TaskEventDispatcher taskEventDispatcher,
+		KvStateRegistry kvStateRegistry,
+		KvStateServer kvStateServer,
+		IOMode defaultIOMode,
+		int partitionRequestInitialBackoff,
+		int partitionRequestMaxBackoff) {
+
+		this.networkBufferPool = checkNotNull(networkBufferPool);
+		this.connectionManager = checkNotNull(connectionManager);
+		this.resultPartitionManager = checkNotNull(resultPartitionManager);
+		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
+		this.kvStateRegistry = checkNotNull(kvStateRegistry);
+
+		this.kvStateServer = kvStateServer;
+
+		this.defaultIOMode = defaultIOMode;
+
+		this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
+		this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
+
+		isShutdown = false;
 	}
 
 	// --------------------------------------------------------------------------------------------
 	//  Properties
 	// --------------------------------------------------------------------------------------------
 
-	public ResultPartitionManager getPartitionManager() {
-		return partitionManager;
+	public ResultPartitionManager getResultPartitionManager() {
+		return resultPartitionManager;
 	}
 
 	public TaskEventDispatcher getTaskEventDispatcher() {
@@ -157,187 +123,27 @@ public class NetworkEnvironment {
 	}
 
 	public IOMode getDefaultIOMode() {
-		return configuration.ioMode();
+		return defaultIOMode;
 	}
 
-	public ResultPartitionConsumableNotifier getPartitionConsumableNotifier() {
-		return partitionConsumableNotifier;
+	public int getPartitionRequestInitialBackoff() {
+		return partitionRequestInitialBackoff;
 	}
 
-	public PartitionStateChecker getPartitionStateChecker() {
-		return partitionStateChecker;
+	public int getPartitionRequestMaxBackoff() {
+		return partitionRequestMaxBackoff;
 	}
 
-	public Tuple2<Integer, Integer> getPartitionRequestInitialAndMaxBackoff() {
-		return configuration.partitionRequestInitialAndMaxBackoff();
+	public KvStateRegistry getKvStateRegistry() {
+		return kvStateRegistry;
 	}
 
-	public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobId, JobVertexID jobVertexId) {
-		return kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
+	public KvStateServer getKvStateServer() {
+		return kvStateServer;
 	}
 
-	// --------------------------------------------------------------------------------------------
-	//  Association / Disassociation with JobManager / TaskManager
-	// --------------------------------------------------------------------------------------------
-
-	public boolean isAssociated() {
-		return partitionConsumableNotifier != null;
-	}
-
-	/**
-	 * This associates the network environment with a TaskManager and JobManager.
-	 * This will actually start the network components.
-	 *
-	 * @param jobManagerGateway Gateway to the JobManager.
-	 * @param taskManagerGateway Gateway to the TaskManager.
-	 *
-	 * @throws IOException Thrown if the network subsystem (Netty) cannot be properly started.
-	 */
-	public void associateWithTaskManagerAndJobManager(
-			ActorGateway jobManagerGateway,
-			ActorGateway taskManagerGateway) throws IOException
-	{
-		checkNotNull(jobManagerGateway);
-		checkNotNull(taskManagerGateway);
-
-		synchronized (lock) {
-			if (isShutdown) {
-				throw new IllegalStateException("environment is shut down");
-			}
-
-			if (this.partitionConsumableNotifier == null &&
-				this.partitionManager == null &&
-				this.taskEventDispatcher == null &&
-				this.connectionManager == null &&
-				this.kvStateRegistry == null &&
-				this.kvStateServer == null)
-			{
-				// good, not currently associated. start the individual components
-
-				LOG.debug("Starting result partition manager and network connection manager");
-				this.partitionManager = new ResultPartitionManager();
-				this.taskEventDispatcher = new TaskEventDispatcher();
-				this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(
-					executionContext,
-					jobManagerGateway,
-					taskManagerGateway,
-					jobManagerTimeout);
-
-				this.partitionStateChecker = new JobManagerPartitionStateChecker(
-						jobManagerGateway, taskManagerGateway);
-
-				// -----  Network connections  -----
-				final Option<NettyConfig> nettyConfig = configuration.nettyConfig();
-				connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get())
-															: new LocalConnectionManager();
-
-				try {
-					LOG.debug("Starting network connection manager");
-					connectionManager.start(partitionManager, taskEventDispatcher, networkBufferPool);
-				}
-				catch (Throwable t) {
-					throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
-				}
-
-				try {
-					kvStateRegistry = new KvStateRegistry();
-
-					if (nettyConfig.isDefined()) {
-						int numNetworkThreads = configuration.queryServerNetworkThreads();
-						if (numNetworkThreads == 0) {
-							numNetworkThreads = nettyConfig.get().getNumberOfSlots();
-						}
-
-						int numQueryThreads = configuration.queryServerNetworkThreads();
-						if (numQueryThreads == 0) {
-							numQueryThreads = nettyConfig.get().getNumberOfSlots();
-						}
-
-						kvStateServer = new KvStateServer(
-								connectionInfo.address(),
-								configuration.queryServerPort(),
-								numNetworkThreads,
-								numQueryThreads,
-								kvStateRegistry,
-								new DisabledKvStateRequestStats());
-
-						kvStateServer.start();
-
-						KvStateRegistryListener listener = new JobManagerKvStateRegistryListener(
-								jobManagerGateway,
-								kvStateServer.getAddress());
-
-						kvStateRegistry.registerListener(listener);
-					}
-				} catch (Throwable t) {
-					throw new IOException("Failed to instantiate KvState management components: "
-							+ t.getMessage(), t);
-				}
-			}
-			else {
-				throw new IllegalStateException(
-						"Network Environment is already associated with a JobManager/TaskManager");
-			}
-		}
-	}
-
-	public void disassociate() throws IOException {
-		synchronized (lock) {
-			if (!isAssociated()) {
-				return;
-			}
-
-			LOG.debug("Disassociating NetworkEnvironment from TaskManager. Cleaning all intermediate results.");
-
-			// Shut down KvStateRegistry
-			kvStateRegistry = null;
-
-			// Shut down KvStateServer
-			if (kvStateServer != null) {
-				try {
-					kvStateServer.shutDown();
-				} catch (Throwable t) {
-					throw new IOException("Cannot shutdown KvStateNettyServer", t);
-				}
-				kvStateServer = null;
-			}
-
-			// terminate all network connections
-			if (connectionManager != null) {
-				try {
-					LOG.debug("Shutting down network connection manager");
-					connectionManager.shutdown();
-					connectionManager = null;
-				}
-				catch (Throwable t) {
-					throw new IOException("Cannot shutdown network connection manager", t);
-				}
-			}
-
-			// shutdown all intermediate results
-			if (partitionManager != null) {
-				try {
-					LOG.debug("Shutting down intermediate result partition manager");
-					partitionManager.shutdown();
-					partitionManager = null;
-				}
-				catch (Throwable t) {
-					throw new IOException("Cannot shutdown partition manager", t);
-				}
-			}
-
-			partitionConsumableNotifier = null;
-
-			partitionStateChecker = null;
-
-			if (taskEventDispatcher != null) {
-				taskEventDispatcher.clearAll();
-				taskEventDispatcher = null;
-			}
-
-			// make sure that the global buffer pool re-acquires all buffers
-			networkBufferPool.destroyAllBufferPools();
-		}
+	public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobId, JobVertexID jobVertexId) {
+		return kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -358,9 +164,6 @@ public class NetworkEnvironment {
 			if (isShutdown) {
 				throw new IllegalStateException("NetworkEnvironment is shut down");
 			}
-			if (!isAssociated()) {
-				throw new IllegalStateException("NetworkEnvironment is not associated with a TaskManager");
-			}
 
 			for (int i = 0; i < producedPartitions.length; i++) {
 				final ResultPartition partition = producedPartitions[i];
@@ -373,17 +176,15 @@ public class NetworkEnvironment {
 					bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false);
 					partition.registerBufferPool(bufferPool);
 
-					partitionManager.registerResultPartition(partition);
-				}
-				catch (Throwable t) {
+					resultPartitionManager.registerResultPartition(partition);
+				} catch (Throwable t) {
 					if (bufferPool != null) {
 						bufferPool.lazyDestroy();
 					}
 
 					if (t instanceof IOException) {
 						throw (IOException) t;
-					}
-					else {
+					} else {
 						throw new IOException(t.getMessage(), t);
 					}
 				}
@@ -401,31 +202,18 @@ public class NetworkEnvironment {
 				try {
 					bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
 					gate.setBufferPool(bufferPool);
-				}
-				catch (Throwable t) {
+				} catch (Throwable t) {
 					if (bufferPool != null) {
 						bufferPool.lazyDestroy();
 					}
 
 					if (t instanceof IOException) {
 						throw (IOException) t;
-					}
-					else {
+					} else {
 						throw new IOException(t.getMessage(), t);
 					}
 				}
 			}
-
-			// Copy the reference to prevent races with concurrent shut downs
-			jobManagerNotifier = partitionConsumableNotifier;
-		}
-
-		for (ResultPartition partition : producedPartitions) {
-			// Eagerly notify consumers if required.
-			if (partition.getEagerlyDeployConsumers()) {
-				jobManagerNotifier.notifyPartitionConsumable(
-						partition.getJobId(), partition.getPartitionId());
-			}
 		}
 	}
 
@@ -436,13 +224,13 @@ public class NetworkEnvironment {
 		final ExecutionAttemptID executionId = task.getExecutionId();
 
 		synchronized (lock) {
-			if (isShutdown || !isAssociated()) {
+			if (isShutdown) {
 				// no need to do anything when we are not operational
 				return;
 			}
 
 			if (task.isCanceledOrFailed()) {
-				partitionManager.releasePartitionsProducedBy(executionId, task.getFailureCause());
+				resultPartitionManager.releasePartitionsProducedBy(executionId, task.getFailureCause());
 			}
 
 			ResultPartitionWriter[] writers = task.getAllWriters();
@@ -476,6 +264,31 @@ public class NetworkEnvironment {
 		}
 	}
 
+	public void start() throws IOException {
+		synchronized (lock) {
+			Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down.");
+
+			LOG.info("Starting the network environment and its components.");
+
+			try {
+				LOG.debug("Starting network connection manager");
+				connectionManager.start(resultPartitionManager, taskEventDispatcher, networkBufferPool);
+			}
+			catch (IOException t) {
+				throw new IOException("Failed to instantiate network connection manager.", t);
+			}
+
+			if (kvStateServer != null) {
+				try {
+					LOG.debug("Starting the KvState server.");
+					kvStateServer.start();
+				} catch (InterruptedException ie) {
+					throw new IOException("Failed to start the KvState server.", ie);
+				}
+			}
+		}
+	}
+
 	/**
 	 * Tries to shut down all network I/O components.
 	 */
@@ -485,20 +298,45 @@ public class NetworkEnvironment {
 				return;
 			}
 
-			// shut down all connections and free all intermediate result partitions
+			LOG.info("Shutting down the network environment and its components.");
+
+			if (kvStateServer != null) {
+				try {
+					kvStateServer.shutDown();
+				} catch (Throwable t) {
+					LOG.warn("Cannot shut down KvState server.", t);
+				}
+			}
+
+			// terminate all network connections
 			try {
-				disassociate();
+				LOG.debug("Shutting down network connection manager");
+				connectionManager.shutdown();
 			}
 			catch (Throwable t) {
-				LOG.warn("Network services did not shut down properly: " + t.getMessage(), t);
+				LOG.warn("Cannot shut down the network connection manager.", t);
 			}
 
+			// shutdown all intermediate results
+			try {
+				LOG.debug("Shutting down intermediate result partition manager");
+				resultPartitionManager.shutdown();
+			}
+			catch (Throwable t) {
+				LOG.warn("Cannot shut down the result partition manager.", t);
+			}
+
+			taskEventDispatcher.clearAll();
+
+			// make sure that the global buffer pool re-acquires all buffers
+			networkBufferPool.destroyAllBufferPools();
+
 			// destroy the buffer pool
 			try {
 				networkBufferPool.destroy();
 			}
 			catch (Throwable t) {
-				LOG.warn("Network buffer pool did not shut down properly: " + t.getMessage(), t);
+				LOG.warn("Network buffer pool did not shut down properly.", t);
 			}
 
 			isShutdown = true;
@@ -506,138 +344,8 @@ public class NetworkEnvironment {
 	}
 
 	public boolean isShutdown() {
-		return isShutdown;
-	}
-
-	/**
-	 * Notifies the job manager about consumable partitions.
-	 */
-	private static class JobManagerResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
-
-		/**
-		 * {@link ExecutionContext} which is used for the failure handler of {@link ScheduleOrUpdateConsumers}
-		 * messages.
-		 */
-		private final ExecutionContext executionContext;
-
-		private final ActorGateway jobManager;
-
-		private final ActorGateway taskManager;
-
-		private final FiniteDuration jobManagerMessageTimeout;
-
-		public JobManagerResultPartitionConsumableNotifier(
-			ExecutionContext executionContext,
-			ActorGateway jobManager,
-			ActorGateway taskManager,
-			FiniteDuration jobManagerMessageTimeout) {
-
-			this.executionContext = executionContext;
-			this.jobManager = jobManager;
-			this.taskManager = taskManager;
-			this.jobManagerMessageTimeout = jobManagerMessageTimeout;
-		}
-
-		@Override
-		public void notifyPartitionConsumable(JobID jobId, final ResultPartitionID partitionId) {
-
-			final ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId, partitionId);
-
-			Future<Object> futureResponse = jobManager.ask(msg, jobManagerMessageTimeout);
-
-			futureResponse.onFailure(new OnFailure() {
-				@Override
-				public void onFailure(Throwable failure) {
-					LOG.error("Could not schedule or update consumers at the JobManager.", failure);
-
-					// Fail task at the TaskManager
-					FailTask failMsg = new FailTask(
-							partitionId.getProducerId(),
-							new RuntimeException("Could not notify JobManager to schedule or update consumers",
-									failure));
-
-					taskManager.tell(failMsg);
-				}
-			}, executionContext);
-		}
-	}
-
-	private static class JobManagerPartitionStateChecker implements PartitionStateChecker {
-
-		private final ActorGateway jobManager;
-
-		private final ActorGateway taskManager;
-
-		public JobManagerPartitionStateChecker(ActorGateway jobManager, ActorGateway taskManager) {
-			this.jobManager = jobManager;
-			this.taskManager = taskManager;
-		}
-
-		@Override
-		public void triggerPartitionStateCheck(
-				JobID jobId,
-				ExecutionAttemptID executionAttemptID,
-				IntermediateDataSetID resultId,
-				ResultPartitionID partitionId) {
-
-			RequestPartitionState msg = new RequestPartitionState(
-					jobId, partitionId, executionAttemptID, resultId);
-
-			jobManager.tell(msg, taskManager);
-		}
-	}
-
-	/**
-	 * Simple {@link KvStateRegistry} listener, which forwards registrations to
-	 * the JobManager.
-	 */
-	private static class JobManagerKvStateRegistryListener implements KvStateRegistryListener {
-
-		private ActorGateway jobManager;
-
-		private KvStateServerAddress kvStateServerAddress;
-
-		public JobManagerKvStateRegistryListener(
-				ActorGateway jobManager,
-				KvStateServerAddress kvStateServerAddress) {
-
-			this.jobManager = Preconditions.checkNotNull(jobManager, "JobManager");
-			this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress");
-		}
-
-		@Override
-		public void notifyKvStateRegistered(
-				JobID jobId,
-				JobVertexID jobVertexId,
-				int keyGroupIndex,
-				String registrationName,
-				KvStateID kvStateId) {
-
-			Object msg = new KvStateMessage.NotifyKvStateRegistered(
-					jobId,
-					jobVertexId,
-					keyGroupIndex,
-					registrationName,
-					kvStateId,
-					kvStateServerAddress);
-
-			jobManager.tell(msg);
-		}
-
-		@Override
-		public void notifyKvStateUnregistered(
-				JobID jobId,
-				JobVertexID jobVertexId,
-				int keyGroupIndex,
-				String registrationName) {
-
-			Object msg = new KvStateMessage.NotifyKvStateUnregistered(
-					jobId,
-					jobVertexId,
-					keyGroupIndex,
-					registrationName);
-
-			jobManager.tell(msg);
+		synchronized (lock) {
+			return isShutdown;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 6806136..c178f2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -76,7 +76,7 @@ public class NettyConfig {
 
 		this.serverAddress = checkNotNull(serverAddress);
 
-		checkArgument(serverPort > 0 && serverPort <= 65536, "Invalid port number.");
+		checkArgument(serverPort >= 0 && serverPort <= 65536, "Invalid port number.");
 		this.serverPort = serverPort;
 
 		checkArgument(memorySegmentSize > 0, "Invalid memory segment size.");

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index d278b3c..abee2a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -71,6 +71,15 @@ public class NettyConnectionManager implements ConnectionManager {
 	}
 
 	@Override
+	public int getDataPort() {
+		if (server != null && server.getLocalAddress() != null) {
+			return server.getLocalAddress().getPort();
+		} else {
+			return -1;
+		}
+	}
+
+	@Override
 	public void shutdown() {
 		client.shutdown();
 		server.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index 036fe22..a93e90c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.concurrent.ThreadFactory;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -50,8 +51,11 @@ class NettyServer {
 
 	private ChannelFuture bindFuture;
 
+	private InetSocketAddress localAddress;
+
 	NettyServer(NettyConfig config) {
 		this.config = checkNotNull(config);
+		localAddress = null;
 	}
 
 	void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws IOException {
@@ -128,6 +132,8 @@ class NettyServer {
 
 		bindFuture = bootstrap.bind().syncUninterruptibly();
 
+		localAddress = (InetSocketAddress) bindFuture.channel().localAddress();
+
 		long end = System.currentTimeMillis();
 		LOG.info("Successful initialization (took {} ms). Listening on SocketAddress {}.", (end - start), bindFuture.channel().localAddress().toString());
 	}
@@ -140,6 +146,10 @@ class NettyServer {
 		return bootstrap;
 	}
 
+	public InetSocketAddress getLocalAddress() {
+		return localAddress;
+	}
+
 	void shutdown() {
 		long start = System.currentTimeMillis();
 		if (bindFuture != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 7c109f3..7bcdd31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -92,7 +92,7 @@ public class ResultPartition implements BufferPoolOwner {
 	 * <p>If <code>true</code>, the consumers are deployed as soon as the
 	 * runtime result is registered at the result manager of the task manager.
 	 */
-	private final boolean eagerlyDeployConsumers;
+	private final boolean doEagerDeployment;
 
 	/** The subpartitions of this partition. At least one. */
 	private final ResultSubpartition[] subpartitions;
@@ -133,7 +133,7 @@ public class ResultPartition implements BufferPoolOwner {
 			JobID jobId,
 			ResultPartitionID partitionId,
 			ResultPartitionType partitionType,
-			boolean eagerlyDeployConsumers,
+			boolean doEagerDeployment,
 			int numberOfSubpartitions,
 			ResultPartitionManager partitionManager,
 			ResultPartitionConsumableNotifier partitionConsumableNotifier,
@@ -144,7 +144,7 @@ public class ResultPartition implements BufferPoolOwner {
 		this.jobId = checkNotNull(jobId);
 		this.partitionId = checkNotNull(partitionId);
 		this.partitionType = checkNotNull(partitionType);
-		this.eagerlyDeployConsumers = eagerlyDeployConsumers;
+		this.doEagerDeployment = doEagerDeployment;
 		this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
 		this.partitionManager = checkNotNull(partitionManager);
 		this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
@@ -211,16 +211,6 @@ public class ResultPartition implements BufferPoolOwner {
 		return subpartitions.length;
 	}
 
-	/**
-	 * Returns whether consumers should be deployed eagerly (as soon as they
-	 * are registered at the result manager of the task manager).
-	 *
-	 * @return Whether consumers should be deployed eagerly
-	 */
-	public boolean getEagerlyDeployConsumers() {
-		return eagerlyDeployConsumers;
-	}
-
 	public BufferProvider getBufferProvider() {
 		return bufferPool;
 	}
@@ -357,6 +347,15 @@ public class ResultPartition implements BufferPoolOwner {
 	}
 
 	/**
+	 * Deploys consumers if eager deployment is activated
+	 */
+	public void deployConsumers() {
+		if (doEagerDeployment) {
+			partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId);
+		}
+	}
+
+	/**
 	 * Releases buffers held by this result partition.
 	 *
 	 * <p> This is a callback from the buffer pool, which is registered for result partitions, which

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
index 9da3e14..6edae6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
@@ -58,6 +58,8 @@ public class ResultPartitionManager implements ResultPartitionProvider {
 				throw new IllegalStateException("Result partition already registered.");
 			}
 
+			partition.deployConsumers();
+
 			LOG.debug("Registered {}.", partition);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 5d82903..35094e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
-import scala.Tuple2;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -71,13 +70,14 @@ public abstract class InputChannel {
 			SingleInputGate inputGate,
 			int channelIndex,
 			ResultPartitionID partitionId,
-			Tuple2<Integer, Integer> initialAndMaxBackoff,
+			int initialBackoff,
+			int maxBackoff,
 			Counter numBytesIn) {
 
 		checkArgument(channelIndex >= 0);
 
-		int initial = initialAndMaxBackoff._1();
-		int max = initialAndMaxBackoff._2();
+		int initial = initialBackoff;
+		int max = maxBackoff;
 
 		checkArgument(initial >= 0 && initial <= max);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 6fcd2f9..a8aae7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.util.event.NotificationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Tuple2;
 
 import java.io.IOException;
 import java.util.Timer;
@@ -72,7 +71,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 			IOMetricGroup metrics) {
 
 		this(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher,
-				new Tuple2<Integer, Integer>(0, 0), metrics);
+				0, 0, metrics);
 	}
 
 	LocalInputChannel(
@@ -81,10 +80,11 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 			ResultPartitionID partitionId,
 			ResultPartitionManager partitionManager,
 			TaskEventDispatcher taskEventDispatcher,
-			Tuple2<Integer, Integer> initialAndMaxBackoff,
+			int initialBackoff,
+			int maxBackoff,
 			IOMetricGroup metrics) {
 
-		super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInLocalCounter());
+		super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter());
 
 		this.partitionManager = checkNotNull(partitionManager);
 		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 1cd042c..a12d2a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Tuple2;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -85,7 +84,7 @@ public class RemoteInputChannel extends InputChannel {
 			IOMetricGroup metrics) {
 
 		this(inputGate, channelIndex, partitionId, connectionId, connectionManager,
-				new Tuple2<Integer, Integer>(0, 0), metrics);
+				0, 0, metrics);
 	}
 
 	public RemoteInputChannel(
@@ -94,10 +93,11 @@ public class RemoteInputChannel extends InputChannel {
 			ResultPartitionID partitionId,
 			ConnectionID connectionId,
 			ConnectionManager connectionManager,
-			Tuple2<Integer, Integer> initialAndMaxBackoff,
+			int initialBackoff,
+			int maxBackoff,
 			IOMetricGroup metrics) {
 
-		super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInRemoteCounter());
+		super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInRemoteCounter());
 
 		this.connectionId = checkNotNull(connectionId);
 		this.connectionManager = checkNotNull(connectionManager);

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 351181a..aaf8887 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -505,6 +505,7 @@ public class SingleInputGate implements InputGate {
 			ExecutionAttemptID executionId,
 			InputGateDeploymentDescriptor igdd,
 			NetworkEnvironment networkEnvironment,
+			PartitionStateChecker partitionStateChecker,
 			IOMetricGroup metrics) {
 
 		final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
@@ -516,7 +517,7 @@ public class SingleInputGate implements InputGate {
 
 		final SingleInputGate inputGate = new SingleInputGate(
 				owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
-				icdd.length, networkEnvironment.getPartitionStateChecker(), metrics);
+				icdd.length, partitionStateChecker, metrics);
 
 		// Create the input channels. There is one input channel for each consumed partition.
 		final InputChannel[] inputChannels = new InputChannel[icdd.length];
@@ -528,27 +529,30 @@ public class SingleInputGate implements InputGate {
 
 			if (partitionLocation.isLocal()) {
 				inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId,
-						networkEnvironment.getPartitionManager(),
-						networkEnvironment.getTaskEventDispatcher(),
-						networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
-						metrics
+					networkEnvironment.getResultPartitionManager(),
+					networkEnvironment.getTaskEventDispatcher(),
+					networkEnvironment.getPartitionRequestInitialBackoff(),
+					networkEnvironment.getPartitionRequestMaxBackoff(),
+					metrics
 				);
 			}
 			else if (partitionLocation.isRemote()) {
 				inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
-						partitionLocation.getConnectionId(),
-						networkEnvironment.getConnectionManager(),
-						networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
-						metrics
+					partitionLocation.getConnectionId(),
+					networkEnvironment.getConnectionManager(),
+					networkEnvironment.getPartitionRequestInitialBackoff(),
+					networkEnvironment.getPartitionRequestInitialBackoff(),
+					metrics
 				);
 			}
 			else if (partitionLocation.isUnknown()) {
 				inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId,
-						networkEnvironment.getPartitionManager(),
-						networkEnvironment.getTaskEventDispatcher(),
-						networkEnvironment.getConnectionManager(),
-						networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
-						metrics
+					networkEnvironment.getResultPartitionManager(),
+					networkEnvironment.getTaskEventDispatcher(),
+					networkEnvironment.getConnectionManager(),
+					networkEnvironment.getPartitionRequestInitialBackoff(),
+					networkEnvironment.getPartitionRequestMaxBackoff(),
+					metrics
 				);
 			}
 			else {

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index cc91e83..27ecc70 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.io.network.api.reader.BufferReader;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import scala.Tuple2;
 
 import java.io.IOException;
 
@@ -46,7 +45,9 @@ public class UnknownInputChannel extends InputChannel {
 	private final ConnectionManager connectionManager;
 
 	/** Initial and maximum backoff (in ms) after failed partition requests. */
-	private final Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff;
+	private final int initialBackoff;
+
+	private final int maxBackoff;
 
 	private final IOMetricGroup metrics;
 
@@ -57,16 +58,18 @@ public class UnknownInputChannel extends InputChannel {
 			ResultPartitionManager partitionManager,
 			TaskEventDispatcher taskEventDispatcher,
 			ConnectionManager connectionManager,
-			Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff,
+			int initialBackoff,
+			int maxBackoff,
 			IOMetricGroup metrics) {
 
-		super(gate, channelIndex, partitionId, partitionRequestInitialAndMaxBackoff, null);
+		super(gate, channelIndex, partitionId, initialBackoff, maxBackoff, null);
 
 		this.partitionManager = checkNotNull(partitionManager);
 		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
 		this.connectionManager = checkNotNull(connectionManager);
-		this.partitionRequestInitialAndMaxBackoff = checkNotNull(partitionRequestInitialAndMaxBackoff);
 		this.metrics = checkNotNull(metrics);
+		this.initialBackoff = initialBackoff;
+		this.maxBackoff = maxBackoff;
 	}
 
 	@Override
@@ -117,10 +120,10 @@ public class UnknownInputChannel extends InputChannel {
 	// ------------------------------------------------------------------------
 
 	public RemoteInputChannel toRemoteInputChannel(ConnectionID producerAddress) {
-		return new RemoteInputChannel(inputGate, channelIndex, partitionId, checkNotNull(producerAddress), connectionManager, partitionRequestInitialAndMaxBackoff, metrics);
+		return new RemoteInputChannel(inputGate, channelIndex, partitionId, checkNotNull(producerAddress), connectionManager, initialBackoff, maxBackoff, metrics);
 	}
 
 	public LocalInputChannel toLocalInputChannel() {
-		return new LocalInputChannel(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, partitionRequestInitialAndMaxBackoff, metrics);
+		return new LocalInputChannel(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, initialBackoff, maxBackoff, metrics);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index 5213fe9..f19c123 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -59,6 +59,13 @@ public class KvStateRegistry {
 	}
 
 	/**
+	 * Unregisters the listener with the registry
+	 */
+	public void unregisterListener() {
+		listener.set(null);
+	}
+
+	/**
 	 * Registers the KvState instance identified by the given 4-tuple of JobID,
 	 * JobVertexID, key group index, and registration name.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayJobManagerCommunicationFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayJobManagerCommunicationFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayJobManagerCommunicationFactory.java
new file mode 100644
index 0000000..4697c79
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayJobManagerCommunicationFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Factory implementation which generates {@link ActorGateway} based job manager communication
+ * components.
+ */
+public class ActorGatewayJobManagerCommunicationFactory implements JobManagerCommunicationFactory {
+	private final ExecutionContext executionContext;
+	private final ActorGateway jobManagerGateway;
+	private final ActorGateway taskManagerGateway;
+	private final FiniteDuration jobManagerMessageTimeout;
+
+	public ActorGatewayJobManagerCommunicationFactory(
+		ExecutionContext executionContext,
+		ActorGateway jobManagerGateway,
+		ActorGateway taskManagerGateway,
+		FiniteDuration jobManagerMessageTimeout) {
+
+		this.executionContext = Preconditions.checkNotNull(executionContext);
+		this.jobManagerGateway = Preconditions.checkNotNull(jobManagerGateway);
+		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
+		this.jobManagerMessageTimeout = Preconditions.checkNotNull(jobManagerMessageTimeout);
+	}
+
+	public PartitionStateChecker createPartitionStateChecker() {
+		return new ActorGatewayPartitionStateChecker(jobManagerGateway, taskManagerGateway);
+	}
+
+	public ResultPartitionConsumableNotifier createResultPartitionConsumableNotifier(Task owningTask) {
+		return new ActorGatewayResultPartitionConsumableNotifier(
+			executionContext,
+			jobManagerGateway,
+			owningTask,
+			jobManagerMessageTimeout);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
new file mode 100644
index 0000000..2d69938
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateMessage;
+import org.apache.flink.runtime.query.KvStateRegistryListener;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This implementation uses {@link ActorGateway} to forward key-value state notifications to the job
+ * manager. The notifications are wrapped in an actor message and send to the given actor gateway.
+ */
+public class ActorGatewayKvStateRegistryListener implements KvStateRegistryListener {
+
+	private ActorGateway jobManager;
+
+	private KvStateServerAddress kvStateServerAddress;
+
+	public ActorGatewayKvStateRegistryListener(
+		ActorGateway jobManager,
+		KvStateServerAddress kvStateServerAddress) {
+
+		this.jobManager = Preconditions.checkNotNull(jobManager, "JobManager");
+		this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress");
+	}
+
+	@Override
+	public void notifyKvStateRegistered(
+		JobID jobId,
+		JobVertexID jobVertexId,
+		int keyGroupIndex,
+		String registrationName,
+		KvStateID kvStateId) {
+
+		Object msg = new KvStateMessage.NotifyKvStateRegistered(
+			jobId,
+			jobVertexId,
+			keyGroupIndex,
+			registrationName,
+			kvStateId,
+			kvStateServerAddress);
+
+		jobManager.tell(msg);
+	}
+
+	@Override
+	public void notifyKvStateUnregistered(
+		JobID jobId,
+		JobVertexID jobVertexId,
+		int keyGroupIndex,
+		String registrationName) {
+
+		Object msg = new KvStateMessage.NotifyKvStateUnregistered(
+			jobId,
+			jobVertexId,
+			keyGroupIndex,
+			registrationName);
+
+		jobManager.tell(msg);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
new file mode 100644
index 0000000..e7c6690
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+
+/**
+ * This implementation uses {@link ActorGateway} to trigger the partition state check at the job
+ * manager.
+ */
+public class ActorGatewayPartitionStateChecker implements PartitionStateChecker {
+
+	private final ActorGateway jobManager;
+
+	private final ActorGateway taskManager;
+
+	public ActorGatewayPartitionStateChecker(ActorGateway jobManager, ActorGateway taskManager) {
+		this.jobManager = jobManager;
+		this.taskManager = taskManager;
+	}
+
+	@Override
+	public void triggerPartitionStateCheck(
+		JobID jobId,
+		ExecutionAttemptID executionAttemptID,
+		IntermediateDataSetID resultId,
+		ResultPartitionID partitionId) {
+
+		JobManagerMessages.RequestPartitionState msg = new JobManagerMessages.RequestPartitionState(
+			jobId,
+			partitionId,
+			executionAttemptID,
+			resultId);
+
+		jobManager.tell(msg, taskManager);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java
new file mode 100644
index 0000000..b91120b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.dispatch.OnFailure;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * This implementation uses {@link ActorGateway} to notify the job manager about consumable
+ * partitions.
+ */
+public class ActorGatewayResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ActorGatewayResultPartitionConsumableNotifier.class);
+
+	/**
+	 * {@link ExecutionContext} which is used for the failure handler of
+	 * {@link JobManagerMessages.ScheduleOrUpdateConsumers} messages.
+	 */
+	private final ExecutionContext executionContext;
+
+	private final ActorGateway jobManager;
+
+	private final Task owningTask;
+
+	private final FiniteDuration jobManagerMessageTimeout;
+
+	public ActorGatewayResultPartitionConsumableNotifier(
+		ExecutionContext executionContext,
+		ActorGateway jobManager,
+		Task owningTask,
+		FiniteDuration jobManagerMessageTimeout) {
+
+		this.executionContext = Preconditions.checkNotNull(executionContext);
+		this.jobManager = Preconditions.checkNotNull(jobManager);
+		this.owningTask = Preconditions.checkNotNull(owningTask);
+		this.jobManagerMessageTimeout = Preconditions.checkNotNull(jobManagerMessageTimeout);
+	}
+
+	@Override
+	public void notifyPartitionConsumable(JobID jobId, final ResultPartitionID partitionId) {
+
+		final JobManagerMessages.ScheduleOrUpdateConsumers msg = new JobManagerMessages.ScheduleOrUpdateConsumers(jobId, partitionId);
+
+		Future<Object> futureResponse = jobManager.ask(msg, jobManagerMessageTimeout);
+
+		futureResponse.onFailure(new OnFailure() {
+			@Override
+			public void onFailure(Throwable failure) {
+				LOG.error("Could not schedule or update consumers at the JobManager.", failure);
+
+				owningTask.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers", failure));
+			}
+		}, executionContext);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java
new file mode 100644
index 0000000..64cfcb1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+
+/**
+ * Factory to generate job manager specific communication components.
+ */
+public interface JobManagerCommunicationFactory {
+
+	/**
+	 * Creates a {@link PartitionStateChecker} which communicates with the associated job manager of
+	 * this instance.
+	 *
+	 * @return PartitionStateChecker which communicates with the associated job manager of this
+	 * 			instance
+	 */
+	PartitionStateChecker createPartitionStateChecker();
+
+	/**
+	 * Creates a {@link ResultPartitionConsumableNotifier} which communicates with the associated
+	 * job manager of this instance.
+	 *
+	 * @param owningTask Task which is associated with the ResultPartitionConsumableNotifier
+	 * @return ResultPartitionConsumableNotifier which communicates with the associated job manager
+	 * 			of this instance
+	 */
+	ResultPartitionConsumableNotifier createResultPartitionConsumableNotifier(Task owningTask);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 73601c4..d09e03c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -91,10 +93,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * with the JobManager.
  *
  * <p>The Flink operators (implemented as subclasses of
- * {@link org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable} have only data
- * readers, -writers, and certain event callbacks. The task connects those to the
- * network stack and actor messages, and tracks the state of the execution and
- * handles exceptions.
+ * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
+ * The task connects those to the network stack and actor messages, and tracks the state
+ * of the execution and handles exceptions.
  *
  * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
  * are the first attempt to execute the task, or a repeated attempt. All of that
@@ -247,6 +248,7 @@ public class Task implements Runnable {
 				MemoryManager memManager,
 				IOManager ioManager,
 				NetworkEnvironment networkEnvironment,
+				JobManagerCommunicationFactory jobManagerCommunicationFactory,
 				BroadcastVariableManager bcVarManager,
 				ActorGateway taskManagerActor,
 				ActorGateway jobManagerActor,
@@ -302,6 +304,9 @@ public class Task implements Runnable {
 		this.producedPartitions = new ResultPartition[partitions.size()];
 		this.writers = new ResultPartitionWriter[partitions.size()];
 
+		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier =
+			jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(this);
+
 		for (int i = 0; i < this.producedPartitions.length; i++) {
 			ResultPartitionDeploymentDescriptor desc = partitions.get(i);
 			ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId);
@@ -313,8 +318,8 @@ public class Task implements Runnable {
 					desc.getPartitionType(),
 					desc.getEagerlyDeployConsumers(),
 					desc.getNumberOfSubpartitions(),
-					networkEnvironment.getPartitionManager(),
-					networkEnvironment.getPartitionConsumableNotifier(),
+					networkEnvironment.getResultPartitionManager(),
+					resultPartitionConsumableNotifier,
 					ioManager,
 					networkEnvironment.getDefaultIOMode());
 
@@ -325,10 +330,17 @@ public class Task implements Runnable {
 		this.inputGates = new SingleInputGate[consumedPartitions.size()];
 		this.inputGatesById = new HashMap<IntermediateDataSetID, SingleInputGate>();
 
+		PartitionStateChecker partitionStateChecker = jobManagerCommunicationFactory.createPartitionStateChecker();
+
 		for (int i = 0; i < this.inputGates.length; i++) {
 			SingleInputGate gate = SingleInputGate.create(
-					taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment, 
-					metricGroup.getIOMetricGroup());
+				taskNameWithSubtaskAndId,
+				jobId,
+				executionId,
+				consumedPartitions.get(i),
+				networkEnvironment,
+				partitionStateChecker,
+				metricGroup.getIOMetricGroup());
 
 			this.inputGates[i] = gate;
 			inputGatesById.put(gate.getConsumedResultId(), gate);

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 407fa01..0c62c69 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1472,6 +1472,9 @@ class JobManager(
         currentJobs.get(msg.getJobId) match {
           case Some((graph, _)) =>
             try {
+              log.debug(s"Lookup key-value state for job ${msg.getJobId} with registration " +
+                         s"name ${msg.getRegistrationName}.")
+
               val registry = graph.getKvStateLocationRegistry
               val location = registry.getKvStateLocation(msg.getRegistrationName)
               if (location == null) {
@@ -1493,6 +1496,9 @@ class JobManager(
         currentJobs.get(msg.getJobId) match {
           case Some((graph, _)) =>
             try {
+              log.debug(s"Key value state registered for job ${msg.getJobId} under " +
+                         s"name ${msg.getRegistrationName}.")
+
               graph.getKvStateLocationRegistry.notifyKvStateRegistered(
                 msg.getJobVertexId,
                 msg.getKeyGroupIndex,

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 0788d7c..893eaa8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -31,4 +31,5 @@ case class NetworkEnvironmentConfiguration(
   queryServerNetworkThreads: Int,
   queryServerQueryThreads: Int,
   nettyConfig: Option[NettyConfig] = None,
-  partitionRequestInitialAndMaxBackoff: (Integer, Integer) = (500, 3000))
+  partitionRequestInitialBackoff: Int = 500,
+  partitinRequestMaxBackoff: Int = 3000)

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 72ec2ac..3154826 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -53,8 +53,10 @@ import org.apache.flink.runtime.filecache.FileCache
 import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, InstanceConnectionInfo, InstanceID}
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
-import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.io.network.netty.NettyConfig
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool
+import org.apache.flink.runtime.io.network.{LocalConnectionManager, NetworkEnvironment, TaskEventDispatcher}
+import org.apache.flink.runtime.io.network.netty.{NettyConfig, NettyConnectionManager}
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
 import org.apache.flink.runtime.memory.MemoryManager
@@ -67,6 +69,8 @@ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage,
 import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
+import org.apache.flink.runtime.query.KvStateRegistry
+import org.apache.flink.runtime.query.netty.{DisabledKvStateRequestStats, KvStateServer}
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.util._
@@ -76,7 +80,6 @@ import org.apache.flink.util.{MathUtils, NetUtils}
 import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
-import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
 import scala.util.{Failure, Success}
 
@@ -193,6 +196,8 @@ class TaskManager(
   private var scheduledTaskManagerRegistration: Option[Cancellable] = None
   private var currentRegistrationRun: UUID = UUID.randomUUID()
 
+  private var jobManagerConnectionFactory: Option[JobManagerCommunicationFactory] = None
+
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
   // --------------------------------------------------------------------------
@@ -395,13 +400,11 @@ class TaskManager(
         // discards intermediate result partitions of a task execution on this TaskManager
         case FailIntermediateResultPartitions(executionID) =>
           log.info("Discarding the results produced by task execution " + executionID)
-          if (network.isAssociated) {
-            try {
-              network.getPartitionManager.releasePartitionsProducedBy(executionID)
-            } catch {
-              case t: Throwable => killTaskManagerFatal(
-              "Fatal leak: Unable to release intermediate result partition data", t)
-            }
+          try {
+            network.getResultPartitionManager.releasePartitionsProducedBy(executionID)
+          } catch {
+            case t: Throwable => killTaskManagerFatal(
+            "Fatal leak: Unable to release intermediate result partition data", t)
           }
 
         // notifies the TaskManager that the state of a task has changed.
@@ -916,25 +919,33 @@ class TaskManager(
       "starting network stack and library cache.")
 
     // sanity check that the JobManager dependent components are not set up currently
-    if (network.isAssociated || blobService.isDefined) {
+    if (jobManagerConnectionFactory.isDefined || blobService.isDefined) {
       throw new IllegalStateException("JobManager-specific components are already initialized.")
     }
 
     currentJobManager = Some(jobManager)
     instanceID = id
 
-    // start the network stack, now that we have the JobManager actor reference
-    try {
-      network.associateWithTaskManagerAndJobManager(
-        new AkkaActorGateway(jobManager, leaderSessionID.orNull),
-        new AkkaActorGateway(self, leaderSessionID.orNull)
-      )
-    }
-    catch {
-      case e: Exception =>
-        val message = "Could not start network environment."
-        log.error(message, e)
-        throw new RuntimeException(message, e)
+    val jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID.orNull)
+    val taskmanagerGateway = new AkkaActorGateway(self, leaderSessionID.orNull)
+
+    jobManagerConnectionFactory = Some(
+      new ActorGatewayJobManagerCommunicationFactory(
+        context.dispatcher,
+        jobManagerGateway,
+        taskmanagerGateway,
+        config.timeout))
+
+
+    val kvStateServer = network.getKvStateServer()
+
+    if (kvStateServer != null) {
+      val kvStateRegistry = network.getKvStateRegistry()
+
+      kvStateRegistry.registerListener(
+        new ActorGatewayKvStateRegistryListener(
+          jobManagerGateway,
+          kvStateServer.getAddress))
     }
 
     // start a blob service, if a blob server is specified
@@ -1031,8 +1042,12 @@ class TaskManager(
     }
     blobService = None
 
-    // disassociate the network environment
-    network.disassociate()
+    // disassociate the slot environment
+    jobManagerConnectionFactory = None
+
+    if (network.getKvStateRegistry != null) {
+      network.getKvStateRegistry.unregisterListener()
+    }
     
     // stop the metrics reporters
     metricsRegistry.shutdown()
@@ -1092,6 +1107,13 @@ class TaskManager(
         case None => throw new IllegalStateException("There is no valid library cache manager.")
       }
 
+      val jmFactory = jobManagerConnectionFactory match {
+        case Some(factory) => factory
+        case None =>
+          throw new IllegalStateException("TaskManager is not associated with a JobManager and, " +
+                                            "thus, the SlotEnvironment has not been initialized.")
+      }
+
       val slot = tdd.getTargetSlotNumber
       if (slot < 0 || slot >= numberOfSlots) {
         throw new IllegalArgumentException(s"Target slot $slot does not exist on TaskManager.")
@@ -1117,6 +1139,7 @@ class TaskManager(
         memoryManager,
         ioManager,
         network,
+        jmFactory,
         bcVarManager,
         selfGateway,
         jobManagerGateway,
@@ -1796,7 +1819,7 @@ object TaskManager {
 
     val (taskManagerConfig : TaskManagerConfiguration,      
       netConfig: NetworkEnvironmentConfiguration,
-      connectionInfo: InstanceConnectionInfo,
+      taskManagerAddress: InetSocketAddress,
       memType: MemoryType
     ) = parseTaskManagerConfiguration(
       configuration,
@@ -1806,14 +1829,64 @@ object TaskManager {
     // pre-start checks
     checkTempDirs(taskManagerConfig.tmpDirPaths)
 
-    val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
+    val networkBufferPool = new NetworkBufferPool(
+      netConfig.numNetworkBuffers,
+      netConfig.networkBufferSize,
+      netConfig.memoryType)
+
+    val connectionManager = netConfig.nettyConfig match {
+      case Some(nettyConfig) => new NettyConnectionManager(nettyConfig)
+      case None => new LocalConnectionManager()
+    }
+
+    val resultPartitionManager = new ResultPartitionManager()
+    val taskEventDispatcher = new TaskEventDispatcher()
+
+    val kvStateRegistry = new KvStateRegistry()
+
+    val kvStateServer = netConfig.nettyConfig match {
+      case Some(nettyConfig) =>
+
+        val numNetworkThreads = if (netConfig.queryServerNetworkThreads == 0) {
+          nettyConfig.getNumberOfSlots
+        } else {
+          netConfig.queryServerNetworkThreads
+        }
+
+        val numQueryThreads = if (netConfig.queryServerQueryThreads == 0) {
+          nettyConfig.getNumberOfSlots
+        } else {
+          netConfig.queryServerQueryThreads
+        }
+
+        new KvStateServer(
+          taskManagerAddress.getAddress(),
+          netConfig.queryServerPort,
+          numNetworkThreads,
+          numQueryThreads,
+          kvStateRegistry,
+          new DisabledKvStateRequestStats())
+
+      case None => null
+    }
 
     // we start the network first, to make sure it can allocate its buffers first
     val network = new NetworkEnvironment(
-      executionContext,
-      taskManagerConfig.timeout,
-      netConfig,
-      connectionInfo)
+      networkBufferPool,
+      connectionManager,
+      resultPartitionManager,
+      taskEventDispatcher,
+      kvStateRegistry,
+      kvStateServer,
+      netConfig.ioMode,
+      netConfig.partitionRequestInitialBackoff,
+      netConfig.partitinRequestMaxBackoff)
+
+    network.start()
+
+    val connectionInfo = new InstanceConnectionInfo(
+      taskManagerAddress.getAddress(),
+      network.getConnectionManager().getDataPort())
 
     // computing the amount of memory to use depends on how much memory is available
     // it strictly needs to happen AFTER the network stack has been initialized
@@ -1991,7 +2064,7 @@ object TaskManager {
       localTaskManagerCommunication: Boolean)
     : (TaskManagerConfiguration,
      NetworkEnvironmentConfiguration,
-     InstanceConnectionInfo,
+     InetSocketAddress,
      MemoryType) = {
 
     // ------- read values from the config and check them ---------
@@ -2000,16 +2073,13 @@ object TaskManager {
     // ----> hosts / ports for communication and data exchange
 
     val dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) match {
-      case 0 => NetUtils.getAvailablePort()
-      case x => x
-    }
+      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
 
-    checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+    checkConfigParameter(dataport >= 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
       "Leave config parameter empty or use 0 to let the system choose a port automatically.")
 
     val taskManagerAddress = InetAddress.getByName(taskManagerHostname)
-    val connectionInfo = new InstanceConnectionInfo(taskManagerAddress, dataport)
+    val taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport)
 
     // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories
 
@@ -2076,8 +2146,8 @@ object TaskManager {
     } else {
       Some(
         new NettyConfig(
-          connectionInfo.address(),
-          connectionInfo.dataPort(),
+          taskManagerInetSocketAddress.getAddress(),
+          taskManagerInetSocketAddress.getPort(),
           pageSize,
           slots,
           configuration)
@@ -2206,7 +2276,7 @@ object TaskManager {
       maxRegistrationPause,
       refusedRegistrationPause)
 
-    (taskManagerConfig, networkConfig, connectionInfo, memType)
+    (taskManagerConfig, networkConfig, taskManagerInetSocketAddress, memType)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
index b41db31..a6963fe 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
@@ -108,7 +108,7 @@ trait TestingTaskManagerLike extends FlinkActor {
       )
 
     case RequestNumActiveConnections =>
-      val numActive = if (network.isAssociated) {
+      val numActive = if (!network.isShutdown) {
         network.getConnectionManager.getNumberOfActiveConnections
       } else {
         0