You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:49 UTC

[14/30] Offer buffer-oriented API for I/O (#25)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/Gate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/Gate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/Gate.java
new file mode 100644
index 0000000..c9db615
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/Gate.java
@@ -0,0 +1,174 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.gates;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
+import eu.stratosphere.nephele.event.task.EventListener;
+import eu.stratosphere.nephele.event.task.EventNotificationManager;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.JobID;
+
+import java.io.IOException;
+
+/**
+ * In Nephele a gate represents the connection between a user program and the processing framework. A gate
+ * must be connected to exactly one record reader/writer and to at least one channel. The <code>Gate</code> class itself
+ * is abstract. A gate automatically created for every record reader/writer in the user program. A gate can only be used
+ * to transport one specific type of records.
+ * <p>
+ * This class in general is not thread-safe.
+ * 
+ * @param <T>
+ *        the record type to be transported from this gate
+ *
+ *  TODO refactor with changes to input side
+ */
+public abstract class Gate<T extends IOReadableWritable> {
+
+	/**
+	 * The ID of the job this gate belongs to.
+	 */
+	private final JobID jobID;
+
+	/**
+	 * The ID of this gate.
+	 */
+	private final GateID gateID;
+
+	/**
+	 * The index of the gate in the list of available input/output gates.
+	 */
+	private final int index;
+
+	/**
+	 * The event notification manager used to dispatch events.
+	 */
+	private final EventNotificationManager eventNotificationManager = new EventNotificationManager();
+
+	/**
+	 * The type of input/output channels connected to this gate.
+	 */
+	private ChannelType channelType = ChannelType.NETWORK;
+
+	/**
+	 * Constructs a new abstract gate
+	 * 
+	 * @param jobID
+	 *        the ID of the job this gate belongs to
+	 * @param gateID
+	 *        the ID of this gate
+	 * @param index
+	 *        the index of the gate in the list of available input/output gates.
+	 */
+	protected Gate(final JobID jobID, final GateID gateID, final int index) {
+		this.jobID = jobID;
+		this.gateID = gateID;
+		this.index = index;
+	}
+
+	public final int getIndex() {
+		return this.index;
+	}
+
+	/**
+	 * Returns the event notification manager used to dispatch events.
+	 * 
+	 * @return the event notification manager used to dispatch events
+	 */
+	protected final EventNotificationManager getEventNotificationManager() {
+		return this.eventNotificationManager;
+	}
+
+	public String toString() {
+
+		return "Gate " + this.index;
+	}
+
+	public final void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
+
+		this.eventNotificationManager.subscribeToEvent(eventListener, eventType);
+	}
+
+	public final void unsubscribeFromEvent(final EventListener eventListener,
+			final Class<? extends AbstractTaskEvent> eventType) {
+
+		this.eventNotificationManager.unsubscribeFromEvent(eventListener, eventType);
+	}
+
+	public final void deliverEvent(final AbstractTaskEvent event) {
+
+		this.eventNotificationManager.deliverEvent((AbstractTaskEvent) event);
+	}
+
+	public final void setChannelType(final ChannelType channelType) {
+
+		this.channelType = channelType;
+	}
+
+	public final ChannelType getChannelType() {
+
+		return this.channelType;
+	}
+
+	public JobID getJobID() {
+
+		return this.jobID;
+	}
+
+	public GateID getGateID() {
+
+		return this.gateID;
+	}
+
+	// FROM GATE INTERFACE
+
+	/**
+	 * Publishes an event.
+	 *
+	 * @param event
+	 *        the event to be published
+	 * @throws IOException
+	 *         thrown if an error occurs while transmitting the event
+	 * @throws InterruptedException
+	 *         thrown if the thread is interrupted while waiting for the event to be published
+	 */
+	abstract public void publishEvent(AbstractEvent event) throws IOException, InterruptedException;
+
+	/**
+	 * Releases the allocated resources (particularly buffer) of all channels attached to this gate. This method
+	 * should only be called after the respected task has stopped running.
+	 */
+	abstract public void releaseAllChannelResources();
+
+	/**
+	 * Checks if the gate is closed. The gate is closed if all this associated channels are closed.
+	 *
+	 * @return <code>true</code> if the gate is closed, <code>false</code> otherwise
+	 * @throws IOException
+	 *         thrown if any error occurred while closing the gate
+	 * @throws InterruptedException
+	 *         thrown if the gate is interrupted while waiting for this operation to complete
+	 */
+	abstract public boolean isClosed() throws IOException, InterruptedException;
+
+	/**
+	 * Checks if the considered gate is an input gate.
+	 *
+	 * @return <code>true</code> if the considered gate is an input gate, <code>false</code> if it is an output gate
+	 */
+	abstract public boolean isInputGate();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/GateID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/GateID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/GateID.java
new file mode 100644
index 0000000..8375c88
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/GateID.java
@@ -0,0 +1,24 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.gates;
+
+import eu.stratosphere.nephele.AbstractID;
+
+/**
+ * A class for statistically unique gate IDs.
+ * 
+ */
+public final class GateID extends AbstractID {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputChannelResult.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputChannelResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputChannelResult.java
new file mode 100644
index 0000000..e2083e5
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputChannelResult.java
@@ -0,0 +1,23 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+package eu.stratosphere.runtime.io.gates;
+
+public enum InputChannelResult {
+
+	NONE,
+	INTERMEDIATE_RECORD_FROM_BUFFER,
+	LAST_RECORD_FROM_BUFFER,
+	END_OF_SUPERSTEP,
+	TASK_EVENT,
+	END_OF_STREAM;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
new file mode 100644
index 0000000..bdac7a2
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
@@ -0,0 +1,384 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.gates;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import eu.stratosphere.nephele.deployment.ChannelDeploymentDescriptor;
+import eu.stratosphere.nephele.deployment.GateDeploymentDescriptor;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
+import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPool;
+import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
+import eu.stratosphere.nephele.execution.Environment;
+import eu.stratosphere.runtime.io.channels.InputChannel;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.nephele.jobgraph.JobID;
+
+/**
+ * In Nephele input gates are a specialization of general gates and connect input channels and record readers. As
+ * channels, input gates are always parameterized to a specific type of record which they can transport. In contrast to
+ * output gates input gates can be associated with a {@link eu.stratosphere.runtime.io.serialization.io.DistributionPattern} object which dictates the concrete
+ * wiring between two groups of vertices.
+ * 
+ * @param <T> The type of record that can be transported through this gate.
+ */
+public class InputGate<T extends IOReadableWritable> extends Gate<T> implements BufferProvider, LocalBufferPoolOwner {
+	
+	/**
+	 * The log object used for debugging.
+	 */
+	private static final Log LOG = LogFactory.getLog(InputGate.class);
+
+	/**
+	 * The array of input channels attached to this input gate.
+	 */
+	private InputChannel<T>[] channels;
+
+	/**
+	 * Queue with indices of channels that store at least one available record.
+	 */
+	private final BlockingQueue<Integer> availableChannels = new LinkedBlockingQueue<Integer>();
+
+	/**
+	 * The listener object to be notified when a channel has at least one record available.
+	 */
+	private final AtomicReference<RecordAvailabilityListener<T>> recordAvailabilityListener = new AtomicReference<RecordAvailabilityListener<T>>(null);
+	
+	
+	private AbstractTaskEvent currentEvent;
+
+	/**
+	 * If the value of this variable is set to <code>true</code>, the input gate is closed.
+	 */
+	private boolean isClosed = false;
+
+	/**
+	 * The channel to read from next.
+	 */
+	private int channelToReadFrom = -1;
+
+	private LocalBufferPool bufferPool;
+
+	/**
+	 * Constructs a new runtime input gate.
+	 * 
+	 * @param jobID
+	 *        the ID of the job this input gate belongs to
+	 * @param gateID
+	 *        the ID of the gate
+	 * @param index
+	 *        the index assigned to this input gate at the {@link Environment} object
+	 */
+	public InputGate(final JobID jobID, final GateID gateID, final int index) {
+		super(jobID, gateID, index);
+	}
+
+	public void initializeChannels(GateDeploymentDescriptor inputGateDescriptor){
+		channels = new InputChannel[inputGateDescriptor.getNumberOfChannelDescriptors()];
+
+		setChannelType(inputGateDescriptor.getChannelType());
+
+		final int nicdd = inputGateDescriptor.getNumberOfChannelDescriptors();
+
+		for(int i = 0; i < nicdd; i++){
+			final ChannelDeploymentDescriptor cdd = inputGateDescriptor.getChannelDescriptor(i);
+			channels[i] = new InputChannel<T>(this, i, cdd.getInputChannelID(),
+					cdd.getOutputChannelID(), getChannelType());
+		}
+	}
+
+	@Override
+	public boolean isInputGate() {
+		return true;
+	}
+
+	/**
+	 * Returns the number of input channels associated with this input gate.
+	 *
+	 * @return the number of input channels associated with this input gate
+	 */
+	public int getNumberOfInputChannels() {
+		return this.channels.length;
+	}
+
+	/**
+	 * Returns the input channel from position <code>pos</code> of the gate's internal channel list.
+	 *
+	 * @param pos
+	 *        the position to retrieve the channel from
+	 * @return the channel from the given position or <code>null</code> if such position does not exist.
+	 */
+	public InputChannel<T> getInputChannel(int pos) {
+		return this.channels[pos];
+	}
+
+	public InputChannel<T>[] channels() {
+		return this.channels;
+	}
+
+	/**
+	 * Reads a record from one of the associated input channels. Channels are read such that one buffer from a channel is
+	 * consecutively consumed. The buffers in turn are consumed in the order in which they arrive.
+	 * Note that this method is not guaranteed to return a record, because the currently available channel data may not always
+	 * constitute an entire record, when events or partial records are part of the data.
+	 *
+	 * When called even though no data is available, this call will block until data is available, so this method should be called
+	 * when waiting is desired (such as when synchronously consuming a single gate) or only when it is known that data is available
+	 * (such as when reading a union of multiple input gates).
+	 *
+	 * @param target The record object into which to construct the complete record.
+	 * @return The result indicating whether a complete record is available, a event is available, only incomplete data
+	 *         is available (NONE), or the gate is exhausted.
+	 * @throws IOException Thrown when an error occurred in the network stack relating to this channel.
+	 * @throws InterruptedException Thrown, when the thread working on this channel is interrupted.
+	 */
+	public InputChannelResult readRecord(T target) throws IOException, InterruptedException {
+
+		if (this.channelToReadFrom == -1) {
+			if (this.isClosed()) {
+				return InputChannelResult.END_OF_STREAM;
+			}
+				
+			if (Thread.interrupted()) {
+				throw new InterruptedException();
+			}
+				
+			this.channelToReadFrom = waitForAnyChannelToBecomeAvailable();
+		}
+			
+		InputChannelResult result = this.getInputChannel(this.channelToReadFrom).readRecord(target);
+		switch (result) {
+			case INTERMEDIATE_RECORD_FROM_BUFFER: // full record and we can stay on the same channel
+				return InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;
+				
+			case LAST_RECORD_FROM_BUFFER: // full record, but we must switch the channel afterwards
+				this.channelToReadFrom = -1;
+				return InputChannelResult.LAST_RECORD_FROM_BUFFER;
+				
+			case END_OF_SUPERSTEP:
+				this.channelToReadFrom = -1;
+				return InputChannelResult.END_OF_SUPERSTEP;
+				
+			case TASK_EVENT: // task event
+				this.currentEvent = this.getInputChannel(this.channelToReadFrom).getCurrentEvent();
+				this.channelToReadFrom = -1;	// event always marks a unit as consumed
+				return InputChannelResult.TASK_EVENT;
+					
+			case NONE: // internal event or an incomplete record that needs further chunks
+				// the current unit is exhausted
+				this.channelToReadFrom = -1;
+				return InputChannelResult.NONE;
+				
+			case END_OF_STREAM: // channel is done
+				this.channelToReadFrom = -1;
+				return isClosed() ? InputChannelResult.END_OF_STREAM : InputChannelResult.NONE;
+				
+			default:   // silence the compiler
+				throw new RuntimeException();
+		}
+	}
+
+	public AbstractTaskEvent getCurrentEvent() {
+		AbstractTaskEvent e = this.currentEvent;
+		this.currentEvent = null;
+		return e;
+	}
+
+	/**
+	 * Notify the gate that the channel with the given index has
+	 * at least one record available.
+	 *
+	 * @param channelIndex
+	 *        the index of the channel which has at least one record available
+	 */
+	public void notifyRecordIsAvailable(int channelIndex) {
+		this.availableChannels.add(Integer.valueOf(channelIndex));
+
+		RecordAvailabilityListener<T> listener = this.recordAvailabilityListener.get();
+		if (listener != null) {
+			listener.reportRecordAvailability(this);
+		}
+	}
+
+	/**
+	 * This method returns the index of a channel which has at least
+	 * one record available. The method may block until at least one
+	 * channel has become ready.
+	 * 
+	 * @return the index of the channel which has at least one record available
+	 */
+	public int waitForAnyChannelToBecomeAvailable() throws InterruptedException {
+		return this.availableChannels.take().intValue();
+	}
+
+
+	@Override
+	public boolean isClosed() throws IOException, InterruptedException {
+
+		if (this.isClosed) {
+			return true;
+		}
+
+		for (int i = 0; i < this.getNumberOfInputChannels(); i++) {
+			final InputChannel<T> inputChannel = this.channels[i];
+			if (!inputChannel.isClosed()) {
+				return false;
+			}
+		}
+
+		this.isClosed = true;
+		
+		return true;
+	}
+
+
+	/**
+	 * Immediately closes the input gate and all its input channels. The corresponding
+	 * output channels are notified. Any remaining records in any buffers or queue is considered
+	 * irrelevant and is discarded.
+	 *
+	 * @throws IOException
+	 *         thrown if an I/O error occurs while closing the gate
+	 * @throws InterruptedException
+	 *         thrown if the thread is interrupted while waiting for the gate to be closed
+	 */
+	public void close() throws IOException, InterruptedException {
+
+		for (int i = 0; i < this.getNumberOfInputChannels(); i++) {
+			final InputChannel<T> inputChannel = this.channels[i];
+			inputChannel.close();
+		}
+
+	}
+
+
+	@Override
+	public String toString() {
+		return "Input " + super.toString();
+	}
+
+
+	@Override
+	public void publishEvent(AbstractEvent event) throws IOException, InterruptedException {
+
+		// Copy event to all connected channels
+		for(int i=0; i< getNumberOfChannels(); i++){
+			channels[i].transferEvent(event);
+		}
+	}
+
+
+	@Override
+	public void releaseAllChannelResources() {
+
+		for(int i=0; i< getNumberOfChannels(); i++){
+			channels[i].releaseAllResources();
+		}
+	}
+
+	/**
+	 * Registers a {@link RecordAvailabilityListener} with this input gate.
+	 *
+	 * @param listener
+	 *        the listener object to be registered
+	 */
+	public void registerRecordAvailabilityListener(final RecordAvailabilityListener<T> listener) {
+		if (!this.recordAvailabilityListener.compareAndSet(null, listener)) {
+			throw new IllegalStateException(this.recordAvailabilityListener
+				+ " is already registered as a record availability listener");
+		}
+	}
+
+	/**
+	 * Notify the gate that is has consumed a data unit from the channel with the given index
+	 *
+	 * @param channelIndex
+	 *        the index of the channel from which a data unit has been consumed
+	 */
+	public void notifyDataUnitConsumed(int channelIndex) {
+		this.channelToReadFrom = -1;
+	}
+
+	//
+
+	@Override
+	public Buffer requestBuffer(int minBufferSize) throws IOException {
+		return this.bufferPool.requestBuffer(minBufferSize);
+	}
+
+	@Override
+	public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
+		return this.bufferPool.requestBufferBlocking(minBufferSize);
+	}
+
+	@Override
+	public int getBufferSize() {
+		return this.bufferPool.getBufferSize();
+	}
+
+	@Override
+	public int getNumberOfChannels() {
+		return getNumberOfInputChannels();
+	}
+
+	@Override
+	public void setDesignatedNumberOfBuffers(int numBuffers) {
+		this.bufferPool.setNumDesignatedBuffers(numBuffers);
+	}
+
+	@Override
+	public void clearLocalBufferPool() {
+		this.bufferPool.destroy();
+	}
+
+	@Override
+	public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool) {
+		this.bufferPool = new LocalBufferPool(globalBufferPool, 1);
+	}
+
+	@Override
+	public void logBufferUtilization() {
+		LOG.info(String.format("\t%s: %d available, %d requested, %d designated",
+				this,
+				this.bufferPool.numAvailableBuffers(),
+				this.bufferPool.numRequestedBuffers(),
+				this.bufferPool.numDesignatedBuffers()));
+	}
+
+	@Override
+	public void reportAsynchronousEvent() {
+		this.bufferPool.reportAsynchronousEvent();
+	}
+
+	@Override
+	public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+		return this.bufferPool.registerBufferAvailabilityListener(listener);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java
new file mode 100644
index 0000000..d3eaea1
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java
@@ -0,0 +1,165 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.gates;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.deployment.ChannelDeploymentDescriptor;
+import eu.stratosphere.nephele.deployment.GateDeploymentDescriptor;
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.OutputChannel;
+import eu.stratosphere.nephele.jobgraph.JobID;
+
+import java.io.IOException;
+
+public class OutputGate extends Gate<IOReadableWritable> {
+
+	private OutputChannel[] channels;
+
+	private boolean closed;
+	
+	/**
+	 * Constructs a new output gate.
+	 *
+	 * @param jobId the ID of the job this input gate belongs to
+	 * @param gateId the ID of the gate
+	 * @param index the index assigned to this output gate at the Environment object
+	 */
+	public OutputGate(JobID jobId, GateID gateId, int index) {
+		super(jobId, gateId, index);
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	//                                             Data processing
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public void sendBuffer(Buffer buffer, int targetChannel) throws IOException, InterruptedException {
+		this.channels[targetChannel].sendBuffer(buffer);
+	}
+
+	public void sendEvent(AbstractEvent event, int targetChannel) throws IOException, InterruptedException {
+		this.channels[targetChannel].sendEvent(event);
+	}
+
+	public void sendBufferAndEvent(Buffer buffer, AbstractEvent event, int targetChannel) throws IOException, InterruptedException {
+		this.channels[targetChannel].sendBufferAndEvent(buffer, event);
+	}
+
+	public void broadcastBuffer(Buffer buffer) throws IOException, InterruptedException {
+		for (int i = 1; i < this.channels.length; i++) {
+			channels[i].sendBuffer(buffer.duplicate());
+		}
+		channels[0].sendBuffer(buffer);
+	}
+
+	public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
+		for (OutputChannel channel : this.channels) {
+			channel.sendEvent(event);
+		}
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	//                                              Channels
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public void initializeChannels(GateDeploymentDescriptor descriptor) {
+		int numChannels = descriptor.getNumberOfChannelDescriptors();
+		this.channels = new OutputChannel[numChannels];
+
+		setChannelType(descriptor.getChannelType());
+
+		for (int i = 0; i < numChannels; i++) {
+			ChannelDeploymentDescriptor channelDescriptor = descriptor.getChannelDescriptor(i);
+
+			ChannelID id = channelDescriptor.getOutputChannelID();
+			ChannelID connectedId = channelDescriptor.getInputChannelID();
+
+			this.channels[i] = new OutputChannel(this, i, id, connectedId, getChannelType());
+		}
+	}
+
+	public OutputChannel[] channels() {
+		return this.channels;
+	}
+
+	public OutputChannel getChannel(int index) {
+		return (index < this.channels.length) ? this.channels[index] : null;
+	}
+
+	public int getNumChannels() {
+		return this.channels.length;
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	//                                              Shutdown
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public void requestClose() throws IOException, InterruptedException {
+		for (OutputChannel channel : this.channels) {
+			channel.requestClose();
+		}
+	}
+
+	@Override
+	public boolean isClosed() {
+		if (this.closed) {
+			return true;
+		}
+		
+		for (OutputChannel channel : this.channels) {
+			if (!channel.isClosed()) {
+				return false;
+			}
+		}
+		
+		this.closed = true;
+		return true;
+	}
+	
+	public void waitForGateToBeClosed() throws InterruptedException {
+		if (this.closed) {
+			return;
+		}
+		
+		for (OutputChannel channel : this.channels) {
+			channel.waitForChannelToBeClosed();
+		}
+		
+		this.closed = true;
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean isInputGate() {
+		return false;
+	}
+
+	@Override
+	public String toString() {
+		return "Output " + super.toString();
+	}
+
+	@Override
+	public void publishEvent(AbstractEvent event) throws IOException, InterruptedException {
+		// replaced by broadcastEvent(AbstractEvent) => TODO will be removed with changes to input side
+	}
+
+	@Override
+	public void releaseAllChannelResources() {
+		// nothing to do for buffer oriented runtime => TODO will be removed with changes to input side
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/RecordAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/RecordAvailabilityListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/RecordAvailabilityListener.java
new file mode 100644
index 0000000..ea1d865
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/RecordAvailabilityListener.java
@@ -0,0 +1,36 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.gates;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.runtime.io.gates.InputGate;
+
+/**
+ * This interface can be implemented by a class which shall be notified by an input gate when one of the its connected
+ * input channels has at least one record available for reading.
+ * 
+ * @param <T>
+ *        the type of record transported through the corresponding input gate
+ */
+public interface RecordAvailabilityListener<T extends IOReadableWritable> {
+
+	/**
+	 * This method is called by an input gate when one of its connected input channels has at least one record available
+	 * for reading.
+	 * 
+	 * @param inputGate
+	 *        the input gate which has at least one record available
+	 */
+	void reportRecordAvailability(InputGate<T> inputGate);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
new file mode 100644
index 0000000..405d79e
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
@@ -0,0 +1,646 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.execution.CancelTaskException;
+import eu.stratosphere.nephele.execution.Environment;
+import eu.stratosphere.nephele.execution.RuntimeEnvironment;
+import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
+import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
+import eu.stratosphere.runtime.io.channels.Channel;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.InputChannel;
+import eu.stratosphere.runtime.io.channels.OutputChannel;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.nephele.protocols.ChannelLookupProtocol;
+import eu.stratosphere.nephele.taskmanager.Task;
+import eu.stratosphere.nephele.AbstractID;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
+import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
+import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
+import eu.stratosphere.runtime.io.network.bufferprovider.SerialSingleBufferPool;
+import eu.stratosphere.runtime.io.network.envelope.Envelope;
+import eu.stratosphere.runtime.io.network.envelope.EnvelopeDispatcher;
+import eu.stratosphere.runtime.io.network.envelope.EnvelopeReceiverList;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.gates.InputGate;
+import eu.stratosphere.runtime.io.gates.OutputGate;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The channel manager sets up the network buffers and dispatches data between channels.
+ */
+public final class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker {
+
+	private static final Log LOG = LogFactory.getLog(ChannelManager.class);
+
+	private final ChannelLookupProtocol channelLookupService;
+
+	private final InstanceConnectionInfo connectionInfo;
+
+	private final Map<ChannelID, Channel> channels;
+
+	private final Map<AbstractID, LocalBufferPoolOwner> localBuffersPools;
+
+	private final Map<ChannelID, EnvelopeReceiverList> receiverCache;
+
+	private final GlobalBufferPool globalBufferPool;
+
+	private final NetworkConnectionManager networkConnectionManager;
+	
+	private final InetSocketAddress ourAddress;
+	
+	private final SerialSingleBufferPool discardingDataPool;
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public ChannelManager(ChannelLookupProtocol channelLookupService, InstanceConnectionInfo connectionInfo,
+						  int numNetworkBuffers, int networkBufferSize) throws IOException {
+		this.channelLookupService = channelLookupService;
+		this.connectionInfo = connectionInfo;
+		this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize);
+		this.networkConnectionManager = new NetworkConnectionManager(this, connectionInfo.address(), connectionInfo.dataPort());
+
+		// management data structures
+		this.channels = new ConcurrentHashMap<ChannelID, Channel>();
+		this.receiverCache = new ConcurrentHashMap<ChannelID, EnvelopeReceiverList>();
+		this.localBuffersPools = new ConcurrentHashMap<AbstractID, LocalBufferPoolOwner>();
+		
+		this.ourAddress = new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort());
+		
+		// a special pool if the data is to be discarded
+		this.discardingDataPool = new SerialSingleBufferPool(networkBufferSize);
+	}
+
+	public void shutdown() {
+		this.networkConnectionManager.shutDown();
+		this.globalBufferPool.destroy();
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	//                                               Task registration
+	// -----------------------------------------------------------------------------------------------------------------
+
+	/**
+	 * Registers the given task with the channel manager.
+	 *
+	 * @param task the task to be registered
+	 * @throws InsufficientResourcesException thrown if not enough buffers available to safely run this task
+	 */
+	public void register(Task task) throws InsufficientResourcesException {
+		// Check if we can safely run this task with the given buffers
+		ensureBufferAvailability(task);
+
+		RuntimeEnvironment environment = task.getRuntimeEnvironment();
+
+		// -------------------------------------------------------------------------------------------------------------
+		//                                       Register output channels
+		// -------------------------------------------------------------------------------------------------------------
+
+		environment.registerGlobalBufferPool(this.globalBufferPool);
+
+		if (this.localBuffersPools.containsKey(task.getVertexID())) {
+			throw new IllegalStateException("Vertex " + task.getVertexID() + " has a previous buffer pool owner");
+		}
+
+		for (OutputGate gate : environment.outputGates()) {
+			// add receiver list hints
+			for (OutputChannel channel : gate.channels()) {
+				// register envelope dispatcher with the channel
+				channel.registerEnvelopeDispatcher(this);
+
+				switch (channel.getChannelType()) {
+					case IN_MEMORY:
+						addReceiverListHint(channel.getID(), channel.getConnectedId());
+						break;
+					case NETWORK:
+						addReceiverListHint(channel.getConnectedId(), channel.getID());
+						break;
+				}
+
+				this.channels.put(channel.getID(), channel);
+			}
+		}
+
+		this.localBuffersPools.put(task.getVertexID(), environment);
+
+		// -------------------------------------------------------------------------------------------------------------
+		//                                       Register input channels
+		// -------------------------------------------------------------------------------------------------------------
+
+		// register global
+		for (InputGate<?> gate : environment.inputGates()) {
+			gate.registerGlobalBufferPool(this.globalBufferPool);
+
+			for (int i = 0; i < gate.getNumberOfInputChannels(); i++) {
+				InputChannel<? extends IOReadableWritable> channel = gate.getInputChannel(i);
+				channel.registerEnvelopeDispatcher(this);
+
+				if (channel.getChannelType() == ChannelType.IN_MEMORY) {
+					addReceiverListHint(channel.getID(), channel.getConnectedId());
+				}
+
+				this.channels.put(channel.getID(), channel);
+			}
+
+			this.localBuffersPools.put(gate.getGateID(), gate);
+		}
+
+		// the number of channels per buffers has changed after unregistering the task
+		// => redistribute the number of designated buffers of the registered local buffer pools
+		redistributeBuffers();
+	}
+
+	/**
+	 * Unregisters the given task from the channel manager.
+	 *
+	 * @param vertexId the ID of the task to be unregistered
+	 * @param task the task to be unregistered
+	 */
+	public void unregister(ExecutionVertexID vertexId, Task task) {
+		final Environment environment = task.getEnvironment();
+
+		// destroy and remove OUTPUT channels from registered channels and cache
+		for (ChannelID id : environment.getOutputChannelIDs()) {
+			Channel channel = this.channels.remove(id);
+			if (channel != null) {
+				channel.destroy();
+			}
+
+			this.receiverCache.remove(channel);
+		}
+
+		// destroy and remove INPUT channels from registered channels and cache
+		for (ChannelID id : environment.getInputChannelIDs()) {
+			Channel channel = this.channels.remove(id);
+			if (channel != null) {
+				channel.destroy();
+			}
+
+			this.receiverCache.remove(channel);
+		}
+
+		// clear and remove INPUT side buffer pools
+		for (GateID id : environment.getInputGateIDs()) {
+			LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(id);
+			if (bufferPool != null) {
+				bufferPool.clearLocalBufferPool();
+			}
+		}
+
+		// clear and remove OUTPUT side buffer pool
+		LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(vertexId);
+		if (bufferPool != null) {
+			bufferPool.clearLocalBufferPool();
+		}
+
+		// the number of channels per buffers has changed after unregistering the task
+		// => redistribute the number of designated buffers of the registered local buffer pools
+		redistributeBuffers();
+	}
+
+	/**
+	 * Ensures that the channel manager has enough buffers to execute the given task.
+	 * <p>
+	 * If there is less than one buffer per channel available, an InsufficientResourcesException will be thrown,
+	 * because of possible deadlocks. With more then one buffer per channel, deadlock-freedom is guaranteed.
+	 *
+	 * @param task task to be executed
+	 * @throws InsufficientResourcesException thrown if not enough buffers available to execute the task
+	 */
+	private void ensureBufferAvailability(Task task) throws InsufficientResourcesException {
+		Environment env = task.getEnvironment();
+
+		int numBuffers = this.globalBufferPool.numBuffers();
+		// existing channels + channels of the task
+		int numChannels = this.channels.size() + env.getNumberOfOutputChannels() + env.getNumberOfInputChannels();
+
+		// need at least one buffer per channel
+		if (numBuffers / numChannels < 1) {
+			String msg = String.format("%s has not enough buffers to safely execute %s (%d buffers missing)",
+					this.connectionInfo.hostname(), env.getTaskName(), numChannels - numBuffers);
+
+			throw new InsufficientResourcesException(msg);
+		}
+	}
+
+	/**
+	 * Redistributes the buffers among the registered buffer pools. This method is called after each task registration
+	 * and unregistration.
+	 * <p>
+	 * Every registered buffer pool gets buffers according to its number of channels weighted by the current buffer to
+	 * channel ratio.
+	 */
+	private void redistributeBuffers() {
+		if (this.localBuffersPools.isEmpty() | this.channels.size() == 0) {
+			return;
+		}
+
+		int numBuffers = this.globalBufferPool.numBuffers();
+		int numChannels = this.channels.size();
+
+		double buffersPerChannel = numBuffers / (double) numChannels;
+
+		if (buffersPerChannel < 1.0) {
+			throw new RuntimeException("System has not enough buffers to execute tasks.");
+		}
+
+		// redistribute number of designated buffers per buffer pool
+		for (LocalBufferPoolOwner bufferPool : this.localBuffersPools.values()) {
+			int numDesignatedBuffers = (int) Math.ceil(buffersPerChannel * bufferPool.getNumberOfChannels());
+			bufferPool.setDesignatedNumberOfBuffers(numDesignatedBuffers);
+		}
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	//                                           Envelope processing
+	// -----------------------------------------------------------------------------------------------------------------
+
+	private void releaseEnvelope(Envelope envelope) {
+		Buffer buffer = envelope.getBuffer();
+		if (buffer != null) {
+			buffer.recycleBuffer();
+		}
+	}
+
+	private void addReceiverListHint(ChannelID source, ChannelID localReceiver) {
+		EnvelopeReceiverList receiverList = new EnvelopeReceiverList(localReceiver);
+
+		if (this.receiverCache.put(source, receiverList) != null) {
+			LOG.warn("Receiver cache already contained entry for " + source);
+		}
+	}
+
+	private void addReceiverListHint(ChannelID source, RemoteReceiver remoteReceiver) {
+		EnvelopeReceiverList receiverList = new EnvelopeReceiverList(remoteReceiver);
+
+		if (this.receiverCache.put(source, receiverList) != null) {
+			LOG.warn("Receiver cache already contained entry for " + source);
+		}
+	}
+
+	private void generateSenderHint(Envelope envelope, RemoteReceiver receiver) {
+		Channel channel = this.channels.get(envelope.getSource());
+		if (channel == null) {
+			LOG.error("Cannot find channel for channel ID " + envelope.getSource());
+			return;
+		}
+
+		// Only generate sender hints for output channels
+		if (channel.isInputChannel()) {
+			return;
+		}
+
+		final ChannelID targetChannelID = channel.getConnectedId();
+		final int connectionIndex = receiver.getConnectionIndex();
+
+		final RemoteReceiver ourAddress = new RemoteReceiver(this.ourAddress, connectionIndex);
+		final Envelope senderHint = SenderHintEvent.createEnvelopeWithEvent(envelope, targetChannelID, ourAddress);
+
+		this.networkConnectionManager.queueEnvelopeForTransfer(receiver, senderHint);
+	}
+
+	/**
+	 * Returns the list of receivers for transfer envelopes produced by the channel with the given source channel ID.
+	 *
+	 * @param jobID
+	 *        the ID of the job the given channel ID belongs to
+	 * @param sourceChannelID
+	 *        the source channel ID for which the receiver list shall be retrieved
+	 * @return the list of receivers or <code>null</code> if the receiver could not be determined
+	 * @throws IOException
+	 * @throws InterruptedException
+	 */
+	private EnvelopeReceiverList getReceiverList(JobID jobID, ChannelID sourceChannelID, boolean reportException) throws IOException {
+		EnvelopeReceiverList receiverList = this.receiverCache.get(sourceChannelID);
+
+		if (receiverList != null) {
+			return receiverList;
+		}
+
+		while (true) {
+			ConnectionInfoLookupResponse lookupResponse;
+			synchronized (this.channelLookupService) {
+				lookupResponse = this.channelLookupService.lookupConnectionInfo(this.connectionInfo, jobID, sourceChannelID);
+			}
+
+			if (lookupResponse.receiverReady()) {
+				receiverList = new EnvelopeReceiverList(lookupResponse);
+				break;
+			}
+			else if (lookupResponse.receiverNotReady()) {
+				try {
+					Thread.sleep(500);
+				} catch (InterruptedException e) {
+					if (reportException) {
+						throw new IOException("Lookup was interrupted.");
+					} else {
+						return null;
+					}
+				}
+			}
+			else if (lookupResponse.isJobAborting()) {
+				if (reportException) {
+					throw new CancelTaskException();
+				} else {
+					return null;
+				}
+			}
+			else if (lookupResponse.receiverNotFound()) {
+				if (reportException) {
+					throw new IOException("Could not find the receiver for Job " + jobID + ", channel with source id " + sourceChannelID);
+				} else {
+					return null;
+				}
+			}
+			else {
+				throw new IllegalStateException("Unrecognized response to channel lookup.");
+			}
+		}
+
+		this.receiverCache.put(sourceChannelID, receiverList);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Receivers for source channel ID " + sourceChannelID + " at task manager " + this.connectionInfo +
+				": " + receiverList);
+		}
+
+		return receiverList;
+	}
+
+	/**
+	 * Invalidates the entries identified by the given channel IDs from the receiver lookup cache.
+	 *
+	 * @param channelIDs channel IDs for entries to invalidate
+	 */
+	public void invalidateLookupCacheEntries(Set<ChannelID> channelIDs) {
+		for (ChannelID id : channelIDs) {
+			this.receiverCache.remove(id);
+		}
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	//                                       EnvelopeDispatcher methods
+	// -----------------------------------------------------------------------------------------------------------------
+
+	@Override
+	public void dispatchFromOutputChannel(Envelope envelope) throws IOException, InterruptedException {
+		EnvelopeReceiverList receiverList = getReceiverListForEnvelope(envelope, true);
+
+		Buffer srcBuffer = envelope.getBuffer();
+		Buffer destBuffer = null;
+		
+		boolean success = false;
+		
+		try {
+			if (receiverList.hasLocalReceiver()) {
+				ChannelID receiver = receiverList.getLocalReceiver();
+				Channel channel = this.channels.get(receiver);
+
+				if (channel == null) {
+					throw new LocalReceiverCancelledException(receiver);
+				}
+
+				if (!channel.isInputChannel()) {
+					throw new IOException("Local receiver " + receiver + " is not an input channel.");
+				}
+
+				InputChannel<?> inputChannel = (InputChannel<?>) channel;
+				
+				// copy the buffer into the memory space of the receiver 
+				if (srcBuffer != null) {
+					try {
+						destBuffer = inputChannel.requestBufferBlocking(srcBuffer.size());
+					} catch (InterruptedException e) {
+						throw new IOException(e.getMessage());
+					}
+					
+					srcBuffer.copyToBuffer(destBuffer);
+					envelope.setBuffer(destBuffer);
+					srcBuffer.recycleBuffer();
+				}
+				
+				inputChannel.queueEnvelope(envelope);
+				success = true;
+			}
+			else if (receiverList.hasRemoteReceiver()) {
+				RemoteReceiver remoteReceiver = receiverList.getRemoteReceiver();
+
+				// Generate sender hint before sending the first envelope over the network
+				if (envelope.getSequenceNumber() == 0) {
+					generateSenderHint(envelope, remoteReceiver);
+				}
+
+				this.networkConnectionManager.queueEnvelopeForTransfer(remoteReceiver, envelope);
+				success = true;
+			}
+		} finally {
+			if (!success) {
+				if (srcBuffer != null) {
+					srcBuffer.recycleBuffer();
+				}
+				if (destBuffer != null) {
+					destBuffer.recycleBuffer();
+				}
+			}
+		}
+	}
+
+	@Override
+	public void dispatchFromInputChannel(Envelope envelope) throws IOException, InterruptedException {
+		// this method sends only events back from input channels to output channels
+		// sanity check that we have no buffer
+		if (envelope.getBuffer() != null) {
+			throw new RuntimeException("Error: This method can only process envelopes without buffers.");
+		}
+		
+		EnvelopeReceiverList receiverList = getReceiverListForEnvelope(envelope, true);
+
+		if (receiverList.hasLocalReceiver()) {
+			ChannelID receiver = receiverList.getLocalReceiver();
+			Channel channel = this.channels.get(receiver);
+
+			if (channel == null) {
+				throw new LocalReceiverCancelledException(receiver);
+			}
+
+			if (channel.isInputChannel()) {
+				throw new IOException("Local receiver " + receiver + " of backward event is not an output channel.");
+			}
+
+			OutputChannel outputChannel = (OutputChannel) channel;
+			outputChannel.queueEnvelope(envelope);
+		}
+		else if (receiverList.hasRemoteReceiver()) {
+			RemoteReceiver remoteReceiver = receiverList.getRemoteReceiver();
+
+			// Generate sender hint before sending the first envelope over the network
+			if (envelope.getSequenceNumber() == 0) {
+				generateSenderHint(envelope, remoteReceiver);
+			}
+
+			this.networkConnectionManager.queueEnvelopeForTransfer(remoteReceiver, envelope);
+		}
+	}
+
+	/**
+	 * 
+	 */
+	@Override
+	public void dispatchFromNetwork(Envelope envelope) throws IOException, InterruptedException {
+		// ========================================================================================
+		//  IMPORTANT
+		//  
+		//  This method is called by the network I/O thread that reads the incoming TCP 
+		//  connections. This method must have minimal overhead and not throw exception if
+		//  something is wrong with a job or individual transmission, but only when something
+		//  is fundamentally broken in the system.
+		// ========================================================================================
+		
+		// the sender hint event is to let the receiver know where exactly the envelope came from.
+		// the receiver will cache the sender id and its connection info in its local lookup table
+		// that allows the receiver to send envelopes to the sender without first pinging the job manager
+		// for the sender's connection info
+		
+		// Check if the envelope is the special envelope with the sender hint event
+		if (SenderHintEvent.isSenderHintEvent(envelope)) {
+			// Check if this is the final destination of the sender hint event before adding it
+			final SenderHintEvent seh = (SenderHintEvent) envelope.deserializeEvents().get(0);
+			if (this.channels.get(seh.getSource()) != null) {
+				addReceiverListHint(seh.getSource(), seh.getRemoteReceiver());
+				return;
+			}
+		}
+		
+		// try and get the receiver list. if we cannot get it anymore, the task has been cleared
+		// the code frees the envelope on exception, so we need not to anything
+		EnvelopeReceiverList receiverList = getReceiverListForEnvelope(envelope, false);
+		if (receiverList == null) {
+			// receiver is cancelled and cleaned away
+			releaseEnvelope(envelope);
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Dropping envelope for cleaned up receiver.");
+			}
+
+			return;
+		}
+
+		if (!receiverList.hasLocalReceiver() || receiverList.hasRemoteReceiver()) {
+			throw new IOException("Bug in network stack: Envelope dispatched from the incoming network pipe has no local receiver or has a remote receiver");
+		}
+
+		ChannelID localReceiver = receiverList.getLocalReceiver();
+		Channel channel = this.channels.get(localReceiver);
+		
+		// if the channel is null, it means that receiver has been cleared already (cancelled or failed).
+		// release the buffer immediately
+		if (channel == null) {
+			releaseEnvelope(envelope);
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Dropping envelope for cancelled receiver " + localReceiver);
+			}
+		}
+		else {
+			channel.queueEnvelope(envelope);
+		}
+	}
+
+	/**
+	 * 
+	 * Upon an exception, this method frees the envelope.
+	 * 
+	 * @param envelope
+	 * @return
+	 * @throws IOException
+	 */
+	private final EnvelopeReceiverList getReceiverListForEnvelope(Envelope envelope, boolean reportException) throws IOException {
+		try {
+			return getReceiverList(envelope.getJobID(), envelope.getSource(), reportException);
+		} catch (IOException e) {
+			releaseEnvelope(envelope);
+			throw e;
+		} catch (CancelTaskException e) {
+			releaseEnvelope(envelope);
+			throw e;
+		}
+	}
+	
+	// -----------------------------------------------------------------------------------------------------------------
+	//                                       BufferProviderBroker methods
+	// -----------------------------------------------------------------------------------------------------------------
+
+	@Override
+	public BufferProvider getBufferProvider(JobID jobID, ChannelID sourceChannelID) throws IOException {
+		EnvelopeReceiverList receiverList = getReceiverList(jobID, sourceChannelID, false);
+		
+		// check if the receiver is already gone
+		if (receiverList == null) {
+			return this.discardingDataPool;
+		}
+
+		if (!receiverList.hasLocalReceiver() || receiverList.hasRemoteReceiver()) {
+			throw new IOException("The destination to be looked up is not a single local endpoint.");
+		}
+		
+
+		ChannelID localReceiver = receiverList.getLocalReceiver();
+		Channel channel = this.channels.get(localReceiver);
+		
+		if (channel == null) {
+			// receiver is already canceled
+			return this.discardingDataPool;
+		}
+
+		if (!channel.isInputChannel()) {
+			throw new IOException("Channel context for local receiver " + localReceiver + " is not an input channel context");
+		}
+
+		return (InputChannel<?>) channel;
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public void logBufferUtilization() {
+		System.out.println("Buffer utilization at " + System.currentTimeMillis());
+
+		System.out.println("\tUnused global buffers: " + this.globalBufferPool.numAvailableBuffers());
+
+		System.out.println("\tLocal buffer pool status:");
+
+		for (LocalBufferPoolOwner bufferPool : this.localBuffersPools.values()) {
+			bufferPool.logBufferUtilization();
+		}
+
+		this.networkConnectionManager.logBufferUtilization();
+
+		System.out.println("\tIncoming connections:");
+
+		for (Channel channel : this.channels.values()) {
+			if (channel.isInputChannel()) {
+				((InputChannel<?>) channel).logQueuedEnvelopes();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ConnectionInfoLookupResponse.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ConnectionInfoLookupResponse.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ConnectionInfoLookupResponse.java
new file mode 100644
index 0000000..aeb5025
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ConnectionInfoLookupResponse.java
@@ -0,0 +1,143 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+
+public class ConnectionInfoLookupResponse implements IOReadableWritable {
+
+	private enum ReturnCode {
+		NOT_FOUND, FOUND_AND_RECEIVER_READY, FOUND_BUT_RECEIVER_NOT_READY, JOB_IS_ABORTING
+	};
+
+	// was request successful?
+	private ReturnCode returnCode;
+	
+	private RemoteReceiver remoteTarget;
+
+	private ChannelID localTarget;
+	
+	
+	public ConnectionInfoLookupResponse() {}
+
+	public ConnectionInfoLookupResponse(ReturnCode code) {
+		this.returnCode = code;
+		this.remoteTarget = null;
+		this.localTarget = null;
+	}
+	
+	public ConnectionInfoLookupResponse(ReturnCode code, ChannelID localTarget) {
+		this.returnCode = code;
+		this.remoteTarget = null;
+		this.localTarget = localTarget;
+	}
+	
+	public ConnectionInfoLookupResponse(ReturnCode code, RemoteReceiver receiver) {
+		this.returnCode = code;
+		this.remoteTarget = receiver;
+		this.localTarget = null;
+	}
+
+	public RemoteReceiver getRemoteTarget() {
+		return this.remoteTarget;
+	}
+
+	public ChannelID getLocalTarget() {
+		return this.localTarget;
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		this.returnCode = ReturnCode.values()[in.readInt()];
+		
+		if (in.readBoolean()) {
+			this.remoteTarget = new RemoteReceiver();
+			this.remoteTarget.read(in);
+		}
+		if (in.readBoolean()) {
+			this.localTarget = new ChannelID();
+			this.localTarget.read(in);
+		}
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeInt(this.returnCode.ordinal());
+		
+		if (this.remoteTarget != null) {
+			out.writeBoolean(true);
+			this.remoteTarget.write(out);
+		} else {
+			out.writeBoolean(false);
+		}
+
+		if (this.localTarget != null) {
+			out.writeBoolean(true);
+			this.localTarget.write(out);
+		} else {
+			out.writeBoolean(false);
+		}
+	}
+
+	public boolean receiverNotFound() {
+		return (this.returnCode == ReturnCode.NOT_FOUND);
+	}
+
+	public boolean receiverNotReady() {
+		return (this.returnCode == ReturnCode.FOUND_BUT_RECEIVER_NOT_READY);
+	}
+
+	public boolean receiverReady() {
+		return (this.returnCode == ReturnCode.FOUND_AND_RECEIVER_READY);
+	}
+
+	public boolean isJobAborting() {
+		return (this.returnCode == ReturnCode.JOB_IS_ABORTING);
+	}
+
+	
+	public static ConnectionInfoLookupResponse createReceiverFoundAndReady(ChannelID targetChannelID) {
+		return new ConnectionInfoLookupResponse(ReturnCode.FOUND_AND_RECEIVER_READY, targetChannelID);
+	}
+
+	public static ConnectionInfoLookupResponse createReceiverFoundAndReady(RemoteReceiver remoteReceiver) {
+		return new ConnectionInfoLookupResponse(ReturnCode.FOUND_AND_RECEIVER_READY, remoteReceiver);
+	}
+
+	public static ConnectionInfoLookupResponse createReceiverNotFound() {
+		return new ConnectionInfoLookupResponse(ReturnCode.NOT_FOUND);
+	}
+
+	public static ConnectionInfoLookupResponse createReceiverNotReady() {
+		return new ConnectionInfoLookupResponse(ReturnCode.FOUND_BUT_RECEIVER_NOT_READY);
+	}
+
+	public static ConnectionInfoLookupResponse createJobIsAborting() {
+		return new ConnectionInfoLookupResponse(ReturnCode.JOB_IS_ABORTING);
+	}
+
+	
+	@Override
+	public String toString() {
+		return this.returnCode.name() + ", local target: " + this.localTarget + ", remoteTarget: " + this.remoteTarget;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/InsufficientResourcesException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/InsufficientResourcesException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/InsufficientResourcesException.java
new file mode 100644
index 0000000..0d2fcd4
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/InsufficientResourcesException.java
@@ -0,0 +1,37 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+/**
+ * This exception is thrown by the {@link ChannelManager} to indicate that a task cannot be accepted because
+ * there are not enough resources available to safely execute it.
+ * 
+ */
+public final class InsufficientResourcesException extends Exception {
+
+	/**
+	 * The generated serial version UID.
+	 */
+	private static final long serialVersionUID = -8977049569413215169L;
+
+	/**
+	 * Constructs a new insufficient resources exception.
+	 * 
+	 * @param msg
+	 *        the message describing the exception
+	 */
+	InsufficientResourcesException(final String msg) {
+		super(msg);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/LocalReceiverCancelledException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/LocalReceiverCancelledException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/LocalReceiverCancelledException.java
new file mode 100644
index 0000000..769ada5
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/LocalReceiverCancelledException.java
@@ -0,0 +1,37 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.runtime.io.network;
+
+import eu.stratosphere.nephele.execution.CancelTaskException;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+
+
+/**
+ *
+ */
+public class LocalReceiverCancelledException extends CancelTaskException {
+	private static final long serialVersionUID = 1L;
+
+	private final ChannelID receiver;
+
+	public LocalReceiverCancelledException(ChannelID receiver) {
+		this.receiver = receiver;
+	}
+	
+	
+	public ChannelID getReceiver() {
+		return receiver;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java
new file mode 100644
index 0000000..44ec642
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java
@@ -0,0 +1,176 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.configuration.GlobalConfiguration;
+import eu.stratosphere.runtime.io.network.envelope.Envelope;
+import eu.stratosphere.runtime.io.network.tcp.IncomingConnectionThread;
+import eu.stratosphere.runtime.io.network.tcp.OutgoingConnection;
+import eu.stratosphere.runtime.io.network.tcp.OutgoingConnectionThread;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * The network connection manager manages incoming and outgoing network connection from and to other hosts.
+ * <p>
+ * This class is thread-safe.
+ * 
+ */
+public final class NetworkConnectionManager {
+
+	/**
+	 * The default number of threads dealing with outgoing connections.
+	 */
+	private static final int DEFAULT_NUMBER_OF_OUTGOING_CONNECTION_THREADS = 1;
+
+	/**
+	 * The default number of connection retries before giving up.
+	 */
+	private static final int DEFAULT_NUMBER_OF_CONNECTION_RETRIES = 10;
+
+	/**
+	 * List of active threads dealing with outgoing connections.
+	 */
+	private final List<OutgoingConnectionThread> outgoingConnectionThreads = new CopyOnWriteArrayList<OutgoingConnectionThread>();
+
+	/**
+	 * Thread dealing with incoming connections.
+	 */
+	private final IncomingConnectionThread incomingConnectionThread;
+
+	/**
+	 * Map containing currently active outgoing connections.
+	 */
+	private final ConcurrentMap<RemoteReceiver, OutgoingConnection> outgoingConnections = new ConcurrentHashMap<RemoteReceiver, OutgoingConnection>();
+
+	/**
+	 * The number of connection retries before giving up.
+	 */
+	private final int numberOfConnectionRetries;
+
+	/**
+	 * A buffer provider for read buffers
+	 */
+	private final ChannelManager channelManager;
+
+	public NetworkConnectionManager(final ChannelManager channelManager,
+			final InetAddress bindAddress, final int dataPort) throws IOException {
+
+		final Configuration configuration = GlobalConfiguration.getConfiguration();
+
+		this.channelManager = channelManager;
+
+		// Start the connection threads
+		final int numberOfOutgoingConnectionThreads = configuration.getInteger(
+			"channel.network.numberOfOutgoingConnectionThreads", DEFAULT_NUMBER_OF_OUTGOING_CONNECTION_THREADS);
+
+		for (int i = 0; i < numberOfOutgoingConnectionThreads; i++) {
+			final OutgoingConnectionThread outgoingConnectionThread = new OutgoingConnectionThread();
+			outgoingConnectionThread.start();
+			this.outgoingConnectionThreads.add(outgoingConnectionThread);
+		}
+
+		this.incomingConnectionThread = new IncomingConnectionThread(
+			this.channelManager, true, new InetSocketAddress(bindAddress, dataPort));
+		this.incomingConnectionThread.start();
+
+		this.numberOfConnectionRetries = configuration.getInteger("channel.network.numberOfConnectionRetries",
+			DEFAULT_NUMBER_OF_CONNECTION_RETRIES);
+	}
+
+	/**
+	 * Randomly selects one of the active threads dealing with outgoing connections.
+	 * 
+	 * @return one of the active threads dealing with outgoing connections
+	 */
+	private OutgoingConnectionThread getOutgoingConnectionThread() {
+
+		return this.outgoingConnectionThreads.get((int) (this.outgoingConnectionThreads.size() * Math.random()));
+	}
+
+	/**
+	 * Queues an envelope for transfer to a particular target host.
+	 * 
+	 * @param remoteReceiver
+	 *        the address of the remote receiver
+	 * @param envelope
+	 *        the envelope to be transfered
+	 */
+	public void queueEnvelopeForTransfer(final RemoteReceiver remoteReceiver, final Envelope envelope) {
+
+		getOutgoingConnection(remoteReceiver).queueEnvelope(envelope);
+	}
+
+	/**
+	 * Returns (and possibly creates) the outgoing connection for the given target address.
+	 * 
+	 * @param targetAddress
+	 *        the address of the connection target
+	 * @return the outgoing connection object
+	 */
+	private OutgoingConnection getOutgoingConnection(final RemoteReceiver remoteReceiver) {
+
+		OutgoingConnection outgoingConnection = this.outgoingConnections.get(remoteReceiver);
+
+		if (outgoingConnection == null) {
+
+			outgoingConnection = new OutgoingConnection(remoteReceiver, getOutgoingConnectionThread(),
+				this.numberOfConnectionRetries);
+
+			final OutgoingConnection oldEntry = this.outgoingConnections
+				.putIfAbsent(remoteReceiver, outgoingConnection);
+
+			// We had a race, use the old value
+			if (oldEntry != null) {
+				outgoingConnection = oldEntry;
+			}
+		}
+
+		return outgoingConnection;
+	}
+
+	public void shutDown() {
+
+		// Interrupt the threads we started
+		this.incomingConnectionThread.interrupt();
+
+		final Iterator<OutgoingConnectionThread> it = this.outgoingConnectionThreads.iterator();
+		while (it.hasNext()) {
+			it.next().interrupt();
+		}
+	}
+
+	public void logBufferUtilization() {
+
+		System.out.println("\tOutgoing connections:");
+
+		final Iterator<Map.Entry<RemoteReceiver, OutgoingConnection>> it = this.outgoingConnections.entrySet()
+			.iterator();
+
+		while (it.hasNext()) {
+
+			final Map.Entry<RemoteReceiver, OutgoingConnection> entry = it.next();
+			System.out.println("\t\tOC " + entry.getKey() + ": " + entry.getValue().getNumberOfQueuedWriteBuffers());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/RemoteReceiver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/RemoteReceiver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/RemoteReceiver.java
new file mode 100644
index 0000000..da36ad0
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/RemoteReceiver.java
@@ -0,0 +1,157 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.util.StringUtils;
+
+/**
+ * Objects of this class uniquely identify a connection to a remote {@link TaskManager}.
+ * 
+ */
+public final class RemoteReceiver implements IOReadableWritable {
+
+	/**
+	 * The address of the connection to the remote {@link TaskManager}.
+	 */
+	private InetSocketAddress connectionAddress;
+
+	/**
+	 * The index of the connection to the remote {@link TaskManager}.
+	 */
+	private int connectionIndex;
+
+	/**
+	 * Constructs a new remote receiver object.
+	 * 
+	 * @param connectionAddress
+	 *        the address of the connection to the remote {@link TaskManager}
+	 * @param connectionIndex
+	 *        the index of the connection to the remote {@link TaskManager}
+	 */
+	public RemoteReceiver(final InetSocketAddress connectionAddress, final int connectionIndex) {
+
+		if (connectionAddress == null) {
+			throw new IllegalArgumentException("Argument connectionAddress must not be null");
+		}
+
+		if (connectionIndex < 0) {
+			throw new IllegalArgumentException("Argument connectionIndex must be a non-negative integer number");
+		}
+
+		this.connectionAddress = connectionAddress;
+		this.connectionIndex = connectionIndex;
+	}
+
+	/**
+	 * Default constructor for serialization/deserialization.
+	 */
+	public RemoteReceiver() {
+		this.connectionAddress = null;
+		this.connectionIndex = -1;
+	}
+
+	/**
+	 * Returns the address of the connection to the remote {@link TaskManager}.
+	 * 
+	 * @return the address of the connection to the remote {@link TaskManager}
+	 */
+	public InetSocketAddress getConnectionAddress() {
+
+		return this.connectionAddress;
+	}
+
+	/**
+	 * Returns the index of the connection to the remote {@link TaskManager}.
+	 * 
+	 * @return the index of the connection to the remote {@link TaskManager}
+	 */
+	public int getConnectionIndex() {
+
+		return this.connectionIndex;
+	}
+
+
+	@Override
+	public int hashCode() {
+
+		return this.connectionAddress.hashCode() + (31 * this.connectionIndex);
+	}
+
+
+	@Override
+	public boolean equals(final Object obj) {
+
+		if (!(obj instanceof RemoteReceiver)) {
+			return false;
+		}
+
+		final RemoteReceiver rr = (RemoteReceiver) obj;
+		if (!this.connectionAddress.equals(rr.connectionAddress)) {
+			return false;
+		}
+
+		if (this.connectionIndex != rr.connectionIndex) {
+			return false;
+		}
+
+		return true;
+	}
+
+
+	@Override
+	public void write(final DataOutput out) throws IOException {
+
+		final InetAddress ia = this.connectionAddress.getAddress();
+		out.writeInt(ia.getAddress().length);
+		out.write(ia.getAddress());
+		out.writeInt(this.connectionAddress.getPort());
+
+		out.writeInt(this.connectionIndex);
+	}
+
+
+	@Override
+	public void read(final DataInput in) throws IOException {
+
+		final int addr_length = in.readInt();
+		final byte[] address = new byte[addr_length];
+		in.readFully(address);
+
+		InetAddress ia = null;
+		try {
+			ia = InetAddress.getByAddress(address);
+		} catch (UnknownHostException uhe) {
+			throw new IOException(StringUtils.stringifyException(uhe));
+		}
+		final int port = in.readInt();
+		this.connectionAddress = new InetSocketAddress(ia, port);
+
+		this.connectionIndex = in.readInt();
+	}
+
+
+	@Override
+	public String toString() {
+
+		return this.connectionAddress + " (" + this.connectionIndex + ")";
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java
new file mode 100644
index 0000000..32be058
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java
@@ -0,0 +1,117 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.envelope.Envelope;
+
+public final class SenderHintEvent extends AbstractEvent {
+
+	/**
+	 * The sequence number that will be set for transfer envelopes which contain the sender hint event.
+	 */
+	private static final int SENDER_HINT_SEQUENCE_NUMBER = 0;
+
+	private final ChannelID source;
+
+	private final RemoteReceiver remoteReceiver;
+
+	SenderHintEvent(final ChannelID source, final RemoteReceiver remoteReceiver) {
+
+		if (source == null) {
+			throw new IllegalArgumentException("Argument source must not be null");
+		}
+
+		if (remoteReceiver == null) {
+			throw new IllegalArgumentException("Argument remoteReceiver must not be null");
+		}
+
+		this.source = source;
+		this.remoteReceiver = remoteReceiver;
+	}
+
+	public SenderHintEvent() {
+
+		this.source = new ChannelID();
+		this.remoteReceiver = new RemoteReceiver();
+	}
+
+	public ChannelID getSource() {
+
+		return this.source;
+	}
+
+	public RemoteReceiver getRemoteReceiver() {
+
+		return this.remoteReceiver;
+	}
+
+
+	@Override
+	public void write(final DataOutput out) throws IOException {
+
+		this.source.write(out);
+		this.remoteReceiver.write(out);
+	}
+
+
+	@Override
+	public void read(final DataInput in) throws IOException {
+
+		this.source.read(in);
+		this.remoteReceiver.read(in);
+	}
+
+	public static Envelope createEnvelopeWithEvent(final Envelope originalEnvelope,
+			final ChannelID source, final RemoteReceiver remoteReceiver) {
+
+		final Envelope envelope = new Envelope(SENDER_HINT_SEQUENCE_NUMBER,
+			originalEnvelope.getJobID(), originalEnvelope.getSource());
+
+		final SenderHintEvent senderEvent = new SenderHintEvent(source, remoteReceiver);
+		envelope.serializeEventList(Arrays.asList(senderEvent));
+
+		return envelope;
+	}
+
+	static boolean isSenderHintEvent(final Envelope envelope) {
+
+		if (envelope.getSequenceNumber() != SENDER_HINT_SEQUENCE_NUMBER) {
+			return false;
+		}
+
+		if (envelope.getBuffer() != null) {
+			return false;
+		}
+
+		List<? extends AbstractEvent> events = envelope.deserializeEvents();
+
+		if (events.size() != 1) {
+			return false;
+		}
+
+		if (!(events.get(0) instanceof SenderHintEvent)) {
+			return false;
+		}
+
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
new file mode 100644
index 0000000..1d23e93
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
@@ -0,0 +1,28 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.bufferprovider;
+
+/**
+ * This interface must be implemented to receive a notification from a {@link BufferProvider} when an empty
+ * {@link eu.stratosphere.runtime.io.Buffer} has
+ * become available again.
+ * 
+ */
+public interface BufferAvailabilityListener {
+
+	/**
+	 * Indicates that at least one {@link eu.stratosphere.runtime.io.Buffer} has become available again.
+	 */
+	void bufferAvailable();
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
new file mode 100644
index 0000000..e3085ee
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
@@ -0,0 +1,69 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.bufferprovider;
+
+import eu.stratosphere.runtime.io.Buffer;
+
+import java.io.IOException;
+
+public interface BufferProvider {
+
+	/**
+	 * Requests a buffer with a minimum size of <code>minBufferSize</code>. The method returns immediately, even if the
+	 * request could not be fulfilled.
+	 *
+	 * @param minBufferSize minimum size of the requested buffer (in bytes)
+	 * @return buffer with at least the requested size or <code>null</code> if no such buffer is currently available
+	 * @throws IOException
+	 */
+	Buffer requestBuffer(int minBufferSize) throws IOException;
+
+	/**
+	 * Requests a buffer with a minimum size of <code>minBufferSize</code>. The method blocks until the request has
+	 * been fulfilled or {@link #reportAsynchronousEvent()} has been called.
+	 *
+	 * @param minBufferSize minimum size of the requested buffer (in bytes)
+	 * @return buffer with at least the requested size
+	 * @throws IOException
+	 * @throws InterruptedException
+	 */
+	Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException;
+
+	/**
+	 * Returns the size of buffers (in bytes) available at this buffer provider.
+	 * 
+	 * @return size of buffers (in bytes) available at this buffer provider
+	 */
+	int getBufferSize();
+
+	/**
+	 * Reports an asynchronous event and interrupts each blocking method of this buffer provider in order to allow the
+	 * blocked thread to respond to the event.
+	 */
+	void reportAsynchronousEvent();
+
+	/**
+	 * Registers the given {@link BufferAvailabilityListener} with an empty buffer pool.
+	 * <p>
+	 * The registration only succeeds, if the buffer pool is empty and has not been destroyed yet.
+	 * <p>
+	 * The registered listener will receive a notification when at least one buffer has become available again. After
+	 * the notification, the listener will be unregistered.
+	 *
+	 * @param listener the listener to be registered
+	 * @return <code>true</code> if the registration has been successful; <code>false</code> if the registration
+	 *         failed, because the buffer pool was not empty or has already been destroyed
+	 */
+	boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener);
+}