You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/12/12 14:17:21 UTC

incubator-flink git commit: [FLINK-998] Close TCP connections after destroying logical channels

Repository: incubator-flink
Updated Branches:
  refs/heads/master 669159956 -> 454e2bc16


[FLINK-998] Close TCP connections after destroying logical channels

- This commit introduces reference-counting to keep track of the
  InputChannel and OutputChannel instances, which share the same
  physical TCP connection.

- When the logical channels are released, the reference count will
  reach 0 and the respective TCP connection can safely be closed.

- Furthermore, the debug thread in the OutboundConnectionQueue is
  properly shut down and all queued buffers are freed when the
  channel is closed or an Exception occurs.

This closes #255.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/454e2bc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/454e2bc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/454e2bc1

Branch: refs/heads/master
Commit: 454e2bc1660a1f72450bb477e8cc012b0609c566
Parents: 6691599
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Dec 8 23:55:13 2014 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Dec 12 14:16:43 2014 +0100

----------------------------------------------------------------------
 .../runtime/io/network/ChannelManager.java      |  34 +++---
 .../io/network/LocalConnectionManager.java      |  13 ++-
 .../io/network/NetworkConnectionManager.java    |   6 +-
 .../network/netty/NettyConnectionManager.java   |  62 ++++++++++-
 .../network/netty/OutboundConnectionQueue.java  |  40 +++++++
 .../util/AtomicDisposableReferenceCounter.java  |  71 +++++++++++++
 .../netty/NettyConnectionManagerTest.java       |   6 +-
 .../AtomicDisposableReferenceCounterTest.java   | 105 +++++++++++++++++++
 .../flink/test/util/AbstractTestBase.java       |   9 +-
 .../exampleJavaPrograms/WordCountITCase.java    |   2 +-
 10 files changed, 322 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/454e2bc1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index 0419231..769075a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.AbstractID;
 import org.apache.flink.runtime.execution.CancelTaskException;
@@ -45,6 +42,8 @@ import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -114,6 +113,10 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 	public GlobalBufferPool getGlobalBufferPool() {
 		return globalBufferPool;
 	}
+
+	public NetworkConnectionManager getNetworkConnectionManager() {
+		return networkConnectionManager;
+	}
 	
 	// -----------------------------------------------------------------------------------------------------------------
 	//                                               Task registration
@@ -205,8 +208,10 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 		for (ChannelID id : environment.getOutputChannelIDs()) {
 			Channel channel = this.channels.remove(id);
 			if (channel != null) {
+
 				channel.destroy();
-				this.receiverCache.remove(channel);
+
+				removeFromReceiverCacheAndMaybeCloseTcpConnection(channel);
 			}
 		}
 
@@ -215,7 +220,8 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 			Channel channel = this.channels.remove(id);
 			if (channel != null) {
 				channel.destroy();
-				this.receiverCache.remove(channel);
+
+				removeFromReceiverCacheAndMaybeCloseTcpConnection(channel);
 			}
 		}
 
@@ -238,6 +244,14 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 		redistributeBuffers();
 	}
 
+	private void removeFromReceiverCacheAndMaybeCloseTcpConnection(Channel channel) {
+		EnvelopeReceiverList receiver = this.receiverCache.remove(channel.getID());
+
+		if (receiver != null && receiver.hasRemoteReceiver()) {
+			networkConnectionManager.close(receiver.getRemoteReceiver());
+		}
+	}
+
 	/**
 	 * Ensures that the channel manager has enough buffers to execute the given task.
 	 * <p>
@@ -336,7 +350,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 		final RemoteReceiver ourAddress = new RemoteReceiver(this.ourAddress, connectionIndex);
 		final Envelope senderHint = SenderHintEvent.createEnvelopeWithEvent(envelope, targetChannelID, ourAddress);
 
-		this.networkConnectionManager.enqueue(senderHint, receiver);
+		this.networkConnectionManager.enqueue(senderHint, receiver, true);
 	}
 
 	/**
@@ -471,7 +485,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 					generateSenderHint(envelope, remoteReceiver);
 				}
 
-				this.networkConnectionManager.enqueue(envelope, remoteReceiver);
+				this.networkConnectionManager.enqueue(envelope, remoteReceiver, false);
 				success = true;
 			}
 		} finally {
@@ -515,11 +529,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 			RemoteReceiver remoteReceiver = receiverList.getRemoteReceiver();
 
 			// Generate sender hint before sending the first envelope over the network
-			if (envelope.getSequenceNumber() == 0) {
-				generateSenderHint(envelope, remoteReceiver);
-			}
-
-			this.networkConnectionManager.enqueue(envelope, remoteReceiver);
+			this.networkConnectionManager.enqueue(envelope, remoteReceiver, envelope.getSequenceNumber() == 0);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/454e2bc1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 89365d6..264fde6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network;
 
 import java.io.IOException;
@@ -28,7 +27,17 @@ public class LocalConnectionManager implements NetworkConnectionManager {
 	}
 
 	@Override
-	public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException {
+	public void enqueue(Envelope envelope, RemoteReceiver receiver, boolean isFirstEnvelope) throws IOException {
+	}
+
+	@Override
+	public void close(RemoteReceiver receiver) {
+
+	}
+
+	@Override
+	public int getNumberOfActiveConnections() {
+		return 0;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/454e2bc1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
index ca1b3eb..309c92d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
@@ -25,7 +25,11 @@ public interface NetworkConnectionManager {
 
 	public void start(ChannelManager channelManager) throws IOException;
 
-	public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException;
+	public void enqueue(Envelope envelope, RemoteReceiver receiver, boolean isFirstEnvelope) throws IOException;
+
+	public void close(RemoteReceiver receiver);
+
+	public int getNumberOfActiveConnections();
 
 	public void shutdown() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/454e2bc1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index 3b148e0..941c448 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -72,6 +72,8 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 
 	private Bootstrap out;
 
+	private Thread debugThread;
+
 	public NettyConnectionManager(InetAddress bindAddress, int bindPort, int bufferSize, int numInThreads,
 								int numOutThreads, int lowWaterMark, int highWaterMark) {
 
@@ -157,12 +159,16 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 		}
 
 		if (LOG.isDebugEnabled()) {
-			new Thread(new Runnable() {
+			debugThread = new Thread(new Runnable() {
 				@Override
 				public void run() {
 					Date date = new Date();
 
 					while (true) {
+						if (Thread.interrupted()) {
+							break;
+						}
+
 						try {
 							Thread.sleep(DEBUG_PRINT_QUEUED_ENVELOPES_EVERY_MS);
 
@@ -171,16 +177,17 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 							System.out.println(date);
 							System.out.println(getNonZeroNumQueuedEnvelopes());
 						} catch (InterruptedException e) {
-							e.printStackTrace();
+							break;
 						}
 					}
 				}
-			}).start();
+			});
+			debugThread.start();
 		}
 	}
 
 	@Override
-	public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException {
+	public void enqueue(Envelope envelope, RemoteReceiver receiver, boolean isFirstEnvelope) throws IOException {
 		// Get the channel. The channel may be
 		// 1) a channel that already exists (usual case) -> just send the data
 		// 2) a channel that is in buildup (sometimes) -> attach to the future and wait for the actual channel
@@ -225,11 +232,56 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 			}
 		}
 
-		channel.enqueue(envelope);
+		// The first envelope of a logical channel increments the reference counter of the
+		// connection to indicate that it is holding the resource. When unregistering the task,
+		// every logical channel decrements this counter again and the last one to decrement it
+		// to zero releases the connection.
+		if (isFirstEnvelope) {
+			if (channel.incrementReferenceCounter()) {
+				channel.enqueue(envelope);
+			}
+			else {
+				// There was a race with a close, try again.
+				outConnections.remove(receiver, channel);
+
+				enqueue(envelope, receiver, isFirstEnvelope);
+			}
+		}
+		else {
+			channel.enqueue(envelope);
+		}
+	}
+
+	@Override
+	public void close(RemoteReceiver receiver) {
+		Object entry = outConnections.get(receiver);
+
+		if (entry instanceof OutboundConnectionQueue) {
+			OutboundConnectionQueue channel = (OutboundConnectionQueue) entry;
+
+			// It is possible that we decrement without ever having incremented the counter, which
+			// is fine.
+			try {
+				if (channel.decrementReferenceCounter()) {
+					channel.close();
+					outConnections.remove(receiver, channel);
+				}
+			} catch (Exception ignored) {
+			}
+		}
+	}
+
+	@Override
+	public int getNumberOfActiveConnections() {
+		return outConnections.size();
 	}
 
 	@Override
 	public void shutdown() throws IOException {
+		if (debugThread != null) {
+			debugThread.interrupt();
+		}
+
 		if (!in.group().isShuttingDown()) {
 			LOG.info("Shutting down incoming connections.");
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/454e2bc1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
index 1f10e2b..850f38b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
@@ -23,7 +23,9 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.runtime.io.network.Buffer;
 import org.apache.flink.runtime.io.network.Envelope;
+import org.apache.flink.runtime.util.AtomicDisposableReferenceCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,12 +42,26 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implem
 
 	private final AtomicInteger numQueuedEnvelopes = new AtomicInteger(0);
 
+	private final AtomicDisposableReferenceCounter closeReferenceCounter = new AtomicDisposableReferenceCounter();
+
 	public OutboundConnectionQueue(Channel channel) {
 		this.channel = channel;
 
 		channel.pipeline().addFirst(this);
 	}
 
+	boolean incrementReferenceCounter() {
+		return closeReferenceCounter.incrementReferenceCounter();
+	}
+
+	boolean decrementReferenceCounter() {
+		return closeReferenceCounter.decrementReferenceCounter();
+	}
+
+	void close() {
+		channel.close();
+	}
+
 	/**
 	 * Enqueues an envelope to be sent later.
 	 * <p/>
@@ -59,6 +75,20 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implem
 	}
 
 	@Override
+	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+		releaseAllEnvelopes();
+
+		super.channelInactive(ctx);
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+		releaseAllEnvelopes();
+
+		super.exceptionCaught(ctx, cause);
+	}
+
+	@Override
 	public void userEventTriggered(ChannelHandlerContext ctx, Object envelopeToEnqueue) throws Exception {
 		boolean triggerWrite = this.queuedEnvelopes.isEmpty();
 
@@ -110,4 +140,14 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implem
 		LOG.error("An exception occurred in Channel {}: {}", channel, t.getMessage());
 		throw new Exception(t);
 	}
+
+	private void releaseAllEnvelopes() {
+		Envelope envelope;
+		while ((envelope = queuedEnvelopes.poll()) != null) {
+			Buffer buffer = envelope.getBuffer();
+			if (buffer != null) {
+				buffer.recycleBuffer();
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/454e2bc1/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java
new file mode 100644
index 0000000..b0f1089
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+/**
+ * Atomic reference counter, which enters a "disposed" state after the reference
+ * count reaches 0.
+ */
+public class AtomicDisposableReferenceCounter {
+
+	private final Object lock = new Object();
+
+	private int referenceCounter;
+
+	private boolean isDisposed;
+
+	/**
+	 * Increments the reference count and returns whether it was successful.
+	 * <p>
+	 * If the method returns <code>false</code>, the counter has already been
+	 * disposed. Otherwise it returns <code>true</code>.
+	 */
+	public boolean incrementReferenceCounter() {
+		synchronized (lock) {
+			if (isDisposed) {
+				return false;
+			}
+
+			referenceCounter++;
+			return true;
+		}
+	}
+
+	/**
+	 * Decrements the reference count.
+	 * <p>
+	 * If the method returns <code>true</code>, the decrement operation disposed
+	 * the counter. Otherwise it returns <code>false</code>.
+	 */
+	public boolean decrementReferenceCounter() {
+		synchronized (lock) {
+			if (isDisposed) {
+				return false;
+			}
+
+			referenceCounter--;
+
+			if (referenceCounter == 0) {
+				isDisposed = true;
+			}
+
+			return isDisposed;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/454e2bc1/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index 34d2367..c3e728b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -126,8 +126,10 @@ public class NettyConnectionManagerTest {
 					// enqueue envelopes with ascending seq numbers
 					while (seqNum.get() < numToSendPerSubtask) {
 						try {
-							Envelope env = new Envelope(seqNum.getAndIncrement(), jobId, channelId);
-							senderConnManager.enqueue(env, receiver);
+							int sequenceNumber = seqNum.getAndIncrement();
+
+							Envelope env = new Envelope(sequenceNumber, jobId, channelId);
+							senderConnManager.enqueue(env, receiver, sequenceNumber == 0);
 						} catch (IOException e) {
 							throw new RuntimeException("Unexpected exception while enqueuing envelope.");
 						}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/454e2bc1/flink-runtime/src/test/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounterTest.java
new file mode 100644
index 0000000..6bd45ec
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounterTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.junit.Test;
+
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AtomicDisposableReferenceCounterTest {
+
+	@Test
+	public void testSerialIncrementAndDecrement() {
+		AtomicDisposableReferenceCounter counter = new AtomicDisposableReferenceCounter();
+
+		assertTrue(counter.incrementReferenceCounter());
+
+		assertTrue(counter.decrementReferenceCounter());
+
+		assertFalse(counter.incrementReferenceCounter());
+
+		assertFalse(counter.decrementReferenceCounter());
+	}
+
+	@Test
+	public void testConcurrentIncrementAndDecrement() throws InterruptedException, ExecutionException, TimeoutException {
+		final Random random = new Random();
+
+		final ExecutorService executor = Executors.newCachedThreadPool();
+
+		final MockIncrementer incrementer = new MockIncrementer();
+
+		final MockDecrementer decrementer = new MockDecrementer();
+
+		// Repeat this to provoke races
+		for (int i = 0; i < 256; i++) {
+			final AtomicDisposableReferenceCounter counter = new AtomicDisposableReferenceCounter();
+			incrementer.setCounter(counter);
+			decrementer.setCounter(counter);
+
+			counter.incrementReferenceCounter();
+
+			// Randomly decide which one should be first as the first task usually will win the race
+			boolean incrementFirst = random.nextBoolean();
+
+			Future<Boolean> success1 = executor.submit(incrementFirst ? incrementer : decrementer);
+			Future<Boolean> success2 = executor.submit(incrementFirst ? decrementer : incrementer);
+
+			// Only one of the two should win the race and return true
+			assertTrue(success1.get() ^ success2.get());
+		}
+	}
+
+	private static class MockIncrementer implements Callable<Boolean> {
+
+		private AtomicDisposableReferenceCounter counter;
+
+		void setCounter(AtomicDisposableReferenceCounter counter) {
+			this.counter = counter;
+		}
+
+		@Override
+		public Boolean call() throws Exception {
+			return counter.incrementReferenceCounter();
+		}
+	}
+
+	private static class MockDecrementer implements Callable<Boolean> {
+
+		private AtomicDisposableReferenceCounter counter;
+
+		void setCounter(AtomicDisposableReferenceCounter counter) {
+			this.counter = counter;
+		}
+
+		@Override
+		public Boolean call() throws Exception {
+			return counter.decrementReferenceCounter();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/454e2bc1/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 1fd0e6e..7fd970b 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.util;
 
 import java.io.BufferedInputStream;
@@ -96,11 +95,16 @@ public abstract class AbstractTestBase {
 		try {
 			
 			int numUnreleasedBCVars = 0;
+
+			int numActiveConnections = 0;
+
 			{
 				TaskManager[] tms = executor.getTaskManagers();
+
 				if (tms != null) {
 					for (TaskManager tm : tms) {
 						numUnreleasedBCVars += tm.getBroadcastVariableManager().getNumberOfVariablesWithReferences();
+						numActiveConnections += tm.getChannelManager().getNetworkConnectionManager().getNumberOfActiveConnections();
 					}
 				}
 			}
@@ -113,12 +117,11 @@ public abstract class AbstractTestBase {
 			}
 			
 			Assert.assertEquals("Not all broadcast variables were released.", 0, numUnreleasedBCVars);
+			Assert.assertEquals("Not all network connections were released.", 0, numActiveConnections);
 		}
 		finally {
 			deleteAllTempFiles();
 		}
-		
-		
 	}
 
 	//------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/454e2bc1/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
index fc7a95f..db72f65 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
@@ -29,7 +29,7 @@ public class WordCountITCase extends JavaProgramTestBase {
 
 	public WordCountITCase(){
 //		setDegreeOfParallelism(4);
-//		setNumTaskTracker(2);
+//		setNumTaskManager(2);
 //		setTaskManagerNumSlots(2);
 	}