You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/30 20:07:23 UTC

flink git commit: [FLINK-1801] [FLINK-1465] Network environment can start prior to TaskManager in "disassociated" mode.

Repository: flink
Updated Branches:
  refs/heads/master c89c657ae -> ee273dbe0


[FLINK-1801] [FLINK-1465] Network environment can start prior to TaskManager in "disassociated" mode.

NetworkEnvironment allocates heavy network buffer pool on startup and supports
multiple associations / disassociations with the TaskManager actor.

Fix negative memory report by replacing overflowing ints with longs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee273dbe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee273dbe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee273dbe

Branch: refs/heads/master
Commit: ee273dbe01e95d2b260fa690e21e2c244a2a5711
Parents: c89c657
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 30 18:32:55 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 30 18:32:55 2015 +0200

----------------------------------------------------------------------
 .../runtime/io/network/ConnectionManager.java   |   4 +-
 .../io/network/LocalConnectionManager.java      |  11 +-
 .../runtime/io/network/NetworkEnvironment.java  | 396 ++++++++++++-------
 .../runtime/io/network/TaskEventDispatcher.java |   6 +
 .../io/network/buffer/NetworkBufferPool.java    |  62 ++-
 .../flink/runtime/taskmanager/TaskManager.scala |  24 +-
 .../io/network/NetworkEnvironmentTest.java      | 124 ++++++
 .../io/network/buffer/LocalBufferPoolTest.java  |   1 +
 .../network/buffer/NetworkBufferPoolTest.java   | 134 +++++++
 9 files changed, 576 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index 06dc151..2f535fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -30,7 +30,9 @@ import java.io.IOException;
  */
 public interface ConnectionManager {
 
-	void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) throws IOException;
+	void start(ResultPartitionProvider partitionProvider,
+				TaskEventDispatcher taskEventDispatcher,
+				NetworkBufferPool networkbufferPool) throws IOException;
 
 	/**
 	 * Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}.

http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index af6273e..410f8ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -22,8 +22,6 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
-import java.io.IOException;
-
 /**
  * A connection manager implementation to bypass setup overhead for task managers running in local
  * execution mode.
@@ -31,11 +29,13 @@ import java.io.IOException;
 public class LocalConnectionManager implements ConnectionManager {
 
 	@Override
-	public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) throws IOException {
+	public void start(ResultPartitionProvider partitionProvider,
+						TaskEventDispatcher taskEventDispatcher,
+						NetworkBufferPool networkbufferPool) {
 	}
 
 	@Override
-	public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException {
+	public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {
 		return null;
 	}
 
@@ -48,6 +48,5 @@ public class LocalConnectionManager implements ConnectionManager {
 	}
 
 	@Override
-	public void shutdown() throws IOException {
-	}
+	public void shutdown() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index e5dc8a2..faba252 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -52,207 +52,304 @@ import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpd
 import static org.apache.flink.runtime.messages.TaskManagerMessages.FailTask;
 
 /**
- * Network I/O components of each {@link TaskManager} instance.
+ * Network I/O components of each {@link TaskManager} instance. The network environment contains
+ * the data structures that keep track of all intermediate results and all data exchanges.
+ *
+ * When initialized, the NetworkEnvironment will allocate the network buffer pool.
+ * All other components (netty, intermediate result managers, ...) are only created once the
+ * environment is "associated" with a TaskManager and JobManager. This happens as soon as the
+ * TaskManager actor gets created and registers itself at the JobManager.
  */
 public class NetworkEnvironment {
 
 	private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
 
-	private final ActorRef taskManager;
+	private final Object lock = new Object();
 
-	private final ActorRef jobManager;
+	private final NetworkEnvironmentConfiguration configuration;
 
 	private final FiniteDuration jobManagerTimeout;
 
-	private final ResultPartitionManager partitionManager;
-
-	private final TaskEventDispatcher taskEventDispatcher;
-
 	private final NetworkBufferPool networkBufferPool;
 
-	private final ConnectionManager connectionManager;
+	private ConnectionManager connectionManager;
 
-	private final ResultPartitionConsumableNotifier partitionConsumableNotifier;
+	private ResultPartitionManager partitionManager;
 
-	private final NetworkEnvironmentConfiguration configuration;
+	private TaskEventDispatcher taskEventDispatcher;
+
+	private ResultPartitionConsumableNotifier partitionConsumableNotifier;
 
 	private boolean isShutdown;
 
+
 	/**
 	 * Initializes all network I/O components.
 	 */
-	public NetworkEnvironment(
-			ActorRef taskManager,
-			ActorRef jobManager,
-			FiniteDuration jobManagerTimeout,
-			NetworkEnvironmentConfiguration config) throws IOException {
-
-		this.taskManager = checkNotNull(taskManager);
-		this.jobManager = checkNotNull(jobManager);
-		this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
+	public NetworkEnvironment(FiniteDuration jobManagerTimeout,
+								NetworkEnvironmentConfiguration config) throws IOException {
 
-		this.partitionManager = new ResultPartitionManager();
-		this.taskEventDispatcher = new TaskEventDispatcher();
 		this.configuration = checkNotNull(config);
+		this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
 
-		// --------------------------------------------------------------------
-		// Network buffers
-		// --------------------------------------------------------------------
+		// create the network buffers - this is the operation most likely to fail upon
+		// mis-configuration, so we do this first
 		try {
 			networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
 		}
 		catch (Throwable t) {
-			throw new IOException("Failed to instantiate network buffer pool: " + t.getMessage(), t);
+			throw new IOException("Cannot allocate network buffer pool: " + t.getMessage(), t);
 		}
+	}
 
-		// --------------------------------------------------------------------
-		// Network connections
-		// --------------------------------------------------------------------
-		final Option<NettyConfig> nettyConfig = config.nettyConfig();
+	// --------------------------------------------------------------------------------------------
+	//  Properties
+	// --------------------------------------------------------------------------------------------
 
-		connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get()) : new LocalConnectionManager();
+	public ResultPartitionManager getPartitionManager() {
+		return partitionManager;
+	}
 
-		try {
-			connectionManager.start(partitionManager, taskEventDispatcher, networkBufferPool);
-		}
-		catch (Throwable t) {
-			throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
-		}
+	public TaskEventDispatcher getTaskEventDispatcher() {
+		return taskEventDispatcher;
+	}
 
-		this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(this);
+	public ConnectionManager getConnectionManager() {
+		return connectionManager;
 	}
 
-	public ActorRef getTaskManager() {
-		return taskManager;
+	public NetworkBufferPool getNetworkBufferPool() {
+		return networkBufferPool;
 	}
 
-	public ActorRef getJobManager() {
-		return jobManager;
+	public IOMode getDefaultIOMode() {
+		return configuration.ioMode();
 	}
 
-	public Timeout getJobManagerTimeout() {
-		return new Timeout(jobManagerTimeout);
+	public ResultPartitionConsumableNotifier getPartitionConsumableNotifier() {
+		return partitionConsumableNotifier;
 	}
 
-	public void registerTask(Task task) throws IOException {
-		final ResultPartition[] producedPartitions = task.getProducedPartitions();
-		final ResultPartitionWriter[] writers = task.getWriters();
+	// --------------------------------------------------------------------------------------------
+	//  Association / Disassociation with JobManager / TaskManager
+	// --------------------------------------------------------------------------------------------
 
-		if (writers.length != producedPartitions.length) {
-			throw new IllegalStateException("Unequal number of writers and partitions.");
-		}
+	public boolean isAssociated() {
+		return partitionConsumableNotifier != null;
+	}
 
-		for (int i = 0; i < producedPartitions.length; i++) {
-			final ResultPartition partition = producedPartitions[i];
-			final ResultPartitionWriter writer = writers[i];
+	/**
+	 * This associates the network environment with a TaskManager and JobManager.
+	 * This will actually start the network components.
+	 *
+	 * @param jobManagerRef The JobManager actor reference.
+	 * @param taskManagerRef The TaskManager actor reference.
+	 *
+	 * @throws IOException Thrown if the network subsystem (Netty) cannot be properly started.
+	 */
+	public void associateWithTaskManagerAndJobManager(ActorRef jobManagerRef, ActorRef taskManagerRef)
+			throws IOException
+	{
+		checkNotNull(jobManagerRef);
+		checkNotNull(taskManagerRef);
+
+		synchronized (lock) {
+			if (isShutdown) {
+				throw new IllegalStateException("environment is shut down");
+			}
 
-			// Buffer pool for the partition
-			BufferPool bufferPool = null;
+			if (this.partitionConsumableNotifier == null &&
+				this.partitionManager == null &&
+				this.taskEventDispatcher == null &&
+				this.connectionManager == null)
+			{
+				// good, not currently associated. start the individual components
 
-			try {
-				bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false);
-				partition.registerBufferPool(bufferPool);
+				this.partitionManager = new ResultPartitionManager();
+				this.taskEventDispatcher = new TaskEventDispatcher();
+				this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(
+													jobManagerRef, taskManagerRef, new Timeout(jobManagerTimeout));
 
-				partitionManager.registerResultPartition(partition);
-			}
-			catch (Throwable t) {
-				if (bufferPool != null) {
-					bufferPool.lazyDestroy();
-				}
+				// -----  Network connections  -----
+				final Option<NettyConfig> nettyConfig = configuration.nettyConfig();
+				connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get())
+															: new LocalConnectionManager();
 
-				if (t instanceof IOException) {
-					throw (IOException) t;
+				try {
+					connectionManager.start(partitionManager, taskEventDispatcher, networkBufferPool);
 				}
-				else {
-					throw new IOException(t.getMessage(), t);
+				catch (Throwable t) {
+					throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
 				}
 			}
-
-			// Register writer with task event dispatcher
-			taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer);
+			else {
+				throw new IllegalStateException(
+						"Network Environment is already associated with a JobManager/TaskManager");
+			}
 		}
+	}
 
-		// Setup the buffer pool for each buffer reader
-		final SingleInputGate[] inputGates = task.getInputGates();
+	public void disassociate() throws IOException {
+		synchronized (lock) {
+			if (!isAssociated()) {
+				return;
+			}
 
-		for (SingleInputGate gate : inputGates) {
-			BufferPool bufferPool = null;
+			LOG.debug("Disassociating NetworkEnvironment from TaskManager. Cleaning all intermediate results.");
 
-			try {
-				bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
-				gate.setBufferPool(bufferPool);
-			}
-			catch (Throwable t) {
-				if (bufferPool != null) {
-					bufferPool.lazyDestroy();
+			// terminate all network connections
+			if (connectionManager != null) {
+				try {
+					LOG.debug("Shutting down network connection manager");
+					connectionManager.shutdown();
+					connectionManager = null;
+				}
+				catch (Throwable t) {
+					throw new IOException("Cannot shutdown network connection manager", t);
 				}
+			}
 
-				if (t instanceof IOException) {
-					throw (IOException) t;
+			// shutdown all intermediate results
+			if (partitionManager != null) {
+				try {
+					LOG.debug("Shutting down intermediate result partition manager");
+					partitionManager.shutdown();
+					partitionManager = null;
 				}
-				else {
-					throw new IOException(t.getMessage(), t);
+				catch (Throwable t) {
+					throw new IOException("Cannot shutdown partition manager", t);
 				}
 			}
+
+			partitionConsumableNotifier = null;
+
+			if (taskEventDispatcher != null) {
+				taskEventDispatcher.clearAll();
+				taskEventDispatcher = null;
+			}
+
+			// make sure that the global buffer pool re-acquires all buffers
+			networkBufferPool.destroyAllBufferPools();
 		}
 	}
 
-	public void unregisterTask(Task task) {
-		LOG.debug("Unregistering task {} ({}) from network environment (state: {}).",
-				task.getTaskNameWithSubtasks(), task.getExecutionState());
 
-		final ExecutionAttemptID executionId = task.getExecutionId();
 
-		if (task.isCanceledOrFailed()) {
-			partitionManager.releasePartitionsProducedBy(executionId);
+	// --------------------------------------------------------------------------------------------
+	//  Task operations
+	// --------------------------------------------------------------------------------------------
+
+	public void registerTask(Task task) throws IOException {
+		final ResultPartition[] producedPartitions = task.getProducedPartitions();
+		final ResultPartitionWriter[] writers = task.getWriters();
+
+		if (writers.length != producedPartitions.length) {
+			throw new IllegalStateException("Unequal number of writers and partitions.");
 		}
 
-		ResultPartitionWriter[] writers = task.getWriters();
+		synchronized (lock) {
+			if (isShutdown) {
+				throw new IllegalStateException("NetworkEnvironment is shut down");
+			}
+			if (!isAssociated()) {
+				throw new IllegalStateException("NetworkEnvironment is not associated with a TaskManager");
+			}
+
+			for (int i = 0; i < producedPartitions.length; i++) {
+				final ResultPartition partition = producedPartitions[i];
+				final ResultPartitionWriter writer = writers[i];
 
-		if (writers != null) {
-			for (ResultPartitionWriter writer : task.getWriters()) {
-				taskEventDispatcher.unregisterWriter(writer);
+				// Buffer pool for the partition
+				BufferPool bufferPool = null;
+
+				try {
+					bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false);
+					partition.registerBufferPool(bufferPool);
+
+					partitionManager.registerResultPartition(partition);
+				}
+				catch (Throwable t) {
+					if (bufferPool != null) {
+						bufferPool.lazyDestroy();
+					}
+
+					if (t instanceof IOException) {
+						throw (IOException) t;
+					}
+					else {
+						throw new IOException(t.getMessage(), t);
+					}
+				}
+
+				// Register writer with task event dispatcher
+				taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer);
 			}
-		}
 
-		final SingleInputGate[] inputGates = task.getInputGates();
+			// Setup the buffer pool for each buffer reader
+			final SingleInputGate[] inputGates = task.getInputGates();
 
-		if (inputGates != null) {
 			for (SingleInputGate gate : inputGates) {
+				BufferPool bufferPool = null;
+
 				try {
-					if (gate != null) {
-						gate.releaseAllResources();
-					}
+					bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
+					gate.setBufferPool(bufferPool);
 				}
-				catch (IOException e) {
-					LOG.error("Error during release of reader resources: " + e.getMessage(), e);
+				catch (Throwable t) {
+					if (bufferPool != null) {
+						bufferPool.lazyDestroy();
+					}
+
+					if (t instanceof IOException) {
+						throw (IOException) t;
+					}
+					else {
+						throw new IOException(t.getMessage(), t);
+					}
 				}
 			}
 		}
 	}
 
-	public ResultPartitionManager getPartitionManager() {
-		return partitionManager;
-	}
+	public void unregisterTask(Task task) {
+		LOG.debug("Unregistering task {} ({}) from network environment (state: {}).",
+				task.getTaskNameWithSubtasks(), task.getExecutionState());
 
-	public TaskEventDispatcher getTaskEventDispatcher() {
-		return taskEventDispatcher;
-	}
+		final ExecutionAttemptID executionId = task.getExecutionId();
 
-	public ConnectionManager getConnectionManager() {
-		return connectionManager;
-	}
+		synchronized (lock) {
+			if (isShutdown || !isAssociated()) {
+				// no need to do anything when we are not operational
+				return;
+			}
 
-	public NetworkBufferPool getNetworkBufferPool() {
-		return networkBufferPool;
-	}
+			if (task.isCanceledOrFailed()) {
+				partitionManager.releasePartitionsProducedBy(executionId);
+			}
 
-	public IOMode getDefaultIOMode() {
-		return configuration.ioMode();
-	}
+			ResultPartitionWriter[] writers = task.getWriters();
 
-	public ResultPartitionConsumableNotifier getPartitionConsumableNotifier() {
-		return partitionConsumableNotifier;
+			if (writers != null) {
+				for (ResultPartitionWriter writer : task.getWriters()) {
+					taskEventDispatcher.unregisterWriter(writer);
+				}
+			}
+
+			final SingleInputGate[] inputGates = task.getInputGates();
+
+			if (inputGates != null) {
+				for (SingleInputGate gate : inputGates) {
+					try {
+						if (gate != null) {
+							gate.releaseAllResources();
+						}
+					}
+					catch (IOException e) {
+						LOG.error("Error during release of reader resources: " + e.getMessage(), e);
+					}
+				}
+			}
+		}
 	}
 
 	public boolean hasReleasedAllResources() {
@@ -281,32 +378,25 @@ public class NetworkEnvironment {
 	 * Tries to shut down all network I/O components.
 	 */
 	public void shutdown() {
-		if (!isShutdown) {
+		synchronized (lock) {
+			if (isShutdown) {
+				return;
+			}
+
+			// shut down all connections and free all intermediate result partitions
 			try {
-				if (networkBufferPool != null) {
-					networkBufferPool.destroy();
-				}
+				disassociate();
 			}
 			catch (Throwable t) {
-				LOG.warn("Network buffer pool did not shut down properly: " + t.getMessage(), t);
-			}
-
-			if (partitionManager != null) {
-				try {
-					partitionManager.shutdown();
-				}
-				catch (Throwable t) {
-					LOG.warn("Partition manager did not shut down properly: " + t.getMessage(), t);
-				}
+				LOG.warn("Network services did not shut down properly: " + t.getMessage(), t);
 			}
 
+			// destroy the buffer pool
 			try {
-				if (connectionManager != null) {
-					connectionManager.shutdown();
-				}
+				networkBufferPool.destroy();
 			}
 			catch (Throwable t) {
-				LOG.warn("Network connection manager did not shut down properly: " + t.getMessage(), t);
+				LOG.warn("Network buffer pool did not shut down properly: " + t.getMessage(), t);
 			}
 
 			isShutdown = true;
@@ -320,13 +410,20 @@ public class NetworkEnvironment {
 	/**
 	 * Notifies the job manager about consumable partitions.
 	 */
-	private static class JobManagerResultPartitionConsumableNotifier
-			implements ResultPartitionConsumableNotifier {
+	private static class JobManagerResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+
+		private final ActorRef jobManager;
+
+		private final ActorRef taskManager;
 
-		private final NetworkEnvironment networkEnvironment;
+		private final Timeout jobManagerMessageTimeout;
 
-		public JobManagerResultPartitionConsumableNotifier(NetworkEnvironment networkEnvironment) {
-			this.networkEnvironment = networkEnvironment;
+		public JobManagerResultPartitionConsumableNotifier(ActorRef jobManager,
+															ActorRef taskManager,
+															Timeout jobManagerMessageTimeout) {
+			this.jobManager = jobManager;
+			this.taskManager = taskManager;
+			this.jobManagerMessageTimeout = jobManagerMessageTimeout;
 		}
 
 		@Override
@@ -334,23 +431,20 @@ public class NetworkEnvironment {
 
 			final ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId, partitionId);
 
-			Future<Object> futureResponse = Patterns.ask(
-					networkEnvironment.getJobManager(),
-					msg,
-					networkEnvironment.getJobManagerTimeout());
+			Future<Object> futureResponse = Patterns.ask(jobManager, msg, jobManagerMessageTimeout);
 
 			futureResponse.onFailure(new OnFailure() {
 				@Override
-				public void onFailure(Throwable failure) throws Throwable {
+				public void onFailure(Throwable failure) {
 					LOG.error("Could not schedule or update consumers at the JobManager.", failure);
 
 					// Fail task at the TaskManager
 					FailTask failMsg = new FailTask(
 							partitionId.getProducerId(),
-							new RuntimeException("Could not schedule or update consumers at " +
-									"the JobManager.", failure));
+							new RuntimeException("Could not notify JobManager to schedule or update consumers",
+									failure));
 
-					networkEnvironment.getTaskManager().tell(failMsg, ActorRef.noSender());
+					taskManager.tell(failMsg, ActorRef.noSender());
 				}
 			}, AkkaUtils.globalExecutionContext());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
index 845f72a..82793e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
@@ -70,6 +70,12 @@ public class TaskEventDispatcher {
 		return false;
 	}
 
+	public void clearAll() {
+		synchronized (registeredWriters) {
+			registeredWriters.clear();
+		}
+	}
+
 	/**
 	 * Returns the number of currently registered writers.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index c1e62da..cb1f118 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -30,9 +30,12 @@ import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 
 /**
- * A fixed size pool of {@link MemorySegment} instances for the network stack.
- * <p>
- * This class is thread-safe.
+ * The NetworkBufferPool is a fixed size pool of {@link MemorySegment} instances
+ * for the network stack.
+ *
+ * The NetworkBufferPool creates {@link LocalBufferPool}s from which the individual tasks draw
+ * the buffers for the network data transfer. When new local buffer pools are created, the
+ * NetworkBufferPool dynamically redistributes the buffers between the pools.
  */
 public class NetworkBufferPool implements BufferPoolFactory {
 
@@ -62,7 +65,15 @@ public class NetworkBufferPool implements BufferPoolFactory {
 	public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
 		this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
 		this.memorySegmentSize = segmentSize;
-		this.availableMemorySegments = new ArrayBlockingQueue<MemorySegment>(numberOfSegmentsToAllocate);
+
+		final long sizeInLong = (long) segmentSize;
+
+		try {
+			this.availableMemorySegments = new ArrayBlockingQueue<MemorySegment>(numberOfSegmentsToAllocate);
+		}
+		catch (OutOfMemoryError err) {
+			throw new OutOfMemoryError("Could not allocate buffer queue of length " + numberOfSegmentsToAllocate);
+		}
 
 		try {
 			for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
@@ -70,15 +81,22 @@ public class NetworkBufferPool implements BufferPoolFactory {
 			}
 		}
 		catch (OutOfMemoryError err) {
-			int requiredMb = (numberOfSegmentsToAllocate * segmentSize) >> 20;
-			int allocatedMb = ((availableMemorySegments.size()) * segmentSize) >> 20;
-			int missingMb = requiredMb - allocatedMb;
+			int allocated = availableMemorySegments.size();
+
+			// free some memory
+			availableMemorySegments.clear();
+
+			long requiredMb = (sizeInLong * numberOfSegmentsToAllocate) >> 20;
+			long allocatedMb = (sizeInLong * allocated) >> 20;
+			long missingMb = requiredMb - allocatedMb;
 
-			throw new OutOfMemoryError("Could not allocate enough memory segments for GlobalBufferPool (required (Mb): " +
-					requiredMb + ", allocated (Mb): " + allocatedMb + ", missing (Mb): " + missingMb + ").");
+			throw new OutOfMemoryError("Could not allocate enough memory segments for NetworkBufferPool " +
+					"(required (Mb): " + requiredMb +
+					", allocated (Mb): " + allocatedMb +
+					", missing (Mb): " + missingMb + ").");
 		}
 
-		int allocatedMb = ((availableMemorySegments.size()) * segmentSize) >> 20;
+		long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;
 
 		LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
 				allocatedMb, availableMemorySegments.size(), segmentSize);
@@ -186,6 +204,10 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 	@Override
 	public void destroyBufferPool(BufferPool bufferPool) {
+		if (!(bufferPool instanceof LocalBufferPool)) {
+			throw new IllegalArgumentException("bufferPool is no LocalBufferPool");
+		}
+
 		synchronized (factoryLock) {
 			if (allBufferPools.remove(bufferPool)) {
 				managedBufferPools.remove(bufferPool);
@@ -201,6 +223,26 @@ public class NetworkBufferPool implements BufferPoolFactory {
 		}
 	}
 
+	/**
+	 * Destroys all buffer pools that allocate their buffers from this
+	 * buffer pool (created via {@link #createBufferPool(int, boolean)}).
+	 */
+	public void destroyAllBufferPools() {
+		synchronized (factoryLock) {
+			// create a copy to avoid concurrent modification exceptions
+			LocalBufferPool[] poolsCopy = allBufferPools.toArray(new LocalBufferPool[allBufferPools.size()]);
+
+			for (LocalBufferPool pool : poolsCopy) {
+				pool.lazyDestroy();
+			}
+
+			// some sanity checks
+			if (allBufferPools.size() > 0 || managedBufferPools.size() > 0 || numTotalRequiredBuffers > 0) {
+				throw new IllegalStateException("NetworkBufferPool is not empty after destroying all LocalBufferPools");
+			}
+		}
+	}
+
 	// Must be called from synchronized block
 	private void redistributeBuffers() throws IOException {
 		int numManagedBufferPools = managedBufferPools.size();

http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index f552717..f558d48 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -319,10 +319,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
         _ ! Heartbeat(instanceID, report)
       }
 
-
-    case LogMemoryUsage =>
-      logMemoryStats()
-
     case SendStackTrace =>
       val traces = Thread.getAllStackTraces.asScala
       val stackTraceStr = traces.map((trace: (Thread, Array[StackTraceElement])) => {
@@ -668,7 +664,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
     }
 
     try {
-      networkEnvironment = Some(new NetworkEnvironment(self, jobManager, timeout, networkConfig))
+      val env: NetworkEnvironment = new NetworkEnvironment(timeout, networkConfig)
+      env.associateWithTaskManagerAndJobManager(jobManager, self)
+      networkEnvironment = Some(env)
     } catch {
       case ioe: IOException =>
         log.error(ioe, "Failed to instantiate network environment.")
@@ -726,7 +724,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
         libraryCacheManager foreach { _.unregisterTask(task.getJobID, executionID) }
 
         log.info("Updating FINAL execution state of {} ({}) to {}.", task.getTaskName,
-          task.getExecutionId, task.getExecutionState);
+          task.getExecutionId, task.getExecutionState)
 
         self ! UpdateTaskExecutionState(new TaskExecutionState(
           task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause))
@@ -746,8 +744,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
           fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
         }
       } catch {
-        case t: Throwable => log.error("Error cleaning up local files from the distributed cache" +
-          ".", t)
+        case t: Throwable => log.error(
+          "Error cleaning up local files from the distributed cache.", t)
       }
     }
 
@@ -761,16 +759,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
 
     task.unregisterMemoryManager(memoryManager)
   }
-
-  private def logMemoryStats(): Unit = {
-    if (log.isInfoEnabled) {
-      val memoryMXBean = ManagementFactory.getMemoryMXBean
-      val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans.asScala
-
-      log.info(TaskManager.getMemoryUsageStatsAsString(memoryMXBean))
-      log.info(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans))
-    }
-  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
new file mode 100644
index 0000000..bb95f4b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import static org.junit.Assert.*;
+
+import akka.actor.ActorRef;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.Some;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+public class NetworkEnvironmentTest {
+
+	@Test
+	public void testAssociateDisassociate() {
+		final int BUFFER_SIZE = 1024;
+		final int NUM_BUFFERS = 20;
+
+		final int port;
+		try {
+			port = NetUtils.getAvailablePort();
+		}
+		catch (Throwable t) {
+			// ignore
+			return;
+		}
+
+		try {
+			NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, new Configuration());
+			NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration(
+					NUM_BUFFERS, BUFFER_SIZE, IOManager.IOMode.SYNC, new Some<NettyConfig>(nettyConf));
+
+			NetworkEnvironment env = new NetworkEnvironment(new FiniteDuration(30, TimeUnit.SECONDS), config);
+
+			assertFalse(env.isShutdown());
+			assertFalse(env.isAssociated());
+
+			// pool must be started already
+			assertNotNull(env.getNetworkBufferPool());
+			assertEquals(NUM_BUFFERS, env.getNetworkBufferPool().getTotalNumberOfMemorySegments());
+
+			// others components are still shut down
+			assertNull(env.getConnectionManager());
+			assertNull(env.getPartitionConsumableNotifier());
+			assertNull(env.getTaskEventDispatcher());
+			assertNull(env.getPartitionManager());
+
+			// associate the environment with some mock actors
+			ActorRef jmActor = Mockito.mock(ActorRef.class);
+			ActorRef tmActor = Mockito.mock(ActorRef.class);
+			env.associateWithTaskManagerAndJobManager(jmActor, tmActor);
+
+			assertNotNull(env.getConnectionManager());
+			assertNotNull(env.getPartitionConsumableNotifier());
+			assertNotNull(env.getTaskEventDispatcher());
+			assertNotNull(env.getPartitionManager());
+
+			// allocate some buffer pool
+			BufferPool localPool = env.getNetworkBufferPool().createBufferPool(10, false);
+			assertNotNull(localPool);
+
+			// disassociate
+			env.disassociate();
+
+			assertNull(env.getConnectionManager());
+			assertNull(env.getPartitionConsumableNotifier());
+			assertNull(env.getTaskEventDispatcher());
+			assertNull(env.getPartitionManager());
+
+			assertNotNull(env.getNetworkBufferPool());
+			assertTrue(localPool.isDestroyed());
+
+			// associate once again
+			jmActor = Mockito.mock(ActorRef.class);
+			tmActor = Mockito.mock(ActorRef.class);
+			env.associateWithTaskManagerAndJobManager(jmActor, tmActor);
+
+			assertNotNull(env.getConnectionManager());
+			assertNotNull(env.getPartitionConsumableNotifier());
+			assertNotNull(env.getTaskEventDispatcher());
+			assertNotNull(env.getPartitionManager());
+
+			// shutdown for good
+			env.shutdown();
+
+			assertTrue(env.isShutdown());
+			assertFalse(env.isAssociated());
+			assertNull(env.getConnectionManager());
+			assertNull(env.getPartitionConsumableNotifier());
+			assertNull(env.getTaskEventDispatcher());
+			assertNull(env.getPartitionManager());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index c9a4b6d..9f04d94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -42,6 +42,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.spy;

http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
new file mode 100644
index 0000000..b24082e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class NetworkBufferPoolTest {
+
+	@Test
+	public void testCreatePoolAfterDestroy() {
+		try {
+			final int bufferSize = 128;
+			final int numBuffers = 10;
+
+			NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize);
+			assertEquals(bufferSize, globalPool.getMemorySegmentSize());
+			assertEquals(numBuffers, globalPool.getTotalNumberOfMemorySegments());
+			assertEquals(numBuffers, globalPool.getNumberOfAvailableMemorySegments());
+			assertEquals(0, globalPool.getNumberOfRegisteredBufferPools());
+
+			globalPool.destroy();
+
+			assertTrue(globalPool.isDestroyed());
+
+			try {
+				globalPool.createBufferPool(2, true);
+				fail("Should throw an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// yippie!
+			}
+
+			try {
+				globalPool.createBufferPool(2, false);
+				fail("Should throw an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// yippie!
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+
+	}
+	@Test
+	public void testDestroyAll() {
+		try {
+			NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
+
+			BufferPool fixedPool = globalPool.createBufferPool(2, true);
+			BufferPool nonFixedPool = globalPool.createBufferPool(5, false);
+
+			assertEquals(2, fixedPool.getNumberOfRequiredMemorySegments());
+			assertEquals(5, nonFixedPool.getNumberOfRequiredMemorySegments());
+
+			Buffer[] buffers = {
+					fixedPool.requestBuffer(),
+					fixedPool.requestBuffer(),
+
+					nonFixedPool.requestBuffer(),
+					nonFixedPool.requestBuffer(),
+					nonFixedPool.requestBuffer(),
+					nonFixedPool.requestBuffer(),
+					nonFixedPool.requestBuffer(),
+					nonFixedPool.requestBuffer(),
+					nonFixedPool.requestBuffer(),
+					nonFixedPool.requestBuffer()
+			};
+
+			for (Buffer b : buffers) {
+				assertNotNull(b);
+				assertNotNull(b.getMemorySegment());
+			}
+
+			assertNull(fixedPool.requestBuffer());
+			assertNull(nonFixedPool.requestBuffer());
+
+			// destroy all allocated ones
+			globalPool.destroyAllBufferPools();
+
+			// check the destroyed status
+			assertFalse(globalPool.isDestroyed());
+			assertTrue(fixedPool.isDestroyed());
+			assertTrue(nonFixedPool.isDestroyed());
+
+			assertEquals(0, globalPool.getNumberOfRegisteredBufferPools());
+
+			// buffers are not yet recycled
+			assertEquals(0, globalPool.getNumberOfAvailableMemorySegments());
+
+			// the recycled buffers should go to the global pool
+			for (Buffer b : buffers) {
+				b.recycle();
+			}
+			assertEquals(globalPool.getTotalNumberOfMemorySegments(), globalPool.getNumberOfAvailableMemorySegments());
+
+			// can request no more buffers
+			assertNull(fixedPool.requestBuffer());
+			assertNull(nonFixedPool.requestBuffer());
+
+			// can create a new pool now
+			assertNotNull(globalPool.createBufferPool(10, false));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}