You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/09/07 12:21:06 UTC

git commit: [FLINK-1063] Revert race introducing commit

Repository: incubator-flink
Updated Branches:
  refs/heads/FLINK-1086-revert_close [created] c0c2abda5


[FLINK-1063] Revert race introducing commit

This reverts "[FLINK-998] Close network connections when idle" [1] as
it introduced a race condition into the network stack, which might
result in a re-ordering of network envelopes.

[1] 52512636444902497e47ccbfb1cabaffb3e23343


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c0c2abda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c0c2abda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c0c2abda

Branch: refs/heads/FLINK-1086-revert_close
Commit: c0c2abda5eaaabc6291f765718d88dcbdc12d2a4
Parents: 66c1263
Author: uce <u....@fu-berlin.de>
Authored: Sat Sep 6 12:52:12 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Sun Sep 7 12:07:06 2014 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  20 +-
 .../io/network/LocalConnectionManager.java      |   9 +-
 .../io/network/NetworkConnectionManager.java    |   7 +-
 .../runtime/io/network/RemoteReceiver.java      |   2 +-
 .../network/netty/NettyConnectionManager.java   | 106 ++--
 .../network/netty/OutboundConnectionQueue.java  | 164 ++---
 .../flink/runtime/taskmanager/TaskManager.java  |  13 +-
 .../netty/NettyConnectionManagerTest.java       | 272 ++++-----
 .../netty/OutboundConnectionQueueTest.java      | 606 -------------------
 .../test/runtime/NetworkStackThroughput.java    |   1 -
 10 files changed, 210 insertions(+), 990 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 2983c63..ef5ee6e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -117,9 +117,14 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_NET_NUM_OUT_THREADS_KEY = "taskmanager.net.numOutThreads";
 
 	/**
-	 * The minimum time in ms a channel must be idle, before it will be closed.
+	 * The low water mark used in NettyConnectionManager for the Bootstrap.
 	 */
-	public static final String TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS_KEY = "taskmanager.net.closeAfterIdleForMs";
+	public static final String TASK_MANAGER_NET_NETTY_LOW_WATER_MARK = "taskmanager.net.nettyLowWaterMark";
+
+	/**
+	 * The high water mark used in NettyConnectionManager for the Bootstrap.
+	 */
+	public static final String TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = "taskmanager.net.nettyHighWaterMark";
 	
 	/**
 	 * Parameter for the interval in which the TaskManager sends the periodic heart beat messages
@@ -353,9 +358,16 @@ public final class ConfigConstants {
 	public static final int DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS = -1;
 
 	/**
-	 * The minimum time in ms a channel must be idle, before it will be closed.
+	 * Default low water mark used in NettyConnectionManager for the Bootstrap. If set to -1, NettyConnectionManager
+	 * will use half of the network buffer size as the low water mark.
+	 */
+	public static final int DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK = -1;
+
+	/**
+	 * Default high water mark used in NettyConnectionManager for the Bootstrap. If set to -1, NettyConnectionManager
+	 * will use the network buffer size as the high water mark.
 	 */
-	public static final int DEFAULT_TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS = 10000;
+	public static final int DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = -1;
 
 	/**
 	 * The default interval for TaskManager heart beats (2000 msecs).

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 612d6d5..645f71e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -16,14 +16,11 @@
  * limitations under the License.
  */
 
+
 package org.apache.flink.runtime.io.network;
 
 import java.io.IOException;
 
-/**
- * Local dummy network connection manager used, when the task manager
- * runs in local mode without starting up the remote network stack.
- */
 public class LocalConnectionManager implements NetworkConnectionManager {
 
 	@Override
@@ -35,10 +32,6 @@ public class LocalConnectionManager implements NetworkConnectionManager {
 	}
 
 	@Override
-	public void close(RemoteReceiver receiver) {
-	}
-
-	@Override
 	public void shutdown() throws IOException {
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
index ceef2c7..7e3e0b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
@@ -16,21 +16,16 @@
  * limitations under the License.
  */
 
+
 package org.apache.flink.runtime.io.network;
 
 import java.io.IOException;
 
-/**
- * The network connection manager is responsible to dispatch envelopes
- * to remote receivers.
- */
 public interface NetworkConnectionManager {
 
 	public void start(ChannelManager channelManager) throws IOException;
 
 	public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException;
 
-	public void close(RemoteReceiver receiver);
-
 	public void shutdown() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
index 8815ea5..23d1205 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
@@ -31,7 +31,7 @@ import org.apache.flink.core.memory.DataOutputView;
  * Objects of this class uniquely identify a connection to a remote {@link TaskManager}.
  * 
  */
-public class RemoteReceiver implements IOReadableWritable {
+public final class RemoteReceiver implements IOReadableWritable {
 
 	/**
 	 * The address of the connection to the remote {@link TaskManager}.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index 6d7e15c..b526ab3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -68,19 +68,12 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 
 	private final int highWaterMark;
 
-	private final int closeAfterIdleForMs;
-
 	private ServerBootstrap in;
 
 	private Bootstrap out;
 
-	public NettyConnectionManager(
-			InetAddress bindAddress,
-			int bindPort,
-			int bufferSize,
-			int numInThreads,
-			int numOutThreads,
-			int closeAfterIdleForMs) {
+	public NettyConnectionManager(InetAddress bindAddress, int bindPort, int bufferSize, int numInThreads,
+								int numOutThreads, int lowWaterMark, int highWaterMark) {
 
 		this.bindAddress = bindAddress;
 		this.bindPort = bindPort;
@@ -92,19 +85,14 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 		this.numInThreads = (numInThreads == -1) ? defaultNumThreads : numInThreads;
 		this.numOutThreads = (numOutThreads == -1) ? defaultNumThreads : numOutThreads;
 
-		this.lowWaterMark = bufferSize / 2;
-		this.highWaterMark = bufferSize;
-
-		this.closeAfterIdleForMs = closeAfterIdleForMs;
+		this.lowWaterMark = (lowWaterMark == -1) ? bufferSize / 2 : lowWaterMark;
+		this.highWaterMark = (highWaterMark == -1) ? bufferSize : highWaterMark;
 	}
 
 	@Override
 	public void start(ChannelManager channelManager) throws IOException {
-		LOG.info(String.format("Starting with %d incoming and %d outgoing connection threads.",
-				numInThreads, numOutThreads));
-		LOG.info(String.format("Setting low water mark to %d and high water mark to %d bytes.",
-				lowWaterMark, highWaterMark));
-		LOG.info(String.format("Close channels after idle for %d ms.", closeAfterIdleForMs));
+		LOG.info(String.format("Starting with %d incoming and %d outgoing connection threads.", numInThreads, numOutThreads));
+		LOG.info(String.format("Setting low water mark to %d and high water mark to %d bytes.", lowWaterMark, highWaterMark));
 
 		final BufferProviderBroker bufferProviderBroker = channelManager;
 		final EnvelopeDispatcher envelopeDispatcher = channelManager;
@@ -112,16 +100,16 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 		int numHeapArenas = 0;
 		int numDirectArenas = numInThreads + numOutThreads;
 		int pageSize = bufferSize << 1;
-		int chunkSize = 16 << 20; // 16 MB
+		int chunkSize = 16 * 1 << 20; // 16 MB
 
 		// shift pageSize maxOrder times to get to chunkSize
-		int maxOrder = (int) (Math.log(chunkSize / pageSize) / Math.log(2));
+		int maxOrder = (int) (Math.log(chunkSize/pageSize) / Math.log(2));
 
 		PooledByteBufAllocator pooledByteBufAllocator =
 				new PooledByteBufAllocator(true, numHeapArenas, numDirectArenas, pageSize, maxOrder);
 
 		String msg = String.format("Instantiated PooledByteBufAllocator with direct arenas: %d, heap arenas: %d, " +
-						"page size (bytes): %d, chunk size (bytes): %d.",
+				"page size (bytes): %d, chunk size (bytes): %d.",
 				numDirectArenas, numHeapArenas, pageSize, (pageSize << maxOrder));
 		LOG.info(msg);
 
@@ -198,7 +186,7 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 		// 2) a channel that is in buildup (sometimes) -> attach to the future and wait for the actual channel
 		// 3) not yet existing -> establish the channel
 
-		final Object entry = outConnections.get(receiver);
+		final Object entry = this.outConnections.get(receiver);
 		final OutboundConnectionQueue channel;
 
 		if (entry != null) {
@@ -216,14 +204,14 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 			// We create a "buildup future" and atomically add it to the map.
 			// Only the thread that really added it establishes the channel.
 			// The others need to wait on that original establisher's future.
-			ChannelInBuildup inBuildup = new ChannelInBuildup(out, receiver, this, closeAfterIdleForMs);
-			Object old = outConnections.putIfAbsent(receiver, inBuildup);
+			ChannelInBuildup inBuildup = new ChannelInBuildup(this.out, receiver);
+			Object old = this.outConnections.putIfAbsent(receiver, inBuildup);
 
 			if (old == null) {
-				out.connect(receiver.getConnectionAddress()).addListener(inBuildup);
+				this.out.connect(receiver.getConnectionAddress()).addListener(inBuildup);
 				channel = inBuildup.waitForChannel();
 
-				Object previous = outConnections.put(receiver, channel);
+				Object previous = this.outConnections.put(receiver, channel);
 
 				if (inBuildup != previous) {
 					throw new IOException("Race condition during channel build up.");
@@ -237,19 +225,7 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 			}
 		}
 
-		if (!channel.enqueue(envelope)) {
-			// The channel has been closed, try again.
-			LOG.debug("Retry enqueue on channel: " + channel + ".");
-
-			// This will either establish a new connection or use the
-			// one, which has been established in the mean time.
-			enqueue(envelope, receiver);
-		}
-	}
-
-	@Override
-	public void close(RemoteReceiver receiver) {
-		outConnections.remove(receiver);
+		channel.enqueue(envelope);
 	}
 
 	@Override
@@ -278,9 +254,9 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 	private String getNonZeroNumQueuedEnvelopes() {
 		StringBuilder str = new StringBuilder();
 
-		str.append(String.format("==== %d outgoing connections ===\n", outConnections.size()));
+		str.append(String.format("==== %d outgoing connections ===\n", this.outConnections.size()));
 
-		for (Map.Entry<RemoteReceiver, Object> entry : outConnections.entrySet()) {
+		for (Map.Entry<RemoteReceiver, Object> entry : this.outConnections.entrySet()) {
 			RemoteReceiver receiver = entry.getKey();
 
 			Object value = entry.getValue();
@@ -316,52 +292,41 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 
 		private final RemoteReceiver receiver;
 
-		private final NetworkConnectionManager connectionManager;
-
-		private final int closeAfterIdleMs;
-
-		private ChannelInBuildup(
-				Bootstrap out,
-				RemoteReceiver receiver,
-				NetworkConnectionManager connectionManager,
-				int closeAfterIdleMs) {
-
+		private ChannelInBuildup(Bootstrap out, RemoteReceiver receiver) {
 			this.out = out;
 			this.receiver = receiver;
-			this.connectionManager = connectionManager;
-			this.closeAfterIdleMs = closeAfterIdleMs;
 		}
 
 		private void handInChannel(OutboundConnectionQueue c) {
-			synchronized (lock) {
-				channel = c;
-				lock.notifyAll();
+			synchronized (this.lock) {
+				this.channel = c;
+				this.lock.notifyAll();
 			}
 		}
 
-		private void notifyOfError(Throwable t) {
-			synchronized (lock) {
-				error = t;
-				lock.notifyAll();
+		private void notifyOfError(Throwable error) {
+			synchronized (this.lock) {
+				this.error = error;
+				this.lock.notifyAll();
 			}
 		}
 
 		private OutboundConnectionQueue waitForChannel() throws IOException {
-			synchronized (lock) {
-				while (error == null && channel == null) {
+			synchronized (this.lock) {
+				while (this.error == null && this.channel == null) {
 					try {
-						lock.wait(2000);
+						this.lock.wait(2000);
 					} catch (InterruptedException e) {
 						throw new RuntimeException("Channel buildup interrupted.");
 					}
 				}
 			}
 
-			if (error != null) {
+			if (this.error != null) {
 				throw new IOException("Connecting the channel failed: " + error.getMessage(), error);
 			}
 
-			return channel;
+			return this.channel;
 		}
 
 		@Override
@@ -371,14 +336,13 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 					LOG.debug(String.format("Channel %s connected", future.channel()));
 				}
 
-				handInChannel(new OutboundConnectionQueue(
-						future.channel(), receiver, connectionManager, closeAfterIdleMs));
+				handInChannel(new OutboundConnectionQueue(future.channel()));
 			}
-			else if (numRetries > 0) {
-				LOG.debug(String.format("Connection request did not succeed, retrying (%d attempts left)", numRetries));
+			else if (this.numRetries > 0) {
+				LOG.debug(String.format("Connection request did not succeed, retrying (%d attempts left)", this.numRetries));
 
-				out.connect(receiver.getConnectionAddress()).addListener(this);
-				numRetries--;
+				this.out.connect(this.receiver.getConnectionAddress()).addListener(this);
+				this.numRetries--;
 			}
 			else {
 				if (future.getClass() != null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
index 4008e97..5a9f789 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
@@ -23,137 +23,73 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.handler.timeout.IdleStateHandler;
+import org.apache.flink.runtime.io.network.Envelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.flink.runtime.io.network.Envelope;
-import org.apache.flink.runtime.io.network.NetworkConnectionManager;
-import org.apache.flink.runtime.io.network.RemoteReceiver;
 
 import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
-public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter {
-
-	private static enum QueueEvent {
-		TRIGGER_WRITE
-	}
+public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implements ChannelFutureListener {
 
 	private static final Logger LOG = LoggerFactory.getLogger(OutboundConnectionQueue.class);
 
-	private final ChannelWriteListener writeListener = new ChannelWriteListener();
-
 	private final Channel channel;
 
-	private final Queue<Envelope> queuedEnvelopes = new ArrayDeque<Envelope>();
+	private final ArrayDeque<Envelope> queuedEnvelopes = new ArrayDeque<Envelope>();
 
-	private final RemoteReceiver receiver;
-
-	private final NetworkConnectionManager connectionManager;
-
-	// Flag to indicate whether a channel close was requested. This flag is true,
-	// iff there are no queued envelopes, when the channel is idling. After a
-	// successful close request, enqueue should return false.
-	private boolean hasRequestedClose = false;
-
-	public OutboundConnectionQueue(
-			Channel channel,
-			RemoteReceiver receiver,
-			NetworkConnectionManager connectionManager,
-			int closeAfterIdleForMs) {
+	private final AtomicInteger numQueuedEnvelopes = new AtomicInteger(0);
 
+	public OutboundConnectionQueue(Channel channel) {
 		this.channel = channel;
-		this.receiver = receiver;
-		this.connectionManager = connectionManager;
 
-		channel.pipeline().addFirst("Outbound Connection Queue", this);
-		channel.pipeline().addFirst("Idle State Handler",
-				new IdleStateHandler(0, 0, closeAfterIdleForMs, TimeUnit.MILLISECONDS));
+		channel.pipeline().addFirst(this);
 	}
 
 	/**
-	 * Enqueues an envelope to be sent.
+	 * Enqueues an envelope to be sent later.
 	 * <p/>
 	 * This method is always invoked by the task thread that wants the envelope sent.
-	 * <p/>
-	 * If this method returns <code>false</code>, the channel cannot be used to enqueue
-	 * envelopes any more and the caller needs to establish a new connection to the target.
-	 * The current envelope needs to be enqueued at the new channel.
 	 *
-	 * @param env the envelope to be sent
-	 * @return true, if successfully enqueued or false, if the channel was requested to be closed
+	 * @param env The envelope to be sent.
 	 */
-	public boolean enqueue(Envelope env) {
-		boolean triggerWrite;
-
-		synchronized (channel) {
-			if (hasRequestedClose) {
-				// The caller has to ensure that the envelope gets queued to
-				// a new channel.
-				return false;
-			}
+	public void enqueue(Envelope env) {
+		// the user event trigger ensure thread-safe hand-over of the envelope
+		this.channel.pipeline().fireUserEventTriggered(env);
+	}
 
-			// Initiate envelope processing, after the queue state has
-			// changed from empty to non-empty.
-			triggerWrite = queuedEnvelopes.isEmpty();
+	@Override
+	public void userEventTriggered(ChannelHandlerContext ctx, Object envelopeToEnqueue) throws Exception {
+		boolean triggerWrite = this.queuedEnvelopes.isEmpty();
 
-			queuedEnvelopes.add(env);
-		}
+		this.queuedEnvelopes.addLast((Envelope) envelopeToEnqueue);
+		this.numQueuedEnvelopes.incrementAndGet();
 
 		if (triggerWrite) {
-			channel.pipeline().fireUserEventTriggered(QueueEvent.TRIGGER_WRITE);
+			writeAndFlushNextEnvelopeIfPossible();
 		}
+	}
 
-		return true;
+	@Override
+	public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+		writeAndFlushNextEnvelopeIfPossible();
 	}
 
 	@Override
-	public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
-		if (event.getClass() == QueueEvent.class) {
-			// Initiate envelope processing, after the queue state has
-			// changed from empty to non-empty.
+	public void operationComplete(ChannelFuture future) throws Exception {
+		if (future.isSuccess()) {
 			writeAndFlushNextEnvelopeIfPossible();
 		}
-		else if (event.getClass() == IdleStateEvent.class) {
-			// Channel idle => try to close
-			boolean closeConnection = false;
-
-			// Only close the connection, if there are no queued envelopes. We have
-			// to ensure that there is no race between closing the channel and
-			// enqueuing a new envelope.
-			synchronized (channel) {
-				if (queuedEnvelopes.isEmpty() && !hasRequestedClose) {
-
-					hasRequestedClose = true;
-
-					closeConnection = true;
-
-					// Notify the connection manager that this channel has been
-					// closed.
-					connectionManager.close(receiver);
-				}
-			}
-
-			if (closeConnection) {
-				ctx.close().addListener(new ChannelCloseListener());
-			}
+		else if (future.cause() != null) {
+			exceptionOccurred(future.cause());
 		}
 		else {
-			throw new IllegalStateException("Triggered unknown event.");
+			exceptionOccurred(new Exception("Envelope send aborted."));
 		}
 	}
 
-	@Override
-	public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
-		writeAndFlushNextEnvelopeIfPossible();
-	}
-
 	public int getNumQueuedEnvelopes() {
-		synchronized (channel) {
-			return queuedEnvelopes.size();
-		}
+		return this.numQueuedEnvelopes.intValue();
 	}
 
 	@Override
@@ -162,48 +98,16 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter {
 	}
 
 	private void writeAndFlushNextEnvelopeIfPossible() {
-		Envelope nextEnvelope = null;
-
-		synchronized (channel) {
-			if (channel.isWritable() && !queuedEnvelopes.isEmpty()) {
-				nextEnvelope = queuedEnvelopes.poll();
-			}
-		}
-
-		if (nextEnvelope != null) {
-			channel.writeAndFlush(nextEnvelope).addListener(writeListener);
-		}
-	}
-
-	private class ChannelWriteListener implements ChannelFutureListener {
-
-		@Override
-		public void operationComplete(ChannelFuture future) throws Exception {
-			if (future.isSuccess()) {
-				writeAndFlushNextEnvelopeIfPossible();
-			}
-			else {
-				exceptionOccurred(future.cause() == null
-						? new Exception("Envelope send aborted.")
-						: future.cause());
-			}
-		}
-	}
-
-	private class ChannelCloseListener implements ChannelFutureListener {
+		if (this.channel.isWritable() && !this.queuedEnvelopes.isEmpty()) {
+			Envelope nextEnvelope = this.queuedEnvelopes.pollFirst();
+			this.numQueuedEnvelopes.decrementAndGet();
 
-		@Override
-		public void operationComplete(ChannelFuture future) throws Exception {
-			if (!future.isSuccess()) {
-				exceptionOccurred(future.cause() == null
-						? new Exception("Close failed.")
-						: future.cause());
-			}
+			this.channel.writeAndFlush(nextEnvelope).addListener(this);
 		}
 	}
 
 	private void exceptionOccurred(Throwable t) throws Exception {
-		LOG.error(String.format("Exception in Channel %s: %s", channel, t.getMessage()));
+		LOG.error("An exception occurred in Channel {}: {}", channel, t.getMessage());
 		throw new Exception(t);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 4525649..9aa39d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+
 package org.apache.flink.runtime.taskmanager;
 
 import java.io.File;
@@ -317,13 +318,17 @@ public class TaskManager implements TaskOperationProtocol {
 							ConfigConstants.TASK_MANAGER_NET_NUM_OUT_THREADS_KEY,
 							ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS);
 
-					int closeAfterIdleForMs = GlobalConfiguration.getInteger(
-							ConfigConstants.TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS_KEY,
-							ConfigConstants.DEFAULT_TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS);
+					int lowWaterMark = GlobalConfiguration.getInteger(
+							ConfigConstants.TASK_MANAGER_NET_NETTY_LOW_WATER_MARK,
+							ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK);
+
+					int highWaterMark = GlobalConfiguration.getInteger(
+							ConfigConstants.TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK,
+							ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK);
 
 					networkConnectionManager = new NettyConnectionManager(
 							localInstanceConnectionInfo.address(), localInstanceConnectionInfo.dataPort(),
-							bufferSize, numInThreads, numOutThreads, closeAfterIdleForMs);
+							bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark);
 					break;
 			}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index cfb6ae5..dfb654d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -16,16 +16,18 @@
  * limitations under the License.
  */
 
+
 package org.apache.flink.runtime.io.network.netty;
 
+import org.junit.Assert;
+
 import org.apache.flink.runtime.io.network.ChannelManager;
 import org.apache.flink.runtime.io.network.Envelope;
 import org.apache.flink.runtime.io.network.RemoteReceiver;
 import org.apache.flink.runtime.io.network.channels.ChannelID;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.jobgraph.JobID;
-import org.junit.Assert;
 import org.mockito.Matchers;
-import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -36,206 +38,158 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 
 public class NettyConnectionManagerTest {
 
 	private final static long RANDOM_SEED = 520346508276087l;
 
-	private Random rand = new Random(RANDOM_SEED);
-
-	private NettyConnectionManager senderManager;
-
-	private NettyConnectionManager receiverManager;
+	private final static Random random = new Random(RANDOM_SEED);
 
-	private ChannelManager channelManager;
+	private final static int BIND_PORT = 20000;
 
-	private RemoteReceiver[] receivers;
+	private final static int BUFFER_SIZE = 32 * 1024;
 
-	private CountDownLatch receivedAllEnvelopesLatch;
-
-	private void initTest(
-			int numProducers,
-			final int numEnvelopesPerProducer,
-			int numInThreads,
-			int numOutThreads,
-			int closeAfterIdleForMs) throws Exception {
-
-		final InetAddress bindAddress = InetAddress.getLocalHost();
-		final int bindPort = 20000;
-		final int bufferSize = 32 * 1024;
+	public void testEnqueueRaceAndDeadlockFreeMultipleChannels() throws Exception {
+		Integer[][] configs = new Integer[][]{
+				{64, 4096, 1, 1, 1},
+				{128, 2048, 1, 1, 1},
+				{256, 1024, 1, 1, 1},
+				{512, 512, 1, 1, 1},
+				{64, 4096, 4, 1, 1},
+				{128, 2048, 4, 1, 1},
+				{256, 1024, 4, 1, 1},
+				{512, 512, 4, 1, 1},
+				{64, 4096, 4, 2, 2},
+				{128, 2048, 4, 2, 2},
+				{256, 1024, 4, 2, 2},
+				{512, 512, 4, 2, 2}
+		};
 
-		senderManager = Mockito.spy(new NettyConnectionManager(
-				bindAddress, bindPort, bufferSize, numInThreads, numOutThreads, closeAfterIdleForMs));
+		for (Integer[] params : configs) {
+			System.out.println(String.format("Running %s with config: %d sub tasks, %d envelopes to send per subtasks, "
+					+ "%d num channels, %d num in threads, %d num out threads.",
+					"testEnqueueRaceAndDeadlockFreeMultipleChannels", params[0], params[1], params[2], params[3], params[4]));
 
-		receiverManager = new NettyConnectionManager(
-				bindAddress, bindPort + 1, bufferSize, numInThreads, numOutThreads, closeAfterIdleForMs);
+			long start = System.currentTimeMillis();
+			doTestEnqueueRaceAndDeadlockFreeMultipleChannels(params[0], params[1], params[2], params[3], params[4]);
+			long end = System.currentTimeMillis();
 
-		channelManager = Mockito.mock(ChannelManager.class);
+			System.out.println(String.format("Runtime: %d ms.", (end - start)));
+		}
+	}
 
-		senderManager.start(channelManager);
-		receiverManager.start(channelManager);
+	private void doTestEnqueueRaceAndDeadlockFreeMultipleChannels(
+			int numSubtasks, final int numToSendPerSubtask, int numChannels, int numInThreads, int numOutThreads)
+			throws Exception {
 
-		receivers = new RemoteReceiver[numProducers];
-		for (int i = 0; i < numProducers; i++) {
-			receivers[i] = new RemoteReceiver(new InetSocketAddress(bindPort + 1), i);
-		}
+		final InetAddress localhost = InetAddress.getLocalHost();
+		final CountDownLatch latch = new CountDownLatch(numSubtasks);
 
 		// --------------------------------------------------------------------
+		// setup
+		// --------------------------------------------------------------------
+		ChannelManager channelManager = mock(ChannelManager.class);
+		doAnswer(new VerifyEnvelopes(latch, numToSendPerSubtask))
+				.when(channelManager).dispatchFromNetwork(Matchers.<Envelope>anyObject());
 
-		receivedAllEnvelopesLatch = new CountDownLatch(numProducers);
-
-		final ConcurrentMap<ChannelID, Integer> receivedSequenceNums =
-				new ConcurrentHashMap<ChannelID, Integer>();
+		final NettyConnectionManager senderConnManager = new NettyConnectionManager(localhost, BIND_PORT, BUFFER_SIZE,
+				numInThreads, numOutThreads, -1, -1);
+		senderConnManager.start(channelManager);
 
-		// Verifies that the sequence numbers of each producer are received
-		// in ascending incremental order. In addition, manages a latch to
-		// allow synchronization after all envelopes have been received.
-		Mockito.doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) throws Throwable {
-				final Envelope env = (Envelope) invocation.getArguments()[0];
+		NettyConnectionManager receiverConnManager = new NettyConnectionManager(localhost, BIND_PORT + 1, BUFFER_SIZE,
+				numInThreads, numOutThreads, -1, -1);
+		receiverConnManager.start(channelManager);
 
-				final int currentSeqNum = env.getSequenceNumber();
-				final ChannelID cid = env.getSource();
+		// --------------------------------------------------------------------
+		// start sender threads
+		// --------------------------------------------------------------------
+		RemoteReceiver[] receivers = new RemoteReceiver[numChannels];
 
-				if (currentSeqNum < 0 || currentSeqNum >= numEnvelopesPerProducer) {
-					Assert.fail("Received more envelopes than expected from " + cid);
-				}
+		for (int i = 0; i < numChannels; i++) {
+			receivers[i] = new RemoteReceiver(new InetSocketAddress(localhost, BIND_PORT + 1), i);
+		}
 
-				Integer previousSeqNum = receivedSequenceNums.put(cid, currentSeqNum);
+		for (int i = 0; i < numSubtasks; i++) {
+			final RemoteReceiver receiver = receivers[random.nextInt(numChannels)];
 
-				if (previousSeqNum != null) {
-					String errMsg = String.format("Received %s with unexpected sequence number.", env);
-					Assert.assertEquals(errMsg, previousSeqNum + 1, currentSeqNum);
-				}
+			final AtomicInteger seqNum = new AtomicInteger(0);
+			final JobID jobId = new JobID();
+			final ChannelID channelId = new ChannelID();
 
-				if (currentSeqNum == numEnvelopesPerProducer - 1) {
-					receivedAllEnvelopesLatch.countDown();
+			new Thread(new Runnable() {
+				@Override
+				public void run() {
+					// enqueue envelopes with ascending seq numbers
+					while (seqNum.get() < numToSendPerSubtask) {
+						try {
+							Envelope env = new Envelope(seqNum.getAndIncrement(), jobId, channelId);
+							senderConnManager.enqueue(env, receiver);
+						} catch (IOException e) {
+							throw new RuntimeException("Unexpected exception while enqueuing envelope.");
+						}
+					}
 				}
+			}).start();
+		}
 
-				return null;
-			}
-		}).when(channelManager).dispatchFromNetwork(Matchers.any(Envelope.class));
-	}
+		latch.await();
 
-	private void finishTest() throws IOException {
-		senderManager.shutdown();
-		receiverManager.shutdown();
+		senderConnManager.shutdown();
+		receiverConnManager.shutdown();
 	}
 
-	// ------------------------------------------------------------------------
+	/**
+	 * Verifies correct ordering of received envelopes (per envelope source channel ID).
+	 */
+	private class VerifyEnvelopes implements Answer<Void> {
 
-	// Verifies that there are no race conditions or dead locks with
-	// concurrent enqueues and closes.
-	public void testConcurrentEnqueueAndClose() throws Exception {
-		Integer[][] configs = new Integer[][]{
-				// No close after idle
-				{64, 4096, 1, 1, 1, 0, 0},
-				{128, 2048, 1, 1, 1, 0, 0},
-				{256, 1024, 1, 1, 1, 0, 0},
-				{512, 512, 1, 1, 1, 0, 0},
-
-				{64, 4096, 4, 1, 1, 0, 0},
-				{128, 2048, 4, 1, 1, 0, 0},
-				{256, 1024, 4, 1, 1, 0, 0},
-				{512, 512, 4, 1, 1, 0, 0},
-
-				{64, 4096, 4, 2, 2, 0, 0},
-				{128, 2048, 4, 2, 2, 0, 0},
-				{256, 1024, 4, 2, 2, 0, 0},
-				{512, 512, 4, 2, 2, 0, 0},
-				// Note: these need plenty of heap space for the threads
-				{1024, 256, 4, 2, 2, 0, 0},
-				{2048, 128, 4, 2, 2, 0, 0},
-				{4096, 64, 4, 2, 2, 0, 0},
-
-				// With close after idle
-				{4, 1024, 1, 1, 1, 40, 80},
-				{8, 1024, 1, 1, 1, 40, 80},
-				{16, 1024, 1, 1, 1, 40, 80},
-				{32, 1024, 1, 1, 1, 40, 80},
-				{64, 1024, 1, 1, 1, 40, 80},
-
-				{16, 1024, 4, 1, 1, 40, 80},
-				{32, 1024, 4, 1, 1, 40, 80},
-				{64, 1024, 4, 1, 1, 40, 80},
-
-				{16, 1024, 4, 2, 2, 40, 80},
-				{32, 1024, 4, 2, 2, 40, 80},
-				{64, 1024, 4, 2, 2, 40, 80}
-		};
+		private final ConcurrentMap<ChannelID, Integer> received = new ConcurrentHashMap<ChannelID, Integer>();
 
-		for (Integer[] params : configs) {
-			System.out.println(String.format("Running testConcurrentEnqueueAndClose with config: " +
-							"%d producers, %d envelopes per producer, %d num channels, " +
-							"%d num in threads, %d num out threads, " +
-							"%d ms min sleep time, %d ms max sleep time.",
-					params[0], params[1], params[2], params[3], params[4], params[5], params[6]));
+		private final CountDownLatch latch;
 
-			long start = System.currentTimeMillis();
-			doTestConcurrentEnqueueAndClose(params[0], params[1], params[2], params[3], params[4], params[5], params[6]);
-			long end = System.currentTimeMillis();
+		private final int numExpectedEnvelopesPerSubtask;
 
-			System.out.println(String.format("Runtime: %d ms.", (end - start)));
+		private VerifyEnvelopes(CountDownLatch latch, int numExpectedEnvelopesPerSubtask) {
+			this.latch = latch;
+			this.numExpectedEnvelopesPerSubtask = numExpectedEnvelopesPerSubtask;
 		}
-	}
-
-	private void doTestConcurrentEnqueueAndClose(
-			int numProducers,
-			final int numEnvelopesPerProducer,
-			final int numChannels,
-			int numInThreads,
-			int numOutThreads,
-			final int minSleepTimeMs,
-			final int maxSleepTimeMs) throws Exception {
-
-		// The idle time before a close is requested is 1/4th of the min sleep
-		// time of each producer. Depending on the number of concurrent producers
-		// and number of envelopes to send per producer, this will result in a
-		// variable number of close requests.
-		initTest(numProducers, numEnvelopesPerProducer, numInThreads, numOutThreads, minSleepTimeMs / 4);
-
-		// --------------------------------------------------------------------
 
-		Runnable[] producers = new Runnable[numProducers];
+		@Override
+		public Void answer(InvocationOnMock invocation) throws Throwable {
+			Envelope env = (Envelope) invocation.getArguments()[0];
 
-		for (int i = 0; i < numProducers; i++) {
-			producers[i] = new Runnable() {
-				@Override
-				public void run() {
-					final JobID jid = new JobID();
-					final ChannelID cid = new ChannelID();
-					final RemoteReceiver receiver = receivers[rand.nextInt(numChannels)];
+			ChannelID channelId = env.getSource();
+			int seqNum = env.getSequenceNumber();
 
-					for (int sequenceNum = 0; sequenceNum < numEnvelopesPerProducer; sequenceNum++) {
-						try {
-							senderManager.enqueue(new Envelope(sequenceNum, jid, cid), receiver);
+			if (seqNum == 0) {
+				Integer previousSeqNum = this.received.putIfAbsent(channelId, seqNum);
 
-							int sleepTime = rand.nextInt((maxSleepTimeMs - minSleepTimeMs) + 1) + minSleepTimeMs;
-							Thread.sleep(sleepTime);
-						} catch (Exception e) {
-							throw new RuntimeException(e);
-						}
-					}
-				}
-			};
-		}
+				String msg = String.format("Received envelope from %s before, but current seq num is 0", channelId);
+				Assert.assertNull(msg, previousSeqNum);
+			}
+			else {
+				boolean isExpectedPreviousSeqNum = this.received.replace(channelId, seqNum - 1, seqNum);
 
-		for (int i = 0; i < numProducers; i++) {
-			new Thread(producers[i], "Producer " + i).start();
-		}
+				String msg = String.format("Received seq num %d from %s, but previous was not %d.",
+						seqNum, channelId, seqNum - 1);
+				Assert.assertTrue(msg, isExpectedPreviousSeqNum);
+			}
 
-		// --------------------------------------------------------------------
+			// count down the latch if all envelopes received for this source
+			if (seqNum == numExpectedEnvelopesPerSubtask - 1) {
+				this.latch.countDown();
+			}
 
-		while (receivedAllEnvelopesLatch.getCount() != 0) {
-			receivedAllEnvelopesLatch.await();
+			return null;
 		}
-
-		finishTest();
 	}
 
 	private void runAllTests() throws Exception {
-		testConcurrentEnqueueAndClose();
+		testEnqueueRaceAndDeadlockFreeMultipleChannels();
 
 		System.out.println("Done.");
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueueTest.java
deleted file mode 100644
index e2f3a14..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueueTest.java
+++ /dev/null
@@ -1,606 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.netty;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.timeout.IdleStateEvent;
-import org.apache.flink.runtime.io.network.Envelope;
-import org.apache.flink.runtime.io.network.NetworkConnectionManager;
-import org.apache.flink.runtime.io.network.RemoteReceiver;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.junit.Assert;
-import org.mockito.Mockito;
-import org.powermock.reflect.Whitebox;
-
-import java.net.InetAddress;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-
-public class OutboundConnectionQueueTest {
-
-	private final static long RANDOM_SEED = 520346508276087l;
-
-	private final Object lock = new Object();
-
-	private Channel channel;
-
-	private NetworkConnectionManager connectionManager;
-
-	private RemoteReceiver receiver;
-
-	private OutboundConnectionQueue queue;
-
-	private TestControlHandler controller;
-
-	private TestVerificationHandler verifier;
-
-	private Throwable exception;
-
-	private void initTest(boolean autoTriggerWrite) {
-		controller = new TestControlHandler(autoTriggerWrite);
-		verifier = new TestVerificationHandler();
-
-		channel = Mockito.spy(new EmbeddedChannel(new ChannelInboundHandlerAdapter() {
-			@Override
-			public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-				exception = cause;
-				super.exceptionCaught(ctx, cause);
-			}
-		}));
-
-		connectionManager = Mockito.mock(NetworkConnectionManager.class);
-
-		receiver = Mockito.mock(RemoteReceiver.class);
-
-		queue = new OutboundConnectionQueue(channel, receiver, connectionManager, 0);
-
-		channel.pipeline().addFirst("Test Control Handler", controller);
-		channel.pipeline().addFirst("Test Verification Handler", verifier);
-
-		exception = null;
-
-		// The testing pipeline looks as follows:
-		// - Test Verification Handler [OUT]
-		// - Test Control Handler [IN]
-		// - Idle State Handler [IN/OUT] [added by OutboundConnectionQueue]
-		// - Outbound queue (SUT) [IN] [added by OutboundConnectionQueue]
-		// - Exception setter [IN] [EmbeddedChannel constructor]
-	}
-
-	/**
-	 * Verifies that the channel is closed after an idle event, when
-	 * there are no queued envelopes.
-	 */
-	public void testClose() throws Exception {
-		initTest(false);
-
-		JobID jid = new JobID();
-		ChannelID cid = new ChannelID();
-
-		Assert.assertTrue(queue.enqueue(new Envelope(1, jid, cid)));
-		Assert.assertTrue(queue.enqueue(new Envelope(2, jid, cid)));
-		Assert.assertTrue(queue.enqueue(new Envelope(3, jid, cid)));
-
-		controller.triggerWrite();
-
-		controller.fireIdle();
-
-		verifier.waitForClose();
-
-		verifier.verifyEnvelopeReceived(cid, 3);
-
-		Mockito.verify(connectionManager, Mockito.times(1)).close(Mockito.any(RemoteReceiver.class));
-	}
-
-	/**
-	 * Verifies that the channel is not closed while there are queued
-	 * envelopes.
-	 */
-	public void testCloseWithQueuedEnvelopes() throws Exception {
-		initTest(true);
-
-		final JobID jid = new JobID();
-		final ChannelID cid = new ChannelID();
-		final CountDownLatch sync = verifier.waitForEnvelopes(3, cid);
-
-		// Make channel not writable => envelopes are queued
-		Mockito.when(channel.isWritable()).thenReturn(false);
-
-		Assert.assertTrue(queue.enqueue(new Envelope(1, jid, cid)));
-		Assert.assertTrue(queue.enqueue(new Envelope(2, jid, cid)));
-		Assert.assertTrue(queue.enqueue(new Envelope(3, jid, cid)));
-
-		// Verify idle event doesn't close channel
-		controller.fireIdle();
-
-		Mockito.verify(connectionManager, Mockito.times(0)).close(Mockito.any(RemoteReceiver.class));
-
-		Boolean hasRequestedClose = Whitebox.<Boolean>getInternalState(queue, "hasRequestedClose");
-		Assert.assertFalse("Close request while envelope in flight.", hasRequestedClose);
-
-		// Change writability of channel back to writable
-		Mockito.when(channel.isWritable()).thenReturn(true);
-		channel.pipeline().fireChannelWritabilityChanged();
-
-		// Wait for the processing of queued envelopes
-		while (sync.getCount() != 0) {
-			sync.await();
-		}
-
-		verifier.verifyEnvelopeReceived(cid, 3);
-
-		// Now close again
-		controller.fireIdle();
-		verifier.waitForClose();
-
-		Mockito.verify(connectionManager, Mockito.times(1)).close(Mockito.any(RemoteReceiver.class));
-	}
-
-	/**
-	 * Verifies that envelopes are delegated back to the connection
-	 * manager after a close.
-	 */
-	public void testEnqueueAfterClose() throws Exception {
-		initTest(true);
-
-		// Immediately close the channel
-		controller.fireIdle();
-		verifier.waitForClose();
-
-		Assert.assertFalse(queue.enqueue(new Envelope(1, new JobID(), new ChannelID())));
-	}
-
-	/**
-	 * Verifies that multiple idle events are handled correctly.
-	 */
-	public void testMultipleIdleEvents() throws Exception {
-		initTest(true);
-
-		controller.fireIdle();
-		verifier.waitForClose();
-
-		controller.fireIdle();
-
-		// Second close should not cause an exception in the
-		// verification handler.
-		Assert.assertNull(exception);
-	}
-
-	/**
-	 * Verifies that unknown user events throw an exception.
-	 */
-	public void testUnknownUserEvent() throws Exception {
-		initTest(true);
-
-		Assert.assertNull(exception);
-
-		controller.context.fireUserEventTriggered("Unknown user event");
-
-		Assert.assertNotNull(exception);
-		Assert.assertTrue(exception instanceof IllegalStateException);
-	}
-
-	// ------------------------------------------------------------------------
-
-	public void testConcurrentEnqueueAndClose() throws Exception {
-		Integer[][] configs = new Integer[][]{
-				{1, 512, 0, 0},
-				{1, 512, 40, 80},
-				{2, 512, 40, 80},
-				{4, 512, 40, 80},
-				{8, 512, 40, 80},
-				{32, 512, 40, 80},
-				{128, 512, 40, 80},
-				{256, 512, 40, 80},
-				{512, 512, 40, 80}
-		};
-
-		for (Integer[] params : configs) {
-			System.out.println(String.format(
-					"Running %s with config: %d producers, %d envelopes to send per producer, " +
-							"%d ms min sleep time, %d ms max sleep time.", "testConcurrentEnqueueAndClose",
-					params[0], params[1], params[2], params[3]));
-
-			long start = System.currentTimeMillis();
-			doTestConcurrentEnqueueAndClose(params[0], params[1], params[2], params[3]);
-			long end = System.currentTimeMillis();
-
-			System.out.println(String.format("Runtime: %d ms.", (end - start)));
-		}
-	}
-
-	/**
-	 * Verifies that concurrent enqueue and close events are handled
-	 * correctly.
-	 */
-	private void doTestConcurrentEnqueueAndClose(
-			final int numProducers,
-			final int numEnvelopesPerProducer,
-			final int minSleepTimeMs,
-			final int maxSleepTimeMs) throws Exception {
-
-		final InetAddress bindHost = InetAddress.getLocalHost();
-		final int bindPort = 20000;
-
-		// Testing concurrent enqueue and close requires real TCP channels,
-		// because Netty's testing EmbeddedChannel does not implement the
-		// same threading model as the NioEventLoopGroup (for example there
-		// is no difference between being IN and OUTSIDE of the event loop
-		// thread).
-
-		final ServerBootstrap in = new ServerBootstrap();
-		in.group(new NioEventLoopGroup(1))
-				.channel(NioServerSocketChannel.class)
-				.localAddress(bindHost, bindPort)
-				.childHandler(new ChannelInitializer<SocketChannel>() {
-					@Override
-					public void initChannel(SocketChannel channel) throws Exception {
-						channel.pipeline()
-								.addLast(new ChannelInboundHandlerAdapter());
-					}
-				});
-
-		final Bootstrap out = new Bootstrap();
-		out.group(new NioEventLoopGroup(1))
-				.channel(NioSocketChannel.class)
-				.handler(new ChannelInitializer<SocketChannel>() {
-					@Override
-					public void initChannel(SocketChannel channel) throws Exception {
-						channel.pipeline()
-								.addLast(new ChannelOutboundHandlerAdapter());
-					}
-				})
-				.option(ChannelOption.TCP_NODELAY, false)
-				.option(ChannelOption.SO_KEEPALIVE, true);
-
-		in.bind().sync();
-
-		// --------------------------------------------------------------------
-
-		// The testing pipeline looks as follows:
-		// - Test Verification Handler [OUT]
-		// - Test Control Handler [IN]
-		// - Idle State Handler [IN/OUT] [added by OutboundConnectionQueue]
-		// - Outbound queue (SUT) [IN] [added by OutboundConnectionQueue]
-
-		channel = out.connect(bindHost, bindPort).sync().channel();
-
-		queue = new OutboundConnectionQueue(channel, receiver, connectionManager, 0);
-
-		controller = new TestControlHandler(true);
-		verifier = new TestVerificationHandler();
-
-		channel.pipeline().addFirst("Test Control Handler", controller);
-		channel.pipeline().addFirst("Test Verification Handler", verifier);
-
-		// --------------------------------------------------------------------
-
-		final Random rand = new Random(RANDOM_SEED);
-
-		// Every producer works on their local reference of the queue and only
-		// updates it to the new channel when enqueue returns false, which
-		// should only happen if the channel has been closed.
-		final ConcurrentMap<ChannelID, OutboundConnectionQueue> producerQueues =
-				new ConcurrentHashMap<ChannelID, OutboundConnectionQueue>();
-
-		final ChannelID[] ids = new ChannelID[numProducers];
-
-		for (int i = 0; i < numProducers; i++) {
-			ids[i] = new ChannelID();
-
-			producerQueues.put(ids[i], queue);
-		}
-
-		final CountDownLatch receivedAllEnvelopesLatch =
-				verifier.waitForEnvelopes(numEnvelopesPerProducer - 1, ids);
-
-		final List<Channel> closedChannels = new ArrayList<Channel>();
-
-		// --------------------------------------------------------------------
-
-		final Runnable closer = new Runnable() {
-			@Override
-			public void run() {
-				while (receivedAllEnvelopesLatch.getCount() != 0) {
-					try {
-						controller.fireIdle();
-
-						// Test two idle events arriving "closely"
-						// after each other
-						if (rand.nextBoolean()) {
-							controller.fireIdle();
-						}
-
-						Thread.sleep(minSleepTimeMs / 2);
-					} catch (InterruptedException e) {
-						e.printStackTrace();
-					}
-				}
-			}
-		};
-
-		final Runnable[] producers = new Runnable[numProducers];
-
-		for (int i = 0; i < numProducers; i++) {
-			final int index = i;
-
-			producers[i] = new Runnable() {
-				@Override
-				public void run() {
-					final JobID jid = new JobID();
-					final ChannelID cid = ids[index];
-
-					for (int j = 0; j < numEnvelopesPerProducer; j++) {
-						OutboundConnectionQueue localQueue = producerQueues.get(cid);
-
-						try {
-							// This code path is handled by the NetworkConnectionManager
-							// in production to enqueue the envelope either to the current
-							// channel or a new one if it was closed.
-							while (!localQueue.enqueue(new Envelope(j, jid, cid))) {
-								synchronized (lock) {
-									if (localQueue == queue) {
-										closedChannels.add(channel);
-
-										channel = out.connect(bindHost, bindPort).sync().channel();
-
-										queue = new OutboundConnectionQueue(channel, receiver, connectionManager, 0);
-
-										channel.pipeline().addFirst("Test Control Handler", controller);
-										channel.pipeline().addFirst("Test Verification Handler", verifier);
-									}
-								}
-
-								producerQueues.put(cid, queue);
-								localQueue = queue;
-							}
-
-							int sleepTime = rand.nextInt((maxSleepTimeMs - minSleepTimeMs) + 1) + minSleepTimeMs;
-							Thread.sleep(sleepTime);
-						} catch (InterruptedException e) {
-							throw new RuntimeException(e);
-						}
-					}
-				}
-			};
-		}
-
-		for (int i = 0; i < numProducers; i++) {
-			new Thread(producers[i], "Producer " + i).start();
-		}
-
-		new Thread(closer, "Closer").start();
-
-		// --------------------------------------------------------------------
-
-		while (receivedAllEnvelopesLatch.getCount() != 0) {
-			receivedAllEnvelopesLatch.await();
-		}
-
-		// Final close, if the last close didn't make it.
-		synchronized (lock) {
-			if (channel != null) {
-				controller.fireIdle();
-			}
-		}
-
-		verifier.waitForClose();
-
-		// If the producers do not sleep after each envelope, the close
-		// should not make it through and no channel should have been
-		// added to the list of closed channels
-		if (minSleepTimeMs == 0 && maxSleepTimeMs == 0) {
-			Assert.assertEquals(0, closedChannels.size());
-		}
-
-		for (Channel ch : closedChannels) {
-			Assert.assertFalse(ch.isOpen());
-		}
-
-		System.out.println(closedChannels.size() + " channels were closed during execution.");
-
-		out.group().shutdownGracefully().sync();
-		in.group().shutdownGracefully().sync();
-	}
-
-	// ------------------------------------------------------------------------
-
-	// Handler to control the flow of Netty's custom user events. Used
-	// to fire idle events and manually trigger write events.
-	@ChannelHandler.Sharable
-	private static class TestControlHandler extends ChannelInboundHandlerAdapter {
-
-		private final Queue<Object> events = new ArrayDeque<Object>();
-
-		private final boolean forwardUserEvents;
-
-		private ChannelHandlerContext context;
-
-		private TestControlHandler(boolean forwardUserEvents) {
-			this.forwardUserEvents = forwardUserEvents;
-		}
-
-		@Override
-		public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
-			synchronized (this) {
-				context = ctx;
-			}
-		}
-
-		@Override
-		public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-			if (forwardUserEvents) {
-				ctx.fireUserEventTriggered(evt);
-			}
-			else {
-				events.add(evt);
-			}
-		}
-
-		@Override
-		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-			super.exceptionCaught(ctx, cause);
-		}
-
-		public void triggerWrite() {
-			synchronized (this) {
-				if (!events.isEmpty()) {
-					context.fireUserEventTriggered(events.remove());
-				}
-			}
-		}
-
-		public void fireIdle() {
-			synchronized (this) {
-				context.fireUserEventTriggered(IdleStateEvent.ALL_IDLE_STATE_EVENT);
-			}
-		}
-	}
-
-	// Handler to verify writes and close events.
-	@ChannelHandler.Sharable
-	private static class TestVerificationHandler extends ChannelOutboundHandlerAdapter {
-
-		private final Map<ChannelID, Integer> receivedSequenceNums = new HashMap<ChannelID, Integer>();
-
-		private final Map<ChannelID, Integer> expectedSequenceNums = new HashMap<ChannelID, Integer>();
-
-		private final Map<ChannelID, CountDownLatch> envelopeLatches = new HashMap<ChannelID, CountDownLatch>();
-
-		private CountDownLatch closeLatch;
-
-		@Override
-		public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
-			closeLatch = new CountDownLatch(1);
-
-			super.handlerAdded(ctx);
-		}
-
-		@Override
-		public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
-			if (closeLatch.getCount() == 0) {
-				throw new IllegalStateException("Write on closed channel.");
-			}
-
-			if (msg.getClass() == Envelope.class) {
-				Envelope env = (Envelope) msg;
-
-				final int currentSeqNum = env.getSequenceNumber();
-				final ChannelID source = env.getSource();
-
-				Integer previousSeqNum = receivedSequenceNums.put(source, currentSeqNum);
-
-				if (previousSeqNum != null) {
-					String errMsg = String.format("Received %s with unexpected sequence number.", env);
-					Assert.assertEquals(errMsg, previousSeqNum + 1, currentSeqNum);
-				}
-
-				promise.setSuccess();
-
-				Integer expectedSeqNum = expectedSequenceNums.get(source);
-				if (expectedSeqNum != null && expectedSeqNum.equals(currentSeqNum)) {
-					envelopeLatches.remove(source).countDown();
-				}
-			}
-		}
-
-		@Override
-		public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
-			if (closeLatch.getCount() == 0) {
-				throw new IllegalStateException("Received multiple close events.");
-			}
-
-			super.close(ctx, promise);
-
-			closeLatch.countDown();
-		}
-
-		public void waitForClose() throws InterruptedException {
-			while (closeLatch.getCount() != 0) {
-				closeLatch.await();
-			}
-		}
-
-		/**
-		 * Verifies that envelope with expected sequence number has been
-		 * processed by the handler (or no envelope has been received by
-		 * the given source, if expected sequence number is null).
-		 */
-		public void verifyEnvelopeReceived(ChannelID source, Integer expectedSequenceNum) {
-			if (expectedSequenceNum == null && receivedSequenceNums.containsKey(source)) {
-				Assert.fail("Received unexpected envelope from channel " + source);
-			}
-			else if (receivedSequenceNums.containsKey(source)) {
-				Assert.assertEquals(expectedSequenceNum, receivedSequenceNums.get(source));
-			}
-			else {
-				Assert.fail("Did not receive any envelope from channel " + source);
-			}
-		}
-
-		public CountDownLatch waitForEnvelopes(Integer expectedSequenceNum, ChannelID... ids) {
-			CountDownLatch latch = new CountDownLatch(ids.length);
-
-			for (ChannelID id : ids) {
-				expectedSequenceNums.put(id, expectedSequenceNum);
-				envelopeLatches.put(id, latch);
-			}
-
-			return latch;
-		}
-	}
-
-	// --------------------------------------------------------------------
-
-	public void runAllTests() throws Exception {
-		testClose();
-		testCloseWithQueuedEnvelopes();
-		testEnqueueAfterClose();
-		testUnknownUserEvent();
-		testMultipleIdleEvents();
-		testConcurrentEnqueueAndClose();
-	}
-
-	public static void main(String[] args) throws Exception {
-		new OutboundConnectionQueueTest().runAllTests();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
index 953b5b7..7424de6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.runtime;
 
 import java.io.IOException;