You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:40 UTC

[05/30] Replace custom Java NIO TCP/IP code with Netty 4 library

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
new file mode 100644
index 0000000..ae67f42
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
@@ -0,0 +1,251 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.runtime.io.network.ChannelManager;
+import eu.stratosphere.runtime.io.network.Envelope;
+import eu.stratosphere.runtime.io.network.RemoteReceiver;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.Future;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class NettyConnectionManager {
+
+	private static final Log LOG = LogFactory.getLog(NettyConnectionManager.class);
+
+	private final ChannelManager channelManager;
+
+	private final ServerBootstrap in;
+
+	private final Bootstrap out;
+
+	private final ConcurrentMap<RemoteReceiver, Object> outConnections;
+
+	public NettyConnectionManager(ChannelManager channelManager, InetAddress bindAddress, int bindPort,
+								int bufferSize, int numInThreads, int numOutThreads,
+								int lowWaterMark, int highWaterMark) {
+		this.outConnections = new ConcurrentHashMap<RemoteReceiver, Object>();
+		this.channelManager = channelManager;
+
+		// --------------------------------------------------------------------
+
+		int defaultNumThreads = Math.max(Runtime.getRuntime().availableProcessors() / 4, 1);
+		numInThreads = (numInThreads == -1) ? defaultNumThreads : numInThreads;
+		numOutThreads = (numOutThreads == -1) ? defaultNumThreads : numOutThreads;
+		LOG.info(String.format("Starting with %d incoming and %d outgoing connection threads.", numInThreads, numOutThreads));
+
+		lowWaterMark = (lowWaterMark == -1) ? bufferSize / 2 : lowWaterMark;
+		highWaterMark = (highWaterMark == -1) ? bufferSize : highWaterMark;
+		LOG.info(String.format("Setting low water mark to %d and high water mark to %d bytes.", lowWaterMark, highWaterMark));
+
+		// --------------------------------------------------------------------
+		// server bootstrap (incoming connections)
+		// --------------------------------------------------------------------
+		this.in = new ServerBootstrap();
+		this.in.group(new NioEventLoopGroup(numInThreads))
+				.channel(NioServerSocketChannel.class)
+				.localAddress(bindAddress, bindPort)
+				.childHandler(new ChannelInitializer<SocketChannel>() {
+					@Override
+					public void initChannel(SocketChannel channel) throws Exception {
+						channel.pipeline()
+								.addLast(new InboundEnvelopeDecoder(NettyConnectionManager.this.channelManager))
+								.addLast(new InboundEnvelopeDispatcherHandler(NettyConnectionManager.this.channelManager));
+					}
+				})
+				.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferSize))
+				.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+
+		// --------------------------------------------------------------------
+		// client bootstrap (outgoing connections)
+		// --------------------------------------------------------------------
+		this.out = new Bootstrap();
+		this.out.group(new NioEventLoopGroup(numOutThreads))
+				.channel(NioSocketChannel.class)
+				.handler(new ChannelInitializer<SocketChannel>() {
+					@Override
+					public void initChannel(SocketChannel channel) throws Exception {
+						channel.pipeline()
+								.addLast(new OutboundEnvelopeEncoder());
+					}
+				})
+				.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, lowWaterMark)
+				.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, highWaterMark)
+				.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+				.option(ChannelOption.TCP_NODELAY, false)
+				.option(ChannelOption.SO_KEEPALIVE, true);
+
+		try {
+			this.in.bind().sync();
+		} catch (InterruptedException e) {
+			throw new RuntimeException("Could not bind server socket for incoming connections.");
+		}
+	}
+
+	public void shutdown() {
+		Future<?> inShutdownFuture = this.in.group().shutdownGracefully();
+		Future<?> outShutdownFuture = this.out.group().shutdownGracefully();
+
+		try {
+			inShutdownFuture.sync();
+			outShutdownFuture.sync();
+		} catch (InterruptedException e) {
+			throw new RuntimeException("Could not properly shutdown connections.");
+		}
+	}
+
+	public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException {
+		// Get the channel. The channel may be
+		// 1) a channel that already exists (usual case) -> just send the data
+		// 2) a channel that is in buildup (sometimes) -> attach to the future and wait for the actual channel
+		// 3) not yet existing -> establish the channel
+
+		final Object entry = this.outConnections.get(receiver);
+		final OutboundConnectionQueue channel;
+
+		if (entry != null) {
+			// existing channel or channel in buildup
+			if (entry instanceof OutboundConnectionQueue) {
+				channel = (OutboundConnectionQueue) entry;
+			}
+			else {
+				ChannelInBuildup future = (ChannelInBuildup) entry;
+				channel = future.waitForChannel();
+			}
+		}
+		else {
+			// No channel yet. Create one, but watch out for a race.
+			// We create a "buildup future" and atomically add it to the map.
+			// Only the thread that really added it establishes the channel.
+			// The others need to wait on that original establisher's future.
+			ChannelInBuildup inBuildup = new ChannelInBuildup(this.out, receiver);
+			Object old = this.outConnections.putIfAbsent(receiver, inBuildup);
+
+			if (old == null) {
+				this.out.connect(receiver.getConnectionAddress()).addListener(inBuildup);
+				channel = inBuildup.waitForChannel();
+
+				Object previous = this.outConnections.put(receiver, channel);
+
+				if (inBuildup != previous) {
+					throw new IOException("Race condition during channel build up.");
+				}
+			}
+			else if (old instanceof ChannelInBuildup) {
+				channel = ((ChannelInBuildup) old).waitForChannel();
+			}
+			else {
+				channel = (OutboundConnectionQueue) old;
+			}
+		}
+
+		channel.enqueue(envelope);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class ChannelInBuildup implements ChannelFutureListener {
+
+		private Object lock = new Object();
+
+		private volatile OutboundConnectionQueue channel;
+
+		private volatile Throwable error;
+
+		private int numRetries = 2;
+
+		private final Bootstrap out;
+
+		private final RemoteReceiver receiver;
+
+		private ChannelInBuildup(Bootstrap out, RemoteReceiver receiver) {
+			this.out = out;
+			this.receiver = receiver;
+		}
+
+		private void handInChannel(OutboundConnectionQueue c) {
+			synchronized (this.lock) {
+				this.channel = c;
+				this.lock.notifyAll();
+			}
+		}
+
+		private void notifyOfError(Throwable error) {
+			synchronized (this.lock) {
+				this.error = error;
+				this.lock.notifyAll();
+			}
+		}
+
+		private OutboundConnectionQueue waitForChannel() throws IOException {
+			synchronized (this.lock) {
+				while (this.error == null && this.channel == null) {
+					try {
+						this.lock.wait(2000);
+					} catch (InterruptedException e) {
+						throw new RuntimeException("Channel buildup interrupted.");
+					}
+				}
+			}
+
+			if (this.error != null) {
+				throw new IOException("Connecting the channel failed: " + error.getMessage(), error);
+			}
+
+			return this.channel;
+		}
+
+		@Override
+		public void operationComplete(ChannelFuture future) throws Exception {
+			if (future.isSuccess()) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug(String.format("Channel %s connected", future.channel()));
+				}
+
+				handInChannel(new OutboundConnectionQueue(future.channel()));
+			}
+			else if (this.numRetries > 0) {
+				LOG.debug(String.format("Connection request did not succeed, retrying (%d attempts left)", this.numRetries));
+
+				this.out.connect(this.receiver.getConnectionAddress()).addListener(this);
+				this.numRetries--;
+			}
+			else {
+				if (future.getClass() != null) {
+					notifyOfError(future.cause());
+				}
+				else {
+					notifyOfError(new Exception("Connection could not be established."));
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
new file mode 100644
index 0000000..c687408
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
@@ -0,0 +1,94 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.runtime.io.network.Envelope;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayDeque;
+
+public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implements ChannelFutureListener {
+
+	private static final Log LOG = LogFactory.getLog(OutboundConnectionQueue.class);
+
+	private final Channel channel;
+
+	private final ArrayDeque<Envelope> queuedEnvelopes;
+
+	public OutboundConnectionQueue(Channel channel) {
+		this.channel = channel;
+		this.queuedEnvelopes = new ArrayDeque<Envelope>();
+
+		channel.pipeline().addFirst(this);
+	}
+
+	/**
+	 * Enqueues an envelope so be sent later.
+	 * <p/>
+	 * This method is always invoked by the task thread that wants the envelope sent.
+	 *
+	 * @param env The envelope to be sent.
+	 */
+	public void enqueue(Envelope env) {
+		// the user event trigger ensure thread-safe hand-over of the envelope
+		this.channel.pipeline().fireUserEventTriggered(env);
+	}
+
+	@Override
+	public void userEventTriggered(ChannelHandlerContext ctx, Object envelopeToEnqueue) throws Exception {
+		boolean triggerWrite = this.queuedEnvelopes.isEmpty();
+
+		this.queuedEnvelopes.addLast((Envelope) envelopeToEnqueue);
+
+		if (triggerWrite) {
+			writeAndFlushNextEnvelopeIfPossible();
+		}
+	}
+
+	@Override
+	public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+		writeAndFlushNextEnvelopeIfPossible();
+	}
+
+	@Override
+	public void operationComplete(ChannelFuture future) throws Exception {
+		if (future.isSuccess()) {
+			writeAndFlushNextEnvelopeIfPossible();
+		}
+		else if (future.cause() != null) {
+			exceptionOccurred(future.cause());
+		}
+		else {
+			exceptionOccurred(new Exception("Envelope send aborted."));
+		}
+	}
+
+	private void writeAndFlushNextEnvelopeIfPossible() {
+		if (this.channel.isWritable() && !this.queuedEnvelopes.isEmpty()) {
+			Envelope nextEnvelope = this.queuedEnvelopes.pollFirst();
+			this.channel.writeAndFlush(nextEnvelope).addListener(this);
+		}
+	}
+
+	private void exceptionOccurred(Throwable t) throws Exception {
+		LOG.error(String.format("An exception occurred in Channel %s: %s", this.channel, t.getMessage()));
+		throw new Exception(t);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java
new file mode 100644
index 0000000..424f2c0
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java
@@ -0,0 +1,65 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.network.Envelope;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+@ChannelHandler.Sharable
+public class OutboundEnvelopeEncoder extends MessageToByteEncoder<Envelope> {
+
+	public static final int HEADER_SIZE = 48;
+
+	public static final int MAGIC_NUMBER = 0xBADC0FFE;
+
+	@Override
+	protected void encode(ChannelHandlerContext ctx, Envelope env, ByteBuf out) throws Exception {
+		// --------------------------------------------------------------------
+		// (1) header (48 bytes)
+		// --------------------------------------------------------------------
+		out.writeInt(MAGIC_NUMBER); // 4 bytes
+
+		if (out.getInt(out.writerIndex()-4) != MAGIC_NUMBER) {
+			throw new RuntimeException();
+		}
+
+		out.writeInt(env.getSequenceNumber()); // 4 bytes
+		env.getJobID().writeTo(out); // 16 bytes
+		env.getSource().writeTo(out); // 16 bytes
+		out.writeInt(env.getEventsSerialized() != null ? env.getEventsSerialized().remaining() : 0); // 4 bytes
+		out.writeInt(env.getBuffer() != null ? env.getBuffer().size() : 0); // 4 bytes
+		// --------------------------------------------------------------------
+		// (2) events (var length)
+		// --------------------------------------------------------------------
+		if (env.getEventsSerialized() != null) {
+			out.writeBytes(env.getEventsSerialized());
+		}
+
+		// --------------------------------------------------------------------
+		// (3) buffer (var length)
+		// --------------------------------------------------------------------
+		if (env.getBuffer() != null) {
+			Buffer buffer = env.getBuffer();
+			out.writeBytes(buffer.getMemorySegment().wrap(0, buffer.size()));
+
+			// Recycle the buffer from OUR buffer pool after everything has been
+			// copied to Nettys buffer space.
+			buffer.recycleBuffer();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnection.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnection.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnection.java
deleted file mode 100644
index f22e6f7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnection.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.tcp;
-
-import eu.stratosphere.runtime.io.Buffer;
-import eu.stratosphere.runtime.io.network.ChannelManager;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
-import eu.stratosphere.runtime.io.network.envelope.EnvelopeReader;
-import eu.stratosphere.runtime.io.network.envelope.EnvelopeReader.DeserializationState;
-import eu.stratosphere.runtime.io.network.envelope.NoBufferAvailableException;
-import eu.stratosphere.util.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SelectionKey;
-
-/**
- * An incoming TCP connection through which data is read and transformed into {@link Envelope} objects.
- */
-public class IncomingConnection {
-
-	private static final Log LOG = LogFactory.getLog(IncomingConnection.class);
-
-	/** Readable byte channel (TCP socket) to read data from */
-	private final ReadableByteChannel channel;
-
-	/** Channel manager to dispatch complete envelopes */
-	private final ChannelManager channelManager;
-
-	/** Envelope reader to turn the channel data into envelopes */
-	private final EnvelopeReader reader;
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public IncomingConnection(ReadableByteChannel channel, ChannelManager channelManager) {
-		this.channel = channel;
-		this.channelManager = channelManager;
-		this.reader = new EnvelopeReader(channelManager);
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public void read() throws IOException, InterruptedException, NoBufferAvailableException {
-		DeserializationState deserializationState = this.reader.readNextChunk(this.channel);
-
-		switch (deserializationState) {
-			case COMPLETE:
-				Envelope envelope = this.reader.getFullyDeserializedTransferEnvelope();
-				this.channelManager.dispatchFromNetwork(envelope);
-				this.reader.reset();
-				break;
-
-			case NO_BUFFER_AVAILABLE:
-				throw new NoBufferAvailableException(this.reader.getBufferProvider());
-
-			case PENDING:
-				break;
-		}
-	}
-
-	public void reportTransmissionProblem(SelectionKey key, IOException ioe) {
-		LOG.error(StringUtils.stringifyException(ioe));
-
-		try {
-			this.channel.close();
-		} catch (IOException e) {
-			LOG.debug("An error occurred while closing the byte channel");
-		}
-
-		if (key != null) {
-			key.cancel();
-		}
-
-		Envelope pendingEnvelope = this.reader.getPendingEnvelope();
-		if (pendingEnvelope != null) {
-			if (pendingEnvelope.hasBuffer()) {
-				Buffer buffer = pendingEnvelope.getBuffer();
-				if (buffer != null) {
-					buffer.recycleBuffer();
-				}
-			}
-		}
-
-		this.reader.reset();
-	}
-
-	public boolean isCloseUnexpected() {
-		return this.reader.hasUnfinishedData();
-	}
-
-	public void closeConnection(SelectionKey key) {
-		try {
-			this.channel.close();
-		} catch (IOException ioe) {
-			LOG.error("An IOException occurred while closing the socket: + " + StringUtils.stringifyException(ioe));
-		}
-
-		if (key != null) {
-			key.cancel();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnectionThread.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnectionThread.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnectionThread.java
deleted file mode 100644
index 774ad4e..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnectionThread.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.tcp;
-
-import eu.stratosphere.runtime.io.network.ChannelManager;
-import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
-import eu.stratosphere.runtime.io.network.envelope.NoBufferAvailableException;
-import eu.stratosphere.util.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.Queue;
-
-public class IncomingConnectionThread extends Thread {
-
-	private static final Log LOG = LogFactory.getLog(IncomingConnectionThread.class);
-
-	private final ChannelManager channelManager;
-
-	private final Selector selector;
-
-	private final Queue<SelectionKey> pendingReadEventSubscribeRequests = new ArrayDeque<SelectionKey>();
-
-	private final ServerSocketChannel listeningSocket;
-
-	private static final class IncomingConnectionBufferAvailListener implements BufferAvailabilityListener {
-
-		private final Queue<SelectionKey> pendingReadEventSubscribeRequests;
-
-		private final SelectionKey key;
-
-		private IncomingConnectionBufferAvailListener(final Queue<SelectionKey> pendingReadEventSubscribeRequests,
-				final SelectionKey key) {
-
-			this.pendingReadEventSubscribeRequests = pendingReadEventSubscribeRequests;
-			this.key = key;
-		}
-
-		@Override
-		public void bufferAvailable() {
-
-			synchronized (this.pendingReadEventSubscribeRequests) {
-				this.pendingReadEventSubscribeRequests.add(this.key);
-			}
-		}
-	}
-
-	public IncomingConnectionThread(ChannelManager channelManager,
-			boolean isListeningThread, InetSocketAddress listeningAddress) throws IOException {
-		super("Incoming Connection Thread");
-
-		this.selector = Selector.open();
-		this.channelManager = channelManager;
-
-		if (isListeningThread) {
-			this.listeningSocket = ServerSocketChannel.open();
-			this.listeningSocket.configureBlocking(false);
-			listeningSocket.register(this.selector, SelectionKey.OP_ACCEPT);
-			this.listeningSocket.socket().bind(listeningAddress);
-			LOG.debug("Listening on " + this.listeningSocket.socket().getLocalSocketAddress());
-		} else {
-			this.listeningSocket = null;
-		}
-	}
-
-	@Override
-	public void run() {
-		try {
-			while (!this.isInterrupted()) {
-	
-				synchronized (this.pendingReadEventSubscribeRequests) {
-					while (!this.pendingReadEventSubscribeRequests.isEmpty()) {
-						final SelectionKey key = this.pendingReadEventSubscribeRequests.poll();
-						final IncomingConnection incomingConnection = (IncomingConnection) key.attachment();
-						final SocketChannel socketChannel = (SocketChannel) key.channel();
-	
-						try {
-							final SelectionKey newKey = socketChannel.register(this.selector, SelectionKey.OP_READ);
-							newKey.attach(incomingConnection);
-						} catch (ClosedChannelException e) {
-							incomingConnection.reportTransmissionProblem(key, e);
-						}
-					}
-				}
-	
-				try {
-					this.selector.select(500);
-				} catch (IOException e) {
-					LOG.error(e);
-				}
-	
-				final Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
-	
-				while (iter.hasNext()) {
-					final SelectionKey key = iter.next();
-	
-					iter.remove();
-					if (key.isValid()) {
-						if (key.isReadable()) {
-							doRead(key);
-						} else if (key.isAcceptable()) {
-							doAccept(key);
-						} else {
-							LOG.error("Unknown key: " + key);
-						}
-					} else {
-						LOG.error("Received invalid key: " + key);
-					}
-				}
-			}
-	
-			// Do cleanup, if necessary
-			if (this.listeningSocket != null) {
-				try {
-					this.listeningSocket.close();
-				} catch (IOException ioe) {
-					// Actually, we can ignore this exception
-					LOG.debug(ioe);
-				}
-			}
-	
-			// Finally, close the selector
-			try {
-				this.selector.close();
-			} catch (IOException ioe) {
-				LOG.debug(StringUtils.stringifyException(ioe));
-			}
-		}
-		catch (Throwable t) {
-			// this is a disaster, this task manager cannot go on!
-			LOG.fatal("Incoming connection thread died with an exception: " + t.getMessage(), t);
-			System.exit(1);
-		}
-	}
-
-	private void doAccept(SelectionKey key) {
-
-		SocketChannel clientSocket = null;
-
-		try {
-			clientSocket = this.listeningSocket.accept();
-			if (clientSocket == null) {
-				LOG.error("Client socket is null");
-				return;
-			}
-		} catch (IOException ioe) {
-			LOG.error(ioe);
-			return;
-		}
-
-		final IncomingConnection incomingConnection = new IncomingConnection(
-			clientSocket, this.channelManager);
-		SelectionKey clientKey = null;
-		try {
-			clientSocket.configureBlocking(false);
-			clientKey = clientSocket.register(this.selector, SelectionKey.OP_READ);
-			clientKey.attach(incomingConnection);
-		} catch (IOException ioe) {
-			incomingConnection.reportTransmissionProblem(clientKey, ioe);
-		}
-	}
-
-	private void doRead(SelectionKey key) {
-
-		final IncomingConnection incomingConnection = (IncomingConnection) key.attachment();
-		try {
-			incomingConnection.read();
-		} catch (EOFException eof) {
-			if (incomingConnection.isCloseUnexpected()) {
-				final SocketChannel socketChannel = (SocketChannel) key.channel();
-				LOG.error("Connection from " + socketChannel.socket().getRemoteSocketAddress()
-					+ " was closed unexpectedly");
-				incomingConnection.reportTransmissionProblem(key, eof);
-			} else {
-				incomingConnection.closeConnection(key);
-			}
-		} catch (IOException ioe) {
-			incomingConnection.reportTransmissionProblem(key, ioe);
-		} catch (InterruptedException e) {
-			// Nothing to do here
-		} catch (NoBufferAvailableException e) {
-			// There are no buffers available, unsubscribe from read event
-			final SocketChannel socketChannel = (SocketChannel) key.channel();
-			try {
-				final SelectionKey newKey = socketChannel.register(this.selector, 0);
-				newKey.attach(incomingConnection);
-			} catch (ClosedChannelException e1) {
-				incomingConnection.reportTransmissionProblem(key, e1);
-			}
-
-			final BufferAvailabilityListener bal = new IncomingConnectionBufferAvailListener(
-				this.pendingReadEventSubscribeRequests, key);
-			if (!e.getBufferProvider().registerBufferAvailabilityListener(bal)) {
-				// In the meantime, a buffer has become available again, subscribe to read event again
-
-				try {
-					final SelectionKey newKey = socketChannel.register(this.selector, SelectionKey.OP_READ);
-					newKey.attach(incomingConnection);
-				} catch (ClosedChannelException e1) {
-					incomingConnection.reportTransmissionProblem(key, e1);
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnection.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnection.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnection.java
deleted file mode 100644
index 7df1901..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnection.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.tcp;
-
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.runtime.io.network.RemoteReceiver;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
-import eu.stratosphere.runtime.io.network.envelope.EnvelopeWriter;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.Queue;
-
-/**
- * This class represents an outgoing TCP connection through which {@link eu.stratosphere.runtime.io.network.envelope.Envelope} objects can be sent.
- * {@link eu.stratosphere.runtime.io.network.envelope.Envelope} objects are received from the {@link eu.stratosphere.nephele.taskmanager.io.bytebuffered.ChannelManager} and added to a queue. An
- * additional network thread then takes the envelopes from the queue and transmits them to the respective destination
- * host.
- * 
- */
-public class OutgoingConnection {
-
-	/**
-	 * The log object used to report debug information and possible errors.
-	 */
-	private static final Log LOG = LogFactory.getLog(OutgoingConnection.class);
-
-	/**
-	 * The address this outgoing connection is connected to.
-	 */
-	private final RemoteReceiver remoteReceiver;
-
-	/**
-	 * The outgoing connection thread which actually transmits the queued transfer envelopes.
-	 */
-	private final OutgoingConnectionThread connectionThread;
-
-	/**
-	 * The queue of transfer envelopes to be transmitted.
-	 */
-	private final Queue<Envelope> queuedEnvelopes = new ArrayDeque<Envelope>();
-
-	/**
-	 * The {@link eu.stratosphere.runtime.io.network.envelope.Envelope} that is currently processed.
-	 */
-	private Envelope currentEnvelope = null;
-
-	/**
-	 * Stores whether the underlying TCP connection is established. As this variable is accessed by the byte buffered
-	 * channel manager and the outgoing connection thread, it must be protected by a monitor.
-	 */
-	private boolean isConnected = false;
-
-	/**
-	 * Stores whether is underlying TCP connection is subscribed to the NIO write event. As this variable is accessed by
-	 * the byte buffered channel and the outgoing connection thread, it must be protected by a monitor.
-	 */
-	private boolean isSubscribedToWriteEvent = false;
-
-	/**
-	 * The overall number of connection retries which shall be performed before a connection error is reported.
-	 */
-	private final int numberOfConnectionRetries;
-
-	/**
-	 * The number of connection retries left before an I/O error is reported.
-	 */
-	private int retriesLeft = 0;
-
-	/**
-	 * The timestamp of the last connection retry.
-	 */
-	private long timstampOfLastRetry = 0;
-
-	/**
-	 * The current selection key representing the interest set of the underlying TCP NIO connection. This variable may
-	 * only be accessed the the outgoing connection thread.
-	 */
-	private SelectionKey selectionKey = null;
-
-	/**
-	 * The period of time in milliseconds that shall be waited before a connection attempt is considered to be failed.
-	 */
-	private static long RETRYINTERVAL = 1000L; // 1 second
-
-	private EnvelopeWriter writer;
-
-	/**
-	 * Constructs a new outgoing connection object.
-	 * 
-	 * @param remoteReceiver
-	 *        the address of the destination host this outgoing connection object is supposed to connect to
-	 * @param connectionThread
-	 *        the connection thread which actually handles the network transfer
-	 * @param numberOfConnectionRetries
-	 *        the number of connection retries allowed before an I/O error is reported
-	 */
-	public OutgoingConnection(RemoteReceiver remoteReceiver, OutgoingConnectionThread connectionThread,
-			int numberOfConnectionRetries) {
-
-		this.remoteReceiver = remoteReceiver;
-		this.connectionThread = connectionThread;
-		this.numberOfConnectionRetries = numberOfConnectionRetries;
-		this.writer = new EnvelopeWriter();
-	}
-
-	/**
-	 * Adds a new {@link eu.stratosphere.runtime.io.network.envelope.Envelope} to the queue of envelopes to be transmitted to the destination host of this
-	 * connection.
-	 * <p>
-	 * This method should only be called by the {@link eu.stratosphere.nephele.taskmanager.io.bytebuffered.ChannelManager} object.
-	 * 
-	 * @param envelope
-	 *        the envelope to be added to the transfer queue
-	 */
-	public void queueEnvelope(Envelope envelope) {
-
-		synchronized (this.queuedEnvelopes) {
-
-			checkConnection();
-			this.queuedEnvelopes.add(envelope);
-		}
-	}
-
-	private void checkConnection() {
-
-		synchronized (this.queuedEnvelopes) {
-
-			if (!this.isConnected) {
-
-				this.retriesLeft = this.numberOfConnectionRetries;
-				this.timstampOfLastRetry = System.currentTimeMillis();
-				this.connectionThread.triggerConnect(this);
-				this.isConnected = true;
-				this.isSubscribedToWriteEvent = true;
-			} else {
-
-				if (!this.isSubscribedToWriteEvent) {
-					this.connectionThread.subscribeToWriteEvent(this.selectionKey);
-					this.isSubscribedToWriteEvent = true;
-				}
-			}
-
-		}
-	}
-
-	/**
-	 * Returns the {@link InetSocketAddress} to the destination host this outgoing connection is supposed to be
-	 * connected to.
-	 * <p>
-	 * This method should be called by the {@link OutgoingConnectionThread} object only.
-	 * 
-	 * @return the {@link InetSocketAddress} to the destination host this outgoing connection is supposed to be
-	 *         connected to
-	 */
-	public InetSocketAddress getConnectionAddress() {
-
-		return this.remoteReceiver.getConnectionAddress();
-	}
-
-	/**
-	 * Reports a problem which occurred while establishing the underlying TCP connection to this outgoing connection
-	 * object. Depending on the number of connection retries left, this method will either try to reestablish the TCP
-	 * connection or report an I/O error to all tasks which have queued envelopes for this connection. In the latter
-	 * case all queued envelopes will be dropped and all included buffers will be freed.
-	 * <p>
-	 * This method should only be called by the {@link OutgoingConnectionThread} object.
-	 * 
-	 * @param ioe
-	 *        thrown if an error occurs while reseting the underlying TCP connection
-	 */
-	public void reportConnectionProblem(IOException ioe) {
-
-		// First, write exception to log
-		final long currentTime = System.currentTimeMillis();
-		if (currentTime - this.timstampOfLastRetry >= RETRYINTERVAL) {
-			LOG.error("Cannot connect to " + this.remoteReceiver + ", " + this.retriesLeft + " retries left");
-		}
-
-		synchronized (this.queuedEnvelopes) {
-
-			if (this.selectionKey != null) {
-
-				final SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
-				if (socketChannel != null) {
-					try {
-						socketChannel.close();
-					} catch (IOException e) {
-						LOG.debug("Error while trying to close the socket channel to " + this.remoteReceiver);
-					}
-				}
-
-				this.selectionKey.cancel();
-				this.selectionKey = null;
-				this.isConnected = false;
-				this.isSubscribedToWriteEvent = false;
-			}
-
-			if (hasRetriesLeft(currentTime)) {
-				this.connectionThread.triggerConnect(this);
-				this.isConnected = true;
-				this.isSubscribedToWriteEvent = true;
-				return;
-			}
-
-			// Error is fatal
-			LOG.error(ioe);
-
-			// Notify source of current envelope and release buffer
-			if (this.currentEnvelope != null) {
-				if (this.currentEnvelope.getBuffer() != null) {
-					this.currentEnvelope.getBuffer().recycleBuffer();
-					this.currentEnvelope = null;
-				}
-			}
-
-			// Notify all other tasks which are waiting for data to be transmitted
-			final Iterator<Envelope> iter = this.queuedEnvelopes.iterator();
-			while (iter.hasNext()) {
-				final Envelope envelope = iter.next();
-				iter.remove();
-				// Recycle the buffer inside the envelope
-				if (envelope.getBuffer() != null) {
-					envelope.getBuffer().recycleBuffer();
-				}
-			}
-
-			this.queuedEnvelopes.clear();
-		}
-	}
-
-	/**
-	 * Reports an I/O error which occurred while writing data to the TCP connection. As a result of the I/O error the
-	 * connection is closed and the interest keys are canceled. Moreover, the task which queued the currently
-	 * transmitted transfer envelope is notified about the error and the current envelope is dropped. If the current
-	 * envelope contains a buffer, the buffer is freed.
-	 * <p>
-	 * This method should only be called by the {@link OutgoingConnectionThread} object.
-	 * 
-	 * @param ioe
-	 *        thrown if an error occurs while reseting the connection
-	 */
-	public void reportTransmissionProblem(IOException ioe) {
-
-		final SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
-
-		// First, write exception to log
-		if (this.currentEnvelope != null) {
-			LOG.error("The connection between " + socketChannel.socket().getLocalAddress() + " and "
-				+ socketChannel.socket().getRemoteSocketAddress()
-				+ " experienced an IOException for transfer envelope " + this.currentEnvelope.getSequenceNumber());
-		} else {
-			LOG.error("The connection between " + socketChannel.socket().getLocalAddress() + " and "
-				+ socketChannel.socket().getRemoteSocketAddress() + " experienced an IOException");
-		}
-
-		// Close the connection and cancel the interest key
-		synchronized (this.queuedEnvelopes) {
-			try {
-				LOG.debug("Closing connection to " + socketChannel.socket().getRemoteSocketAddress());
-				socketChannel.close();
-			} catch (IOException e) {
-				LOG.debug("An error occurred while responding to an IOException");
-				LOG.debug(e);
-			}
-
-			this.selectionKey.cancel();
-
-			// Error is fatal
-			LOG.error(ioe);
-
-			// Trigger new connection if there are more envelopes to be transmitted
-			if (this.queuedEnvelopes.isEmpty()) {
-				this.isConnected = false;
-				this.isSubscribedToWriteEvent = false;
-			} else {
-				this.connectionThread.triggerConnect(this);
-				this.isConnected = true;
-				this.isSubscribedToWriteEvent = true;
-			}
-
-			// We must assume the current envelope is corrupted so we notify the task which created it.
-			if (this.currentEnvelope != null) {
-				if (this.currentEnvelope.getBuffer() != null) {
-					this.currentEnvelope.getBuffer().recycleBuffer();
-					this.currentEnvelope = null;
-				}
-			}
-		}
-	}
-
-	/**
-	 * Checks whether further retries are left for establishing the underlying TCP connection.
-	 * 
-	 * @param currentTime
-	 *        the current system time in milliseconds since January 1st, 1970
-	 * @return <code>true</code> if there are retries left, <code>false</code> otherwise
-	 */
-	private boolean hasRetriesLeft(long currentTime) {
-
-		if (currentTime - this.timstampOfLastRetry >= RETRYINTERVAL) {
-			this.retriesLeft--;
-			this.timstampOfLastRetry = currentTime;
-			if (this.retriesLeft == 0) {
-				return false;
-			}
-		}
-
-		return true;
-	}
-
-	/**
-	 * Writes the content of the current {@link eu.stratosphere.runtime.io.network.envelope.Envelope} object to the underlying TCP connection.
-	 * <p>
-	 * This method should only be called by the {@link OutgoingConnectionThread} object.
-	 * 
-	 * @return <code>true</code> if there is more data from this/other queued envelopes to be written to this channel
-	 * @throws IOException
-	 *         thrown if an error occurs while writing the data to the channel
-	 */
-	public boolean write() throws IOException {
-
-		final WritableByteChannel writableByteChannel = (WritableByteChannel) this.selectionKey.channel();
-
-		if (this.currentEnvelope == null) {
-			synchronized (this.queuedEnvelopes) {
-				if (this.queuedEnvelopes.isEmpty()) {
-					return false;
-				} else {
-					this.currentEnvelope = this.queuedEnvelopes.peek();
-
-					this.writer.setEnvelopeForWriting(this.currentEnvelope);
-				}
-			}
-		}
-
-		if (!this.writer.writeNextChunk(writableByteChannel)) {
-			// Make sure we recycle the attached memory or file buffers correctly
-			if (this.currentEnvelope.getBuffer() != null) {
-				this.currentEnvelope.getBuffer().recycleBuffer();
-			}
-
-			synchronized (this.queuedEnvelopes) {
-				this.queuedEnvelopes.poll();
-				this.currentEnvelope = null;
-			}
-		}
-
-		return true;
-	}
-
-	/**
-	 * Requests to close the underlying TCP connection. The request is ignored if at least one {@link eu.stratosphere.runtime.io.network.envelope.Envelope}
-	 * is queued.
-	 * <p>
-	 * This method should only be called by the {@link OutgoingConnectionThread} object.
-	 * 
-	 * @throws IOException
-	 *         thrown if an error occurs while closing the TCP connection
-	 */
-	public void requestClose() throws IOException {
-
-		synchronized (this.queuedEnvelopes) {
-
-			if (this.queuedEnvelopes.isEmpty()) {
-
-				if (this.isSubscribedToWriteEvent) {
-
-					this.connectionThread.unsubscribeFromWriteEvent(this.selectionKey);
-					this.isSubscribedToWriteEvent = false;
-				}
-			}
-		}
-	}
-
-	/**
-	 * Closes the underlying TCP connection if no more {@link eu.stratosphere.runtime.io.network.envelope.Envelope} objects are in the transmission queue.
-	 * <p>
-	 * This method should only be called by the {@link OutgoingConnectionThread} object.
-	 * 
-	 * @throws IOException
-	 */
-	public void closeConnection() throws IOException {
-
-		synchronized (this.queuedEnvelopes) {
-
-			if (!this.queuedEnvelopes.isEmpty()) {
-				return;
-			}
-
-			if (this.selectionKey != null) {
-
-				final SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
-				socketChannel.close();
-				this.selectionKey.cancel();
-				this.selectionKey = null;
-			}
-
-			this.isConnected = false;
-			this.isSubscribedToWriteEvent = false;
-		}
-	}
-
-	/**
-	 * Returns the number of queued {@link eu.stratosphere.runtime.io.network.envelope.Envelope} objects with the given source channel ID.
-	 * 
-	 * @param sourceChannelID
-	 *        the source channel ID to count the queued envelopes for
-	 * @return the number of queued transfer envelopes with the given source channel ID
-	 */
-	public int getNumberOfQueuedEnvelopesFromChannel(final ChannelID sourceChannelID) {
-
-		synchronized (this.queuedEnvelopes) {
-
-			int number = 0;
-
-			final Iterator<Envelope> it = this.queuedEnvelopes.iterator();
-			while (it.hasNext()) {
-				final Envelope te = it.next();
-				if (sourceChannelID.equals(te.getSource())) {
-					number++;
-				}
-			}
-
-			return number;
-		}
-	}
-
-	/**
-	 * Removes all queued {@link eu.stratosphere.runtime.io.network.envelope.Envelope} objects from the transmission which match the given source channel
-	 * ID.
-	 * 
-	 * @param sourceChannelID
-	 *        the source channel ID of the transfered transfer envelopes to be dropped
-	 */
-	public void dropAllQueuedEnvelopesFromChannel(final ChannelID sourceChannelID) {
-
-		synchronized (this.queuedEnvelopes) {
-
-			final Iterator<Envelope> it = this.queuedEnvelopes.iterator();
-			while (it.hasNext()) {
-				final Envelope te = it.next();
-				if (sourceChannelID.equals(te.getSource())) {
-					it.remove();
-					if (te.getBuffer() != null) {
-						te.getBuffer().recycleBuffer();
-					}
-				}
-			}
-		}
-	}
-
-	/**
-	 * Checks whether this outgoing connection object manages an active connection or can be removed by the
-	 * {@link eu.stratosphere.nephele.taskmanager.io.bytebuffered.ChannelManager} object.
-	 * <p>
-	 * This method should only be called by the byte buffered channel manager.
-	 * 
-	 * @return <code>true</code> if this object is no longer manages an active connection and can be removed,
-	 *         <code>false</code> otherwise.
-	 */
-	public boolean canBeRemoved() {
-
-		synchronized (this.queuedEnvelopes) {
-
-			if (this.isConnected) {
-				return false;
-			}
-
-			if (this.currentEnvelope != null) {
-				return false;
-			}
-
-			return this.queuedEnvelopes.isEmpty();
-		}
-	}
-
-	/**
-	 * Sets the selection key representing the interest set of the underlying TCP NIO connection.
-	 * 
-	 * @param selectionKey
-	 *        the selection of the underlying TCP connection
-	 */
-	public void setSelectionKey(SelectionKey selectionKey) {
-		this.selectionKey = selectionKey;
-	}
-
-	/**
-	 * Returns the number of currently queued envelopes which contain a write buffer.
-	 * 
-	 * @return the number of currently queued envelopes which contain a write buffer
-	 */
-	public int getNumberOfQueuedWriteBuffers() {
-
-		int retVal = 0;
-
-		synchronized (this.queuedEnvelopes) {
-
-			final Iterator<Envelope> it = this.queuedEnvelopes.iterator();
-			while (it.hasNext()) {
-
-				final Envelope envelope = it.next();
-				if (envelope.getBuffer() != null) {
-					++retVal;
-				}
-			}
-		}
-
-		return retVal;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnectionThread.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnectionThread.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnectionThread.java
deleted file mode 100644
index dde26e3..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnectionThread.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.tcp;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Queue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.util.StringUtils;
-
-public class OutgoingConnectionThread extends Thread {
-
-	/**
-	 * The minimum time a TCP connection must be idle it is closed.
-	 */
-	private static final long MIN_IDLE_TIME_BEFORE_CLOSE = 80000L; // 80 seconds
-
-	private static final Log LOG = LogFactory.getLog(OutgoingConnectionThread.class);
-
-	private final Selector selector;
-
-	private final Queue<OutgoingConnection> pendingConnectionRequests = new ArrayDeque<OutgoingConnection>();
-
-	private final Queue<SelectionKey> pendingWriteEventSubscribeRequests = new ArrayDeque<SelectionKey>();
-
-	private final Map<OutgoingConnection, Long> connectionsToClose = new HashMap<OutgoingConnection, Long>();
-
-	public OutgoingConnectionThread() throws IOException {
-		super("Outgoing Connection Thread");
-
-		this.selector = Selector.open();
-	}
-
-
-	@Override
-	public void run() {
-		try {
-			while (!isInterrupted()) {
-	
-				synchronized (this.pendingConnectionRequests) {
-	
-					if (!this.pendingConnectionRequests.isEmpty()) {
-	
-						final OutgoingConnection outgoingConnection = this.pendingConnectionRequests.poll();
-						try {
-							final SocketChannel socketChannel = SocketChannel.open();
-							socketChannel.configureBlocking(false);
-							final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT);
-							socketChannel.connect(outgoingConnection.getConnectionAddress());
-							key.attach(outgoingConnection);
-						} catch (final IOException ioe) {
-							// IOException is reported by separate thread to avoid deadlocks
-							final Runnable reporterThread = new Runnable() {
-	
-								@Override
-								public void run() {
-									outgoingConnection.reportConnectionProblem(ioe);
-								}
-							};
-							new Thread(reporterThread).start();
-						}
-					}
-				}
-	
-				synchronized (this.pendingWriteEventSubscribeRequests) {
-	
-					if (!this.pendingWriteEventSubscribeRequests.isEmpty()) {
-						final SelectionKey oldSelectionKey = this.pendingWriteEventSubscribeRequests.poll();
-						final OutgoingConnection outgoingConnection = (OutgoingConnection) oldSelectionKey.attachment();
-						final SocketChannel socketChannel = (SocketChannel) oldSelectionKey.channel();
-	
-						try {
-							final SelectionKey newSelectionKey = socketChannel.register(this.selector, SelectionKey.OP_READ
-								| SelectionKey.OP_WRITE);
-							newSelectionKey.attach(outgoingConnection);
-							outgoingConnection.setSelectionKey(newSelectionKey);
-						} catch (final IOException ioe) {
-							// IOException is reported by separate thread to avoid deadlocks
-							final Runnable reporterThread = new Runnable() {
-	
-								@Override
-								public void run() {
-									outgoingConnection.reportTransmissionProblem(ioe);
-								}
-							};
-							new Thread(reporterThread).start();
-						}
-					}
-				}
-	
-				synchronized (this.connectionsToClose) {
-	
-					final Iterator<Map.Entry<OutgoingConnection, Long>> closeIt = this.connectionsToClose.entrySet()
-						.iterator();
-					final long now = System.currentTimeMillis();
-					while (closeIt.hasNext()) {
-	
-						final Map.Entry<OutgoingConnection, Long> entry = closeIt.next();
-						if ((entry.getValue().longValue() + MIN_IDLE_TIME_BEFORE_CLOSE) < now) {
-							final OutgoingConnection outgoingConnection = entry.getKey();
-							closeIt.remove();
-							// Create new thread to close connection to avoid deadlocks
-							final Runnable closeThread = new Runnable() {
-	
-								@Override
-								public void run() {
-									try {
-										outgoingConnection.closeConnection();
-									} catch (IOException ioe) {
-										outgoingConnection.reportTransmissionProblem(ioe);
-									}
-								}
-							};
-	
-							new Thread(closeThread).start();
-						}
-	
-					}
-				}
-	
-				try {
-					this.selector.select(10);
-				} catch (IOException e) {
-					LOG.error(e);
-				}
-	
-				final Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
-	
-				while (iter.hasNext()) {
-					final SelectionKey key = iter.next();
-	
-					iter.remove();
-					if (key.isValid()) {
-						if (key.isConnectable()) {
-							doConnect(key);
-						} else {
-							if (key.isReadable()) {
-								doRead(key);
-								// A read will always result in an exception, so the write key will not be valid anymore
-								continue;
-							}
-							if (key.isWritable()) {
-								doWrite(key);
-							}
-						}
-					} else {
-						LOG.error("Received invalid key: " + key);
-					}
-				}
-			}
-	
-			// Finally, try to close the selector
-			try {
-				this.selector.close();
-			} catch (IOException ioe) {
-				LOG.debug(StringUtils.stringifyException(ioe));
-			}
-		}
-		catch (Throwable t) {
-			// this is a disaster, this task manager cannot go on!
-			LOG.fatal("Outgoing connection thread died with an exception: " + t.getMessage(), t);
-			System.exit(1);
-		}
-	}
-
-	private void doConnect(SelectionKey key) {
-
-		final OutgoingConnection outgoingConnection = (OutgoingConnection) key.attachment();
-		final SocketChannel socketChannel = (SocketChannel) key.channel();
-		try {
-			while (!socketChannel.finishConnect()) {
-				try {
-					Thread.sleep(100);
-				} catch (InterruptedException e1) {
-					LOG.error(e1);
-				}
-			}
-
-			final SelectionKey channelKey = socketChannel.register(selector, SelectionKey.OP_WRITE
-				| SelectionKey.OP_READ);
-			outgoingConnection.setSelectionKey(channelKey);
-			channelKey.attach(outgoingConnection);
-
-		} catch (IOException ioe) {
-			outgoingConnection.reportConnectionProblem(ioe);
-		}
-	}
-
-	private void doWrite(SelectionKey key) {
-
-		final OutgoingConnection outgoingConnection = (OutgoingConnection) key.attachment();
-
-		try {
-
-			if (!outgoingConnection.write()) {
-				// Try to close the connection
-				outgoingConnection.requestClose();
-			}
-
-		} catch (IOException ioe) {
-			outgoingConnection.reportTransmissionProblem(ioe);
-		}
-	}
-
-	private void doRead(SelectionKey key) {
-
-		final SocketChannel socketChannel = (SocketChannel) key.channel();
-		final OutgoingConnection outgoingConnection = (OutgoingConnection) key.attachment();
-		final ByteBuffer buf = ByteBuffer.allocate(8);
-
-		try {
-
-			if (socketChannel.read(buf) == -1) {
-				outgoingConnection.reportTransmissionProblem(new IOException(
-					"Read unexpected EOF from channel"));
-			} else {
-				LOG.error("Outgoing connection read real data from channel");
-			}
-		} catch (IOException ioe) {
-			outgoingConnection.reportTransmissionProblem(ioe);
-		}
-	}
-
-	public void triggerConnect(OutgoingConnection outgoingConnection) {
-
-		synchronized (this.pendingConnectionRequests) {
-			this.pendingConnectionRequests.add(outgoingConnection);
-		}
-	}
-
-	public void unsubscribeFromWriteEvent(SelectionKey selectionKey) throws IOException {
-
-		final SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
-		final OutgoingConnection outgoingConnection = (OutgoingConnection) selectionKey.attachment();
-
-		final SelectionKey newSelectionKey = socketChannel.register(this.selector, SelectionKey.OP_READ);
-		newSelectionKey.attach(outgoingConnection);
-		outgoingConnection.setSelectionKey(newSelectionKey);
-
-		synchronized (this.connectionsToClose) {
-			this.connectionsToClose.put(outgoingConnection, Long.valueOf(System.currentTimeMillis()));
-		}
-	}
-
-	public void subscribeToWriteEvent(SelectionKey selectionKey) {
-
-		synchronized (this.pendingWriteEventSubscribeRequests) {
-			this.pendingWriteEventSubscribeRequests.add(selectionKey);
-		}
-		synchronized (this.connectionsToClose) {
-			this.connectionsToClose.remove((OutgoingConnection) selectionKey.attachment());
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/AdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/AdaptiveSpanningRecordDeserializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/AdaptiveSpanningRecordDeserializer.java
index f4c8aec..696915f 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/AdaptiveSpanningRecordDeserializer.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/AdaptiveSpanningRecordDeserializer.java
@@ -15,9 +15,6 @@ package eu.stratosphere.runtime.io.serialization;
 
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.runtime.io.serialization.DataInputDeserializer;
-import eu.stratosphere.runtime.io.serialization.DataOutputSerializer;
-import eu.stratosphere.runtime.io.serialization.RecordDeserializer;
 
 import java.io.DataInput;
 import java.io.EOFException;
@@ -62,6 +59,7 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 		// check if we can get a full length;
 		if (nonSpanningRemaining >= 4) {
 			int len = this.nonSpanningWrapper.readInt();
+
 			if (len <= nonSpanningRemaining - 4) {
 				// we can get a full record from here
 				target.read(this.nonSpanningWrapper);
@@ -156,8 +154,9 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 
 		@Override
 		public final void readFully(byte[] b, int off, int len) throws IOException {
-			if (off < 0 || len < 0 || off + len > b.length)
+			if (off < 0 || len < 0 || off + len > b.length) {
 				throw new IndexOutOfBoundsException();
+			}
 			
 			this.segment.get(this.position, b, off, len);
 			this.position += len;
@@ -230,14 +229,16 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 			try {
 				int b;
 				while ((b = readUnsignedByte()) != '\n') {
-					if (b != '\r')
+					if (b != '\r') {
 						bld.append((char) b);
+					}
 				}
 			}
 			catch (EOFException eofex) {}
 
-			if (bld.length() == 0)
+			if (bld.length() == 0) {
 				return null;
+			}
 			
 			// trim a trailing carriage return
 			int len = bld.length();
@@ -275,8 +276,9 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 
 			while (count < utflen) {
 				c = (int) bytearr[count] & 0xff;
-				if (c > 127)
+				if (c > 127) {
 					break;
+				}
 				count++;
 				chararr[chararr_count++] = (char) c;
 			}
@@ -298,21 +300,25 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 				case 12:
 				case 13:
 					count += 2;
-					if (count > utflen)
+					if (count > utflen) {
 						throw new UTFDataFormatException("malformed input: partial character at end");
+					}
 					char2 = (int) bytearr[count - 1];
-					if ((char2 & 0xC0) != 0x80)
+					if ((char2 & 0xC0) != 0x80) {
 						throw new UTFDataFormatException("malformed input around byte " + count);
+					}
 					chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
 					break;
 				case 14:
 					count += 3;
-					if (count > utflen)
+					if (count > utflen) {
 						throw new UTFDataFormatException("malformed input: partial character at end");
+					}
 					char2 = (int) bytearr[count - 2];
 					char3 = (int) bytearr[count - 1];
-					if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
+					if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
 						throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+					}
 					chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
 					break;
 				default:
@@ -325,8 +331,9 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 		
 		@Override
 		public final int skipBytes(int n) throws IOException {
-			if (n < 0)
+			if (n < 0) {
 				throw new IllegalArgumentException();
+			}
 			
 			int toSkip = Math.min(n, remaining());
 			this.position += toSkip;
@@ -390,6 +397,7 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 					return;
 				} else {
 					this.recordLength = this.lengthBuffer.getInt(0);
+
 					this.lengthBuffer.clear();
 					segmentPosition = toPut;
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataInputDeserializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataInputDeserializer.java
index e6479fe..a8a53fe 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataInputDeserializer.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataInputDeserializer.java
@@ -63,11 +63,13 @@ public class DataInputDeserializer implements DataInput {
 	}
 
 	public void setBuffer(byte[] buffer, int start, int len) {
-		if (buffer == null)
+		if (buffer == null) {
 			throw new NullPointerException();
+		}
 
-		if (start < 0 || len < 0 || start + len >= buffer.length)
+		if (start < 0 || len < 0 || start + len >= buffer.length) {
 			throw new IllegalArgumentException();
+		}
 
 		this.buffer = buffer;
 		this.position = start;
@@ -144,7 +146,7 @@ public class DataInputDeserializer implements DataInput {
 			@SuppressWarnings("restriction")
 			int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position);
 			if (LITTLE_ENDIAN) {
-				 value = Integer.reverseBytes(value);
+				value = Integer.reverseBytes(value);
 			}
 			
 			this.position += 4;
@@ -183,7 +185,7 @@ public class DataInputDeserializer implements DataInput {
 			@SuppressWarnings("restriction")
 			long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position);
 			if (LITTLE_ENDIAN) {
-				 value = Long.reverseBytes(value);
+				value = Long.reverseBytes(value);
 			}
 			this.position += 8;
 			return value;
@@ -215,8 +217,9 @@ public class DataInputDeserializer implements DataInput {
 
 		while (count < utflen) {
 			c = (int) bytearr[count] & 0xff;
-			if (c > 127)
+			if (c > 127) {
 				break;
+			}
 			count++;
 			chararr[chararr_count++] = (char) c;
 		}
@@ -240,22 +243,26 @@ public class DataInputDeserializer implements DataInput {
 			case 13:
 				/* 110x xxxx 10xx xxxx */
 				count += 2;
-				if (count > utflen)
+				if (count > utflen) {
 					throw new UTFDataFormatException("malformed input: partial character at end");
+				}
 				char2 = (int) bytearr[count - 1];
-				if ((char2 & 0xC0) != 0x80)
+				if ((char2 & 0xC0) != 0x80) {
 					throw new UTFDataFormatException("malformed input around byte " + count);
+				}
 				chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
 				break;
 			case 14:
 				/* 1110 xxxx 10xx xxxx 10xx xxxx */
 				count += 3;
-				if (count > utflen)
+				if (count > utflen) {
 					throw new UTFDataFormatException("malformed input: partial character at end");
+				}
 				char2 = (int) bytearr[count - 2];
 				char3 = (int) bytearr[count - 1];
-				if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
+				if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
 					throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+				}
 				chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
 				break;
 			default:

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataOutputSerializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataOutputSerializer.java
index b5171b9..ce088f0 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataOutputSerializer.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataOutputSerializer.java
@@ -196,9 +196,9 @@ public class DataOutputSerializer implements DataOutput {
 			}
 		}
 
-		if (utflen > 65535)
+		if (utflen > 65535) {
 			throw new UTFDataFormatException("Encoded string is too long: " + utflen);
-		
+		}
 		else if (this.position > this.buffer.length - utflen - 2) {
 			resize(utflen + 2);
 		}
@@ -212,8 +212,9 @@ public class DataOutputSerializer implements DataOutput {
 		int i = 0;
 		for (i = 0; i < strlen; i++) {
 			c = str.charAt(i);
-			if (!((c >= 0x0001) && (c <= 0x007F)))
+			if (!((c >= 0x0001) && (c <= 0x007F))) {
 				break;
+			}
 			bytearr[count++] = (byte) c;
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializer.java
index 443f8d8..b54496a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializer.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializer.java
@@ -68,6 +68,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 
 		// write data and length
 		record.write(this.serializationBuffer);
+
 		this.lengthBuffer.putInt(0, this.serializationBuffer.length());
 
 		this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
@@ -103,8 +104,9 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	 * @param source the {@link ByteBuffer} to copy data from
 	 */
 	private void copyToTargetBufferFrom(ByteBuffer source) {
-		if (this.targetBuffer == null)
+		if (this.targetBuffer == null) {
 			return;
+		}
 
 		int needed = source.remaining();
 		int available = this.limit - this.position;
@@ -127,8 +129,9 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 
 	@Override
 	public Buffer getCurrentBuffer() {
-		if (targetBuffer == null)
+		if (targetBuffer == null) {
 			return null;
+		}
 
 		this.targetBuffer.limitSize(this.position);
 		return this.targetBuffer;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java
index 09b244f..3b2ad69 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java
@@ -70,7 +70,7 @@ public class TestBufferProvider implements BufferProvider {
 	}
 
 	@Override
-	public boolean registerBufferAvailabilityListener(BufferAvailabilityListener bufferAvailabilityListener) {
+	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener bufferAvailabilityListener) {
 		throw new UnsupportedOperationException();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
index c66d821..13fbfbc 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
@@ -33,7 +33,6 @@ import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializerFactory
 import eu.stratosphere.core.memory.DataInputView;
 import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.nephele.io.ChannelSelector;
 import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
 import eu.stratosphere.pact.runtime.shipping.OutputEmitter;
 import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
index 0b968d8..2bd5d98 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
@@ -28,8 +28,6 @@ import org.junit.Test;
 import eu.stratosphere.api.common.distributions.DataDistribution;
 import eu.stratosphere.api.common.distributions.UniformIntegerDistribution;
 import eu.stratosphere.runtime.io.api.ChannelSelector;
-import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordComparator;
-import eu.stratosphere.nephele.io.ChannelSelector;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
 import eu.stratosphere.pact.runtime.shipping.RecordOutputEmitter;
 import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
index a397312..b715a4e 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
@@ -44,7 +44,9 @@ import eu.stratosphere.util.MutableObjectIterator;
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.FutureTask;
 
 public class MockEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner {
 	
@@ -124,8 +126,8 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 	}
 
 	@Override
-	public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
-		return false;
+	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+		return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_POOL_DESTROYED;
 	}
 
 	@Override
@@ -338,4 +340,9 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 	public BufferProvider getOutputBufferProvider() {
 		return this;
 	}
+
+	@Override
+	public Map<String, FutureTask<Path>> getCopyTask() {
+		return null;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java
deleted file mode 100644
index b8914a8..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.envelope;
-
-import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.util.DiscardingRecycler;
-import eu.stratosphere.nephele.util.TestBufferProvider;
-import eu.stratosphere.runtime.io.Buffer;
-import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
-import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
-import eu.stratosphere.runtime.io.BufferRecycler;
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-
-public class EnvelopeReaderWriterTest {
-	
-	private final long RANDOM_SEED = 520346508276087l;
-
-	private static final int BUFFER_SIZE = 32768;
-	
-	private static final byte BUFFER_CONTENT = 13;
-	
-	private final int[] BUFFER_SIZES = { 0, 2, BUFFER_SIZE, 3782, 88, 0, 23};
-	
-	private final AbstractEvent[][] EVENT_LISTS = {
-		{},
-		{},
-		{},
-		{ new TestEvent1(34872527) },
-		{ new TestEvent1(8749653), new TestEvent1(365345) },
-		{ new TestEvent2(34563456), new TestEvent1(598432), new TestEvent2(976293845) },
-		{}
-	};
-
-	@Test
-	public void testWriteAndRead() {
-		
-		Assert.assertTrue("Test broken.", BUFFER_SIZES.length == EVENT_LISTS.length);
-
-		File testFile = null;
-		RandomAccessFile raf = null;
-		try {
-			testFile = File.createTempFile("envelopes", ".tmp");
-			raf = new RandomAccessFile(testFile, "rw");
-			
-			// write
-			FileChannel c = raf.getChannel();
-			writeEnvelopes(c);
-			
-			// read
-			c.position(0);
-			readEnvelopes(c, -1.0f);
-			c.close();
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-		finally {
-			if (raf != null)
-				try { raf.close(); } catch (Throwable t) {}
-			
-			if (testFile != null)
-				testFile.delete();
-		}
-	}
-	
-	@Test
-	public void testWriteAndReadChunked() {
-		
-		Assert.assertTrue("Test broken.", BUFFER_SIZES.length == EVENT_LISTS.length);
-
-		File testFile = null;
-		RandomAccessFile raf = null;
-		try {
-			testFile = File.createTempFile("envelopes", ".tmp");
-			raf = new RandomAccessFile(testFile, "rw");
-			
-			// write
-			FileChannel c = raf.getChannel();
-			writeEnvelopes(new ChunkedWriteableChannel(c));
-			
-			// read
-			c.position(0);
-			readEnvelopes(new ChunkedReadableChannel(c), 0.75f);
-			c.close();
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-		finally {
-			if (raf != null)
-				try { raf.close(); } catch (Throwable t) {}
-			
-			if (testFile != null)
-				testFile.delete();
-		}
-	}
-
-	private void writeEnvelopes(WritableByteChannel channel) throws IOException {
-
-		final BufferRecycler recycler = new DiscardingRecycler();
-		final Random rand = new Random(RANDOM_SEED);
-		
-		final EnvelopeWriter serializer = new EnvelopeWriter();
-		
-		final int NUM_ENVS = BUFFER_SIZES.length;
-		
-		for (int i = 0; i < NUM_ENVS; i++) {
-			int seqNum = Math.abs(rand.nextInt());
-			JobID jid = new JobID(rand.nextLong(), rand.nextLong());
-			ChannelID sid = new ChannelID(rand.nextLong(), rand.nextLong());
-			
-			Envelope env = new Envelope(seqNum, jid, sid);
-			if (EVENT_LISTS[i].length > 0) {
-				env.serializeEventList(Arrays.asList(EVENT_LISTS[i]));
-			}
-			
-			int bufferSize = BUFFER_SIZES[i];
-			if (bufferSize > 0) {
-				MemorySegment ms = new MemorySegment(new byte[BUFFER_SIZE]);
-				for (int x = 0; x < bufferSize; x++) {
-					ms.put(x, BUFFER_CONTENT);
-				}
-				
-				Buffer mb = new Buffer(ms, bufferSize, recycler);
-				env.setBuffer(mb);
-			}
-			
-			serializer.setEnvelopeForWriting(env);
-			
-			while (serializer.writeNextChunk(channel));
-		}
-	}
-	
-	private void readEnvelopes(ReadableByteChannel channel, float probabilityForNoBufferCurrently) throws IOException {
-		
-		final Random rand = new Random(RANDOM_SEED);
-		
-		final EnvelopeReader reader = new EnvelopeReader(new OneForAllBroker(BUFFER_SIZE, probabilityForNoBufferCurrently));
-		
-		final int NUM_ENVS = BUFFER_SIZES.length;
-		
-		for (int i = 0; i < NUM_ENVS; i++) {
-			int expectedSeqNum = Math.abs(rand.nextInt());
-			JobID expectedJid = new JobID(rand.nextLong(), rand.nextLong());
-			ChannelID expectedSid = new ChannelID(rand.nextLong(), rand.nextLong());
-			
-			// read the next envelope
-			while (reader.readNextChunk(channel) != EnvelopeReader.DeserializationState.COMPLETE);
-			Envelope env = reader.getFullyDeserializedTransferEnvelope();
-			
-			// check the basic fields from the header
-			Assert.assertEquals(expectedSeqNum, env.getSequenceNumber());
-			Assert.assertEquals(expectedJid, env.getJobID());
-			Assert.assertEquals(expectedSid, env.getSource());
-			
-			// check the events
-			List<? extends AbstractEvent> events = env.deserializeEvents();
-			Assert.assertEquals(EVENT_LISTS[i].length, events.size());
-			
-			for (int n = 0; n < EVENT_LISTS[i].length; n++) {
-				AbstractEvent expectedEvent = EVENT_LISTS[i][n];
-				AbstractEvent actualEvent = events.get(n);
-				
-				Assert.assertEquals(expectedEvent.getClass(), actualEvent.getClass());
-				Assert.assertEquals(expectedEvent, actualEvent);
-			}
-			
-			// check the buffer
-			Buffer buf = env.getBuffer();
-			if (buf == null) {
-				Assert.assertTrue(BUFFER_SIZES[i] == 0);
-			} else {
-				Assert.assertEquals(BUFFER_SIZES[i], buf.size());
-				for (int k = 0; k < BUFFER_SIZES[i]; k++) {
-					Assert.assertEquals(BUFFER_CONTENT, buf.getMemorySegment().get(k));
-				}
-			}
-			
-			reader.reset();
-		}
-		
-	}
-	
-	
-	public  static final class TestEvent1 extends AbstractEvent {
-
-		private long id;
-		
-		public TestEvent1() {}
-		
-		public TestEvent1(long id) {
-			this.id = id;
-		}
-		
-		@Override
-		public void write(DataOutput out) throws IOException {
-			out.writeLong(id);
-		}
-
-		@Override
-		public void read(DataInput in) throws IOException {
-			id = in.readLong();
-		}
-		
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == TestEvent1.class) {
-				return ((TestEvent1) obj).id == this.id;
-			} else {
-				return false;
-			}
-		}
-		
-		@Override
-		public int hashCode() {
-			return ((int) id) ^ ((int) (id >>> 32));
-		}
-		
-		@Override
-		public String toString() {
-			return "TestEvent1 (" + id + ")";
-		}
-	}
-	
-	public static final class TestEvent2 extends AbstractEvent {
-
-		private long id;
-		
-		public TestEvent2() {}
-		
-		public TestEvent2(long id) {
-			this.id = id;
-		}
-		
-		@Override
-		public void write(DataOutput out) throws IOException {
-			out.writeLong(id);
-		}
-
-		@Override
-		public void read(DataInput in) throws IOException {
-			id = in.readLong();
-		}
-		
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == TestEvent2.class) {
-				return ((TestEvent2) obj).id == this.id;
-			} else {
-				return false;
-			}
-		}
-		
-		@Override
-		public int hashCode() {
-			return ((int) id) ^ ((int) (id >>> 32));
-		}
-		
-		@Override
-		public String toString() {
-			return "TestEvent2 (" + id + ")";
-		}
-	}
-	
-	private static final class ChunkedWriteableChannel implements WritableByteChannel {
-		
-		private final WritableByteChannel delegate;
-		
-		private final Random rnd;
-		
-		private ChunkedWriteableChannel(WritableByteChannel delegate) {
-			this.delegate = delegate;
-			this.rnd = new Random();
-		}
-
-		@Override
-		public boolean isOpen() {
-			return this.delegate.isOpen();
-		}
-
-		@Override
-		public void close() throws IOException {
-			this.delegate.close();
-		}
-
-		@Override
-		public int write(ByteBuffer src) throws IOException {
-			final int available = src.remaining();
-			final int oldLimit = src.limit();
-			
-			int toWrite = rnd.nextInt(available) + 1;
-			toWrite = Math.min(Math.max(toWrite, 8), available);
-			
-			src.limit(src.position() + toWrite);
-			
-			int written = this.delegate.write(src);
-			
-			src.limit(oldLimit);
-			
-			return written;
-		}
-	}
-	
-	private static final class ChunkedReadableChannel implements ReadableByteChannel {
-		
-		private final ReadableByteChannel delegate;
-		
-		private final Random rnd;
-		
-		private ChunkedReadableChannel(ReadableByteChannel delegate) {
-			this.delegate = delegate;
-			this.rnd = new Random();
-		}
-
-		@Override
-		public boolean isOpen() {
-			return this.delegate.isOpen();
-		}
-
-		@Override
-		public void close() throws IOException {
-			this.delegate.close();
-		}
-
-		@Override
-		public int read(ByteBuffer dst) throws IOException {
-			final int available = dst.remaining();
-			final int oldLimit = dst.limit();
-			
-			int toRead = rnd.nextInt(available) + 1;
-			toRead = Math.min(Math.max(toRead, 8), available);
-			
-			dst.limit(dst.position() + toRead);
-			
-			int read = this.delegate.read(dst);
-			
-			dst.limit(oldLimit);
-			
-			return read;
-		}
-	}
-	
-	private static final class OneForAllBroker implements BufferProviderBroker {
-
-		private final TestBufferProvider provider;
-
-		private OneForAllBroker(int sizeOfMemorySegments) {
-			this.provider = new TestBufferProvider(sizeOfMemorySegments);
-		}
-		
-		private OneForAllBroker(int sizeOfMemorySegments, float probabilityForNoBufferCurrently) {
-			this.provider = new TestBufferProvider(sizeOfMemorySegments, probabilityForNoBufferCurrently);
-		}
-		
-		@Override
-		public BufferProvider getBufferProvider(JobID jobID, ChannelID sourceChannelID) {
-			return this.provider;
-		}
-	}
-}