You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:54 UTC

[19/30] Offer buffer-oriented API for I/O (#25)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index 1ea5b1b..5240fc8 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -65,7 +65,6 @@ import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
 import eu.stratosphere.nephele.instance.HardwareDescription;
 import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
 import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.io.channels.ChannelID;
 import eu.stratosphere.nephele.ipc.RPC;
 import eu.stratosphere.nephele.ipc.Server;
 import eu.stratosphere.nephele.jobgraph.JobID;
@@ -80,12 +79,11 @@ import eu.stratosphere.nephele.protocols.TaskOperationProtocol;
 import eu.stratosphere.nephele.services.iomanager.IOManager;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
 import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.ByteBufferedChannelManager;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.InsufficientResourcesException;
-import eu.stratosphere.nephele.taskmanager.runtime.ExecutorThreadFactory;
-import eu.stratosphere.nephele.taskmanager.runtime.RuntimeTask;
 import eu.stratosphere.nephele.util.SerializableArrayList;
 import eu.stratosphere.pact.runtime.cache.FileCache;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.ChannelManager;
+import eu.stratosphere.runtime.io.network.InsufficientResourcesException;
 import eu.stratosphere.util.StringUtils;
 
 /**
@@ -128,10 +126,10 @@ public class TaskManager implements TaskOperationProtocol {
 	private final InstanceConnectionInfo localInstanceConnectionInfo;
 
 	/**
-	 * The instance of the {@link ByteBufferedChannelManager} which is responsible for
+	 * The instance of the {@link eu.stratosphere.nephele.taskmanager.io.bytebuffered.ChannelManager} which is responsible for
 	 * setting up and cleaning up the byte buffered channels of the tasks.
 	 */
-	private final ByteBufferedChannelManager byteBufferedChannelManager;
+	private final ChannelManager channelManager;
 
 	/**
 	 * Instance of the task manager profile if profiling is enabled.
@@ -279,14 +277,24 @@ public class TaskManager implements TaskOperationProtocol {
 		final int pageSize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
 			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
 
+		// Initialize network buffer pool
+		int numBuffers = GlobalConfiguration.getInteger(
+				ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+				ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+
+		int bufferSize = GlobalConfiguration.getInteger(
+				ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
+				ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
+
 		// Initialize the byte buffered channel manager
+		ChannelManager channelManager = null;
 		try {
-			this.byteBufferedChannelManager = new ByteBufferedChannelManager(this.lookupService,
-				this.localInstanceConnectionInfo);
-		} catch (Exception e) {
-			LOG.fatal("Cannot create byte channel manager:" + e.getMessage(), e);
-			throw new Exception("Failed to instantiate Byte-buffered channel manager. " + e.getMessage(), e);
+			channelManager = new ChannelManager(this.lookupService, this.localInstanceConnectionInfo, numBuffers, bufferSize);
+		} catch (IOException ioe) {
+			LOG.error(StringUtils.stringifyException(ioe));
+			throw new Exception("Failed to instantiate Byte-buffered channel manager. " + ioe.getMessage(), ioe);
 		}
+		this.channelManager = channelManager;
 		
 		{
 			HardwareDescription resources = HardwareDescriptionFactory.extractFromSystem();
@@ -598,7 +606,6 @@ public class TaskManager implements TaskOperationProtocol {
 		return new TaskCancelResult(id, AbstractTaskResult.ReturnCode.SUCCESS);
 	}
 
-
 	@Override
 	public TaskKillResult killTask(final ExecutionVertexID id) throws IOException {
 
@@ -661,13 +668,11 @@ public class TaskManager implements TaskOperationProtocol {
 			}
 
 			final Configuration jobConfiguration = tdd.getJobConfiguration();
-			final Set<ChannelID> activeOutputChannels = null; // TODO: Fix me
 
 			// Register the task
 			Task task;
 			try {
-				task = createAndRegisterTask(vertexID, jobConfiguration, re,
-					activeOutputChannels);
+				task = createAndRegisterTask(vertexID, jobConfiguration, re);
 			} catch (InsufficientResourcesException e) {
 				final TaskSubmissionResult result = new TaskSubmissionResult(vertexID,
 					AbstractTaskResult.ReturnCode.INSUFFICIENT_RESOURCES);
@@ -707,12 +712,10 @@ public class TaskManager implements TaskOperationProtocol {
 	 *        the job configuration that has been attached to the original job graph
 	 * @param environment
 	 *        the environment of the task to be registered
-	 * @param activeOutputChannels
-	 *        the set of initially active output channels
 	 * @return the task to be started or <code>null</code> if a task with the same ID was already running
 	 */
 	private Task createAndRegisterTask(final ExecutionVertexID id, final Configuration jobConfiguration,
-			final RuntimeEnvironment environment, final Set<ChannelID> activeOutputChannels)
+			final RuntimeEnvironment environment)
 					throws InsufficientResourcesException, IOException {
 
 		if (id == null) {
@@ -730,10 +733,10 @@ public class TaskManager implements TaskOperationProtocol {
 			final Task runningTask = this.runningTasks.get(id);
 			boolean registerTask = true;
 			if (runningTask == null) {
-				task = new RuntimeTask(id, environment, this);
+				task = new Task(id, environment, this);
 			} else {
 
-				if (runningTask instanceof RuntimeTask) {
+				if (runningTask instanceof Task) {
 					// Task is already running
 					return null;
 				} else {
@@ -746,7 +749,7 @@ public class TaskManager implements TaskOperationProtocol {
 
 			if (registerTask) {
 				// Register the task with the byte buffered channel manager
-				this.byteBufferedChannelManager.register(task, activeOutputChannels);
+				this.channelManager.register(task);
 
 				boolean enableProfiling = false;
 				if (this.profiler != null && jobConfiguration.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
@@ -786,7 +789,7 @@ public class TaskManager implements TaskOperationProtocol {
 				this.fileCache.deleteTmpFile(e.getKey(), task.getJobID());
 			}
 			// Unregister task from the byte buffered channel manager
-			this.byteBufferedChannelManager.unregister(id, task);
+			this.channelManager.unregister(id, task);
 
 			// Unregister task from profiling
 			task.unregisterProfiler(this.profiler);
@@ -892,7 +895,7 @@ public class TaskManager implements TaskOperationProtocol {
 		}
 
 		// Shut down the network channel manager
-		this.byteBufferedChannelManager.shutdown();
+		this.channelManager.shutdown();
 
 		// Shut down the memory manager
 		if (this.ioManager != null) {
@@ -930,8 +933,9 @@ public class TaskManager implements TaskOperationProtocol {
 	}
 
 	@Override
-	public void logBufferUtilization() {
-		this.byteBufferedChannelManager.logBufferUtilization();
+	public void logBufferUtilization() throws IOException {
+
+		this.channelManager.logBufferUtilization();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferAvailabilityListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferAvailabilityListener.java
deleted file mode 100644
index 49e774b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferAvailabilityListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bufferprovider;
-
-/**
- * This interface must be implemented to receive a notification from a {@link BufferProvider} when an empty
- * {@link Buffer} has
- * become available again.
- * 
- */
-public interface BufferAvailabilityListener {
-
-	/**
-	 * Indicates that at least one {@link Buffer} has become available again.
-	 */
-	void bufferAvailable();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProvider.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProvider.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProvider.java
deleted file mode 100644
index 1b47b57..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProvider.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bufferprovider;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.io.channels.Buffer;
-
-public interface BufferProvider {
-
-	/**
-	 * Requests an empty buffer with a minimum size of <code>minimumSizeOfBuffer</code>. The method returns
-	 * immediately, even if the request could not be fulfilled. Note that <code>minimumSizeOfBuffer</code> must not
-	 * exceed the value returned by the method <code>getMaximumBufferSize()</code>.
-	 * 
-	 * @param minimumSizeOfBuffer
-	 *        the minimum size of the requested read buffer in bytes
-	 * @return the buffer with at least the requested size or <code>null</code> if no such buffer is currently available
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while allocating the buffer
-	 */
-	Buffer requestEmptyBuffer(int minimumSizeOfBuffer) throws IOException;
-
-	/**
-	 * Requests an empty buffer with a minimum size of <code>minimumSizeOfBuffer</code>. The method blocks
-	 * until the request can be fulfilled. Note that <code>minimumSizeOfBuffer</code> must not
-	 * exceed the value returned by the method <code>getMaximumBufferSize()</code>.
-	 * 
-	 * @param minimumSizeOfBuffer
-	 *        the minimum size of the requested read buffer in bytes
-	 * @return the buffer with at least the requested size
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while allocating the buffer
-	 * @throws InterruptedException
-	 *         thrown if the thread waiting for the buffer is interrupted
-	 */
-	Buffer requestEmptyBufferBlocking(int minimumSizeOfBuffer) throws IOException,
-			InterruptedException;
-
-	/**
-	 * Returns the maximum buffer size in bytes available at this buffer provider.
-	 * 
-	 * @return the maximum buffer size in bytes available at this buffer provider
-	 */
-	int getMaximumBufferSize();
-
-	/**
-	 * Returns if this buffer provider is shared between different entities (for examples tasks).
-	 * 
-	 * @return <code>true</code> if this buffer provider is shared, <code>false</code> otherwise
-	 */
-	boolean isShared();
-
-	/**
-	 * Reports an asynchronous event. Calling this method interrupts each blocking method of the buffer provider and
-	 * allows the blocked thread to respond to the event.
-	 */
-	void reportAsynchronousEvent();
-
-	/**
-	 * Registers the given {@link BufferAvailabilityListener} with an empty buffer pool to receive a notification when
-	 * at least one buffer has become available again. After the notification, the listener is automatically
-	 * unregistered again.
-	 * 
-	 * @param bufferAvailabilityListener
-	 *        the listener to be registered
-	 * @return <code>true</code> if the registration has been successful or <code>false</code> if the registration
-	 *         failed because the buffer pool has not been empty or has already been destroyed
-	 */
-	boolean registerBufferAvailabilityListener(BufferAvailabilityListener bufferAvailabilityListener);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProviderBroker.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProviderBroker.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProviderBroker.java
deleted file mode 100644
index 6474386..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProviderBroker.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bufferprovider;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-public interface BufferProviderBroker {
-
-	BufferProvider getBufferProvider(JobID jobID, ChannelID sourceChannelID) throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/GlobalBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/GlobalBufferPool.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/GlobalBufferPool.java
deleted file mode 100644
index 0fc25eb..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/GlobalBufferPool.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bufferprovider;
-
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.configuration.ConfigConstants;
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.core.memory.MemorySegment;
-
-public final class GlobalBufferPool {
-
-	private final static Log LOG = LogFactory.getLog(GlobalBufferPool.class);
-	
-	/**
-	 * The singleton instance of the global buffer pool.
-	 */
-	private static GlobalBufferPool instance = null;
-
-	/**
-	 * The number of buffers created at startup.
-	 */
-	private final int numberOfBuffers;
-
-	/**
-	 * The size of read/write buffers in bytes.
-	 */
-	private final int bufferSizeInBytes;
-
-	private final Queue<MemorySegment> buffers;
-
-	/**
-	 * Returns the singleton instance of the global buffer pool. If the instance does not already exist, it is also
-	 * created by calling this method.
-	 * 
-	 * @return the singleton instance of the global buffer pool
-	 */
-	public static synchronized GlobalBufferPool getInstance() {
-
-		if (instance == null) {
-			instance = new GlobalBufferPool();
-		}
-
-		return instance;
-	}
-
-	/**
-	 * Constructs the global buffer pool.
-	 */
-	private GlobalBufferPool() {
-
-		this.numberOfBuffers = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
-		this.bufferSizeInBytes = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
-
-		this.buffers = new ArrayBlockingQueue<MemorySegment>(this.numberOfBuffers);
-
-		// Initialize buffers
-		for (int i = 0; i < this.numberOfBuffers; i++) {
-			// allocate byteBuffer
-			final byte[] segMemory = new byte[this.bufferSizeInBytes];
-			final MemorySegment readBuffer = new MemorySegment(segMemory);
-			this.buffers.add(readBuffer);
-		}
-
-		LOG.info("Initialized global buffer pool with " + this.numberOfBuffers + " buffers with a size "
-			+ this.bufferSizeInBytes + " bytes each");
-	}
-
-	/**
-	 * Returns the maximum size of a buffer available at this pool in bytes.
-	 * 
-	 * @return the maximum size of a buffer available at this pool in bytes
-	 */
-	public int getMaximumBufferSize() {
-
-		return this.bufferSizeInBytes;
-	}
-
-	/**
-	 * Locks a buffer from the global buffer pool and returns it to the caller of this method.
-	 * 
-	 * @return the locked buffer from the pool or <code>null</code> if currently no global buffer is available
-	 */
-	public MemorySegment lockGlobalBuffer() {
-
-		return this.buffers.poll();
-	}
-
-	/**
-	 * Releases a lock on a previously locked buffer and returns the buffer to the global pool.
-	 * 
-	 * @param releasedBuffer
-	 *        the previously locked buffer to be released
-	 */
-	public void releaseGlobalBuffer(final MemorySegment releasedBuffer) {
-		this.buffers.add(releasedBuffer);
-	}
-
-	/**
-	 * Returns the total number of buffers managed by this pool.
-	 * 
-	 * @return the total number of buffers managed by this pool
-	 */
-	public int getTotalNumberOfBuffers() {
-
-		return this.numberOfBuffers;
-	}
-
-	/**
-	 * Returns the number of buffers which are currently available at this pool.
-	 * 
-	 * @return the number of buffers which are currently available at this pool
-	 */
-	public int getCurrentNumberOfBuffers() {
-
-		return this.buffers.size();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPool.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPool.java
deleted file mode 100644
index f296003..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPool.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bufferprovider;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.BufferFactory;
-import eu.stratosphere.nephele.io.channels.MemoryBufferPoolConnector;
-
-public final class LocalBufferPool implements BufferProvider {
-
-	private static final class LocalBufferPoolConnector implements MemoryBufferPoolConnector {
-
-		private final LocalBufferPool localBufferPool;
-
-		private LocalBufferPoolConnector(final LocalBufferPool localBufferPool) {
-			this.localBufferPool = localBufferPool;
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void recycle(final MemorySegment byteBuffer) {
-
-			this.localBufferPool.recycleBuffer(byteBuffer);
-		}
-
-	}
-
-	private final static Log LOG = LogFactory.getLog(LocalBufferPool.class);
-
-	private final GlobalBufferPool globalBufferPool;
-
-	private final int maximumBufferSize;
-
-	private int designatedNumberOfBuffers;
-
-	private int requestedNumberOfBuffers = 0;
-
-	private final boolean isShared;
-
-	private boolean asynchronousEventOccurred = false;
-
-	private boolean isDestroyed = false;
-
-	private final Queue<MemorySegment> buffers = new ArrayDeque<MemorySegment>();
-
-	private final LocalBufferPoolConnector bufferPoolConnector;
-
-	private final Queue<BufferAvailabilityListener> bufferAvailabilityListenerQueue = new ArrayDeque<BufferAvailabilityListener>();
-
-	public LocalBufferPool(final int designatedNumberOfBuffers, final boolean isShared) {
-
-		this.globalBufferPool = GlobalBufferPool.getInstance();
-		this.maximumBufferSize = this.globalBufferPool.getMaximumBufferSize();
-		this.designatedNumberOfBuffers = designatedNumberOfBuffers;
-		this.isShared = isShared;
-		this.bufferPoolConnector = new LocalBufferPoolConnector(this);
-	}
-
-
-	@Override
-	public Buffer requestEmptyBuffer(final int minimumSizeOfBuffer) throws IOException {
-
-		try {
-			return requestBufferInternal(minimumSizeOfBuffer, false);
-		} catch (InterruptedException e) {
-			LOG.error("Caught unexpected InterruptedException");
-		}
-
-		return null;
-	}
-
-
-	@Override
-	public Buffer requestEmptyBufferBlocking(final int minimumSizeOfBuffer) throws IOException, InterruptedException {
-
-		return requestBufferInternal(minimumSizeOfBuffer, true);
-	}
-
-	private Buffer requestBufferInternal(final int minimumSizeOfBuffer, final boolean block) throws IOException,
-			InterruptedException {
-
-		if (minimumSizeOfBuffer > this.maximumBufferSize) {
-			throw new IllegalArgumentException("Buffer of " + minimumSizeOfBuffer
-				+ " bytes is requested, but maximum buffer size is " + this.maximumBufferSize);
-		}
-
-		while (true) {
-
-			boolean async = false;
-
-			synchronized (this.buffers) {
-
-				// Make sure we return excess buffers immediately
-				while (this.requestedNumberOfBuffers > this.designatedNumberOfBuffers) {
-
-					final MemorySegment seg = this.buffers.poll();
-					if (seg == null) {
-						break;
-					}
-
-					this.globalBufferPool.releaseGlobalBuffer(seg);
-					this.requestedNumberOfBuffers--;
-				}
-
-				while (this.buffers.isEmpty()) {
-
-					// Check if the number of cached buffers matches the number of designated buffers
-					if (this.requestedNumberOfBuffers < this.designatedNumberOfBuffers) {
-
-						final MemorySegment memSeg = this.globalBufferPool.lockGlobalBuffer();
-						if (memSeg != null) {
-							this.buffers.add(memSeg);
-							this.requestedNumberOfBuffers++;
-							continue;
-						}
-					}
-
-					if (this.asynchronousEventOccurred && block) {
-						this.asynchronousEventOccurred = false;
-						async = true;
-						break;
-					}
-
-					if (block) {
-						this.buffers.wait(100);
-					} else {
-						return null;
-					}
-				}
-
-				if (!async) {
-					final MemorySegment memSeg = this.buffers.poll();
-					return BufferFactory.createFromMemory(minimumSizeOfBuffer, memSeg, this.bufferPoolConnector);
-				}
-			}
-		}
-	}
-
-
-	@Override
-	public int getMaximumBufferSize() {
-
-		return this.maximumBufferSize;
-	}
-
-	/**
-	 * Sets the designated number of buffers for this local buffer cache.
-	 * 
-	 * @param designatedNumberOfBuffers
-	 *        the designated number of buffers for this local buffer cache
-	 */
-	public void setDesignatedNumberOfBuffers(final int designatedNumberOfBuffers) {
-
-		synchronized (this.buffers) {
-
-			this.designatedNumberOfBuffers = designatedNumberOfBuffers;
-
-			// Make sure we return excess buffers immediately
-			while (this.requestedNumberOfBuffers > this.designatedNumberOfBuffers) {
-
-				if (this.buffers.isEmpty()) {
-					break;
-				}
-
-				this.globalBufferPool.releaseGlobalBuffer(this.buffers.poll());
-				this.requestedNumberOfBuffers--;
-			}
-
-			this.buffers.notify();
-		}
-	}
-
-	public void destroy() {
-
-		synchronized (this.buffers) {
-
-			if (this.isDestroyed) {
-				LOG.error("destroy is called on LocalBufferPool multiple times");
-				return;
-			}
-
-			this.isDestroyed = true;
-
-			while (!this.buffers.isEmpty()) {
-				this.globalBufferPool.releaseGlobalBuffer(this.buffers.poll());
-			}
-
-			this.requestedNumberOfBuffers = 0;
-		}
-	}
-
-
-	@Override
-	public boolean isShared() {
-
-		return this.isShared;
-	}
-
-	public int getNumberOfAvailableBuffers() {
-
-		synchronized (this.buffers) {
-			return this.buffers.size();
-		}
-	}
-
-	public int getDesignatedNumberOfBuffers() {
-
-		synchronized (this.buffers) {
-			return this.designatedNumberOfBuffers;
-		}
-	}
-
-	public int getRequestedNumberOfBuffers() {
-
-		synchronized (this.buffers) {
-			return this.requestedNumberOfBuffers;
-		}
-	}
-
-	private void recycleBuffer(final MemorySegment memSeg) {
-
-		synchronized (this.buffers) {
-
-			if (this.isDestroyed) {
-				this.globalBufferPool.releaseGlobalBuffer(memSeg);
-				this.requestedNumberOfBuffers--;
-			} else {
-				this.buffers.add(memSeg);
-				this.buffers.notify();
-			}
-
-			while (!this.bufferAvailabilityListenerQueue.isEmpty()) {
-				this.bufferAvailabilityListenerQueue.poll().bufferAvailable();
-			}
-		}
-	}
-
-
-	@Override
-	public void reportAsynchronousEvent() {
-
-		synchronized (this.buffers) {
-			this.asynchronousEventOccurred = true;
-			this.buffers.notify();
-		}
-	}
-
-
-	@Override
-	public boolean registerBufferAvailabilityListener(final BufferAvailabilityListener bufferAvailabilityListener) {
-
-		synchronized (this.buffers) {
-			if (!this.buffers.isEmpty()) {
-				return false;
-			}
-
-			if (this.isDestroyed) {
-				return false;
-			}
-
-			this.bufferAvailabilityListenerQueue.add(bufferAvailabilityListener);
-		}
-
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPoolOwner.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPoolOwner.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPoolOwner.java
deleted file mode 100644
index c56a73c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPoolOwner.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bufferprovider;
-
-/**
- * A local buffer pool owner is an object which initially retrieves its buffers from the {@link GlobalBufferPool} and
- * manages its fraction of the overall buffer pool locally by means of a {@link LocalBufferPool}.
- * 
- */
-public interface LocalBufferPoolOwner {
-
-	/**
-	 * Returns the number of byte-buffered channels that will retrieve their buffers from the local buffer pool.
-	 * 
-	 * @return the number of byte-buffered channels that will retrieve their buffers from the local buffer pool
-	 */
-	int getNumberOfChannels();
-
-	/**
-	 * Sets the designated number of buffers the local buffer pool owner is allowed to fetch from the global buffer pool
-	 * and manage locally by means of the {@link LocalBufferPool}.
-	 * 
-	 * @param numberOfBuffers
-	 *        the numberOfBuffers the local buffer pool owner is allowed to fetch from the global buffer pool
-	 */
-	void setDesignatedNumberOfBuffers(int numberOfBuffers);
-
-	/**
-	 * Clears the local buffer pool and returns all buffers to the global buffer pool.
-	 */
-	void clearLocalBufferPool();
-
-	/**
-	 * Logs the current status of the local buffer pool. This method is intended mainly for debugging purposes.
-	 */
-	void logBufferUtilization();
-
-	/**
-	 * Reports an asynchronous event. Calling this method interrupts each blocking method of the buffer pool owner and
-	 * allows the blocked thread to respond to the event.
-	 */
-	void reportAsynchronousEvent();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelContext.java
deleted file mode 100644
index 066c268..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelContext.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.util.Iterator;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-public abstract class AbstractOutputChannelContext implements OutputChannelContext {
-
-	/**
-	 * The forwarding chain used by this output channel context.
-	 */
-	private final OutputChannelForwardingChain forwardingChain;
-
-	public AbstractOutputChannelContext(final OutputChannelForwardingChain forwardingChain) {
-
-		this.forwardingChain = forwardingChain;
-	}
-
-
-	@Override
-	public void queueTransferEnvelope(final TransferEnvelope transferEnvelope) {
-
-		if (transferEnvelope.getBuffer() != null) {
-			throw new IllegalStateException("Transfer envelope for output channel has buffer attached");
-		}
-
-		final Iterator<AbstractEvent> it = transferEnvelope.getEventList().iterator();
-		while (it.hasNext()) {
-
-			final AbstractEvent event = it.next();
-			if (event instanceof AbstractTaskEvent) {
-				processEventAsynchronously(event);
-			} else {
-				processEventSynchronously(event);
-			}
-		}
-	}
-
-	/**
-	 * Processes an event received from the framework in a synchronous fashion, i.e. the event processing is done by the
-	 * thread the event is destined for (usually the task thread).
-	 * 
-	 * @param event
-	 *        the event to be processed
-	 */
-	protected void processEventSynchronously(final AbstractEvent event) {
-
-		this.forwardingChain.offerEvent(event);
-	}
-
-	/**
-	 * Processes an event received from the framework in an asynchronous fashion, i.e. the event processing is done by
-	 * the thread which delivers the event.
-	 * 
-	 * @param event
-	 *        the event to be processed
-	 */
-	protected void processEventAsynchronously(final AbstractEvent event) {
-
-		// The default implementation does nothing
-	}
-
-
-	@Override
-	public void destroy() {
-
-		this.forwardingChain.destroy();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelForwarder.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelForwarder.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelForwarder.java
deleted file mode 100644
index 4a59cdc..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelForwarder.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-/**
- * An output channel forwarder is a component which processes a {@link TransferEnvelope} after it has been produced by
- * an {@link AbstractByteBufferedOutputChannel}. The component can decide based on the transfer envelope whether to
- * forward the envelope, discard it, or to store it.
- * 
- */
-public abstract class AbstractOutputChannelForwarder {
-
-	private final AbstractOutputChannelForwarder next;
-
-	private volatile AbstractOutputChannelForwarder prev = null;
-
-	protected AbstractOutputChannelForwarder(final AbstractOutputChannelForwarder next) {
-		this.next = next;
-		if (this.next != null) {
-			this.next.prev = this;
-		}
-	}
-
-	/**
-	 * Called by the framework to push a produced transfer envelope towards its receiver. This method will always be
-	 * called by the task thread itself.
-	 * 
-	 * @param transferEnvelope
-	 *        the transfer envelope to be processed
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while processing the transfer envelope
-	 * @throws InterruptedException
-	 *         thrown if the task thread was interrupted while processing the transfer envelope
-	 */
-	public void push(final TransferEnvelope transferEnvelope) throws IOException, InterruptedException {
-
-		if (this.next != null) {
-			this.next.push(transferEnvelope);
-		}
-	}
-
-	public TransferEnvelope pull() {
-
-		if (this.prev != null) {
-			return this.prev.pull();
-		}
-
-		return null;
-	}
-
-	public boolean hasDataLeft() throws IOException, InterruptedException {
-
-		if (this.next != null) {
-			this.next.hasDataLeft();
-		}
-
-		return false;
-	}
-
-	public void processEvent(final AbstractEvent event) {
-
-		if (this.next != null) {
-			this.next.processEvent(event);
-		}
-	}
-
-	public void destroy() {
-
-		if (this.next != null) {
-			this.next.destroy();
-		}
-	}
-
-	protected final void recycleTransferEnvelope(final TransferEnvelope transferEnvelope) {
-
-		final Buffer buffer = transferEnvelope.getBuffer();
-		if (buffer != null) {
-			buffer.recycleBuffer();
-
-		}
-	}
-
-	protected final AbstractOutputChannelForwarder getNext() {
-
-		return this.next;
-	}
-
-	protected final AbstractOutputChannelForwarder getPrev() {
-
-		return this.prev;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ByteBufferedChannelManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ByteBufferedChannelManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ByteBufferedChannelManager.java
deleted file mode 100644
index 3bf351d..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ByteBufferedChannelManager.java
+++ /dev/null
@@ -1,816 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.execution.Environment;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.io.AbstractID;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.protocols.ChannelLookupProtocol;
-import eu.stratosphere.nephele.taskmanager.Task;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProviderBroker;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.GlobalBufferPool;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPool;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPoolOwner;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeDispatcher;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeReceiverList;
-
-public final class ByteBufferedChannelManager implements TransferEnvelopeDispatcher, BufferProviderBroker {
-
-	/**
-	 * The log object used to report problems and errors.
-	 */
-	private static final Log LOG = LogFactory.getLog(ByteBufferedChannelManager.class);
-
-	private static final boolean DEFAULT_ALLOW_SENDER_SIDE_SPILLING = false;
-
-	private static final boolean DEFAULT_MERGE_SPILLED_BUFFERS = true;
-
-	// TODO: Make this configurable
-	private static final int NUMBER_OF_CHANNELS_FOR_MULTICAST = 10;
-
-	private final Map<ChannelID, ChannelContext> registeredChannels = new ConcurrentHashMap<ChannelID, ChannelContext>();
-
-	private final Map<AbstractID, LocalBufferPoolOwner> localBufferPoolOwner = new ConcurrentHashMap<AbstractID, LocalBufferPoolOwner>();
-
-	private final NetworkConnectionManager networkConnectionManager;
-
-	private final ChannelLookupProtocol channelLookupService;
-
-	private final InstanceConnectionInfo localConnectionInfo;
-
-	private final LocalBufferPool transitBufferPool;
-
-	private final boolean allowSenderSideSpilling;
-
-	private final boolean mergeSpilledBuffers;
-
-	private final boolean multicastEnabled = true;
-
-	/**
-	 * This map caches transfer envelope receiver lists.
-	 */
-	private final Map<ChannelID, TransferEnvelopeReceiverList> receiverCache = new ConcurrentHashMap<ChannelID, TransferEnvelopeReceiverList>();
-
-	public ByteBufferedChannelManager(final ChannelLookupProtocol channelLookupService,
-			final InstanceConnectionInfo localInstanceConnectionInfo) throws IOException {
-
-		this.channelLookupService = channelLookupService;
-
-		this.localConnectionInfo = localInstanceConnectionInfo;
-
-		// Initialize the global buffer pool
-		GlobalBufferPool.getInstance();
-
-		// Initialize the transit buffer pool
-		this.transitBufferPool = new LocalBufferPool(128, true);
-
-		this.networkConnectionManager = new NetworkConnectionManager(this,
-			localInstanceConnectionInfo.getAddress(), localInstanceConnectionInfo.getDataPort());
-
-		this.allowSenderSideSpilling = GlobalConfiguration.getBoolean("channel.network.allowSenderSideSpilling",
-			DEFAULT_ALLOW_SENDER_SIDE_SPILLING);
-
-		this.mergeSpilledBuffers = GlobalConfiguration.getBoolean("channel.network.mergeSpilledBuffers",
-			DEFAULT_MERGE_SPILLED_BUFFERS);
-
-		LOG.info("Initialized byte buffered channel manager with sender-side spilling "
-			+ (this.allowSenderSideSpilling ? "enabled" : "disabled")
-			+ (this.mergeSpilledBuffers ? " and spilled buffer merging enabled" : ""));
-	}
-
-	/**
-	 * Registers the given task with the byte buffered channel manager.
-	 * 
-	 * @param task
-	 *        the task to be registered
-	 * @param the
-	 *        set of output channels which are initially active
-	 * @throws InsufficientResourcesException
-	 *         thrown if the channel manager does not have enough memory buffers to safely run this task
-	 */
-	public void register(final Task task, final Set<ChannelID> activeOutputChannels)
-			throws InsufficientResourcesException {
-
-		// Check if we can safely run this task with the given resources
-		checkBufferAvailability(task);
-
-		final Environment environment = task.getEnvironment();
-
-		final TaskContext taskContext = task.createTaskContext(this,
-			this.localBufferPoolOwner.remove(task.getVertexID()));
-
-		final Set<GateID> outputGateIDs = environment.getOutputGateIDs();
-		for (final Iterator<GateID> gateIt = outputGateIDs.iterator(); gateIt.hasNext();) {
-
-			final GateID gateID = gateIt.next();
-			final OutputGateContext outputGateContext = taskContext.createOutputGateContext(gateID);
-			final Set<ChannelID> outputChannelIDs = environment.getOutputChannelIDsOfGate(gateID);
-			for (final Iterator<ChannelID> channelIt = outputChannelIDs.iterator(); channelIt.hasNext();) {
-
-				final ChannelID channelID = channelIt.next();
-				final OutputChannelContext previousContext = (OutputChannelContext) this.registeredChannels
-					.get(channelID);
-
-				final boolean isActive = true;/* activeOutputChannels.contains(channelID); */
-
-				final OutputChannelContext outputChannelContext = outputGateContext.createOutputChannelContext(
-					channelID, previousContext, isActive, this.mergeSpilledBuffers);
-
-				// Add routing entry to receiver cache to reduce latency
-				if (outputChannelContext.getType() == ChannelType.INMEMORY) {
-					addReceiverListHint(outputChannelContext.getChannelID(),
-						outputChannelContext.getConnectedChannelID());
-				}
-
-				// Add routing entry to receiver cache to save lookup for data arriving at the output channel
-				if (outputChannelContext.getType() == ChannelType.NETWORK) {
-					addReceiverListHint(outputChannelContext.getConnectedChannelID(),
-						outputChannelContext.getChannelID());
-				}
-
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Registering byte buffered output channel " + outputChannelContext.getChannelID() + " ("
-							+ (isActive ? "active" : "inactive") + ")");
-				}
-
-				this.registeredChannels.put(outputChannelContext.getChannelID(), outputChannelContext);
-			}
-		}
-
-		final Set<GateID> inputGateIDs = environment.getInputGateIDs();
-		for (final Iterator<GateID> gateIt = inputGateIDs.iterator(); gateIt.hasNext();) {
-
-			final GateID gateID = gateIt.next();
-			final InputGateContext inputGateContext = taskContext.createInputGateContext(gateID);
-			final Set<ChannelID> inputChannelIDs = environment.getInputChannelIDsOfGate(gateID);
-			for (final Iterator<ChannelID> channelIt = inputChannelIDs.iterator(); channelIt.hasNext();) {
-
-				final ChannelID channelID = channelIt.next();
-				final InputChannelContext previousContext = (InputChannelContext) this.registeredChannels
-					.get(channelID);
-
-				final InputChannelContext inputChannelContext = inputGateContext.createInputChannelContext(
-					channelID, previousContext);
-
-				// Add routing entry to receiver cache to reduce latency
-				if (inputChannelContext.getType() == ChannelType.INMEMORY) {
-					addReceiverListHint(inputChannelContext.getChannelID(), inputChannelContext.getConnectedChannelID());
-				}
-
-				this.registeredChannels.put(inputChannelContext.getChannelID(), inputChannelContext);
-			}
-
-			// Add input gate context to set of local buffer pool owner
-			final LocalBufferPoolOwner bufferPoolOwner = inputGateContext.getLocalBufferPoolOwner();
-			if (bufferPoolOwner != null) {
-				this.localBufferPoolOwner.put(inputGateContext.getGateID(), bufferPoolOwner);
-			}
-
-		}
-
-		this.localBufferPoolOwner.put(task.getVertexID(), taskContext);
-
-		redistributeGlobalBuffers();
-	}
-
-	/**
-	 * Unregisters the given task from the byte buffered channel manager.
-	 * 
-	 * @param vertexID
-	 *        the ID of the task to be unregistered
-	 * @param task
-	 *        the task to be unregistered
-	 */
-	public void unregister(final ExecutionVertexID vertexID, final Task task) {
-
-		final Environment environment = task.getEnvironment();
-
-		Iterator<ChannelID> channelIterator = environment.getOutputChannelIDs().iterator();
-
-		while (channelIterator.hasNext()) {
-
-			final ChannelID outputChannelID = channelIterator.next();
-			final ChannelContext context = this.registeredChannels.remove(outputChannelID);
-			if (context != null) {
-				context.destroy();
-			}
-			this.receiverCache.remove(outputChannelID);
-		}
-
-		channelIterator = environment.getInputChannelIDs().iterator();
-
-		while (channelIterator.hasNext()) {
-
-			final ChannelID outputChannelID = channelIterator.next();
-			final ChannelContext context = this.registeredChannels.remove(outputChannelID);
-			if (context != null) {
-				context.destroy();
-			}
-			this.receiverCache.remove(outputChannelID);
-		}
-
-		final Iterator<GateID> inputGateIterator = environment.getInputGateIDs().iterator();
-
-		while (inputGateIterator.hasNext()) {
-
-			final GateID inputGateID = inputGateIterator.next();
-
-			final LocalBufferPoolOwner owner = this.localBufferPoolOwner.remove(inputGateID);
-			if (owner != null) {
-				owner.clearLocalBufferPool();
-			}
-		}
-
-		final LocalBufferPoolOwner owner = this.localBufferPoolOwner.remove(vertexID);
-		if (owner != null) {
-			owner.clearLocalBufferPool();
-		}
-
-		redistributeGlobalBuffers();
-	}
-
-	/**
-	 * Shuts down the byte buffered channel manager and stops all its internal processes.
-	 */
-	public void shutdown() {
-
-		this.networkConnectionManager.shutDown();
-	}
-
-	public NetworkConnectionManager getNetworkConnectionManager() {
-
-		return this.networkConnectionManager;
-	}
-
-	private void recycleBuffer(final TransferEnvelope envelope) {
-
-		final Buffer buffer = envelope.getBuffer();
-		if (buffer != null) {
-			buffer.recycleBuffer();
-		}
-	}
-
-	private void sendReceiverNotFoundEvent(final TransferEnvelope envelope, final ChannelID receiver)
-			throws IOException, InterruptedException {
-
-		if (ReceiverNotFoundEvent.isReceiverNotFoundEvent(envelope)) {
-
-			LOG.info("Dropping request to send ReceiverNotFoundEvent as response to ReceiverNotFoundEvent");
-			return;
-		}
-
-		final JobID jobID = envelope.getJobID();
-
-		final TransferEnvelope transferEnvelope = ReceiverNotFoundEvent.createEnvelopeWithEvent(jobID, receiver,
-			envelope.getSequenceNumber());
-
-		final TransferEnvelopeReceiverList receiverList = getReceiverList(jobID, receiver);
-		if (receiverList == null) {
-			return;
-		}
-
-		processEnvelopeEnvelopeWithoutBuffer(transferEnvelope, receiverList);
-	}
-
-	private void processEnvelope(final TransferEnvelope transferEnvelope, final boolean freeSourceBuffer)
-			throws IOException, InterruptedException {
-
-		TransferEnvelopeReceiverList receiverList = null;
-		try {
-			receiverList = getReceiverList(transferEnvelope.getJobID(),
-				transferEnvelope.getSource());
-		} catch (InterruptedException e) {
-			recycleBuffer(transferEnvelope);
-			throw e;
-		} catch (IOException e) {
-			recycleBuffer(transferEnvelope);
-			throw e;
-		}
-
-		if (receiverList == null) {
-			recycleBuffer(transferEnvelope);
-			return;
-		}
-
-		// This envelope is known to have either no buffer or an memory-based input buffer
-		if (transferEnvelope.getBuffer() == null) {
-			processEnvelopeEnvelopeWithoutBuffer(transferEnvelope, receiverList);
-		} else {
-			processEnvelopeWithBuffer(transferEnvelope, receiverList, freeSourceBuffer);
-		}
-	}
-
-	private void processEnvelopeWithBuffer(final TransferEnvelope transferEnvelope,
-			final TransferEnvelopeReceiverList receiverList, final boolean freeSourceBuffer)
-			throws IOException, InterruptedException {
-
-		// Handle the most common (unicast) case first
-		if (!freeSourceBuffer) {
-
-			final List<ChannelID> localReceivers = receiverList.getLocalReceivers();
-			if (localReceivers.size() != 1) {
-				LOG.error("Expected receiver list to have exactly one element");
-			}
-
-			final ChannelID localReceiver = localReceivers.get(0);
-
-			final ChannelContext cc = this.registeredChannels.get(localReceiver);
-			if (cc == null) {
-
-				try {
-					sendReceiverNotFoundEvent(transferEnvelope, localReceiver);
-				} finally {
-					recycleBuffer(transferEnvelope);
-				}
-				return;
-			}
-
-			if (!cc.isInputChannel()) {
-				LOG.error("Local receiver " + localReceiver
-					+ " is not an input channel, but is supposed to accept a buffer");
-			}
-
-			cc.queueTransferEnvelope(transferEnvelope);
-
-			return;
-		}
-
-		// This is the in-memory or multicast case
-		final Buffer srcBuffer = transferEnvelope.getBuffer();
-
-		try {
-
-			if (receiverList.hasLocalReceivers()) {
-
-				final List<ChannelID> localReceivers = receiverList.getLocalReceivers();
-
-				for (final ChannelID localReceiver : localReceivers) {
-
-					final ChannelContext cc = this.registeredChannels.get(localReceiver);
-					if (cc == null) {
-
-						sendReceiverNotFoundEvent(transferEnvelope, localReceiver);
-						continue;
-					}
-
-					if (!cc.isInputChannel()) {
-						LOG.error("Local receiver " + localReceiver
-							+ " is not an input channel, but is supposed to accept a buffer");
-						continue;
-					}
-
-					final InputChannelContext inputChannelContext = (InputChannelContext) cc;
-
-					Buffer destBuffer = null;
-					try {
-						destBuffer = inputChannelContext.requestEmptyBufferBlocking(srcBuffer.size());
-						srcBuffer.copyToBuffer(destBuffer);
-					} catch (IOException e) {
-						if (destBuffer != null) {
-							destBuffer.recycleBuffer();
-						}
-						throw e;
-					}
-					// TODO: See if we can save one duplicate step here
-					final TransferEnvelope dup = transferEnvelope.duplicateWithoutBuffer();
-					dup.setBuffer(destBuffer);
-					inputChannelContext.queueTransferEnvelope(dup);
-				}
-			}
-
-			if (receiverList.hasRemoteReceivers()) {
-
-				final List<RemoteReceiver> remoteReceivers = receiverList.getRemoteReceivers();
-
-				// Generate sender hint before sending the first envelope over the network
-				if (transferEnvelope.getSequenceNumber() == 0) {
-					generateSenderHint(transferEnvelope, remoteReceivers);
-				}
-
-				for (final RemoteReceiver remoteReceiver : remoteReceivers) {
-					TransferEnvelope dup = transferEnvelope.duplicate();
-					this.networkConnectionManager.queueEnvelopeForTransfer(remoteReceiver, dup);
-				}
-			}
-		} finally {
-			// Recycle the source buffer
-			srcBuffer.recycleBuffer();
-		}
-	}
-
-	private void processEnvelopeEnvelopeWithoutBuffer(final TransferEnvelope transferEnvelope,
-			final TransferEnvelopeReceiverList receiverList) throws IOException, InterruptedException {
-
-		// No need to copy anything
-		final Iterator<ChannelID> localIt = receiverList.getLocalReceivers().iterator();
-
-		while (localIt.hasNext()) {
-
-			final ChannelID localReceiver = localIt.next();
-
-			final ChannelContext channelContext = this.registeredChannels.get(localReceiver);
-			if (channelContext == null) {
-				sendReceiverNotFoundEvent(transferEnvelope, localReceiver);
-				continue;
-			}
-			channelContext.queueTransferEnvelope(transferEnvelope);
-		}
-
-		if (!receiverList.hasRemoteReceivers()) {
-			return;
-		}
-
-		// Generate sender hint before sending the first envelope over the network
-		final List<RemoteReceiver> remoteReceivers = receiverList.getRemoteReceivers();
-		if (transferEnvelope.getSequenceNumber() == 0) {
-			generateSenderHint(transferEnvelope, remoteReceivers);
-		}
-
-		final Iterator<RemoteReceiver> remoteIt = remoteReceivers.iterator();
-
-		while (remoteIt.hasNext()) {
-
-			final RemoteReceiver remoteReceiver = remoteIt.next();
-			this.networkConnectionManager.queueEnvelopeForTransfer(remoteReceiver, transferEnvelope);
-		}
-	}
-
-	private void addReceiverListHint(final ChannelID source, final ChannelID localReceiver) {
-
-		final TransferEnvelopeReceiverList receiverList = new TransferEnvelopeReceiverList(localReceiver);
-
-		if (this.receiverCache.put(source, receiverList) != null) {
-			LOG.warn("Receiver cache already contained entry for " + source);
-		}
-	}
-
-	private void addReceiverListHint(final ChannelID source, final RemoteReceiver remoteReceiver) {
-
-		final TransferEnvelopeReceiverList receiverList = new TransferEnvelopeReceiverList(remoteReceiver);
-
-		if (this.receiverCache.put(source, receiverList) != null) {
-			LOG.warn("Receiver cache already contained entry for " + source);
-		}
-	}
-
-	private void generateSenderHint(final TransferEnvelope transferEnvelope, final List<RemoteReceiver> remoteReceivers) {
-
-		final ChannelContext channelContext = this.registeredChannels.get(transferEnvelope.getSource());
-		if (channelContext == null) {
-			LOG.error("Cannot find channel context for channel ID " + transferEnvelope.getSource());
-			return;
-		}
-
-		// Only generate sender hints for output channels
-		if (channelContext.isInputChannel()) {
-			return;
-		}
-
-		final ChannelID remoteSourceID = channelContext.getConnectedChannelID();
-		final int connectionIndex = remoteReceivers.get(0).getConnectionIndex();
-		final InetSocketAddress isa = new InetSocketAddress(this.localConnectionInfo.getAddress(),
-			this.localConnectionInfo.getDataPort());
-
-		final RemoteReceiver remoteReceiver = new RemoteReceiver(isa, connectionIndex);
-		final TransferEnvelope senderHint = SenderHintEvent.createEnvelopeWithEvent(transferEnvelope, remoteSourceID,
-			remoteReceiver);
-
-		final Iterator<RemoteReceiver> remoteIt = remoteReceivers.iterator();
-
-		while (remoteIt.hasNext()) {
-
-			final RemoteReceiver rr = remoteIt.next();
-			this.networkConnectionManager.queueEnvelopeForTransfer(rr, senderHint);
-		}
-	}
-
-	/**
-	 * Returns the list of receivers for transfer envelopes produced by the channel with the given source channel ID.
-	 * 
-	 * @param jobID
-	 *        the ID of the job the given channel ID belongs to
-	 * @param sourceChannelID
-	 *        the source channel ID for which the receiver list shall be retrieved
-	 * @return the list of receivers or <code>null</code> if the receiver could not be determined
-	 * @throws IOException
-	 * @throws InterruptedExcption
-	 */
-	private TransferEnvelopeReceiverList getReceiverList(final JobID jobID, final ChannelID sourceChannelID)
-			throws IOException, InterruptedException {
-
-		TransferEnvelopeReceiverList receiverList = this.receiverCache.get(sourceChannelID);
-
-		if (receiverList != null) {
-			return receiverList;
-		}
-
-		while (true) {
-
-			if (Thread.currentThread().isInterrupted()) {
-				break;
-			}
-
-			ConnectionInfoLookupResponse lookupResponse;
-			synchronized (this.channelLookupService) {
-				lookupResponse = this.channelLookupService.lookupConnectionInfo(
-					this.localConnectionInfo, jobID, sourceChannelID);
-			}
-
-			if (lookupResponse.isJobAborting()) {
-				break;
-			}
-
-			if (lookupResponse.receiverNotFound()) {
-				LOG.error("Cannot find task(s) waiting for data from source channel with ID " + sourceChannelID);
-				break;
-			}
-
-			if (lookupResponse.receiverNotReady()) {
-				Thread.sleep(500);
-				continue;
-			}
-
-			if (lookupResponse.receiverReady()) {
-				receiverList = new TransferEnvelopeReceiverList(lookupResponse);
-				break;
-			}
-
-		}
-
-		if (receiverList != null) {
-
-			this.receiverCache.put(sourceChannelID, receiverList);
-
-			if (LOG.isDebugEnabled()) {
-
-				final StringBuilder sb = new StringBuilder();
-				sb.append("Receiver list for source channel ID " + sourceChannelID + " at task manager "
-					+ this.localConnectionInfo + "\n");
-
-				if (receiverList.hasLocalReceivers()) {
-					sb.append("\tLocal receivers:\n");
-					final Iterator<ChannelID> it = receiverList.getLocalReceivers().iterator();
-					while (it.hasNext()) {
-						sb.append("\t\t" + it.next() + "\n");
-					}
-				}
-
-				if (receiverList.hasRemoteReceivers()) {
-					sb.append("Remote receivers:\n");
-					final Iterator<RemoteReceiver> it = receiverList.getRemoteReceivers().iterator();
-					while (it.hasNext()) {
-						sb.append("\t\t" + it.next() + "\n");
-					}
-				}
-
-				LOG.debug(sb.toString());
-			}
-		}
-
-		return receiverList;
-	}
-
-
-	@Override
-	public void processEnvelopeFromOutputChannel(final TransferEnvelope transferEnvelope) throws IOException,
-			InterruptedException {
-
-		processEnvelope(transferEnvelope, true);
-	}
-
-
-	@Override
-	public void processEnvelopeFromInputChannel(final TransferEnvelope transferEnvelope) throws IOException,
-			InterruptedException {
-
-		processEnvelope(transferEnvelope, false);
-	}
-
-
-	@Override
-	public void processEnvelopeFromNetwork(final TransferEnvelope transferEnvelope, boolean freeSourceBuffer)
-			throws IOException, InterruptedException {
-
-		// Check if the envelope is the special envelope with the sender hint event
-		if (SenderHintEvent.isSenderHintEvent(transferEnvelope)) {
-
-			// Check if this is the final destination of the sender hint event before adding it
-			final SenderHintEvent seh = (SenderHintEvent) transferEnvelope.getEventList().get(0);
-			if (this.registeredChannels.get(seh.getSource()) != null) {
-
-				addReceiverListHint(seh.getSource(), seh.getRemoteReceiver());
-				return;
-			}
-		}
-
-		processEnvelope(transferEnvelope, freeSourceBuffer);
-	}
-
-	/**
-	 * Triggers the byte buffer channel manager write the current utilization of its read and write buffers to the logs.
-	 * This method is primarily for debugging purposes.
-	 */
-	public void logBufferUtilization() {
-
-		System.out.println("Buffer utilization at " + System.currentTimeMillis());
-
-		System.out.println("\tUnused global buffers: " + GlobalBufferPool.getInstance().getCurrentNumberOfBuffers());
-
-		System.out.println("\tLocal buffer pool status:");
-
-		final Iterator<LocalBufferPoolOwner> it = this.localBufferPoolOwner.values().iterator();
-		while (it.hasNext()) {
-			it.next().logBufferUtilization();
-		}
-
-		this.networkConnectionManager.logBufferUtilization();
-
-		System.out.println("\tIncoming connections:");
-
-		final Iterator<Map.Entry<ChannelID, ChannelContext>> it2 = this.registeredChannels.entrySet()
-				.iterator();
-
-		while (it2.hasNext()) {
-
-			final Map.Entry<ChannelID, ChannelContext> entry = it2.next();
-			final ChannelContext context = entry.getValue();
-			if (context.isInputChannel()) {
-
-				final InputChannelContext inputChannelContext = (InputChannelContext) context;
-				inputChannelContext.logQueuedEnvelopes();
-			}
-		}
-	}
-
-
-	@Override
-	public BufferProvider getBufferProvider(final JobID jobID, final ChannelID sourceChannelID) throws IOException,
-			InterruptedException {
-
-		final TransferEnvelopeReceiverList receiverList = getReceiverList(jobID, sourceChannelID);
-
-		// Receiver could not be determined, use transit buffer pool to read data from channel
-		if (receiverList == null) {
-			return this.transitBufferPool;
-		}
-
-		if (receiverList.hasLocalReceivers() && !receiverList.hasRemoteReceivers()) {
-
-			final List<ChannelID> localReceivers = receiverList.getLocalReceivers();
-			if (localReceivers.size() == 1) {
-				// Unicast case, get final buffer provider
-
-				final ChannelID localReceiver = localReceivers.get(0);
-				final ChannelContext cc = this.registeredChannels.get(localReceiver);
-				if (cc == null) {
-
-					// Use the transit buffer for this purpose, data will be discarded in most cases anyway.
-					return this.transitBufferPool;
-				}
-
-				if (!cc.isInputChannel()) {
-					throw new IOException("Channel context for local receiver " + localReceiver
-							+ " is not an input channel context");
-				}
-
-				final InputChannelContext icc = (InputChannelContext) cc;
-
-				return icc;
-			}
-		}
-
-		return this.transitBufferPool;
-	}
-
-	/**
-	 * Checks if the byte buffered channel manager has enough resources available to safely execute the given task.
-	 * 
-	 * @param task
-	 *        the task to be executed
-	 * @throws InsufficientResourcesException
-	 *         thrown if the byte buffered manager currently does not have enough resources available to execute the
-	 *         task
-	 */
-	private void checkBufferAvailability(final Task task) throws InsufficientResourcesException {
-
-		final int totalNumberOfBuffers = GlobalBufferPool.getInstance().getTotalNumberOfBuffers();
-		int numberOfAlreadyRegisteredChannels = this.registeredChannels.size();
-		if (this.multicastEnabled) {
-			numberOfAlreadyRegisteredChannels += NUMBER_OF_CHANNELS_FOR_MULTICAST;
-		}
-
-		final Environment env = task.getEnvironment();
-
-		final int numberOfNewChannels = env.getNumberOfOutputChannels() + env.getNumberOfInputChannels();
-		final int totalNumberOfChannels = numberOfAlreadyRegisteredChannels + numberOfNewChannels;
-
-		final double buffersPerChannel = (double) totalNumberOfBuffers
-			/ (double) totalNumberOfChannels;
-
-		if (buffersPerChannel < 1.0) {
-
-			// Construct error message
-			final StringBuilder sb = new StringBuilder(this.localConnectionInfo.getHostName());
-			sb.append(" has not enough buffers available to safely execute ");
-			sb.append(env.getTaskName());
-			sb.append(" (");
-			sb.append(totalNumberOfChannels - totalNumberOfBuffers);
-			sb.append(" buffers are currently missing)");
-
-			throw new InsufficientResourcesException(sb.toString());
-		}
-	}
-
-	/**
-	 * Redistributes the global buffers among the registered tasks.
-	 */
-	private void redistributeGlobalBuffers() {
-
-		final int totalNumberOfBuffers = GlobalBufferPool.getInstance().getTotalNumberOfBuffers();
-		int totalNumberOfChannels = this.registeredChannels.size();
-		if (this.multicastEnabled) {
-			totalNumberOfChannels += NUMBER_OF_CHANNELS_FOR_MULTICAST;
-		}
-		final double buffersPerChannel = (double) totalNumberOfBuffers / (double) totalNumberOfChannels;
-		if (buffersPerChannel < 1.0) {
-			LOG.warn("System is low on memory buffers. This may result in reduced performance.");
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Total number of buffers is " + totalNumberOfBuffers);
-			LOG.debug("Total number of channels is " + totalNumberOfChannels);
-		}
-
-		if (this.localBufferPoolOwner.isEmpty()) {
-			return;
-		}
-
-		final Iterator<LocalBufferPoolOwner> it = this.localBufferPoolOwner.values().iterator();
-		while (it.hasNext()) {
-			final LocalBufferPoolOwner lbpo = it.next();
-			lbpo.setDesignatedNumberOfBuffers((int) Math.ceil(buffersPerChannel * lbpo.getNumberOfChannels()));
-		}
-
-		if (this.multicastEnabled) {
-			this.transitBufferPool.setDesignatedNumberOfBuffers((int) Math.ceil(buffersPerChannel
-				* NUMBER_OF_CHANNELS_FOR_MULTICAST));
-		}
-	}
-
-	/**
-	 * Invalidates the entries identified by the given channel IDs from the receiver lookup cache.
-	 * 
-	 * @param channelIDs
-	 *        the channel IDs identifying the cache entries to invalidate
-	 */
-	public void invalidateLookupCacheEntries(final Set<ChannelID> channelIDs) {
-
-		final Iterator<ChannelID> it = channelIDs.iterator();
-		while (it.hasNext()) {
-
-			this.receiverCache.remove(it.next());
-		}
-	}
-
-	public void reportAsynchronousEvent(final ExecutionVertexID vertexID) {
-
-		final LocalBufferPoolOwner lbpo = this.localBufferPoolOwner.get(vertexID);
-		if (lbpo == null) {
-			System.out.println("Cannot find local buffer pool owner for " + vertexID);
-			return;
-		}
-
-		lbpo.reportAsynchronousEvent();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/CanceledChannelSet.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/CanceledChannelSet.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/CanceledChannelSet.java
deleted file mode 100644
index f1efdc7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/CanceledChannelSet.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import eu.stratosphere.nephele.io.channels.ChannelID;
-
-/**
- * This channel set stores the ID's of all channels whose tasks have been canceled recently. The set is cleaned up by
- * periodically calling the method <code>cleanup</code>.
- * <p>
- * This class is thread-safe.
- * 
- */
-public final class CanceledChannelSet implements Set<ChannelID> {
-
-	/**
-	 * The period of time the entries must at least remain in the map.
-	 */
-	private final static long CLEANUP_INTERVAL = 30000; // 30 sec.
-
-	/**
-	 * The map which stores the ID's of the channels whose tasks have been canceled.
-	 */
-	private final Map<ChannelID, Long> canceledChannels = new HashMap<ChannelID, Long>();
-
-
-	@Override
-	public boolean add(ChannelID arg0) {
-
-		final long now = System.currentTimeMillis();
-
-		synchronized (this.canceledChannels) {
-			if (this.canceledChannels.put(arg0, Long.valueOf(now)) == null) {
-				return true;
-			}
-		}
-
-		return false;
-	}
-
-
-	@Override
-	public boolean addAll(Collection<? extends ChannelID> arg0) {
-
-		final Long now = Long.valueOf(System.currentTimeMillis());
-		final Iterator<? extends ChannelID> it = arg0.iterator();
-		boolean retVal = false;
-
-		synchronized (this.canceledChannels) {
-
-			while (it.hasNext()) {
-
-				if (this.canceledChannels.put(it.next(), now) == null) {
-					retVal = true;
-				}
-			}
-		}
-
-		return retVal;
-	}
-
-
-	@Override
-	public void clear() {
-
-		synchronized (this.canceledChannels) {
-			this.canceledChannels.clear();
-		}
-
-	}
-
-
-	@Override
-	public boolean contains(Object arg0) {
-
-		synchronized (this.canceledChannels) {
-			return this.canceledChannels.containsKey(arg0);
-		}
-	}
-
-
-	@Override
-	public boolean containsAll(Collection<?> arg0) {
-
-		synchronized (this.canceledChannels) {
-			return this.canceledChannels.keySet().containsAll(arg0);
-		}
-	}
-
-
-	@Override
-	public boolean isEmpty() {
-
-		synchronized (this.canceledChannels) {
-			return this.canceledChannels.isEmpty();
-		}
-	}
-
-
-	@Override
-	public Iterator<ChannelID> iterator() {
-
-		synchronized (this.canceledChannels) {
-			return this.canceledChannels.keySet().iterator();
-		}
-	}
-
-
-	@Override
-	public boolean remove(Object arg0) {
-
-		synchronized (this.canceledChannels) {
-			if (this.canceledChannels.remove(arg0) == null) {
-				return false;
-			}
-		}
-
-		return true;
-	}
-
-
-	@Override
-	public boolean removeAll(Collection<?> arg0) {
-
-		final Iterator<?> it = arg0.iterator();
-		boolean retVal = false;
-
-		synchronized (this.canceledChannels) {
-
-			while (it.hasNext()) {
-				if (this.canceledChannels.remove(it.next()) != null) {
-					retVal = true;
-				}
-			}
-		}
-
-		return retVal;
-	}
-
-
-	@Override
-	public boolean retainAll(Collection<?> arg0) {
-
-		throw new RuntimeException("Method not implemented");
-	}
-
-
-	@Override
-	public int size() {
-
-		synchronized (this.canceledChannels) {
-			return this.canceledChannels.size();
-		}
-	}
-
-
-	@Override
-	public Object[] toArray() {
-
-		synchronized (this.canceledChannels) {
-			return this.canceledChannels.keySet().toArray();
-		}
-	}
-
-
-	@Override
-	public <T> T[] toArray(T[] arg0) {
-
-		synchronized (this.canceledChannels) {
-			return this.canceledChannels.keySet().toArray(arg0);
-		}
-	}
-
-	/**
-	 * Removes all entries from the set which have been added longer than <code>CLEANUP_INTERVAL</code> milliseconds
-	 * ago.
-	 */
-	public void cleanup() {
-
-		final long now = System.currentTimeMillis();
-
-		synchronized (this.canceledChannels) {
-
-			final Iterator<Map.Entry<ChannelID, Long>> it = this.canceledChannels.entrySet().iterator();
-			while (it.hasNext()) {
-
-				final Map.Entry<ChannelID, Long> entry = it.next();
-				if ((entry.getValue().longValue() + CLEANUP_INTERVAL) < now) {
-					it.remove();
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ChannelContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ChannelContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ChannelContext.java
deleted file mode 100644
index b3a9200..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ChannelContext.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-public interface ChannelContext {
-
-	boolean isInputChannel();
-	
-	JobID getJobID();
-
-	ChannelID getChannelID();
-
-	ChannelID getConnectedChannelID();
-	
-	ChannelType getType();
-	
-	void queueTransferEnvelope(TransferEnvelope transferEnvelope);
-	
-	void destroy();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ConnectionInfoLookupResponse.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ConnectionInfoLookupResponse.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ConnectionInfoLookupResponse.java
deleted file mode 100644
index 69daa0e..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ConnectionInfoLookupResponse.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
-
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.util.EnumUtils;
-import eu.stratosphere.nephele.util.SerializableArrayList;
-
-public class ConnectionInfoLookupResponse implements IOReadableWritable {
-
-	private enum ReturnCode {
-		NOT_FOUND, FOUND_AND_RECEIVER_READY, FOUND_BUT_RECEIVER_NOT_READY, JOB_IS_ABORTING
-	};
-
-	// was request successful?
-	private ReturnCode returnCode;
-
-	/**
-	 * Contains next-hop instances, this instance must forward multicast transmissions to.
-	 */
-	private final SerializableArrayList<RemoteReceiver> remoteTargets = new SerializableArrayList<RemoteReceiver>();
-
-	/**
-	 * Contains local ChannelIDs, multicast packets must be forwarded to.
-	 */
-	private final SerializableArrayList<ChannelID> localTargets = new SerializableArrayList<ChannelID>();
-
-	public ConnectionInfoLookupResponse() {
-		this.returnCode = ReturnCode.NOT_FOUND;
-	}
-
-	public void addRemoteTarget(final RemoteReceiver remote) {
-		this.remoteTargets.add(remote);
-	}
-
-	public void addLocalTarget(ChannelID local) {
-		this.localTargets.add(local);
-	}
-
-	private void setReturnCode(ReturnCode code) {
-		this.returnCode = code;
-	}
-
-	public List<RemoteReceiver> getRemoteTargets() {
-		return this.remoteTargets;
-	}
-
-	public List<ChannelID> getLocalTargets() {
-		return this.localTargets;
-	}
-
-	@Override
-	public void read(DataInput in) throws IOException {
-
-		this.localTargets.read(in);
-		this.remoteTargets.read(in);
-
-		this.returnCode = EnumUtils.readEnum(in, ReturnCode.class);
-	}
-
-	@Override
-	public void write(DataOutput out) throws IOException {
-
-		this.localTargets.write(out);
-		this.remoteTargets.write(out);
-
-		EnumUtils.writeEnum(out, this.returnCode);
-
-	}
-
-	public boolean receiverNotFound() {
-
-		return (this.returnCode == ReturnCode.NOT_FOUND);
-	}
-
-	public boolean receiverNotReady() {
-
-		return (this.returnCode == ReturnCode.FOUND_BUT_RECEIVER_NOT_READY);
-	}
-
-	public boolean receiverReady() {
-
-		return (this.returnCode == ReturnCode.FOUND_AND_RECEIVER_READY);
-	}
-
-	public boolean isJobAborting() {
-
-		return (this.returnCode == ReturnCode.JOB_IS_ABORTING);
-	}
-
-	public static ConnectionInfoLookupResponse createReceiverFoundAndReady(final ChannelID targetChannelID) {
-
-		final ConnectionInfoLookupResponse response = new ConnectionInfoLookupResponse();
-		response.setReturnCode(ReturnCode.FOUND_AND_RECEIVER_READY);
-		response.addLocalTarget(targetChannelID);
-
-		return response;
-	}
-
-	public static ConnectionInfoLookupResponse createReceiverFoundAndReady(final RemoteReceiver remoteReceiver) {
-
-		final ConnectionInfoLookupResponse response = new ConnectionInfoLookupResponse();
-		response.setReturnCode(ReturnCode.FOUND_AND_RECEIVER_READY);
-		response.addRemoteTarget(remoteReceiver);
-
-		return response;
-	}
-
-	/**
-	 * Constructor used to generate a plain ConnectionInfoLookupResponse object to be filled with multicast targets.
-	 * 
-	 * @return
-	 */
-	public static ConnectionInfoLookupResponse createReceiverFoundAndReady() {
-
-		final ConnectionInfoLookupResponse response = new ConnectionInfoLookupResponse();
-		response.setReturnCode(ReturnCode.FOUND_AND_RECEIVER_READY);
-
-		return response;
-	}
-
-	public static ConnectionInfoLookupResponse createReceiverNotFound() {
-		final ConnectionInfoLookupResponse response = new ConnectionInfoLookupResponse();
-		response.setReturnCode(ReturnCode.NOT_FOUND);
-
-		return response;
-	}
-
-	public static ConnectionInfoLookupResponse createReceiverNotReady() {
-		final ConnectionInfoLookupResponse response = new ConnectionInfoLookupResponse();
-		response.setReturnCode(ReturnCode.FOUND_BUT_RECEIVER_NOT_READY);
-
-		return response;
-	}
-
-	public static ConnectionInfoLookupResponse createJobIsAborting() {
-		final ConnectionInfoLookupResponse response = new ConnectionInfoLookupResponse();
-		response.setReturnCode(ReturnCode.JOB_IS_ABORTING);
-
-		return response;
-	}
-
-	@Override
-	public String toString() {
-		StringBuilder returnstring = new StringBuilder();
-		returnstring.append("local targets (total: " + this.localTargets.size() + "):\n");
-		for (ChannelID i : this.localTargets) {
-			returnstring.append(i + "\n");
-		}
-		returnstring.append("remote targets: (total: " + this.remoteTargets.size() + "):\n");
-		for (final RemoteReceiver rr : this.remoteTargets) {
-			returnstring.append(rr + "\n");
-		}
-		return returnstring.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/GateContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/GateContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/GateContext.java
deleted file mode 100644
index 7b8ce36..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/GateContext.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import eu.stratosphere.nephele.io.GateID;
-
-public interface GateContext {
-
-	GateID getGateID();
-}