You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/10 21:35:25 UTC

[28/34] Replace custom Java NIO TCP/IP code with Netty 4 library

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
index 405d79e..7b01f4e 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
@@ -31,15 +31,13 @@ import eu.stratosphere.nephele.AbstractID;
 import eu.stratosphere.runtime.io.Buffer;
 import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
 import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
+import eu.stratosphere.runtime.io.network.bufferprovider.DiscardBufferPool;
 import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
 import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
-import eu.stratosphere.runtime.io.network.bufferprovider.SerialSingleBufferPool;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
-import eu.stratosphere.runtime.io.network.envelope.EnvelopeDispatcher;
-import eu.stratosphere.runtime.io.network.envelope.EnvelopeReceiverList;
 import eu.stratosphere.runtime.io.gates.GateID;
 import eu.stratosphere.runtime.io.gates.InputGate;
 import eu.stratosphere.runtime.io.gates.OutputGate;
+import eu.stratosphere.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -52,7 +50,7 @@ import java.util.concurrent.ConcurrentHashMap;
 /**
  * The channel manager sets up the network buffers and dispatches data between channels.
  */
-public final class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker {
+public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker {
 
 	private static final Log LOG = LogFactory.getLog(ChannelManager.class);
 
@@ -68,20 +66,27 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
 
 	private final GlobalBufferPool globalBufferPool;
 
-	private final NetworkConnectionManager networkConnectionManager;
+	private final NettyConnectionManager nettyConnectionManager;
 	
 	private final InetSocketAddress ourAddress;
 	
-	private final SerialSingleBufferPool discardingDataPool;
+	private final DiscardBufferPool discardBufferPool;
 
 	// -----------------------------------------------------------------------------------------------------------------
 
 	public ChannelManager(ChannelLookupProtocol channelLookupService, InstanceConnectionInfo connectionInfo,
-						  int numNetworkBuffers, int networkBufferSize) throws IOException {
+						int numNetworkBuffers, int networkBufferSize,
+						int numInThreads, int numOutThreads,
+						int lowWatermark, int highWaterMark) throws IOException {
+
 		this.channelLookupService = channelLookupService;
 		this.connectionInfo = connectionInfo;
+
 		this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize);
-		this.networkConnectionManager = new NetworkConnectionManager(this, connectionInfo.address(), connectionInfo.dataPort());
+
+		this.nettyConnectionManager = new NettyConnectionManager(
+				this, connectionInfo.address(), connectionInfo.dataPort(),
+				networkBufferSize, numInThreads, numOutThreads, lowWatermark, highWaterMark);
 
 		// management data structures
 		this.channels = new ConcurrentHashMap<ChannelID, Channel>();
@@ -91,11 +96,11 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
 		this.ourAddress = new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort());
 		
 		// a special pool if the data is to be discarded
-		this.discardingDataPool = new SerialSingleBufferPool(networkBufferSize);
+		this.discardBufferPool = new DiscardBufferPool();
 	}
 
 	public void shutdown() {
-		this.networkConnectionManager.shutDown();
+		this.nettyConnectionManager.shutdown();
 		this.globalBufferPool.destroy();
 	}
 
@@ -301,7 +306,7 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
 		}
 	}
 
-	private void generateSenderHint(Envelope envelope, RemoteReceiver receiver) {
+	private void generateSenderHint(Envelope envelope, RemoteReceiver receiver) throws IOException {
 		Channel channel = this.channels.get(envelope.getSource());
 		if (channel == null) {
 			LOG.error("Cannot find channel for channel ID " + envelope.getSource());
@@ -319,7 +324,7 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
 		final RemoteReceiver ourAddress = new RemoteReceiver(this.ourAddress, connectionIndex);
 		final Envelope senderHint = SenderHintEvent.createEnvelopeWithEvent(envelope, targetChannelID, ourAddress);
 
-		this.networkConnectionManager.queueEnvelopeForTransfer(receiver, senderHint);
+		this.nettyConnectionManager.enqueue(senderHint, receiver);
 	}
 
 	/**
@@ -331,7 +336,6 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
 	 *        the source channel ID for which the receiver list shall be retrieved
 	 * @return the list of receivers or <code>null</code> if the receiver could not be determined
 	 * @throws IOException
-	 * @throws InterruptedException
 	 */
 	private EnvelopeReceiverList getReceiverList(JobID jobID, ChannelID sourceChannelID, boolean reportException) throws IOException {
 		EnvelopeReceiverList receiverList = this.receiverCache.get(sourceChannelID);
@@ -383,8 +387,10 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
 		this.receiverCache.put(sourceChannelID, receiverList);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Receivers for source channel ID " + sourceChannelID + " at task manager " + this.connectionInfo +
-				": " + receiverList);
+			LOG.debug(String.format("Receiver for %s: %s [%s])",
+					sourceChannelID,
+					receiverList.hasLocalReceiver() ? receiverList.getLocalReceiver() : receiverList.getRemoteReceiver(),
+					receiverList.hasLocalReceiver() ? "local" : "remote"));
 		}
 
 		return receiverList;
@@ -436,7 +442,7 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
 					} catch (InterruptedException e) {
 						throw new IOException(e.getMessage());
 					}
-					
+
 					srcBuffer.copyToBuffer(destBuffer);
 					envelope.setBuffer(destBuffer);
 					srcBuffer.recycleBuffer();
@@ -453,7 +459,7 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
 					generateSenderHint(envelope, remoteReceiver);
 				}
 
-				this.networkConnectionManager.queueEnvelopeForTransfer(remoteReceiver, envelope);
+				this.nettyConnectionManager.enqueue(envelope, remoteReceiver);
 				success = true;
 			}
 		} finally {
@@ -501,7 +507,7 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
 				generateSenderHint(envelope, remoteReceiver);
 			}
 
-			this.networkConnectionManager.queueEnvelopeForTransfer(remoteReceiver, envelope);
+			this.nettyConnectionManager.enqueue(envelope, remoteReceiver);
 		}
 	}
 
@@ -597,7 +603,7 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
 		
 		// check if the receiver is already gone
 		if (receiverList == null) {
-			return this.discardingDataPool;
+			return this.discardBufferPool;
 		}
 
 		if (!receiverList.hasLocalReceiver() || receiverList.hasRemoteReceiver()) {
@@ -610,7 +616,7 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
 		
 		if (channel == null) {
 			// receiver is already canceled
-			return this.discardingDataPool;
+			return this.discardBufferPool;
 		}
 
 		if (!channel.isInputChannel()) {
@@ -633,8 +639,6 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
 			bufferPool.logBufferUtilization();
 		}
 
-		this.networkConnectionManager.logBufferUtilization();
-
 		System.out.println("\tIncoming connections:");
 
 		for (Channel channel : this.channels.values()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/Envelope.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/Envelope.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/Envelope.java
new file mode 100644
index 0000000..0f8bae8
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/Envelope.java
@@ -0,0 +1,178 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.runtime.io.serialization.DataInputDeserializer;
+import eu.stratosphere.runtime.io.serialization.DataOutputSerializer;
+import eu.stratosphere.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public final class Envelope {
+
+	private final JobID jobID;
+
+	private final ChannelID source;
+
+	private final int sequenceNumber;
+
+	private ByteBuffer serializedEventList;
+
+	private Buffer buffer;
+
+	public Envelope(int sequenceNumber, JobID jobID, ChannelID source) {
+		this.sequenceNumber = sequenceNumber;
+		this.jobID = jobID;
+		this.source = source;
+	}
+
+	private Envelope(Envelope toDuplicate) {
+		this.jobID = toDuplicate.jobID;
+		this.source = toDuplicate.source;
+		this.sequenceNumber = toDuplicate.sequenceNumber;
+		this.serializedEventList = null;
+		this.buffer = null;
+	}
+
+	public Envelope duplicate() {
+		Envelope duplicate = new Envelope(this);
+		if (hasBuffer()) {
+			duplicate.setBuffer(this.buffer.duplicate());
+		}
+
+		return duplicate;
+	}
+
+	public Envelope duplicateWithoutBuffer() {
+		return new Envelope(this);
+	}
+
+	public JobID getJobID() {
+		return this.jobID;
+	}
+
+	public ChannelID getSource() {
+		return this.source;
+	}
+
+	public int getSequenceNumber() {
+		return this.sequenceNumber;
+	}
+
+	public void setEventsSerialized(ByteBuffer serializedEventList) {
+		if (this.serializedEventList != null) {
+			throw new IllegalStateException("Event list has already been set.");
+		}
+
+		this.serializedEventList = serializedEventList;
+	}
+
+	public void serializeEventList(List<? extends AbstractEvent> eventList) {
+		if (this.serializedEventList != null) {
+			throw new IllegalStateException("Event list has already been set.");
+		}
+
+		this.serializedEventList = serializeEvents(eventList);
+	}
+
+	public ByteBuffer getEventsSerialized() {
+		return this.serializedEventList;
+	}
+
+	public List<? extends AbstractEvent> deserializeEvents() {
+		return deserializeEvents(getClass().getClassLoader());
+	}
+
+	public List<? extends AbstractEvent> deserializeEvents(ClassLoader classloader) {
+		if (this.serializedEventList == null) {
+			return Collections.emptyList();
+		}
+
+		try {
+			DataInputDeserializer deserializer = new DataInputDeserializer(this.serializedEventList);
+
+			int numEvents = deserializer.readInt();
+			ArrayList<AbstractEvent> events = new ArrayList<AbstractEvent>(numEvents);
+
+			for (int i = 0; i < numEvents; i++) {
+				String className = deserializer.readUTF();
+				Class<? extends AbstractEvent> clazz;
+				try {
+					clazz = Class.forName(className).asSubclass(AbstractEvent.class);
+				} catch (ClassNotFoundException e) {
+					throw new RuntimeException("Could not load event class '" + className + "'.", e);
+				} catch (ClassCastException e) {
+					throw new RuntimeException("The class '" + className + "' is no valid subclass of '" + AbstractEvent.class.getName() + "'.", e);
+				}
+
+				AbstractEvent evt = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
+				evt.read(deserializer);
+
+				events.add(evt);
+			}
+
+			return events;
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Error while deserializing the events.", e);
+		}
+	}
+
+	public void setBuffer(Buffer buffer) {
+		this.buffer = buffer;
+	}
+
+	public Buffer getBuffer() {
+		return this.buffer;
+	}
+
+	private ByteBuffer serializeEvents(List<? extends AbstractEvent> events) {
+		try {
+			// create the serialized event list
+			DataOutputSerializer serializer = events.size() == 0
+				? new DataOutputSerializer(4)
+				: new DataOutputSerializer(events.size() * 32);
+			serializer.writeInt(events.size());
+
+			for (AbstractEvent evt : events) {
+				serializer.writeUTF(evt.getClass().getName());
+				evt.write(serializer);
+			}
+
+			return serializer.wrapAsByteBuffer();
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Error while serializing the task events.", e);
+		}
+	}
+
+	public boolean hasBuffer() {
+		return this.buffer != null;
+	}
+
+	@Override
+	public String toString() {
+		return String.format("Envelope %d [source id: %s, buffer size: %d, events size: %d]",
+				this.sequenceNumber, this.getSource(), this.buffer == null ? -1 : this.buffer.size(),
+				this.serializedEventList == null ? -1 : this.serializedEventList.remaining());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeDispatcher.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeDispatcher.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeDispatcher.java
new file mode 100644
index 0000000..372ff92
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeDispatcher.java
@@ -0,0 +1,46 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import java.io.IOException;
+
+/**
+ * A envelope dispatcher receives {@link Envelope}s and sends them to all of its destinations.
+ */
+public interface EnvelopeDispatcher {
+
+	/**
+	 * Dispatches an envelope from an output channel to the receiving input channels (forward flow).
+	 *
+	 * @param envelope envelope to be sent
+	 */
+	void dispatchFromOutputChannel(Envelope envelope) throws IOException, InterruptedException;
+
+	/**
+	 * Dispatches an envelope from an input channel to the receiving output channels (backwards flow).
+	 *
+	 * @param envelope envelope to be sent
+	 */
+	void dispatchFromInputChannel(Envelope envelope) throws IOException, InterruptedException;
+
+	/**
+	 * Dispatches an envelope from an incoming TCP connection.
+	 * <p>
+	 * After an envelope has been constructed from a TCP socket, this method is called to send the envelope to the
+	 * receiving input channel.
+	 *
+	 * @param envelope envelope to be sent
+	 */
+	void dispatchFromNetwork(Envelope envelope) throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeReceiverList.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeReceiverList.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeReceiverList.java
new file mode 100644
index 0000000..d53d728
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeReceiverList.java
@@ -0,0 +1,75 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import java.net.InetAddress;
+
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.ConnectionInfoLookupResponse;
+import eu.stratosphere.runtime.io.network.RemoteReceiver;
+
+/**
+ * A transfer envelope receiver list contains all recipients of a transfer envelope. Their are three different types of
+ * receivers: Local receivers identified by {@link ChannelID} objects, remote receivers identified by
+ * {@link InetAddress} objects and finally checkpoints which are identified by
+ * <p>
+ * This class is thread-safe.
+ * 
+ */
+public class EnvelopeReceiverList {
+
+	private final ChannelID localReceiver;
+
+	private final RemoteReceiver remoteReceiver;
+
+	public EnvelopeReceiverList(ConnectionInfoLookupResponse cilr) {
+		this.localReceiver = cilr.getLocalTarget();
+		this.remoteReceiver = cilr.getRemoteTarget();
+	}
+
+	public EnvelopeReceiverList(ChannelID localReceiver) {
+		this.localReceiver = localReceiver;
+		this.remoteReceiver = null;
+	}
+
+	public EnvelopeReceiverList(RemoteReceiver remoteReceiver) {
+		this.localReceiver = null;
+		this.remoteReceiver = remoteReceiver;
+	}
+
+	public boolean hasLocalReceiver() {
+		return this.localReceiver != null;
+	}
+
+	public boolean hasRemoteReceiver() {
+		return this.remoteReceiver != null;
+	}
+
+	public int getTotalNumberOfReceivers() {
+		return (this.localReceiver == null ? 0 : 1) + (this.remoteReceiver == null ? 0 : 1);
+	}
+
+	public RemoteReceiver getRemoteReceiver() {
+		return this.remoteReceiver;
+	}
+
+	public ChannelID getLocalReceiver() {
+		return this.localReceiver;
+	}
+	
+	@Override
+	public String toString() {
+		return "local receiver: " + this.localReceiver + ", remote receiver: " + this.remoteReceiver;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java
deleted file mode 100644
index 44ec642..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
-import eu.stratosphere.runtime.io.network.tcp.IncomingConnectionThread;
-import eu.stratosphere.runtime.io.network.tcp.OutgoingConnection;
-import eu.stratosphere.runtime.io.network.tcp.OutgoingConnectionThread;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-/**
- * The network connection manager manages incoming and outgoing network connection from and to other hosts.
- * <p>
- * This class is thread-safe.
- * 
- */
-public final class NetworkConnectionManager {
-
-	/**
-	 * The default number of threads dealing with outgoing connections.
-	 */
-	private static final int DEFAULT_NUMBER_OF_OUTGOING_CONNECTION_THREADS = 1;
-
-	/**
-	 * The default number of connection retries before giving up.
-	 */
-	private static final int DEFAULT_NUMBER_OF_CONNECTION_RETRIES = 10;
-
-	/**
-	 * List of active threads dealing with outgoing connections.
-	 */
-	private final List<OutgoingConnectionThread> outgoingConnectionThreads = new CopyOnWriteArrayList<OutgoingConnectionThread>();
-
-	/**
-	 * Thread dealing with incoming connections.
-	 */
-	private final IncomingConnectionThread incomingConnectionThread;
-
-	/**
-	 * Map containing currently active outgoing connections.
-	 */
-	private final ConcurrentMap<RemoteReceiver, OutgoingConnection> outgoingConnections = new ConcurrentHashMap<RemoteReceiver, OutgoingConnection>();
-
-	/**
-	 * The number of connection retries before giving up.
-	 */
-	private final int numberOfConnectionRetries;
-
-	/**
-	 * A buffer provider for read buffers
-	 */
-	private final ChannelManager channelManager;
-
-	public NetworkConnectionManager(final ChannelManager channelManager,
-			final InetAddress bindAddress, final int dataPort) throws IOException {
-
-		final Configuration configuration = GlobalConfiguration.getConfiguration();
-
-		this.channelManager = channelManager;
-
-		// Start the connection threads
-		final int numberOfOutgoingConnectionThreads = configuration.getInteger(
-			"channel.network.numberOfOutgoingConnectionThreads", DEFAULT_NUMBER_OF_OUTGOING_CONNECTION_THREADS);
-
-		for (int i = 0; i < numberOfOutgoingConnectionThreads; i++) {
-			final OutgoingConnectionThread outgoingConnectionThread = new OutgoingConnectionThread();
-			outgoingConnectionThread.start();
-			this.outgoingConnectionThreads.add(outgoingConnectionThread);
-		}
-
-		this.incomingConnectionThread = new IncomingConnectionThread(
-			this.channelManager, true, new InetSocketAddress(bindAddress, dataPort));
-		this.incomingConnectionThread.start();
-
-		this.numberOfConnectionRetries = configuration.getInteger("channel.network.numberOfConnectionRetries",
-			DEFAULT_NUMBER_OF_CONNECTION_RETRIES);
-	}
-
-	/**
-	 * Randomly selects one of the active threads dealing with outgoing connections.
-	 * 
-	 * @return one of the active threads dealing with outgoing connections
-	 */
-	private OutgoingConnectionThread getOutgoingConnectionThread() {
-
-		return this.outgoingConnectionThreads.get((int) (this.outgoingConnectionThreads.size() * Math.random()));
-	}
-
-	/**
-	 * Queues an envelope for transfer to a particular target host.
-	 * 
-	 * @param remoteReceiver
-	 *        the address of the remote receiver
-	 * @param envelope
-	 *        the envelope to be transfered
-	 */
-	public void queueEnvelopeForTransfer(final RemoteReceiver remoteReceiver, final Envelope envelope) {
-
-		getOutgoingConnection(remoteReceiver).queueEnvelope(envelope);
-	}
-
-	/**
-	 * Returns (and possibly creates) the outgoing connection for the given target address.
-	 * 
-	 * @param targetAddress
-	 *        the address of the connection target
-	 * @return the outgoing connection object
-	 */
-	private OutgoingConnection getOutgoingConnection(final RemoteReceiver remoteReceiver) {
-
-		OutgoingConnection outgoingConnection = this.outgoingConnections.get(remoteReceiver);
-
-		if (outgoingConnection == null) {
-
-			outgoingConnection = new OutgoingConnection(remoteReceiver, getOutgoingConnectionThread(),
-				this.numberOfConnectionRetries);
-
-			final OutgoingConnection oldEntry = this.outgoingConnections
-				.putIfAbsent(remoteReceiver, outgoingConnection);
-
-			// We had a race, use the old value
-			if (oldEntry != null) {
-				outgoingConnection = oldEntry;
-			}
-		}
-
-		return outgoingConnection;
-	}
-
-	public void shutDown() {
-
-		// Interrupt the threads we started
-		this.incomingConnectionThread.interrupt();
-
-		final Iterator<OutgoingConnectionThread> it = this.outgoingConnectionThreads.iterator();
-		while (it.hasNext()) {
-			it.next().interrupt();
-		}
-	}
-
-	public void logBufferUtilization() {
-
-		System.out.println("\tOutgoing connections:");
-
-		final Iterator<Map.Entry<RemoteReceiver, OutgoingConnection>> it = this.outgoingConnections.entrySet()
-			.iterator();
-
-		while (it.hasNext()) {
-
-			final Map.Entry<RemoteReceiver, OutgoingConnection> entry = it.next();
-			System.out.println("\t\tOC " + entry.getKey() + ": " + entry.getValue().getNumberOfQueuedWriteBuffers());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java
index 32be058..65287f7 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java
@@ -21,7 +21,6 @@ import java.util.List;
 
 import eu.stratosphere.nephele.event.task.AbstractEvent;
 import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
 
 public final class SenderHintEvent extends AbstractEvent {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
index 1d23e93..af08aa8 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
@@ -1,5 +1,5 @@
 /***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  * the License. You may obtain a copy of the License at
@@ -13,16 +13,19 @@
 
 package eu.stratosphere.runtime.io.network.bufferprovider;
 
+import eu.stratosphere.runtime.io.Buffer;
+
 /**
- * This interface must be implemented to receive a notification from a {@link BufferProvider} when an empty
- * {@link eu.stratosphere.runtime.io.Buffer} has
- * become available again.
- * 
+ * This interface must be implemented to receive an asynchronous callback from
+ * a {@link BufferProvider} as soon as a buffer has become available again.
  */
 public interface BufferAvailabilityListener {
 
 	/**
-	 * Indicates that at least one {@link eu.stratosphere.runtime.io.Buffer} has become available again.
+	 * Returns a Buffer to the listener.
+	 * <p/>
+	 * Note: the listener has to adjust the size of the returned Buffer to the
+	 * requested size manually via {@link Buffer#limitSize(int)}.
 	 */
-	void bufferAvailable();
+	void bufferAvailable(Buffer buffer) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
index e3085ee..d82b427 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
@@ -65,5 +65,21 @@ public interface BufferProvider {
 	 * @return <code>true</code> if the registration has been successful; <code>false</code> if the registration
 	 *         failed, because the buffer pool was not empty or has already been destroyed
 	 */
-	boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener);
+	BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener);
+
+	public enum BufferAvailabilityRegistration {
+		NOT_REGISTERED_BUFFER_AVAILABLE(false),
+		NOT_REGISTERED_BUFFER_POOL_DESTROYED(false),
+		REGISTERED(true);
+
+		private final boolean isSuccessful;
+
+		private BufferAvailabilityRegistration(boolean isSuccessful) {
+			this.isSuccessful = isSuccessful;
+		}
+
+		public boolean isSuccessful() {
+			return isSuccessful;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/DiscardBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/DiscardBufferPool.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/DiscardBufferPool.java
new file mode 100644
index 0000000..5daa509
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/DiscardBufferPool.java
@@ -0,0 +1,51 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.bufferprovider;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.BufferRecycler;
+
+public final class DiscardBufferPool implements BufferProvider, BufferRecycler {
+	
+	@Override
+	public Buffer requestBuffer(int minBufferSize) {
+		return null;
+	}
+
+	@Override
+	public Buffer requestBufferBlocking(int minBufferSize) {
+		return null;
+	}
+
+	@Override
+	public int getBufferSize() {
+		return 0;
+	}
+
+	@Override
+	public void reportAsynchronousEvent() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+		return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_POOL_DESTROYED;
+	}
+
+	@Override
+	public void recycle(MemorySegment buffer) {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
index e8aeb11..3eb10c1 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
@@ -184,20 +184,20 @@ public final class LocalBufferPool implements BufferProvider {
 	}
 
 	@Override
-	public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
 		synchronized (this.buffers) {
 			if (!this.buffers.isEmpty()) {
-				return false;
+				return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_AVAILABLE;
 			}
 
 			if (this.isDestroyed) {
-				return false;
+				return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_POOL_DESTROYED;
 			}
 
 			this.listeners.add(listener);
 		}
 
-		return true;
+		return BufferAvailabilityRegistration.REGISTERED;
 	}
 
 	/**
@@ -221,7 +221,7 @@ public final class LocalBufferPool implements BufferProvider {
 				}
 
 				this.globalBufferPool.returnBuffer(this.buffers.poll());
-				this.numRequestedBuffers --;
+				this.numRequestedBuffers--;
 			}
 
 			this.buffers.notify();
@@ -294,11 +294,17 @@ public final class LocalBufferPool implements BufferProvider {
 				this.globalBufferPool.returnBuffer(buffer);
 				this.numRequestedBuffers--;
 			} else {
-				this.buffers.add(buffer);
-				this.buffers.notify();
-
-				while (!this.listeners.isEmpty()) {
-					this.listeners.poll().bufferAvailable();
+				if (!this.listeners.isEmpty()) {
+					Buffer availableBuffer = new Buffer(buffer, buffer.size(), this.recycler);
+					try {
+						this.listeners.poll().bufferAvailable(availableBuffer);
+					} catch (Exception e) {
+						this.buffers.add(buffer);
+						this.buffers.notify();
+					}
+				} else {
+					this.buffers.add(buffer);
+					this.buffers.notify();
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/SerialSingleBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/SerialSingleBufferPool.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/SerialSingleBufferPool.java
deleted file mode 100644
index 217a5ce..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/SerialSingleBufferPool.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.bufferprovider;
-
-import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.runtime.io.Buffer;
-import eu.stratosphere.runtime.io.BufferRecycler;
-
-/**
- * 
- */
-public final class SerialSingleBufferPool implements BufferProvider, BufferRecycler {
-	
-	private final Buffer buffer;
-
-	/** Size of the buffer in this pool */
-	private final int bufferSize;
-
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public SerialSingleBufferPool(int bufferSize) {
-		this.buffer = new Buffer(new MemorySegment(new byte[bufferSize]), bufferSize, this);
-		this.bufferSize = bufferSize;
-	}
-	
-	// -----------------------------------------------------------------------------------------------------------------
-
-	@Override
-	public Buffer requestBuffer(int minBufferSize) {
-		if (minBufferSize <= this.bufferSize) {
-			return this.buffer.duplicate();
-		}
-		else {
-			throw new IllegalArgumentException("Requesting buffer with size " + minBufferSize + ". Pool's buffer size is " + this.bufferSize);
-		}
-	}
-
-	@Override
-	public Buffer requestBufferBlocking(int minBufferSize) {
-		if (minBufferSize <= this.bufferSize) {
-			return this.buffer.duplicate();
-		}
-		else {
-			throw new IllegalArgumentException("Requesting buffer with size " + minBufferSize + ". Pool's buffer size is " + this.bufferSize);
-		}
-	}
-
-	@Override
-	public int getBufferSize() {
-		return this.bufferSize;
-	}
-
-	@Override
-	public void reportAsynchronousEvent() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public void recycle(MemorySegment buffer) {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/Envelope.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/Envelope.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/Envelope.java
deleted file mode 100644
index a692aec..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/Envelope.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.envelope;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.runtime.io.Buffer;
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.runtime.io.serialization.DataInputDeserializer;
-import eu.stratosphere.runtime.io.serialization.DataOutputSerializer;
-import eu.stratosphere.util.InstantiationUtil;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public final class Envelope {
-
-	private final JobID jobID;
-
-	private final ChannelID source;
-
-	private final int sequenceNumber;
-
-	private ByteBuffer serializedEventList;
-
-	private Buffer buffer;
-
-	public Envelope(int sequenceNumber, JobID jobID, ChannelID source) {
-		this.sequenceNumber = sequenceNumber;
-		this.jobID = jobID;
-		this.source = source;
-	}
-
-	private Envelope(Envelope toDuplicate) {
-		this.jobID = toDuplicate.jobID;
-		this.source = toDuplicate.source;
-		this.sequenceNumber = toDuplicate.sequenceNumber;
-		this.serializedEventList = null;
-		this.buffer = null;
-	}
-
-	public Envelope duplicate() {
-		Envelope duplicate = new Envelope(this);
-		if (hasBuffer()) {
-			duplicate.setBuffer(this.buffer.duplicate());
-		}
-
-		return duplicate;
-	}
-
-	public Envelope duplicateWithoutBuffer() {
-		return new Envelope(this);
-	}
-
-	public JobID getJobID() {
-		return this.jobID;
-	}
-
-	public ChannelID getSource() {
-		return this.source;
-	}
-
-	public int getSequenceNumber() {
-		return this.sequenceNumber;
-	}
-
-	public void setEventsSerialized(ByteBuffer serializedEventList) {
-		if (this.serializedEventList != null)
-			throw new IllegalStateException("Event list has already been set.");
-
-		this.serializedEventList = serializedEventList;
-	}
-
-	public void serializeEventList(List<? extends AbstractEvent> eventList) {
-		if (this.serializedEventList != null)
-			throw new IllegalStateException("Event list has already been set.");
-
-		this.serializedEventList = serializeEvents(eventList);
-	}
-
-	public ByteBuffer getEventsSerialized() {
-		return this.serializedEventList;
-	}
-
-	public List<? extends AbstractEvent> deserializeEvents() {
-		return deserializeEvents(getClass().getClassLoader());
-	}
-
-	public List<? extends AbstractEvent> deserializeEvents(ClassLoader classloader) {
-		if (this.serializedEventList == null) {
-			return Collections.emptyList();
-		}
-
-		try {
-			DataInputDeserializer deserializer = new DataInputDeserializer(this.serializedEventList);
-
-			int numEvents = deserializer.readInt();
-			ArrayList<AbstractEvent> events = new ArrayList<AbstractEvent>(numEvents);
-
-			for (int i = 0; i < numEvents; i++) {
-				String className = deserializer.readUTF();
-				Class<? extends AbstractEvent> clazz;
-				try {
-					clazz = Class.forName(className).asSubclass(AbstractEvent.class);
-				} catch (ClassNotFoundException e) {
-					throw new RuntimeException("Could not load event class '" + className + "'.", e);
-				} catch (ClassCastException e) {
-					throw new RuntimeException("The class '" + className + "' is no valid subclass of '" + AbstractEvent.class.getName() + "'.", e);
-				}
-
-				AbstractEvent evt = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
-				evt.read(deserializer);
-
-				events.add(evt);
-			}
-
-			return events;
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while deserializing the events.", e);
-		}
-	}
-
-	public void setBuffer(Buffer buffer) {
-		this.buffer = buffer;
-	}
-
-	public Buffer getBuffer() {
-		return this.buffer;
-	}
-
-	private ByteBuffer serializeEvents(List<? extends AbstractEvent> events) {
-		try {
-			// create the serialized event list
-			DataOutputSerializer serializer = events.size() == 0
-				? new DataOutputSerializer(4)
-				: new DataOutputSerializer(events.size() * 32);
-			serializer.writeInt(events.size());
-
-			for (AbstractEvent evt : events) {
-				serializer.writeUTF(evt.getClass().getName());
-				evt.write(serializer);
-			}
-
-			return serializer.wrapAsByteBuffer();
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while serializing the task events.", e);
-		}
-	}
-
-	public boolean hasBuffer() {
-		return this.buffer != null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeDispatcher.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeDispatcher.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeDispatcher.java
deleted file mode 100644
index 2b69c0d..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeDispatcher.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.envelope;
-
-import java.io.IOException;
-
-/**
- * A envelope dispatcher receives {@link Envelope}s and sends them to all of its destinations.
- */
-public interface EnvelopeDispatcher {
-
-	/**
-	 * Dispatches an envelope from an output channel to the receiving input channels (forward flow).
-	 *
-	 * @param envelope envelope to be sent
-	 */
-	void dispatchFromOutputChannel(Envelope envelope) throws IOException, InterruptedException;
-
-	/**
-	 * Dispatches an envelope from an input channel to the receiving output channels (backwards flow).
-	 *
-	 * @param envelope envelope to be sent
-	 */
-	void dispatchFromInputChannel(Envelope envelope) throws IOException, InterruptedException;
-
-	/**
-	 * Dispatches an envelope from an incoming TCP connection.
-	 * <p>
-	 * After an envelope has been constructed from a TCP socket, this method is called to send the envelope to the
-	 * receiving input channel.
-	 *
-	 * @param envelope envelope to be sent
-	 */
-	void dispatchFromNetwork(Envelope envelope) throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReader.java
deleted file mode 100644
index 7b7e178..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReader.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.envelope;
-
-import eu.stratosphere.runtime.io.Buffer;
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
-import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.ReadableByteChannel;
-
-public class EnvelopeReader {
-
-	public enum DeserializationState {
-		COMPLETE,
-		PENDING,
-		NO_BUFFER_AVAILABLE;
-	}
-
-	private final BufferProviderBroker bufferProviderBroker;
-
-	private final ByteBuffer headerBuffer;
-
-	private ByteBuffer currentHeaderBuffer;
-
-	private ByteBuffer currentEventsList;
-
-	private ByteBuffer currentDataBuffer;
-
-	private int bufferRequestPendingWithSize;
-
-
-	private Envelope pendingEnvelope;
-
-	private Envelope constructedEnvelope;
-
-
-	public BufferProvider bufferProvider;
-
-	private JobID lastDeserializedJobID;
-
-	private ChannelID lastDeserializedSourceID;
-
-
-	public EnvelopeReader(BufferProviderBroker bufferProviderBroker) {
-		this.bufferProviderBroker = bufferProviderBroker;
-
-		this.headerBuffer = ByteBuffer.allocateDirect(EnvelopeWriter.HEADER_SIZE);
-		this.headerBuffer.order(ByteOrder.LITTLE_ENDIAN);
-
-		this.currentHeaderBuffer = this.headerBuffer;
-	}
-
-	public DeserializationState readNextChunk(ReadableByteChannel channel) throws IOException {
-
-		// 1) check if the header is pending
-		if (this.currentHeaderBuffer != null) {
-			ByteBuffer header = this.currentHeaderBuffer;
-
-			channel.read(header);
-			if (header.hasRemaining()) {
-				// not finished with the header
-				return DeserializationState.PENDING;
-			} else {
-				// header done, construct the envelope
-				this.currentHeaderBuffer = null;
-
-				Envelope env = constructEnvelopeFromHeader(header);
-				this.pendingEnvelope = env;
-
-				// check for events and data
-				int eventsSize = getEventListSize(header);
-				int bufferSize = getBufferSize(header);
-
-				// make the events list the next buffer to be read
-				if (eventsSize > 0) {
-					this.currentEventsList = ByteBuffer.allocate(eventsSize);
-				}
-
-				// if we have a data buffer, we need memory segment for it
-				// we may not immediately get the memory segment, though, so we first record
-				// that we need it
-				if (bufferSize > 0) {
-					this.bufferRequestPendingWithSize = bufferSize;
-				}
-			}
-		}
-
-		// 2) read the eventList, if it should have one
-		if (this.currentEventsList != null) {
-			channel.read(this.currentEventsList);
-			if (this.currentEventsList.hasRemaining()) {
-				// events list still incomplete
-				return DeserializationState.PENDING;
-			} else {
-				this.currentEventsList.flip();
-				this.pendingEnvelope.setEventsSerialized(this.currentEventsList);
-				this.currentEventsList = null;
-			}
-		}
-
-		// 3) check if we need to get a buffer
-		if (this.bufferRequestPendingWithSize > 0) {
-			Buffer b = getBufferForTarget(this.pendingEnvelope.getJobID(), this.pendingEnvelope.getSource(), this.bufferRequestPendingWithSize);
-			if (b == null) {
-				// no buffer available at this time. come back later
-				return DeserializationState.NO_BUFFER_AVAILABLE;
-			} else {
-				// buffer is available. set the field so the buffer will be filled
-				this.pendingEnvelope.setBuffer(b);
-				this.currentDataBuffer = b.getMemorySegment().wrap(0, this.bufferRequestPendingWithSize);
-				this.bufferRequestPendingWithSize = 0;
-			}
-		}
-
-		// 4) fill the buffer
-		if (this.currentDataBuffer != null) {
-			channel.read(this.currentDataBuffer);
-			if (this.currentDataBuffer.hasRemaining()) {
-				// data buffer incomplete
-				return DeserializationState.PENDING;
-			} else {
-				this.currentDataBuffer = null;
-			}
-		}
-
-		// if we get here, we completed our job, or did nothing, if the deserializer was not
-		// reset after the previous envelope
-		if (this.pendingEnvelope != null) {
-			this.constructedEnvelope = this.pendingEnvelope;
-			this.pendingEnvelope = null;
-			return DeserializationState.COMPLETE;
-		} else {
-			throw new IllegalStateException("Error: read() was called before reserializer was reset after the last envelope.");
-		}
-	}
-
-	private Envelope constructEnvelopeFromHeader(ByteBuffer header) throws IOException {
-		int magicNumber = header.getInt(EnvelopeWriter.MAGIC_NUMBER_OFFSET);
-
-		if (magicNumber != EnvelopeWriter.MAGIC_NUMBER) {
-			throw new IOException("Network stream corrupted: invalid magic number in envelope header.");
-		}
-
-		int seqNum = header.getInt(EnvelopeWriter.SEQUENCE_NUMBER_OFFSET);
-		JobID jid = JobID.fromByteBuffer(header, EnvelopeWriter.JOB_ID_OFFSET);
-		ChannelID cid = ChannelID.fromByteBuffer(header, EnvelopeWriter.CHANNEL_ID_OFFSET);
-		return new Envelope(seqNum, jid, cid);
-	}
-
-	private int getBufferSize(ByteBuffer header) {
-		return header.getInt(EnvelopeWriter.BUFFER_SIZE_OFFSET);
-	}
-
-	private int getEventListSize(ByteBuffer header) {
-		return header.getInt(EnvelopeWriter.EVENTS_SIZE_OFFSET);
-	}
-
-	private Buffer getBufferForTarget(JobID jid, ChannelID cid, int size) throws IOException {
-		if (!(jid.equals(this.lastDeserializedJobID) && cid.equals(this.lastDeserializedSourceID))) {
-			this.bufferProvider = this.bufferProviderBroker.getBufferProvider(jid, cid);
-			this.lastDeserializedJobID = jid;
-			this.lastDeserializedSourceID = cid;
-		}
-
-		return this.bufferProvider.requestBuffer(size);
-	}
-
-
-	public Envelope getFullyDeserializedTransferEnvelope() {
-		Envelope t = this.constructedEnvelope;
-		if (t == null) {
-			throw new IllegalStateException("Envelope has not yet been fully constructed.");
-		}
-
-		this.constructedEnvelope = null;
-		return t;
-	}
-
-	public void reset() {
-		this.headerBuffer.clear();
-		this.currentHeaderBuffer = this.headerBuffer;
-		this.constructedEnvelope = null;
-	}
-
-	public boolean hasUnfinishedData() {
-		return this.pendingEnvelope != null || this.currentHeaderBuffer != null;
-	}
-
-	public BufferProvider getBufferProvider() {
-		return bufferProvider;
-	}
-
-	public Envelope getPendingEnvelope() {
-		return pendingEnvelope;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReceiverList.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReceiverList.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReceiverList.java
deleted file mode 100644
index f99e1f2..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReceiverList.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.envelope;
-
-import java.net.InetAddress;
-
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.runtime.io.network.ConnectionInfoLookupResponse;
-import eu.stratosphere.runtime.io.network.RemoteReceiver;
-
-/**
- * A transfer envelope receiver list contains all recipients of a transfer envelope. Their are three d ifferent types of
- * receivers: Local receivers identified by {@link ChannelID} objects, remote receivers identified by
- * {@link InetAddress} objects and finally checkpoints which are identified by
- * <p>
- * This class is thread-safe.
- * 
- */
-public class EnvelopeReceiverList {
-
-	private final ChannelID localReceiver;
-
-	private final RemoteReceiver remoteReceiver;
-
-	public EnvelopeReceiverList(ConnectionInfoLookupResponse cilr) {
-		this.localReceiver = cilr.getLocalTarget();
-		this.remoteReceiver = cilr.getRemoteTarget();
-	}
-
-	public EnvelopeReceiverList(ChannelID localReceiver) {
-		this.localReceiver = localReceiver;
-		this.remoteReceiver = null;
-	}
-
-	public EnvelopeReceiverList(RemoteReceiver remoteReceiver) {
-		this.localReceiver = null;
-		this.remoteReceiver = remoteReceiver;
-	}
-
-	public boolean hasLocalReceiver() {
-		return this.localReceiver != null;
-	}
-
-	public boolean hasRemoteReceiver() {
-		return this.remoteReceiver != null;
-	}
-
-	public int getTotalNumberOfReceivers() {
-		return (this.localReceiver == null ? 0 : 1) + (this.remoteReceiver == null ? 0 : 1);
-	}
-
-	public RemoteReceiver getRemoteReceiver() {
-		return this.remoteReceiver;
-	}
-
-	public ChannelID getLocalReceiver() {
-		return this.localReceiver;
-	}
-	
-	@Override
-	public String toString() {
-		return "local receiver: " + this.localReceiver + ", remote receiver: " + this.remoteReceiver;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeWriter.java
deleted file mode 100644
index c00e61b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeWriter.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.envelope;
-
-import eu.stratosphere.nephele.AbstractID;
-import eu.stratosphere.runtime.io.Buffer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.WritableByteChannel;
-
-public class EnvelopeWriter {
-
-	protected static final int MAGIC_NUMBER = 0xBADC0FFE;
-
-	/**
-	 * Size of the envelope header: 48 bytes = 4 bytes magic number, 4 bytes sequence number, 16 bytes job id,
-	 * 16 bytes sender id, 4 bytes bufferSize, 4 bytes event list length
-	 */
-	public static final int HEADER_SIZE = 4 + 4 + 2 * AbstractID.SIZE + 4 + 4;
-
-	public static final int MAGIC_NUMBER_OFFSET = 0;
-
-	public static final int SEQUENCE_NUMBER_OFFSET = 4;
-
-	public static final int JOB_ID_OFFSET = 8;
-
-	public static final int CHANNEL_ID_OFFSET = 24;
-
-	public static final int BUFFER_SIZE_OFFSET = 40;
-
-	public static final int EVENTS_SIZE_OFFSET = 44;
-
-	private ByteBuffer currentHeader;
-
-	private ByteBuffer currentEvents;
-
-	private ByteBuffer currentDataBuffer;
-
-	private final ByteBuffer headerBuffer;
-
-	public EnvelopeWriter() {
-		this.headerBuffer = ByteBuffer.allocateDirect(HEADER_SIZE);
-		this.headerBuffer.order(ByteOrder.LITTLE_ENDIAN);
-	}
-
-	/**
-	 * @param channel
-	 * @return True, if the writer has more pending data for the current envelope, false if not.
-	 *
-	 * @throws java.io.IOException
-	 */
-	public boolean writeNextChunk(WritableByteChannel channel) throws IOException {
-		// 1) check if the the header is still pending
-		if (this.currentHeader != null) {
-			channel.write(this.currentHeader);
-
-			if (this.currentHeader.hasRemaining()) {
-				// header was not fully written, so we can leave this method
-				return true;
-			} else {
-				this.currentHeader = null;
-			}
-		}
-
-		// 2) check if there are events pending
-		if (this.currentEvents != null) {
-			channel.write(this.currentEvents);
-			if (this.currentEvents.hasRemaining()) {
-				// events were not fully written, so leave this method
-				return true;
-			} else {
-				this.currentEvents = null;
-			}
-		}
-
-		// 3) write the data buffer
-		if (this.currentDataBuffer != null) {
-			channel.write(this.currentDataBuffer);
-			if (this.currentDataBuffer.hasRemaining()) {
-				return true;
-			} else {
-				this.currentDataBuffer = null;
-			}
-		}
-
-		return false;
-	}
-
-	public void setEnvelopeForWriting(Envelope env) {
-		// header
-		constructHeader(env);
-		this.currentHeader = this.headerBuffer;
-
-		// events (possibly null)
-		this.currentEvents = env.getEventsSerialized();
-
-		// data buffer (possibly null)
-		Buffer buf = env.getBuffer();
-		if (buf != null && buf.size() > 0) {
-			this.currentDataBuffer = buf.getMemorySegment().wrap(0, buf.size());
-		}
-	}
-
-	private void constructHeader(Envelope env) {
-		final ByteBuffer buf = this.headerBuffer;
-
-		buf.clear();							// reset
-		buf.putInt(MAGIC_NUMBER);
-		buf.putInt(env.getSequenceNumber());	// sequence number (4 bytes)
-		env.getJobID().write(buf);				// job Id (16 bytes)
-		env.getSource().write(buf);				// producerId (16 bytes)
-
-		// buffer size
-		buf.putInt(env.getBuffer() == null ? 0 : env.getBuffer().size());
-
-		// size of event list
-		buf.putInt(env.getEventsSerialized() == null ? 0 : env.getEventsSerialized().remaining());
-
-		buf.flip();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/NoBufferAvailableException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/NoBufferAvailableException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/NoBufferAvailableException.java
deleted file mode 100644
index f7d49bf..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/NoBufferAvailableException.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.envelope;
-
-import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
-
-/**
- * This exception is thrown to indicate that the deserialization process of a {@link Envelope} could not be
- * continued because a {@link Buffer} to store the envelope's content is currently not available.
- * 
- */
-public final class NoBufferAvailableException extends Exception {
-
-	/**
-	 * Generated serial UID.
-	 */
-	private static final long serialVersionUID = -9164212953646457026L;
-
-	/**
-	 * The buffer provider which could not deliver a buffer.
-	 */
-	private final BufferProvider bufferProvider;
-
-	/**
-	 * Constructs a new exception.
-	 * 
-	 * @param bufferProvider
-	 *        the buffer provider which could not deliver a buffer
-	 */
-	public NoBufferAvailableException(final BufferProvider bufferProvider) {
-		this.bufferProvider = bufferProvider;
-	}
-
-	/**
-	 * Returns the buffer provider which could not deliver a buffer.
-	 * 
-	 * @return the buffer provider which could not deliver a buffer
-	 */
-	public BufferProvider getBufferProvider() {
-		return this.bufferProvider;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
new file mode 100644
index 0000000..1ab1871
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
@@ -0,0 +1,344 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
+import eu.stratosphere.runtime.io.network.Envelope;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class InboundEnvelopeDecoder extends ChannelInboundHandlerAdapter implements BufferAvailabilityListener {
+
+	private final BufferProviderBroker bufferProviderBroker;
+
+	private final BufferAvailabilityChangedTask bufferAvailabilityChangedTask = new BufferAvailabilityChangedTask();
+
+	private final ConcurrentLinkedQueue<Buffer> bufferBroker = new ConcurrentLinkedQueue<Buffer>();
+
+	private final ByteBuffer headerBuffer;
+
+	private Envelope currentEnvelope;
+
+	private ByteBuffer currentEventsBuffer;
+
+	private ByteBuffer currentDataBuffer;
+
+	private int currentBufferRequestSize;
+
+	private BufferProvider currentBufferProvider;
+
+	private JobID lastJobId;
+
+	private ChannelID lastSourceId;
+
+	private ByteBuf stagedBuffer;
+
+	private ChannelHandlerContext channelHandlerContext;
+
+	private int bytesToSkip;
+
+	private enum DecoderState {
+		COMPLETE,
+		PENDING,
+		NO_BUFFER_AVAILABLE
+	}
+
+	public InboundEnvelopeDecoder(BufferProviderBroker bufferProviderBroker) {
+		this.bufferProviderBroker = bufferProviderBroker;
+		this.headerBuffer = ByteBuffer.allocateDirect(OutboundEnvelopeEncoder.HEADER_SIZE);
+	}
+
+	@Override
+	public void channelActive(ChannelHandlerContext ctx) throws Exception {
+		if (this.channelHandlerContext == null) {
+			this.channelHandlerContext = ctx;
+		}
+
+		super.channelActive(ctx);
+	}
+
+	@Override
+	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+		if (this.stagedBuffer != null) {
+			throw new IllegalStateException("No channel read event should be fired " +
+					"as long as the a buffer is staged.");
+		}
+
+		ByteBuf in = (ByteBuf) msg;
+
+		if (this.bytesToSkip > 0) {
+			this.bytesToSkip = skipBytes(in, this.bytesToSkip);
+
+			// we skipped over the whole buffer
+			if (this.bytesToSkip > 0) {
+				in.release();
+				return;
+			}
+		}
+
+		decodeBuffer(in, ctx);
+	}
+
+	/**
+	 * Decodes all Envelopes contained in a Netty ByteBuf and forwards them in the pipeline.
+	 * Returns true and releases the buffer, if it was fully consumed. Otherwise, returns false and retains the buffer.
+	 * </p>
+	 * In case of no buffer availability (returns false), a buffer availability listener is registered and the input
+	 * buffer is staged for later consumption.
+	 *
+	 * @return <code>true</code>, if buffer fully consumed, <code>false</code> otherwise
+	 * @throws IOException
+	 */
+	private boolean decodeBuffer(ByteBuf in, ChannelHandlerContext ctx) throws IOException {
+
+		DecoderState decoderState;
+		while ((decoderState = decodeEnvelope(in)) != DecoderState.PENDING) {
+			if (decoderState == DecoderState.COMPLETE) {
+				ctx.fireChannelRead(this.currentEnvelope);
+				this.currentEnvelope = null;
+			}
+			else if (decoderState == DecoderState.NO_BUFFER_AVAILABLE) {
+				switch (this.currentBufferProvider.registerBufferAvailabilityListener(this)) {
+					case REGISTERED:
+						if (ctx.channel().config().isAutoRead()) {
+							ctx.channel().config().setAutoRead(false);
+						}
+
+						this.stagedBuffer = in;
+						this.stagedBuffer.retain();
+						return false;
+
+					case NOT_REGISTERED_BUFFER_AVAILABLE:
+						continue;
+
+					case NOT_REGISTERED_BUFFER_POOL_DESTROYED:
+						this.bytesToSkip = skipBytes(in, this.currentBufferRequestSize);
+
+						this.currentBufferRequestSize = 0;
+						this.currentEventsBuffer = null;
+						this.currentEnvelope = null;
+				}
+			}
+		}
+
+		if (in.isReadable()) {
+			throw new IllegalStateException("Every buffer should have been fully" +
+					"consumed after *successfully* decoding it (if it was not successful, " +
+					"the buffer will be staged for later consumption).");
+		}
+
+		in.release();
+		return true;
+	}
+
+	/**
+	 * Notifies the IO thread that a Buffer has become available again.
+	 * <p/>
+	 * This method will be called from outside the Netty IO thread. The caller will be the buffer pool from which the
+	 * available buffer comes (i.e. the InputGate).
+	 * <p/>
+	 * We have to make sure that the available buffer is handed over to the IO thread in a safe manner.
+	 */
+	@Override
+	public void bufferAvailable(Buffer buffer) throws Exception {
+		this.bufferBroker.offer(buffer);
+		this.channelHandlerContext.channel().eventLoop().execute(this.bufferAvailabilityChangedTask);
+	}
+
+	/**
+	 * Continues the decoding of a staged buffer after a buffer has become available again.
+	 * <p/>
+	 * This task should be executed by the IO thread to ensure safe access to the staged buffer.
+	 */
+	private class BufferAvailabilityChangedTask implements Runnable {
+		@Override
+		public void run() {
+			Buffer availableBuffer = bufferBroker.poll();
+			if (availableBuffer == null) {
+				throw new IllegalStateException("The BufferAvailabilityChangedTask" +
+						"should only be executed when a Buffer has been offered" +
+						"to the Buffer broker (after becoming available).");
+			}
+
+			// This alters the state of the last `decodeEnvelope(ByteBuf)`
+			// call to set the buffer, which has become available again
+			availableBuffer.limitSize(currentBufferRequestSize);
+			currentEnvelope.setBuffer(availableBuffer);
+			currentDataBuffer = availableBuffer.getMemorySegment().wrap(0, InboundEnvelopeDecoder.this.currentBufferRequestSize);
+			currentBufferRequestSize = 0;
+
+			stagedBuffer.release();
+
+			try {
+				if (decodeBuffer(stagedBuffer, channelHandlerContext)) {
+					stagedBuffer = null;
+					channelHandlerContext.channel().config().setAutoRead(true);
+				}
+			} catch (IOException e) {
+				availableBuffer.recycleBuffer();
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------
+
+	private DecoderState decodeEnvelope(ByteBuf in) throws IOException {
+		// --------------------------------------------------------------------
+		// (1) header (EnvelopeEncoder.HEADER_SIZE bytes)
+		// --------------------------------------------------------------------
+		if (this.currentEnvelope == null) {
+			copy(in, this.headerBuffer);
+
+			if (this.headerBuffer.hasRemaining()) {
+				return DecoderState.PENDING;
+			}
+			else {
+				this.headerBuffer.flip();
+
+				int magicNum = this.headerBuffer.getInt();
+				if (magicNum != OutboundEnvelopeEncoder.MAGIC_NUMBER) {
+					throw new IOException("Network stream corrupted: invalid magic" +
+							"number in current envelope header.");
+				}
+
+				int seqNum = this.headerBuffer.getInt();
+				JobID jobId = JobID.fromByteBuffer(this.headerBuffer);
+				ChannelID sourceId = ChannelID.fromByteBuffer(this.headerBuffer);
+
+				this.currentEnvelope = new Envelope(seqNum, jobId, sourceId);
+
+				int eventsSize = this.headerBuffer.getInt();
+				int bufferSize = this.headerBuffer.getInt();
+
+				this.currentEventsBuffer = eventsSize > 0 ? ByteBuffer.allocate(eventsSize) : null;
+				this.currentBufferRequestSize = bufferSize > 0 ? bufferSize : 0;
+
+				this.headerBuffer.clear();
+			}
+		}
+
+		// --------------------------------------------------------------------
+		// (2) events (var length)
+		// --------------------------------------------------------------------
+		if (this.currentEventsBuffer != null) {
+			copy(in, this.currentEventsBuffer);
+
+			if (this.currentEventsBuffer.hasRemaining()) {
+				return DecoderState.PENDING;
+			}
+			else {
+				this.currentEventsBuffer.flip();
+				this.currentEnvelope.setEventsSerialized(this.currentEventsBuffer);
+				this.currentEventsBuffer = null;
+			}
+		}
+
+		// --------------------------------------------------------------------
+		// (3) buffer (var length)
+		// --------------------------------------------------------------------
+		// (a) request a buffer from OUR pool
+		if (this.currentBufferRequestSize > 0) {
+			JobID jobId = this.currentEnvelope.getJobID();
+			ChannelID sourceId = this.currentEnvelope.getSource();
+			Buffer buffer = requestBufferForTarget(jobId, sourceId, this.currentBufferRequestSize);
+
+			if (buffer == null) {
+				return DecoderState.NO_BUFFER_AVAILABLE;
+			}
+			else {
+				this.currentEnvelope.setBuffer(buffer);
+				this.currentDataBuffer = buffer.getMemorySegment().wrap(0, this.currentBufferRequestSize);
+				this.currentBufferRequestSize = 0;
+			}
+		}
+
+		// (b) copy data to OUR buffer
+		if (this.currentDataBuffer != null) {
+			copy(in, this.currentDataBuffer);
+
+			if (this.currentDataBuffer.hasRemaining()) {
+				return DecoderState.PENDING;
+			}
+			else {
+				this.currentDataBuffer = null;
+			}
+		}
+
+		// if we made it to this point, we completed the envelope;
+		// in the other cases we return early with PENDING or NO_BUFFER_AVAILABLE
+		return DecoderState.COMPLETE;
+	}
+
+	private Buffer requestBufferForTarget(JobID jobId, ChannelID sourceId, int size) throws IOException {
+		// Request the buffer from the target buffer provider, which is the
+		// InputGate of the receiving InputChannel.
+		if (!(jobId.equals(this.lastJobId) && sourceId.equals(this.lastSourceId))) {
+			this.lastJobId = jobId;
+			this.lastSourceId = sourceId;
+
+			this.currentBufferProvider = this.bufferProviderBroker.getBufferProvider(jobId, sourceId);
+		}
+
+		return this.currentBufferProvider.requestBuffer(size);
+	}
+
+	/**
+	 * Copies min(from.readableBytes(), to.remaining() bytes from Nettys ByteBuf to the Java NIO ByteBuffer.
+	 */
+	private void copy(ByteBuf src, ByteBuffer dst) {
+		// This branch is necessary, because an Exception is thrown if the
+		// destination buffer has more remaining (writable) bytes than
+		// currently readable from the Netty ByteBuf source.
+		if (src.isReadable()) {
+			if (src.readableBytes() < dst.remaining()) {
+				int oldLimit = dst.limit();
+
+				dst.limit(dst.position() + src.readableBytes());
+				src.readBytes(dst);
+				dst.limit(oldLimit);
+			}
+			else {
+				src.readBytes(dst);
+			}
+		}
+	}
+
+	/**
+	 * Skips over min(in.readableBytes(), toSkip) bytes in the Netty ByteBuf and returns how many bytes remain to be
+	 * skipped.
+	 *
+	 * @return remaining bytes to be skipped
+	 */
+	private int skipBytes(ByteBuf in, int toSkip) {
+		if (toSkip <= in.readableBytes()) {
+			in.readBytes(toSkip);
+			return 0;
+		}
+
+		int remainingToSkip = toSkip - in.readableBytes();
+		in.readerIndex(in.readerIndex() + in.readableBytes());
+
+		return remainingToSkip;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcherHandler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcherHandler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcherHandler.java
new file mode 100644
index 0000000..d0270b6
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcherHandler.java
@@ -0,0 +1,41 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.runtime.io.network.Envelope;
+import eu.stratosphere.runtime.io.network.EnvelopeDispatcher;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class InboundEnvelopeDispatcherHandler extends ChannelInboundHandlerAdapter {
+
+	private static final Log LOG = LogFactory.getLog(InboundEnvelopeDispatcherHandler.class);
+
+	private final EnvelopeDispatcher channelManager;
+
+	public InboundEnvelopeDispatcherHandler(EnvelopeDispatcher channelManager) {
+		this.channelManager = channelManager;
+	}
+
+	@Override
+	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+		Envelope envelope = (Envelope) msg;
+//		LOG.debug(String.format("Decoded envelope with seq num %d from source channel %s",
+//				envelope.getSequenceNumber(),
+//				envelope.getSource()));
+		this.channelManager.dispatchFromNetwork(envelope);
+	}
+}