You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:42 UTC

[07/30] git commit: Replace custom Java NIO TCP/IP code with Netty 4 library

Replace custom Java NIO TCP/IP code with Netty 4 library


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4cd4a134
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4cd4a134
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4cd4a134

Branch: refs/heads/master
Commit: 4cd4a13415d609a2979c8fa3cf4b797c990ee8c2
Parents: 2db78a8
Author: uce <u....@fu-berlin.de>
Authored: Fri May 9 13:39:15 2014 +0200
Committer: StephanEwen <st...@tu-berlin.de>
Committed: Sat Jun 7 09:41:21 2014 +0200

----------------------------------------------------------------------
 .../configuration/ConfigConstants.java          |  58 +-
 stratosphere-runtime/pom.xml                    |   8 +-
 .../eu/stratosphere/nephele/AbstractID.java     |   6 +
 .../nephele/execution/CancelTaskException.java  |   3 -
 .../nephele/execution/Environment.java          |  76 +-
 .../execution/ExecutionStateTransition.java     |   2 +
 .../nephele/execution/RuntimeEnvironment.java   | 170 ++--
 .../nephele/executiongraph/ExecutionVertex.java |  36 +
 .../nephele/instance/AbstractInstance.java      |  16 +
 .../eu/stratosphere/nephele/jobgraph/JobID.java |  11 +-
 .../nephele/jobmanager/JobManager.java          |  34 +
 .../jobmanager/scheduler/RecoveryLogic.java     |   5 +-
 .../protocols/ExtendedManagementProtocol.java   |  13 +
 .../protocols/TaskOperationProtocol.java        |  12 +
 .../stratosphere/nephele/taskmanager/Task.java  |  93 +-
 .../nephele/taskmanager/TaskKillResult.java     |  44 +
 .../nephele/taskmanager/TaskManager.java        |  32 +-
 .../runtime/ExecutorThreadFactory.java          |  35 +
 .../task/AbstractIterativePactTask.java         |   9 +-
 .../iterative/task/IterationHeadPactTask.java   |   3 +-
 .../pact/runtime/shipping/OutputEmitter.java    |   1 -
 .../runtime/shipping/RecordOutputEmitter.java   |   3 -
 .../pact/runtime/task/DataSinkTask.java         |   3 +-
 .../pact/runtime/task/RegularPactTask.java      |   8 +-
 .../java/eu/stratosphere/runtime/io/Buffer.java |   5 +-
 .../runtime/io/channels/Channel.java            |   4 +-
 .../runtime/io/channels/ChannelID.java          |  10 +-
 .../runtime/io/channels/ChannelType.java        |   5 +
 .../runtime/io/channels/InputChannel.java       |  17 +-
 .../runtime/io/channels/OutputChannel.java      |   5 +-
 .../runtime/io/gates/InputGate.java             |   7 +-
 .../runtime/io/gates/OutputGate.java            |   1 -
 .../runtime/io/network/ChannelManager.java      |  50 +-
 .../runtime/io/network/Envelope.java            | 178 ++++
 .../runtime/io/network/EnvelopeDispatcher.java  |  46 +
 .../io/network/EnvelopeReceiverList.java        |  75 ++
 .../io/network/NetworkConnectionManager.java    | 176 ----
 .../runtime/io/network/SenderHintEvent.java     |   1 -
 .../BufferAvailabilityListener.java             |  17 +-
 .../network/bufferprovider/BufferProvider.java  |  18 +-
 .../bufferprovider/DiscardBufferPool.java       |  51 ++
 .../network/bufferprovider/LocalBufferPool.java |  26 +-
 .../bufferprovider/SerialSingleBufferPool.java  |  77 --
 .../runtime/io/network/envelope/Envelope.java   | 169 ----
 .../io/network/envelope/EnvelopeDispatcher.java |  46 -
 .../io/network/envelope/EnvelopeReader.java     | 212 -----
 .../network/envelope/EnvelopeReceiverList.java  |  75 --
 .../io/network/envelope/EnvelopeWriter.java     | 134 ---
 .../envelope/NoBufferAvailableException.java    |  53 --
 .../network/netty/InboundEnvelopeDecoder.java   | 344 ++++++++
 .../netty/InboundEnvelopeDispatcherHandler.java |  41 +
 .../network/netty/NettyConnectionManager.java   | 251 ++++++
 .../network/netty/OutboundConnectionQueue.java  |  94 ++
 .../network/netty/OutboundEnvelopeEncoder.java  |  65 ++
 .../io/network/tcp/IncomingConnection.java      | 115 ---
 .../network/tcp/IncomingConnectionThread.java   | 226 -----
 .../io/network/tcp/OutgoingConnection.java      | 529 ------------
 .../network/tcp/OutgoingConnectionThread.java   | 276 ------
 .../AdaptiveSpanningRecordDeserializer.java     |  32 +-
 .../io/serialization/DataInputDeserializer.java |  25 +-
 .../io/serialization/DataOutputSerializer.java  |   7 +-
 .../serialization/SpanningRecordSerializer.java |   7 +-
 .../nephele/util/TestBufferProvider.java        |   2 +-
 .../runtime/task/util/OutputEmitterTest.java    |   1 -
 .../task/util/RecordOutputEmitterTest.java      |   2 -
 .../pact/runtime/test/util/MockEnvironment.java |  11 +-
 .../envelope/EnvelopeReaderWriterTest.java      | 394 ---------
 .../netty/InboundEnvelopeDecoderTest.java       | 857 +++++++++++++++++++
 .../netty/NettyConnectionManagerTest.java       | 196 +++++
 .../netty/OutboundEnvelopeEncoderTest.java      |  97 +++
 .../KMeansIterativeNepheleITCase.java           |   3 +-
 .../clients/examples/LocalExecutorITCase.java   |   3 +-
 .../exampleJavaPrograms/WordCountITCase.java    |   5 +-
 .../ConnectedComponentsNepheleITCase.java       |  32 +-
 .../IterationWithChainingNepheleITCase.java     |  12 +-
 .../WordCountUnionReduceITCase.java             |   4 -
 .../test/runtime/NetworkStackNepheleITCase.java | 286 +++++++
 77 files changed, 3236 insertions(+), 2820 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
index 3b9ba3d..96c8965 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
@@ -34,7 +34,6 @@ public final class ConfigConstants {
 	 * The config parameter defining the maximal intra-node parallelism for jobs.
 	 */
 	public static final String PARALLELIZATION_MAX_INTRA_NODE_DEGREE_KEY = "parallelization.intra-node.default";
-
 	
 	// -------------------------------- Runtime -------------------------------
 
@@ -98,6 +97,26 @@ public final class ConfigConstants {
 	 * The config parameter defining the size of the buffers used in the network stack.
 	 */
 	public static final String TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY = "taskmanager.network.bufferSizeInBytes";
+
+	/**
+	 * The number of incoming connection threads used in NettyConnectionManager for the ServerBootstrap.
+	 */
+	public static final String TASK_MANAGER_NETTY_NUM_IN_THREADS_KEY = "taskmanager.netty.numInThreads";
+
+	/**
+	 * The number of outgoing connection threads used in NettyConnectionManager for the Bootstrap.
+	 */
+	public static final String TASK_MANAGER_NETTY_NUM_OUT_THREADS_KEY = "taskmanager.netty.numOutThreads";
+
+	/**
+	 * The low water mark used in NettyConnectionManager for the Bootstrap.
+	 */
+	public static final String TASK_MANAGER_NETTY_LOW_WATER_MARK = "taskmanager.netty.lowWaterMark";
+
+	/**
+	 * The high water mark used in NettyConnectionManager for the Bootstrap.
+	 */
+	public static final String TASK_MANAGER_NETTY_HIGH_WATER_MARK = "taskmanager.netty.highWaterMark";
 	
 	/**
 	 * Parameter for the interval in which the RaskManager sends the periodic heart beat messages
@@ -134,10 +153,9 @@ public final class ConfigConstants {
 	 * The parameter defining the polling interval (in seconds) for the JobClient.
 	 */
 	public static final String JOBCLIENT_POLLING_INTERVAL_KEY = "jobclient.polling.interval";
-	
-	
+
 	// ------------------------ Hadoop Configuration ------------------------
-	
+
 	/**
 	 * Path to hdfs-defaul.xml file
 	 */
@@ -153,7 +171,6 @@ public final class ConfigConstants {
 	 */
 	public static final String PATH_HADOOP_CONFIG = "fs.hdfs.hadoopconf";
 	
-	
 	// ------------------------ File System Bahavior ------------------------
 
 	/**
@@ -251,11 +268,7 @@ public final class ConfigConstants {
 	public static final String STRATOSPHERE_BASE_DIR_PATH_KEY = "stratosphere.base.dir.path";
 	
 	public static final String STRATOSPHERE_JVM_OPTIONS = "env.java.opts";
-	
-	
 
-	
-	
 	// ------------------------------------------------------------------------
 	//                            Default Values
 	// ------------------------------------------------------------------------
@@ -318,7 +331,31 @@ public final class ConfigConstants {
 	 * Default size of network stack buffers.
 	 */
 	public static final int DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE = 32768;
-	
+
+	/**
+	 * Default number of incoming connection threads used in NettyConnectionManager for the ServerBootstrap. If set
+	 * to -1, NettyConnectionManager will pick a reasonable default depending on the number of cores of the machine.
+	 */
+	public static final int DEFAULT_TASK_MANAGER_NETTY_NUM_IN_THREADS = -1;
+
+	/**
+	 * Default number of outgoing connection threads used in NettyConnectionManager for the Bootstrap. If set
+	 * to -1, NettyConnectionManager will pick a reasonable default depending on the number of cores of the machine.
+	 */
+	public static final int DEFAULT_TASK_MANAGER_NETTY_NUM_OUT_THREADS = -1;
+
+	/**
+	 * Default low water mark used in NettyConnectionManager for the Bootstrap. If set to -1, NettyConnectionManager
+	 * will use half of the network buffer size as the low water mark.
+	 */
+	public static final int DEFAULT_TASK_MANAGER_NETTY_LOW_WATER_MARK = -1;
+
+	/**
+	 * Default high water mark used in NettyConnectionManager for the Bootstrap. If set to -1, NettyConnectionManager
+	 * will use the network buffer size as the high water mark.
+	 */
+	public static final int DEFAULT_TASK_MANAGER_NETTY_HIGH_WATER_MARK = -1;
+
 	/**
 	 * The default interval for TaskManager heart beats (2000 msecs).
 	 */
@@ -452,7 +489,6 @@ public final class ConfigConstants {
 	 */
 	public static final int DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX = 1;
 
-	
 	// ------------------------------------------------------------------------
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/pom.xml b/stratosphere-runtime/pom.xml
index bb5ca2a..074725b 100644
--- a/stratosphere-runtime/pom.xml
+++ b/stratosphere-runtime/pom.xml
@@ -52,7 +52,13 @@
 			<artifactId>aws-java-sdk</artifactId>
 			<version>1.2.1</version>
 		</dependency>
-		
+
+		<dependency>
+			<groupId>io.netty</groupId>
+			<artifactId>netty-all</artifactId>
+			<version>4.0.19.Final</version>
+		</dependency>
+
 		<dependency>
 			<groupId>eu.stratosphere</groupId>
 			<artifactId>stratosphere-java</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
index 476e22a..648c8dc 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
@@ -20,6 +20,7 @@ import java.nio.ByteBuffer;
 
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.util.StringUtils;
+import io.netty.buffer.ByteBuf;
 
 /**
  * A statistically unique identification number.
@@ -167,6 +168,11 @@ public class AbstractID implements IOReadableWritable {
 		buffer.putLong(this.upperPart);
 	}
 
+	public void writeTo(ByteBuf buf) {
+		buf.writeLong(this.lowerPart);
+		buf.writeLong(this.upperPart);
+	}
+
 	@Override
 	public String toString() {
 		final byte[] ba = new byte[SIZE];

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java
index 6a09e89..aa196da 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java
@@ -14,9 +14,6 @@
  **********************************************************************************************************************/
 package eu.stratosphere.nephele.execution;
 
-
-import eu.stratosphere.pact.runtime.task.chaining.ExceptionInChainedStubException;
-
 /**
  * Thrown to trigger a canceling of the executing task. Intended to cause a cancelled status, rather than a failed status.
  */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
index e0bcc70..05a420c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
@@ -16,17 +16,21 @@ package eu.stratosphere.nephele.execution;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.fs.Path;
 import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.runtime.io.gates.GateID;
-import eu.stratosphere.runtime.io.gates.InputGate;
-import eu.stratosphere.runtime.io.gates.OutputGate;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
 import eu.stratosphere.nephele.services.iomanager.IOManager;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
-import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
 import eu.stratosphere.nephele.template.InputSplitProvider;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.gates.InputGate;
+import eu.stratosphere.runtime.io.gates.OutputGate;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.FutureTask;
 
 /**
  * The user code of every Nephele task runs inside an <code>Environment</code> object. The environment provides
@@ -38,114 +42,112 @@ public interface Environment {
 	 * Returns the ID of the job from the original job graph. It is used by the library cache manager to find the
 	 * required
 	 * libraries for executing the assigned Nephele task.
-	 * 
+	 *
 	 * @return the ID of the job from the original job graph
 	 */
 	JobID getJobID();
 
 	/**
 	 * Returns the task configuration object which was attached to the original JobVertex.
-	 * 
+	 *
 	 * @return the task configuration object which was attached to the original JobVertex.
 	 */
 	Configuration getTaskConfiguration();
 
 	/**
 	 * Returns the job configuration object which was attached to the original {@link JobGraph}.
-	 * 
+	 *
 	 * @return the job configuration object which was attached to the original {@link JobGraph}
 	 */
 	Configuration getJobConfiguration();
 
 	/**
 	 * Returns the current number of subtasks the respective task is split into.
-	 * 
+	 *
 	 * @return the current number of subtasks the respective task is split into
 	 */
 	int getCurrentNumberOfSubtasks();
 
 	/**
 	 * Returns the index of this subtask in the subtask group.
-	 * 
+	 *
 	 * @return the index of this subtask in the subtask group
 	 */
 	int getIndexInSubtaskGroup();
 
 	/**
 	 * Sends a notification that objects that a new user thread has been started to the execution observer.
-	 * 
-	 * @param userThread
-	 *        the user thread which has been started
+	 *
+	 * @param userThread the user thread which has been started
 	 */
 	void userThreadStarted(Thread userThread);
 
 	/**
 	 * Sends a notification that a user thread has finished to the execution observer.
-	 * 
-	 * @param userThread
-	 *        the user thread which has finished
+	 *
+	 * @param userThread the user thread which has finished
 	 */
 	void userThreadFinished(Thread userThread);
 
 	/**
 	 * Returns the input split provider assigned to this environment.
-	 * 
+	 *
 	 * @return the input split provider or <code>null</code> if no such provider has been assigned to this environment.
 	 */
 	InputSplitProvider getInputSplitProvider();
 
 	/**
 	 * Returns the current {@link IOManager}.
-	 * 
+	 *
 	 * @return the current {@link IOManager}.
 	 */
 	IOManager getIOManager();
 
 	/**
 	 * Returns the current {@link MemoryManager}.
-	 * 
+	 *
 	 * @return the current {@link MemoryManager}.
 	 */
 	MemoryManager getMemoryManager();
 
 	/**
 	 * Returns the name of the task running in this environment.
-	 * 
+	 *
 	 * @return the name of the task running in this environment
 	 */
 	String getTaskName();
 
 	/**
 	 * Returns the next unbound input gate ID or <code>null</code> if no such ID exists
-	 * 
+	 *
 	 * @return the next unbound input gate ID or <code>null</code> if no such ID exists
 	 */
 	GateID getNextUnboundInputGateID();
 
 	/**
 	 * Returns the number of output gates registered with this environment.
-	 * 
+	 *
 	 * @return the number of output gates registered with this environment
 	 */
 	int getNumberOfOutputGates();
 
 	/**
 	 * Returns the number of input gates registered with this environment.
-	 * 
+	 *
 	 * @return the number of input gates registered with this environment
 	 */
 	int getNumberOfInputGates();
 
 	/**
 	 * Returns the number of output channels attached to this environment.
-	 * 
+	 *
 	 * @return the number of output channels attached to this environment
 	 */
 	int getNumberOfOutputChannels();
 
 	/**
 	 * Returns the number of input channels attached to this environment.
-	 * 
+	 *
 	 * @return the number of input channels attached to this environment
 	 */
 	int getNumberOfInputChannels();
@@ -164,50 +166,48 @@ public interface Environment {
 
 	/**
 	 * Returns the IDs of all output channels connected to this environment.
-	 * 
+	 *
 	 * @return the IDs of all output channels connected to this environment
 	 */
 	Set<ChannelID> getOutputChannelIDs();
 
 	/**
 	 * Returns the IDs of all input channels connected to this environment.
-	 * 
+	 *
 	 * @return the IDs of all input channels connected to this environment
 	 */
 	Set<ChannelID> getInputChannelIDs();
 
 	/**
 	 * Returns the IDs of all output gates connected to this environment.
-	 * 
+	 *
 	 * @return the IDs of all output gates connected to this environment
 	 */
 	Set<GateID> getOutputGateIDs();
 
 	/**
 	 * Returns the IDs of all input gates connected to this environment.
-	 * 
+	 *
 	 * @return the IDs of all input gates connected to this environment
 	 */
 	Set<GateID> getInputGateIDs();
 
 	/**
 	 * Returns the IDs of all the output channels connected to the gate with the given ID.
-	 * 
-	 * @param gateID
-	 *        the gate ID
+	 *
+	 * @param gateID the gate ID
 	 * @return the IDs of all the output channels connected to the gate with the given ID
 	 */
 	Set<ChannelID> getOutputChannelIDsOfGate(GateID gateID);
 
 	/**
 	 * Returns the IDs of all the input channels connected to the gate with the given ID.
-	 * 
-	 * @param gateID
-	 *        the gate ID
+	 *
+	 * @param gateID the gate ID
 	 * @return the IDs of all the input channels connected to the gate with the given ID
 	 */
 	Set<ChannelID> getInputChannelIDsOfGate(GateID gateID);
-	
+
 	/**
 	 * Returns the proxy object for the accumulator protocol.
 	 */
@@ -215,11 +215,13 @@ public interface Environment {
 
 	/**
 	 * Returns the buffer provider for this environment.
-	 * <p>
+	 * <p/>
 	 * The returned buffer provider is used by the output side of the network stack.
 	 *
 	 * @return Buffer provider for the output side of the network stack
 	 * @see eu.stratosphere.runtime.io.api.RecordWriter
 	 */
 	BufferProvider getOutputBufferProvider();
+
+	Map<String, FutureTask<Path>> getCopyTask();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
index 55f036a..87674c8 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
@@ -14,6 +14,8 @@
 package eu.stratosphere.nephele.execution;
 
 import static eu.stratosphere.nephele.execution.ExecutionState.FAILED;
+import static eu.stratosphere.nephele.execution.ExecutionState.CANCELED;
+import static eu.stratosphere.nephele.execution.ExecutionState.CANCELING;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
index 59787d2..29d6853 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
@@ -13,30 +13,11 @@
 
 package eu.stratosphere.nephele.execution;
 
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.FutureTask;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.fs.Path;
 import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.deployment.ChannelDeploymentDescriptor;
-import eu.stratosphere.nephele.deployment.GateDeploymentDescriptor;
 import eu.stratosphere.nephele.deployment.TaskDeploymentDescriptor;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
 import eu.stratosphere.nephele.services.iomanager.IOManager;
@@ -45,7 +26,6 @@ import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.nephele.template.InputSplitProvider;
 import eu.stratosphere.runtime.io.Buffer;
 import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.runtime.io.channels.OutputChannel;
 import eu.stratosphere.runtime.io.gates.GateID;
 import eu.stratosphere.runtime.io.gates.InputGate;
@@ -56,12 +36,27 @@ import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
 import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPool;
 import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
 import eu.stratosphere.util.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.FutureTask;
 
 /**
  * The user code of every Nephele task runs inside a <code>RuntimeEnvironment</code> object. The environment provides
  * important services to the task. It keeps track of setting up the communication channels and provides access to input
  * splits, memory manager, etc.
- * <p>
+ * <p/>
  * This class is thread-safe.
  */
 public class RuntimeEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner, Runnable {
@@ -141,7 +136,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	 * The observer object for the task's execution.
 	 */
 	private volatile ExecutionObserver executionObserver = null;
-	
+
 	/**
 	 * The RPC proxy to report accumulators to JobManager
 	 */
@@ -164,26 +159,22 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	private LocalBufferPool outputBufferPool;
 
+	private Map<String,FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
+
 	/**
 	 * Creates a new runtime environment object which contains the runtime information for the encapsulated Nephele
 	 * task.
-	 * 
-	 * @param jobID
-	 *        the ID of the original Nephele job
-	 * @param taskName
-	 *        the name of task running in this environment
-	 * @param invokableClass
-	 *        invokableClass the class that should be instantiated as a Nephele task
-	 * @param taskConfiguration
-	 *        the configuration object which was attached to the original JobVertex
-	 * @param jobConfiguration
-	 *        the configuration object which was attached to the original {@link JobGraph}
-	 * @throws Exception
-	 *         thrown if an error occurs while instantiating the invokable class
+	 *
+	 * @param jobID             the ID of the original Nephele job
+	 * @param taskName          the name of task running in this environment
+	 * @param invokableClass    invokableClass the class that should be instantiated as a Nephele task
+	 * @param taskConfiguration the configuration object which was attached to the original JobVertex
+	 * @param jobConfiguration  the configuration object which was attached to the original JobGraph
+	 * @throws Exception thrown if an error occurs while instantiating the invokable class
 	 */
 	public RuntimeEnvironment(final JobID jobID, final String taskName,
-			final Class<? extends AbstractInvokable> invokableClass, final Configuration taskConfiguration,
-			final Configuration jobConfiguration) throws Exception {
+							final Class<? extends AbstractInvokable> invokableClass, final Configuration taskConfiguration,
+							final Configuration jobConfiguration) throws Exception {
 
 		this.jobID = jobID;
 		this.taskName = taskName;
@@ -203,23 +194,18 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	/**
 	 * Constructs a runtime environment from a task deployment description.
-	 * 
-	 * @param tdd
-	 *        the task deployment description
-	 * @param memoryManager
-	 *        the task manager's memory manager component
-	 * @param ioManager
-	 *        the task manager's I/O manager component
-	 * @param inputSplitProvider
-	 *        the input split provider for this environment
-	 * @throws Exception
-	 *         thrown if an error occurs while instantiating the invokable class
-	 */
-	@SuppressWarnings({ "unchecked", "rawtypes" })
+	 *
+	 * @param tdd                the task deployment description
+	 * @param memoryManager      the task manager's memory manager component
+	 * @param ioManager          the task manager's I/O manager component
+	 * @param inputSplitProvider the input split provider for this environment
+	 * @throws Exception thrown if an error occurs while instantiating the invokable class
+	 */
+	@SuppressWarnings({"unchecked", "rawtypes"})
 	public RuntimeEnvironment(final TaskDeploymentDescriptor tdd,
-			final MemoryManager memoryManager, final IOManager ioManager,
-			final InputSplitProvider inputSplitProvider,
-			AccumulatorProtocol accumulatorProtocolProxy, Map<String, FutureTask<Path>> cpTasks) throws Exception {
+							final MemoryManager memoryManager, final IOManager ioManager,
+							final InputSplitProvider inputSplitProvider,
+							AccumulatorProtocol accumulatorProtocolProxy, Map<String, FutureTask<Path>> cpTasks) throws Exception {
 
 		this.jobID = tdd.getJobID();
 		this.taskName = tdd.getTaskName();
@@ -246,14 +232,14 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 		int numInputGates = tdd.getNumberOfInputGateDescriptors();
 
-		for(int i = 0; i < numInputGates; i++){
+		for (int i = 0; i < numInputGates; i++) {
 			this.inputGates.get(i).initializeChannels(tdd.getInputGateDescriptor(i));
 		}
 	}
 
 	/**
 	 * Returns the invokable object that represents the Nephele task.
-	 * 
+	 *
 	 * @return the invokable object that represents the Nephele task
 	 */
 	public AbstractInvokable getInvokable() {
@@ -272,7 +258,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	@Override
 	public OutputGate createAndRegisterOutputGate() {
-		OutputGate gate = new OutputGate(getJobID(), new GateID(),  getNumberOfOutputGates());
+		OutputGate gate = new OutputGate(getJobID(), new GateID(), getNumberOfOutputGates());
 		this.outputGates.add(gate);
 
 		return gate;
@@ -318,7 +304,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 			if (this.executionObserver.isCanceled() || t instanceof CancelTaskException) {
 				changeExecutionState(ExecutionState.CANCELED, null);
-			} else {
+			}
+			else {
 				changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));
 			}
 
@@ -347,7 +334,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 			if (this.executionObserver.isCanceled() || t instanceof CancelTaskException) {
 				changeExecutionState(ExecutionState.CANCELED, null);
-			} else {
+			}
+			else {
 				changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));
 			}
 
@@ -400,9 +388,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	/**
 	 * Returns the registered input gate with index <code>pos</code>.
-	 * 
-	 * @param pos
-	 *        the index of the input gate to return
+	 *
+	 * @param pos the index of the input gate to return
 	 * @return the input gate at index <code>pos</code> or <code>null</code> if no such index exists
 	 */
 	public InputGate<? extends IOReadableWritable> getInputGate(final int pos) {
@@ -415,9 +402,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	/**
 	 * Returns the registered output gate with index <code>pos</code>.
-	 * 
-	 * @param index
-	 *        the index of the output gate to return
+	 *
+	 * @param index the index of the output gate to return
 	 * @return the output gate at index <code>pos</code> or <code>null</code> if no such index exists
 	 */
 	public OutputGate getOutputGate(int index) {
@@ -430,7 +416,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	/**
 	 * Returns the thread which is assigned to execute the user code.
-	 * 
+	 *
 	 * @return the thread which is assigned to execute the user code
 	 */
 	public Thread getExecutingThread() {
@@ -439,7 +425,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 			if (this.executingThread == null) {
 				if (this.taskName == null) {
 					this.executingThread = new Thread(this);
-				} else {
+				}
+				else {
 					this.executingThread = new Thread(this, getTaskNameWithIndex());
 				}
 			}
@@ -450,11 +437,9 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	/**
 	 * Blocks until all output channels are closed.
-	 * 
-	 * @throws IOException
-	 *         thrown if an error occurred while closing the output channels
-	 * @throws InterruptedException
-	 *         thrown if the thread waiting for the channels to be closed is interrupted
+	 *
+	 * @throws IOException          thrown if an error occurred while closing the output channels
+	 * @throws InterruptedException thrown if the thread waiting for the channels to be closed is interrupted
 	 */
 	private void waitForOutputChannelsToBeClosed() throws InterruptedException {
 		// Make sure, we leave this method with an InterruptedException when the task has been canceled
@@ -469,11 +454,9 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	/**
 	 * Blocks until all input channels are closed.
-	 * 
-	 * @throws IOException
-	 *         thrown if an error occurred while closing the input channels
-	 * @throws InterruptedException
-	 *         thrown if the thread waiting for the channels to be closed is interrupted
+	 *
+	 * @throws IOException          thrown if an error occurred while closing the input channels
+	 * @throws InterruptedException thrown if the thread waiting for the channels to be closed is interrupted
 	 */
 	private void waitForInputChannelsToBeClosed() throws IOException, InterruptedException {
 		// Wait for disconnection of all output gates
@@ -494,7 +477,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 			if (allClosed) {
 				break;
-			} else {
+			}
+			else {
 				Thread.sleep(SLEEPINTERVAL);
 			}
 		}
@@ -564,7 +548,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	/**
 	 * Returns the name of the task with its index in the subtask group and the total number of subtasks.
-	 * 
+	 *
 	 * @return the name of the task with its index in the subtask group and the total number of subtasks
 	 */
 	public String getTaskNameWithIndex() {
@@ -573,9 +557,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	/**
 	 * Sets the execution observer for this environment.
-	 * 
-	 * @param executionObserver
-	 *        the execution observer for this environment
+	 *
+	 * @param executionObserver the execution observer for this environment
 	 */
 	public void setExecutionObserver(final ExecutionObserver executionObserver) {
 		this.executionObserver = executionObserver;
@@ -616,7 +599,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	@Override
 	public Set<ChannelID> getOutputChannelIDs() {
-		Set<ChannelID> ids= new HashSet<ChannelID>();
+		Set<ChannelID> ids = new HashSet<ChannelID>();
 
 		for (OutputGate gate : this.outputGates) {
 			for (OutputChannel channel : gate.channels()) {
@@ -726,38 +709,47 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	public List<InputGate<? extends IOReadableWritable>> inputGates() {
 		return this.inputGates;
 	}
-	
+
 	@Override
 	public AccumulatorProtocol getAccumulatorProtocolProxy() {
 		return accumulatorProtocolProxy;
 	}
 
+	public void addCopyTaskForCacheFile(String name, FutureTask<Path> copyTask) {
+		this.cacheCopyTasks.put(name, copyTask);
+	}
+
+	@Override
+	public Map<String, FutureTask<Path>> getCopyTask() {
+		return this.cacheCopyTasks;
+	}
+
 	@Override
 	public BufferProvider getOutputBufferProvider() {
 		return this;
 	}
-	
+
 	// -----------------------------------------------------------------------------------------------------------------
 	//                                            BufferProvider methods
 	// -----------------------------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public Buffer requestBuffer(int minBufferSize) throws IOException {
 		return this.outputBufferPool.requestBuffer(minBufferSize);
 	}
-	
+
 	@Override
 	public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
 		return this.outputBufferPool.requestBufferBlocking(minBufferSize);
 	}
-	
+
 	@Override
 	public int getBufferSize() {
 		return this.outputBufferPool.getBufferSize();
 	}
-	
+
 	@Override
-	public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
 		return this.outputBufferPool.registerBufferAvailabilityListener(listener);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
index 57ff073..8e9395a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import eu.stratosphere.nephele.taskmanager.TaskKillResult;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -691,6 +692,41 @@ public final class ExecutionVertex {
 	}
 
 	/**
+	 * Kills and removes the task represented by this vertex from the instance it is currently running on. If the
+	 * corresponding task is not in the state <code>RUNNING</code>, this call will be ignored. If the call has been
+	 * executed
+	 * successfully, the task will change the state <code>FAILED</code>.
+	 *
+	 * @return the result of the task kill attempt
+	 */
+	public TaskKillResult killTask() {
+
+		final ExecutionState state = this.executionState.get();
+
+		if (state != ExecutionState.RUNNING) {
+			final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.ReturnCode.ILLEGAL_STATE);
+			result.setDescription("Vertex " + this.toString() + " is in state " + state);
+			return result;
+		}
+
+		final AllocatedResource ar = this.allocatedResource.get();
+
+		if (ar == null) {
+			final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.ReturnCode.NO_INSTANCE);
+			result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
+			return result;
+		}
+
+		try {
+			return ar.getInstance().killTask(this.vertexID);
+		} catch (IOException e) {
+			final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.ReturnCode.IPC_ERROR);
+			result.setDescription(StringUtils.stringifyException(e));
+			return result;
+		}
+	}
+
+	/**
 	 * Cancels and removes the task represented by this vertex
 	 * from the instance it is currently running on. If the task
 	 * is not currently running, its execution state is simply

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
index 50e0e7f..56b4eae 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
@@ -24,6 +24,7 @@ import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileRequest
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
+import eu.stratosphere.nephele.taskmanager.TaskKillResult;
 import eu.stratosphere.runtime.io.channels.ChannelID;
 import eu.stratosphere.nephele.ipc.RPC;
 import eu.stratosphere.nephele.jobgraph.JobID;
@@ -205,6 +206,21 @@ public abstract class AbstractInstance extends NetworkNode {
 		return getTaskManagerProxy().cancelTask(id);
 	}
 
+	/**
+	 * Kills the task identified by the given ID at the instance's
+	 * {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
+	 *
+	 * @param id
+	 *        the ID identifying the task to be killed
+	 * @throws IOException
+	 *         thrown if an error occurs while transmitting the request or receiving the response
+	 * @return the result of the kill attempt
+	 */
+	public synchronized TaskKillResult killTask(final ExecutionVertexID id) throws IOException {
+
+		return getTaskManagerProxy().killTask(id);
+	}
+
 	@Override
 	public boolean equals(final Object obj) {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
index 2ec2ed6..e32df61 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
@@ -13,11 +13,10 @@
 
 package eu.stratosphere.nephele.jobgraph;
 
-import java.nio.ByteBuffer;
+import eu.stratosphere.nephele.AbstractID;
 
 import javax.xml.bind.DatatypeConverter;
-
-import eu.stratosphere.nephele.AbstractID;
+import java.nio.ByteBuffer;
 
 public final class JobID extends AbstractID {
 
@@ -44,6 +43,12 @@ public final class JobID extends AbstractID {
 		return new JobID(bytes);
 	}
 
+	public static JobID fromByteBuffer(ByteBuffer buf) {
+		long lower = buf.getLong();
+		long upper = buf.getLong();
+		return new JobID(lower, upper);
+	}
+
 	public static JobID fromByteBuffer(ByteBuffer buf, int offset) {
 		long lower = buf.getLong(offset);
 		long upper = buf.getLong(offset + 8);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
index 846ca2e..3ae9f3b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
@@ -31,6 +31,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
+import eu.stratosphere.nephele.taskmanager.TaskKillResult;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
@@ -893,6 +895,38 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 	}
 
 	@Override
+	public void killTask(final JobID jobID, final ManagementVertexID id) throws IOException {
+
+		final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
+		if (eg == null) {
+			LOG.error("Cannot find execution graph for job " + jobID);
+			return;
+		}
+
+		final ExecutionVertex vertex = eg.getVertexByID(ExecutionVertexID.fromManagementVertexID(id));
+		if (vertex == null) {
+			LOG.error("Cannot find execution vertex with ID " + id);
+			return;
+		}
+
+		LOG.info("Killing task " + vertex + " of job " + jobID);
+
+		final Runnable runnable = new Runnable() {
+
+			@Override
+			public void run() {
+
+				final TaskKillResult result = vertex.killTask();
+				if (result.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) {
+					LOG.error(result.getDescription());
+				}
+			}
+		};
+
+		eg.executeCommand(runnable);
+	}
+
+	@Override
 	public void killInstance(final StringRecord instanceName) throws IOException {
 
 		final AbstractInstance instance = this.instanceManager.getInstanceByName(instanceName.toString());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
index 762b494..e369613 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 
+import eu.stratosphere.nephele.taskmanager.AbstractTaskResult;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -87,8 +88,8 @@ public final class RecoveryLogic {
 				verticesToBeRestarted.put(vertex.getID(), vertex);
 				final TaskCancelResult cancelResult = vertex.cancelTask();
 
-				if (cancelResult.getReturnCode() != ReturnCode.SUCCESS
-						&& cancelResult.getReturnCode() != ReturnCode.TASK_NOT_FOUND) {
+				if (cancelResult.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS
+						&& cancelResult.getReturnCode() != AbstractTaskResult.ReturnCode.TASK_NOT_FOUND) {
 
 					verticesToBeRestarted.remove(vertex.getID());
 					LOG.error("Unable to cancel vertex" + cancelResult.getDescription());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
index 461c797..59ec15d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
@@ -24,6 +24,7 @@ import eu.stratosphere.nephele.instance.InstanceType;
 import eu.stratosphere.nephele.instance.InstanceTypeDescription;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.managementgraph.ManagementGraph;
+import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
 import eu.stratosphere.nephele.topology.NetworkTopology;
 
 /**
@@ -81,6 +82,18 @@ public interface ExtendedManagementProtocol extends JobManagementProtocol {
 	List<AbstractEvent> getEvents(JobID jobID) throws IOException;
 
 	/**
+	 * Kills the task with the given vertex ID.
+	 *
+	 * @param jobID
+	 *        the ID of the job the vertex to be killed belongs to
+	 * @param id
+	 *        the vertex ID which identified the task be killed
+	 * @throws IOException
+	 *         thrown if an error occurs while transmitting the kill request
+	 */
+	void killTask(JobID jobID, ManagementVertexID id) throws IOException;
+
+	/**
 	 * Kills the instance with the given name (i.e. shuts down its task manager).
 	 * 
 	 * @param instanceName

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
index 19522db..93d8cdf 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
@@ -23,6 +23,7 @@ import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileRequest
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
+import eu.stratosphere.nephele.taskmanager.TaskKillResult;
 import eu.stratosphere.runtime.io.channels.ChannelID;
 import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
 import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
@@ -58,6 +59,17 @@ public interface TaskOperationProtocol extends VersionedProtocol {
 	TaskCancelResult cancelTask(ExecutionVertexID id) throws IOException;
 
 	/**
+	 * Advises the task manager to kill the task with the given ID.
+	 *
+	 * @param id
+	 *        the ID of the task to kill
+	 * @return the result of the task kill attempt
+	 * @throws IOException
+	 *         thrown if an error occurs during this remote procedure call
+	 */
+	TaskKillResult killTask(ExecutionVertexID id) throws IOException;
+
+	/**
 	 * Queries the task manager about the cache status of the libraries stated in the {@link LibraryCacheProfileRequest}
 	 * object.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
index 06eec0c..825eae1 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
@@ -25,6 +25,7 @@ import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.profiling.TaskManagerProfiler;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
 import eu.stratosphere.nephele.template.AbstractInvokable;
+import eu.stratosphere.util.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -57,9 +58,7 @@ public final class Task implements ExecutionObserver {
 
 	private Queue<ExecutionListener> registeredListeners = new ConcurrentLinkedQueue<ExecutionListener>();
 
-	public Task(final ExecutionVertexID vertexID, final RuntimeEnvironment environment,
-					   final TaskManager taskManager) {
-
+	public Task(ExecutionVertexID vertexID, final RuntimeEnvironment environment, TaskManager taskManager) {
 		this.vertexID = vertexID;
 		this.environment = environment;
 		this.taskManager = taskManager;
@@ -102,58 +101,51 @@ public final class Task implements ExecutionObserver {
 		executionStateChanged(ExecutionState.FAILED, "Execution thread died unexpectedly");
 	}
 
-	/**
-	 * Checks if the state of the thread which is associated with this task is <code>TERMINATED</code>.
-	 * 
-	 * @return <code>true</code> if the state of this thread which is associated with this task is
-	 *         <code>TERMINATED</code>, <code>false</code> otherwise
-	 */
-	public boolean isTerminated() {
-		final Thread executingThread = this.environment.getExecutingThread();
-		if (executingThread.getState() == Thread.State.TERMINATED) {
-			return true;
-		}
-
-		return false;
+	public void cancelExecution() {
+		cancelOrKillExecution(true);
 	}
 
-	/**
-	 * Starts the execution of this task.
-	 */
-	public void startExecution() {
-
-		final Thread thread = this.environment.getExecutingThread();
-		thread.start();
+	public void killExecution() {
+		cancelOrKillExecution(false);
 	}
 
 	/**
-	 * Cancels the execution of the task (i.e. interrupts the execution thread).
+	 * Cancels or kills the task.
+	 *
+	 * @param cancel <code>true/code> if the task shall be canceled, <code>false</code> if it shall be killed
 	 */
-	public void cancelExecution() {
+	private void cancelOrKillExecution(boolean cancel) {
 		final Thread executingThread = this.environment.getExecutingThread();
 
 		if (executingThread == null) {
 			return;
 		}
 
-		LOG.info("Canceling " + this.environment.getTaskNameWithIndex());
+		if (this.executionState != ExecutionState.RUNNING && this.executionState != ExecutionState.FINISHING) {
+			return;
+		}
+
+		LOG.info((cancel ? "Canceling " : "Killing ") + this.environment.getTaskNameWithIndex());
 
-		this.isCanceled = true;
-		// Change state
-		executionStateChanged(ExecutionState.CANCELING, null);
+		if (cancel) {
+			this.isCanceled = true;
+			// Change state
+			executionStateChanged(ExecutionState.CANCELING, null);
 
-		// Request user code to shut down
-		try {
-			final AbstractInvokable invokable = this.environment.getInvokable();
-			if (invokable != null) {
-				invokable.cancel();
+			// Request user code to shut down
+			try {
+				final AbstractInvokable invokable = this.environment.getInvokable();
+				if (invokable != null) {
+					invokable.cancel();
+				}
+			} catch (Throwable e) {
+				LOG.error(StringUtils.stringifyException(e));
 			}
-		} catch (Throwable e) {
-			LOG.error("Error while canceling task", e);
 		}
 
 		// Continuously interrupt the user thread until it changed to state CANCELED
 		while (true) {
+
 			executingThread.interrupt();
 
 			if (!executingThread.isAlive()) {
@@ -168,12 +160,36 @@ public final class Task implements ExecutionObserver {
 				break;
 			}
 
-			if (LOG.isDebugEnabled())
-				LOG.debug("Sending repeated canceling  signal to " +
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Sending repeated " + (cancel == true ? "canceling" : "killing") + " signal to " +
 						this.environment.getTaskName() + " with state " + this.executionState);
+			}
 		}
 	}
 
+	/**
+	 * Checks if the state of the thread which is associated with this task is <code>TERMINATED</code>.
+	 * 
+	 * @return <code>true</code> if the state of this thread which is associated with this task is
+	 *         <code>TERMINATED</code>, <code>false</code> otherwise
+	 */
+	public boolean isTerminated() {
+		final Thread executingThread = this.environment.getExecutingThread();
+		if (executingThread.getState() == Thread.State.TERMINATED) {
+			return true;
+		}
+
+		return false;
+	}
+
+	/**
+	 * Starts the execution of this task.
+	 */
+	public void startExecution() {
+
+		final Thread thread = this.environment.getExecutingThread();
+		thread.start();
+	}
 
 	/**
 	 * Registers the task manager profiler with the task.
@@ -324,5 +340,4 @@ public final class Task implements ExecutionObserver {
 
 		return this.environment;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java
new file mode 100644
index 0000000..3c0002c
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java
@@ -0,0 +1,44 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.taskmanager;
+
+import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
+
+/**
+ * A <code>TaskKillResult</code> is used to report the results
+ * of a task kill attempt. It contains the ID of the task to be killed, a return code and
+ * a description. In case of an error during the kill operation the description includes an error message.
+ * 
+ */
+public class TaskKillResult extends AbstractTaskResult {
+
+	/**
+	 * Constructs a new task kill result.
+	 * 
+	 * @param vertexID
+	 *        the task ID this result belongs to
+	 * @param returnCode
+	 *        the return code of the kill
+	 */
+	public TaskKillResult(final ExecutionVertexID vertexID, final ReturnCode returnCode) {
+		super(vertexID, returnCode);
+	}
+
+	/**
+	 * Constructs an empty task kill result.
+	 */
+	public TaskKillResult() {
+		super();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index 5240fc8..3b478cf 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -126,7 +126,7 @@ public class TaskManager implements TaskOperationProtocol {
 	private final InstanceConnectionInfo localInstanceConnectionInfo;
 
 	/**
-	 * The instance of the {@link eu.stratosphere.nephele.taskmanager.io.bytebuffered.ChannelManager} which is responsible for
+	 * The instance of the {@link ChannelManager} which is responsible for
 	 * setting up and cleaning up the byte buffered channels of the tasks.
 	 */
 	private final ChannelManager channelManager;
@@ -286,16 +286,32 @@ public class TaskManager implements TaskOperationProtocol {
 				ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
 				ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
 
-		// Initialize the byte buffered channel manager
-		ChannelManager channelManager = null;
+		int numInThreads = GlobalConfiguration.getInteger(
+				ConfigConstants.TASK_MANAGER_NETTY_NUM_IN_THREADS_KEY,
+				ConfigConstants.DEFAULT_TASK_MANAGER_NETTY_NUM_IN_THREADS);
+
+		int numOutThreads = GlobalConfiguration.getInteger(
+				ConfigConstants.TASK_MANAGER_NETTY_NUM_OUT_THREADS_KEY,
+				ConfigConstants.DEFAULT_TASK_MANAGER_NETTY_NUM_OUT_THREADS);
+
+		int lowWaterMark = GlobalConfiguration.getInteger(
+				ConfigConstants.TASK_MANAGER_NETTY_LOW_WATER_MARK,
+				ConfigConstants.DEFAULT_TASK_MANAGER_NETTY_LOW_WATER_MARK);
+
+		int highWaterMark = GlobalConfiguration.getInteger(
+				ConfigConstants.TASK_MANAGER_NETTY_HIGH_WATER_MARK,
+				ConfigConstants.DEFAULT_TASK_MANAGER_NETTY_HIGH_WATER_MARK);
+
+		// Initialize the channel manager
 		try {
-			channelManager = new ChannelManager(this.lookupService, this.localInstanceConnectionInfo, numBuffers, bufferSize);
+			this.channelManager = new ChannelManager(
+					this.lookupService, this.localInstanceConnectionInfo,
+					numBuffers, bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark);
 		} catch (IOException ioe) {
 			LOG.error(StringUtils.stringifyException(ioe));
 			throw new Exception("Failed to instantiate Byte-buffered channel manager. " + ioe.getMessage(), ioe);
 		}
-		this.channelManager = channelManager;
-		
+
 		{
 			HardwareDescription resources = HardwareDescriptionFactory.extractFromSystem();
 
@@ -933,7 +949,7 @@ public class TaskManager implements TaskOperationProtocol {
 	}
 
 	@Override
-	public void logBufferUtilization() throws IOException {
+	public void logBufferUtilization() {
 
 		this.channelManager.logBufferUtilization();
 	}
@@ -956,7 +972,7 @@ public class TaskManager implements TaskOperationProtocol {
 
 	@Override
 	public void invalidateLookupCacheEntries(final Set<ChannelID> channelIDs) throws IOException {
-		this.byteBufferedChannelManager.invalidateLookupCacheEntries(channelIDs);
+		this.channelManager.invalidateLookupCacheEntries(channelIDs);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java
new file mode 100644
index 0000000..ed845e8
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java
@@ -0,0 +1,35 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+package eu.stratosphere.nephele.taskmanager.runtime;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ExecutorThreadFactory implements ThreadFactory {
+	
+	public static final ExecutorThreadFactory INSTANCE = new ExecutorThreadFactory();
+
+	private static final String THREAD_NAME = "Nephele Executor Thread ";
+	
+	private final AtomicInteger threadNumber = new AtomicInteger(1);
+	
+	
+	private ExecutorThreadFactory() {}
+	
+	
+	public Thread newThread(Runnable target) {
+		Thread t = new Thread(target, THREAD_NAME + threadNumber.getAndIncrement());
+		t.setDaemon(true);
+		return t;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java
index 674f270..79b9c83 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java
@@ -21,7 +21,6 @@ import eu.stratosphere.api.common.typeutils.TypeSerializer;
 import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
 import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.execution.Environment;
-import eu.stratosphere.nephele.io.MutableReader;
 import eu.stratosphere.pact.runtime.hash.CompactingHashTable;
 import eu.stratosphere.pact.runtime.iterative.concurrent.BlockingBackChannel;
 import eu.stratosphere.pact.runtime.iterative.concurrent.BlockingBackChannelBroker;
@@ -36,6 +35,7 @@ import eu.stratosphere.pact.runtime.task.RegularPactTask;
 import eu.stratosphere.pact.runtime.task.ResettablePactDriver;
 import eu.stratosphere.pact.runtime.task.util.TaskConfig;
 import eu.stratosphere.pact.runtime.udf.RuntimeUDFContext;
+import eu.stratosphere.runtime.io.api.MutableReader;
 import eu.stratosphere.types.Value;
 import eu.stratosphere.util.Collector;
 import eu.stratosphere.util.InstantiationUtil;
@@ -322,14 +322,15 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
 	 * <p/>
 	 * This collector is used by {@link IterationIntermediatePactTask} or {@link IterationTailPactTask} to update the
 	 * solution set of workset iterations. Depending on the task configuration, either a fast (non-probing)
-	 * {@link SolutionSetFastUpdateOutputCollector} or normal (re-probing) {@link SolutionSetUpdateOutputCollector}
-	 * is created.
+	 * {@link eu.stratosphere.pact.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or normal (re-probing)
+	 * {@link SolutionSetUpdateOutputCollector} is created.
 	 * <p/>
 	 * If a non-null delegate is given, the new {@link Collector} will write back to the solution set and also call
 	 * collect(T) of the delegate.
 	 *
 	 * @param delegate null -OR- a delegate collector to be called by the newly created collector
-	 * @return a new {@link SolutionSetFastUpdateOutputCollector} or {@link SolutionSetUpdateOutputCollector}
+	 * @return a new {@link eu.stratosphere.pact.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or
+	 * {@link SolutionSetUpdateOutputCollector}
 	 */
 	protected Collector<OT> createSolutionSetUpdateOutputCollector(Collector<OT> delegate) {
 		Broker<CompactingHashTable<?>> solutionSetBroker = SolutionSetBroker.instance();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
index c39e3ef..89571c4 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
@@ -17,6 +17,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import eu.stratosphere.runtime.io.api.BufferWriter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -27,8 +28,6 @@ import eu.stratosphere.api.common.typeutils.TypeSerializer;
 import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
 import eu.stratosphere.core.memory.DataInputView;
 import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.nephele.io.AbstractRecordWriter;
-import eu.stratosphere.nephele.io.RecordWriter;
 import eu.stratosphere.pact.runtime.hash.CompactingHashTable;
 import eu.stratosphere.pact.runtime.io.InputViewIterator;
 import eu.stratosphere.pact.runtime.iterative.concurrent.BlockingBackChannel;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
index 46e3249..6491749 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
@@ -18,7 +18,6 @@ import eu.stratosphere.api.common.typeutils.TypeComparator;
 import eu.stratosphere.runtime.io.api.ChannelSelector;
 import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
 
-
 public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T>> {
 	
 	private final ShipStrategyType strategy;		// the shipping strategy used by this output emitter

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
index ba352eb..7fe35b4 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
@@ -15,13 +15,10 @@ package eu.stratosphere.pact.runtime.shipping;
 
 import eu.stratosphere.api.common.distributions.DataDistribution;
 import eu.stratosphere.api.common.typeutils.TypeComparator;
-import eu.stratosphere.nephele.io.ChannelSelector;
 import eu.stratosphere.runtime.io.api.ChannelSelector;
-import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordComparator;
 import eu.stratosphere.types.Key;
 import eu.stratosphere.types.Record;
 
-
 public class RecordOutputEmitter implements ChannelSelector<Record> {
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
index cb3e782..638a7aa 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
@@ -193,8 +193,9 @@ public class DataSinkTask<IT> extends AbstractOutputTask {
 			}
 			// drop, if the task was canceled
 			else if (!this.taskCanceled) {
-				if (LOG.isErrorEnabled())
+				if (LOG.isErrorEnabled()) {
 					LOG.error(getLogString("Error in user code: " + ex.getMessage()), ex);
+				}
 				throw ex;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
index b01799a..92c4648 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
@@ -22,9 +22,7 @@ import eu.stratosphere.api.common.typeutils.TypeComparator;
 import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
 import eu.stratosphere.api.common.typeutils.TypeSerializer;
 import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
-import eu.stratosphere.configuration.ConfigConstants;
 import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.GlobalConfiguration;
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.nephele.execution.CancelTaskException;
 import eu.stratosphere.nephele.execution.Environment;
@@ -280,8 +278,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 	@Override
 	public void invoke() throws Exception {
 
-		if (LOG.isDebugEnabled())
+		if (LOG.isDebugEnabled()) {
 			LOG.debug(formatLogString("Start task code."));
+		}
 
 		// whatever happens in this scope, make sure that the local strategies are cleaned up!
 		// note that the initialization of the local strategies is in the try-finally block as well,
@@ -390,8 +389,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 	public void cancel() throws Exception {
 		this.running = false;
 
-		if (LOG.isDebugEnabled())
+		if (LOG.isDebugEnabled()) {
 			LOG.debug(formatLogString("Cancelling task code"));
+		}
 
 		try {
 			if (this.driver != null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/Buffer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/Buffer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/Buffer.java
index c192cb9..f3f51f1 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/Buffer.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/Buffer.java
@@ -40,8 +40,6 @@ public class Buffer {
 	}
 
 	/**
-	 * NOTE: Requires that the reference counter was increased prior to the constructor call!
-	 *
 	 * @param toDuplicate Buffer instance to duplicate
 	 */
 	private Buffer(Buffer toDuplicate) {
@@ -74,7 +72,8 @@ public class Buffer {
 	}
 
 	public void recycleBuffer() {
-		if (this.referenceCounter.decrementAndGet() == 0) {
+		int refCount = this.referenceCounter.decrementAndGet();
+		if (refCount == 0) {
 			this.recycler.recycle(this.memorySegment);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/Channel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/Channel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/Channel.java
index dfa5d5e..17fff02 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/Channel.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/Channel.java
@@ -17,8 +17,8 @@ import java.io.IOException;
 
 import eu.stratosphere.nephele.event.task.AbstractEvent;
 import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
-import eu.stratosphere.runtime.io.network.envelope.EnvelopeDispatcher;
+import eu.stratosphere.runtime.io.network.Envelope;
+import eu.stratosphere.runtime.io.network.EnvelopeDispatcher;
 
 /**
  * The base class for channel objects.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelID.java
index 66be7de..ba78c01 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelID.java
@@ -13,10 +13,10 @@
 
 package eu.stratosphere.runtime.io.channels;
 
-import java.nio.ByteBuffer;
-
 import eu.stratosphere.nephele.AbstractID;
 
+import java.nio.ByteBuffer;
+
 public class ChannelID extends AbstractID {
 
 	public ChannelID() {
@@ -31,6 +31,12 @@ public class ChannelID extends AbstractID {
 		super(bytes);
 	}
 
+	public static ChannelID fromByteBuffer(ByteBuffer buf) {
+		long lower = buf.getLong();
+		long upper = buf.getLong();
+		return new ChannelID(lower, upper);
+	}
+
 	public static ChannelID fromByteBuffer(ByteBuffer buf, int offset) {
 		long lower = buf.getLong(offset);
 		long upper = buf.getLong(offset + 8);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelType.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelType.java
index 5d5b53d..3007489 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelType.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelType.java
@@ -24,3 +24,8 @@ public enum ChannelType {
 	/** In-memory channels */
 	IN_MEMORY
 }
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java
index 860141d..6122c36 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java
@@ -21,7 +21,7 @@ import eu.stratosphere.runtime.io.Buffer;
 import eu.stratosphere.runtime.io.gates.InputChannelResult;
 import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
 import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
+import eu.stratosphere.runtime.io.network.Envelope;
 import eu.stratosphere.runtime.io.gates.InputGate;
 import eu.stratosphere.runtime.io.serialization.AdaptiveSpanningRecordDeserializer;
 import eu.stratosphere.runtime.io.serialization.RecordDeserializer;
@@ -30,7 +30,6 @@ import eu.stratosphere.runtime.io.serialization.RecordDeserializer.Deserializati
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Arrays;
@@ -83,6 +82,8 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 
 	private int lastReceivedEnvelope = -1;
 
+	private ChannelID lastSourceID = null;
+
 	private boolean destroyCalled = false;
 
 	// ----------------------
@@ -104,7 +105,7 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 	 *        the ID of the channel this channel is connected to
 	 */
 	public InputChannel(final InputGate<T> inputGate, final int channelIndex, final ChannelID channelID,
-						   final ChannelID connectedChannelID, ChannelType type) {
+						final ChannelID connectedChannelID, ChannelType type) {
 		super(channelIndex, channelID, connectedChannelID, type);
 		this.inputGate = inputGate;
 		this.deserializer = new AdaptiveSpanningRecordDeserializer<T>();
@@ -158,6 +159,9 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 
 //	public abstract AbstractTaskEvent getCurrentEvent();
 
+	private DeserializationResult lastDeserializationResult;
+
+
 	public InputChannelResult readRecord(T target) throws IOException {
 		if (this.dataBuffer == null) {
 			if (isClosed()) {
@@ -176,7 +180,7 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 			{
 				// sanity check: an event may only come after a complete record.
 				if (this.deserializer.hasUnfinishedData()) {
-					throw new IOException("Channel received an event before completing the current partial record.");
+					throw new IllegalStateException("Channel received an event before completing the current partial record.");
 				}
 
 				AbstractEvent evt = boe.getEvent();
@@ -202,8 +206,8 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 			}
 		}
 
-
 		DeserializationResult deserializationResult = this.deserializer.getNextRecord(target);
+		this.lastDeserializationResult = deserializationResult;
 
 		if (deserializationResult.isBufferConsumed()) {
 			releasedConsumedReadBuffer(this.dataBuffer);
@@ -348,6 +352,7 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 
 				this.queuedEnvelopes.add(envelope);
 				this.lastReceivedEnvelope = sequenceNumber;
+				this.lastSourceID = envelope.getSource();
 
 				// Notify the channel about the new data. notify as much as there is (buffer plus once per event)
 				if (envelope.getBuffer() != null) {
@@ -432,7 +437,7 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 	}
 
 	@Override
-	public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
 		return this.inputGate.registerBufferAvailabilityListener(listener);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/OutputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/OutputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/OutputChannel.java
index 7ca916c..f4fed65 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/OutputChannel.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/OutputChannel.java
@@ -17,7 +17,7 @@ import eu.stratosphere.nephele.event.task.AbstractEvent;
 import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.runtime.io.Buffer;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
+import eu.stratosphere.runtime.io.network.Envelope;
 import eu.stratosphere.runtime.io.gates.OutputGate;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -107,7 +107,6 @@ public class OutputChannel extends Channel {
 					this.receiverCloseRequested = true;
 					this.closeLock.notifyAll();
 				}
-				LOG.debug("OutputChannel received close event from target.");
 			} 
 			else if (event instanceof AbstractTaskEvent) {
 				if (LOG.isDebugEnabled()) {
@@ -165,7 +164,7 @@ public class OutputChannel extends Channel {
 	
 	private void checkStatus() throws IOException {
 		if (this.senderCloseRequested) {
-			throw new IllegalStateException(String.format("Channel %s already requested to be closed.", getID()));
+			throw new IllegalStateException(String.format("Channel %s already requested to be closed", getID()));
 		}
 		if (this.receiverCloseRequested) {
 			throw new ReceiverAlreadyClosedException();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
index bdac7a2..c623220 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
@@ -14,9 +14,6 @@
 package eu.stratosphere.runtime.io.gates;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
@@ -24,7 +21,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import eu.stratosphere.nephele.deployment.ChannelDeploymentDescriptor;
 import eu.stratosphere.nephele.deployment.GateDeploymentDescriptor;
 import eu.stratosphere.runtime.io.Buffer;
-import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
 import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
 import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
@@ -38,7 +34,6 @@ import eu.stratosphere.nephele.event.task.AbstractEvent;
 import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
 import eu.stratosphere.nephele.execution.Environment;
 import eu.stratosphere.runtime.io.channels.InputChannel;
-import eu.stratosphere.runtime.io.channels.ChannelID;
 import eu.stratosphere.nephele.jobgraph.JobID;
 
 /**
@@ -378,7 +373,7 @@ public class InputGate<T extends IOReadableWritable> extends Gate<T> implements
 	}
 
 	@Override
-	public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
 		return this.bufferPool.registerBufferAvailabilityListener(listener);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java
index d3eaea1..bff9180 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java
@@ -19,7 +19,6 @@ import eu.stratosphere.nephele.deployment.GateDeploymentDescriptor;
 import eu.stratosphere.nephele.event.task.AbstractEvent;
 import eu.stratosphere.runtime.io.Buffer;
 import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.runtime.io.channels.OutputChannel;
 import eu.stratosphere.nephele.jobgraph.JobID;