You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/11 22:19:39 UTC

[5/8] flink git commit: [FLINK-1672] [runtime] Unify Task and RuntimeEnvironment into one class.

[FLINK-1672] [runtime] Unify Task and RuntimeEnvironment into one class.

 - This simplifies and hardens the failure handling during task startup
 - Guarantees that no actor system threads are blocked by task bootstrap, or task canceling
 - Corrects some previously erroneous corner case state transitions
 - Adds simple and robust tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e613014
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e613014
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e613014

Branch: refs/heads/master
Commit: 8e61301452218e6d279b013beb7bbd02a7c2e3f9
Parents: 1d368a4
Author: Stephan Ewen <se...@apache.org>
Authored: Sun May 3 04:41:03 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon May 11 21:13:41 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/execution/Environment.java    |   3 +
 .../flink/runtime/execution/ExecutionState.java |   6 +-
 .../runtime/execution/RuntimeEnvironment.java   | 458 ---------
 .../runtime/io/network/NetworkEnvironment.java  |  11 +-
 .../io/network/partition/ResultPartition.java   |  14 +-
 .../partition/consumer/SingleInputGate.java     |  15 +-
 .../runtime/operators/RegularPactTask.java      |   3 +-
 .../runtime/taskmanager/RuntimeEnvironment.java | 230 +++++
 .../apache/flink/runtime/taskmanager/Task.java  | 990 ++++++++++++++-----
 .../runtime/messages/TaskManagerMessages.scala  |  12 +-
 .../flink/runtime/messages/TaskMessages.scala   |   6 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 321 ++----
 .../consumer/LocalInputChannelTest.java         |   7 +-
 .../partition/consumer/SingleInputGateTest.java |   6 +-
 .../partition/consumer/TestSingleInputGate.java |   4 +-
 .../partition/consumer/UnionInputGateTest.java  |   9 +-
 .../operators/testutils/MockEnvironment.java    |   6 +
 .../runtime/taskmanager/ForwardingActor.java    |  41 +
 .../taskmanager/TaskExecutionStateTest.java     |  33 +
 .../runtime/taskmanager/TaskManagerTest.java    | 152 ++-
 .../flink/runtime/taskmanager/TaskTest.java     | 874 ++++++++++++----
 .../testingUtils/TestingTaskManager.scala       |   6 +-
 22 files changed, 1972 insertions(+), 1235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 7ab3bc9..081e3ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.execution;
 
+import akka.actor.ActorRef;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -159,4 +160,6 @@ public interface Environment {
 
 	InputGate[] getAllInputGates();
 
+	// this should go away
+	ActorRef getJobManager();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
index 2fcaea1..9f4a5a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
@@ -35,10 +35,10 @@ package org.apache.flink.runtime.execution;
  *                                               ... -> FAILED
  * </pre>
  *
- * It is possible to enter the {@code FAILED} state from any other state.
+ * <p>It is possible to enter the {@code FAILED} state from any other state.</p>
  *
- * The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are
- * considered terminal states.
+ * <p>The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are
+ * considered terminal states.</p>
  */
 public enum ExecutionState {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
deleted file mode 100644
index 081d4bc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.execution;
-
-import akka.actor.ActorRef;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.accumulators.AccumulatorEvent;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.messages.accumulators.ReportAccumulatorResult;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static com.google.common.base.Preconditions.checkElementIndex;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-public class RuntimeEnvironment implements Environment, Runnable {
-
-	private static final Logger LOG = LoggerFactory.getLogger(RuntimeEnvironment.class);
-
-	private static final ThreadGroup TASK_THREADS = new ThreadGroup("Task Threads");
-
-	/** The ActorRef to the job manager */
-	private final ActorRef jobManager;
-
-	/** The task that owns this environment */
-	private final Task owner;
-
-	/** The job configuration encapsulated in the environment object. */
-	private final Configuration jobConfiguration;
-
-	/** The task configuration encapsulated in the environment object. */
-	private final Configuration taskConfiguration;
-
-	/** ClassLoader for all user code classes */
-	private final ClassLoader userCodeClassLoader;
-
-	/** Instance of the class to be run in this environment. */
-	private final AbstractInvokable invokable;
-
-	/** The memory manager of the current environment (currently the one associated with the executing TaskManager). */
-	private final MemoryManager memoryManager;
-
-	/** The I/O manager of the current environment (currently the one associated with the executing TaskManager). */
-	private final IOManager ioManager;
-
-	/** The input split provider that can be queried for new input splits. */
-	private final InputSplitProvider inputSplitProvider;
-
-	/** The thread executing the task in the environment. */
-	private Thread executingThread;
-
-	private final BroadcastVariableManager broadcastVariableManager;
-
-	private final Map<String, FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
-
-	private final AtomicBoolean canceled = new AtomicBoolean();
-
-	private final ResultPartition[] producedPartitions;
-	private final ResultPartitionWriter[] writers;
-
-	private final SingleInputGate[] inputGates;
-
-	private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById = new HashMap<IntermediateDataSetID, SingleInputGate>();
-
-	public RuntimeEnvironment(
-			ActorRef jobManager, Task owner, TaskDeploymentDescriptor tdd, ClassLoader userCodeClassLoader,
-			MemoryManager memoryManager, IOManager ioManager, InputSplitProvider inputSplitProvider,
-			BroadcastVariableManager broadcastVariableManager, NetworkEnvironment networkEnvironment) throws Exception {
-
-		this.owner = checkNotNull(owner);
-
-		this.memoryManager = checkNotNull(memoryManager);
-		this.ioManager = checkNotNull(ioManager);
-		this.inputSplitProvider = checkNotNull(inputSplitProvider);
-		this.jobManager = checkNotNull(jobManager);
-
-		this.broadcastVariableManager = checkNotNull(broadcastVariableManager);
-
-		try {
-			// Produced intermediate result partitions
-			final List<ResultPartitionDeploymentDescriptor> partitions = tdd.getProducedPartitions();
-
-			this.producedPartitions = new ResultPartition[partitions.size()];
-			this.writers = new ResultPartitionWriter[partitions.size()];
-
-			for (int i = 0; i < this.producedPartitions.length; i++) {
-				ResultPartitionDeploymentDescriptor desc = partitions.get(i);
-				ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), owner.getExecutionId());
-
-				this.producedPartitions[i] = new ResultPartition(
-						this,
-						owner.getJobID(),
-						partitionId,
-						desc.getPartitionType(),
-						desc.getNumberOfSubpartitions(),
-						networkEnvironment.getPartitionManager(),
-						networkEnvironment.getPartitionConsumableNotifier(),
-						ioManager,
-						networkEnvironment.getDefaultIOMode());
-
-				writers[i] = new ResultPartitionWriter(this.producedPartitions[i]);
-			}
-
-			// Consumed intermediate result partitions
-			final List<InputGateDeploymentDescriptor> consumedPartitions = tdd.getInputGates();
-
-			this.inputGates = new SingleInputGate[consumedPartitions.size()];
-
-			for (int i = 0; i < inputGates.length; i++) {
-				inputGates[i] = SingleInputGate.create(
-						this, consumedPartitions.get(i), networkEnvironment);
-
-				// The input gates are organized by key for task updates/channel updates at runtime
-				inputGatesById.put(inputGates[i].getConsumedResultId(), inputGates[i]);
-			}
-
-			this.jobConfiguration = tdd.getJobConfiguration();
-			this.taskConfiguration = tdd.getTaskConfiguration();
-
-			// ----------------------------------------------------------------
-			// Invokable setup
-			// ----------------------------------------------------------------
-			// Note: This has to be done *after* the readers and writers have
-			// been setup, because the invokable relies on them for I/O.
-			// ----------------------------------------------------------------
-
-			// Load and instantiate the invokable class
-			this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
-			// Class of the task to run in this environment
-			Class<? extends AbstractInvokable> invokableClass;
-			try {
-				final String className = tdd.getInvokableClassName();
-				invokableClass = Class.forName(className, true, userCodeClassLoader).asSubclass(AbstractInvokable.class);
-			}
-			catch (Throwable t) {
-				throw new Exception("Could not load invokable class.", t);
-			}
-
-			try {
-				this.invokable = invokableClass.newInstance();
-			}
-			catch (Throwable t) {
-				throw new Exception("Could not instantiate the invokable class.", t);
-			}
-
-			this.invokable.setEnvironment(this);
-			this.invokable.registerInputOutput();
-		}
-		catch (Throwable t) {
-			throw new Exception("Error setting up runtime environment: " + t.getMessage(), t);
-		}
-	}
-
-	/**
-	 * Returns the task invokable instance.
-	 */
-	public AbstractInvokable getInvokable() {
-		return this.invokable;
-	}
-
-	@Override
-	public JobID getJobID() {
-		return this.owner.getJobID();
-	}
-
-	@Override
-	public JobVertexID getJobVertexId() {
-		return this.owner.getVertexID();
-	}
-
-	@Override
-	public void run() {
-		// quick fail in case the task was cancelled while the thread was started
-		if (owner.isCanceledOrFailed()) {
-			owner.cancelingDone();
-			return;
-		}
-
-		try {
-			Thread.currentThread().setContextClassLoader(userCodeClassLoader);
-			invokable.invoke();
-
-			// Make sure, we enter the catch block when the task has been canceled
-			if (owner.isCanceledOrFailed()) {
-				throw new CancelTaskException("Task has been canceled or failed");
-			}
-
-			// Finish the produced partitions
-			if (producedPartitions != null) {
-				for (ResultPartition partition : producedPartitions) {
-					if (partition != null) {
-						partition.finish();
-					}
-				}
-			}
-
-			if (owner.isCanceledOrFailed()) {
-				throw new CancelTaskException();
-			}
-
-			// Finally, switch execution state to FINISHED and report to job manager
-			if (!owner.markAsFinished()) {
-				throw new Exception("Could *not* notify job manager that the task is finished.");
-			}
-		}
-		catch (Throwable t) {
-			if (!owner.isCanceledOrFailed()) {
-				// Perform clean up when the task failed and has been not canceled by the user
-				try {
-					invokable.cancel();
-				}
-				catch (Throwable t2) {
-					LOG.error("Error while canceling the task", t2);
-				}
-			}
-
-			// if we are already set as cancelled or failed (when failure is triggered externally),
-			// mark that the thread is done.
-			if (owner.isCanceledOrFailed() || t instanceof CancelTaskException) {
-				owner.cancelingDone();
-			}
-			else {
-				// failure from inside the task thread. notify the task of the failure
-				owner.markFailed(t);
-			}
-		}
-	}
-
-	/**
-	 * Returns the thread, which is assigned to execute the user code.
-	 */
-	public Thread getExecutingThread() {
-		synchronized (this) {
-			if (executingThread == null) {
-				String name = owner.getTaskNameWithSubtasks();
-
-				if (LOG.isDebugEnabled()) {
-					name = name + " (" + owner.getExecutionId() + ")";
-				}
-
-				executingThread = new Thread(TASK_THREADS, this, name);
-			}
-
-			return executingThread;
-		}
-	}
-
-	public void cancelExecution() {
-		if (!canceled.compareAndSet(false, true)) {
-			return;
-		}
-
-		LOG.info("Canceling {} ({}).", owner.getTaskNameWithSubtasks(), owner.getExecutionId());
-
-		// Request user code to shut down
-		if (invokable != null) {
-			try {
-				invokable.cancel();
-			}
-			catch (Throwable e) {
-				LOG.error("Error while canceling the task.", e);
-			}
-		}
-
-		final Thread executingThread = this.executingThread;
-		if (executingThread != null) {
-			// interrupt the running thread and wait for it to die
-			executingThread.interrupt();
-			try {
-				executingThread.join(5000);
-			}
-			catch (InterruptedException e) {
-			}
-			if (!executingThread.isAlive()) {
-				return;
-			}
-			// Continuously interrupt the user thread until it changed to state CANCELED
-			while (executingThread != null && executingThread.isAlive()) {
-				LOG.warn("Task " + owner.getTaskNameWithSubtasks() + " did not react to cancelling signal. Sending repeated interrupt.");
-				if (LOG.isDebugEnabled()) {
-					StringBuilder bld = new StringBuilder("Task ").append(owner.getTaskNameWithSubtasks()).append(" is stuck in method:\n");
-					StackTraceElement[] stack = executingThread.getStackTrace();
-					for (StackTraceElement e : stack) {
-						bld.append(e).append('\n');
-					}
-					LOG.debug(bld.toString());
-				}
-				executingThread.interrupt();
-				try {
-					executingThread.join(1000);
-				}
-				catch (InterruptedException e) {
-				}
-			}
-		}
-	}
-
-	@Override
-	public ActorRef getJobManager() {
-		return jobManager;
-	}
-
-	@Override
-	public IOManager getIOManager() {
-		return ioManager;
-	}
-
-	@Override
-	public MemoryManager getMemoryManager() {
-		return memoryManager;
-	}
-
-	@Override
-	public BroadcastVariableManager getBroadcastVariableManager() {
-		return broadcastVariableManager;
-	}
-
-	@Override
-	public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) {
-		AccumulatorEvent evt;
-		try {
-			evt = new AccumulatorEvent(getJobID(), accumulators);
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Cannot serialize accumulators to send them to JobManager", e);
-		}
-
-		ReportAccumulatorResult accResult = new ReportAccumulatorResult(getJobID(), owner.getExecutionId(), evt);
-		jobManager.tell(accResult, ActorRef.noSender());
-	}
-
-	@Override
-	public ResultPartitionWriter getWriter(int index) {
-		checkElementIndex(index, writers.length, "Illegal environment writer request.");
-
-		return writers[checkElementIndex(index, writers.length)];
-	}
-
-	@Override
-	public ResultPartitionWriter[] getAllWriters() {
-		return writers;
-	}
-
-	@Override
-	public InputGate getInputGate(int index) {
-		checkElementIndex(index, inputGates.length);
-
-		return inputGates[index];
-	}
-
-	@Override
-	public SingleInputGate[] getAllInputGates() {
-		return inputGates;
-	}
-
-	public ResultPartition[] getProducedPartitions() {
-		return producedPartitions;
-	}
-
-	public SingleInputGate getInputGateById(IntermediateDataSetID id) {
-		return inputGatesById.get(id);
-	}
-
-	@Override
-	public Configuration getTaskConfiguration() {
-		return taskConfiguration;
-	}
-
-	@Override
-	public Configuration getJobConfiguration() {
-		return jobConfiguration;
-	}
-
-	@Override
-	public int getNumberOfSubtasks() {
-		return owner.getNumberOfSubtasks();
-	}
-
-	@Override
-	public int getIndexInSubtaskGroup() {
-		return owner.getSubtaskIndex();
-	}
-
-	@Override
-	public String getTaskName() {
-		return owner.getTaskName();
-	}
-
-	@Override
-	public InputSplitProvider getInputSplitProvider() {
-		return inputSplitProvider;
-	}
-
-	@Override
-	public String getTaskNameWithSubtasks() {
-		return owner.getTaskNameWithSubtasks();
-	}
-
-	@Override
-	public ClassLoader getUserClassLoader() {
-		return userCodeClassLoader;
-	}
-
-	public void addCopyTasksForCacheFile(Map<String, FutureTask<Path>> copyTasks) {
-		cacheCopyTasks.putAll(copyTasks);
-	}
-
-	public void addCopyTaskForCacheFile(String name, FutureTask<Path> copyTask) {
-		cacheCopyTasks.put(name, copyTask);
-	}
-
-	@Override
-	public Map<String, FutureTask<Path>> getCopyTask() {
-		return cacheCopyTasks;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index af55ebf..259ea55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -243,7 +243,7 @@ public class NetworkEnvironment {
 
 	public void registerTask(Task task) throws IOException {
 		final ResultPartition[] producedPartitions = task.getProducedPartitions();
-		final ResultPartitionWriter[] writers = task.getWriters();
+		final ResultPartitionWriter[] writers = task.getAllWriters();
 
 		if (writers.length != producedPartitions.length) {
 			throw new IllegalStateException("Unequal number of writers and partitions.");
@@ -288,7 +288,7 @@ public class NetworkEnvironment {
 			}
 
 			// Setup the buffer pool for each buffer reader
-			final SingleInputGate[] inputGates = task.getInputGates();
+			final SingleInputGate[] inputGates = task.getAllInputGates();
 
 			for (SingleInputGate gate : inputGates) {
 				BufferPool bufferPool = null;
@@ -329,10 +329,9 @@ public class NetworkEnvironment {
 				partitionManager.releasePartitionsProducedBy(executionId);
 			}
 
-			ResultPartitionWriter[] writers = task.getWriters();
-
+			ResultPartitionWriter[] writers = task.getAllWriters();
 			if (writers != null) {
-				for (ResultPartitionWriter writer : task.getWriters()) {
+				for (ResultPartitionWriter writer : writers) {
 					taskEventDispatcher.unregisterWriter(writer);
 				}
 			}
@@ -344,7 +343,7 @@ public class NetworkEnvironment {
 				}
 			}
 
-			final SingleInputGate[] inputGates = task.getInputGates();
+			final SingleInputGate[] inputGates = task.getAllInputGates();
 
 			if (inputGates != null) {
 				for (SingleInputGate gate : inputGates) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index f06c8fb..df1f254 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
@@ -76,9 +75,8 @@ import static com.google.common.base.Preconditions.checkState;
 public class ResultPartition implements BufferPoolOwner {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class);
-
-	/** The owning environment. Mainly for debug purposes. */
-	private final Environment owner;
+	
+	private final String owningTaskName;
 
 	private final JobID jobId;
 
@@ -120,7 +118,7 @@ public class ResultPartition implements BufferPoolOwner {
 	private long totalNumberOfBytes;
 
 	public ResultPartition(
-			Environment owner,
+			String owningTaskName,
 			JobID jobId,
 			ResultPartitionID partitionId,
 			ResultPartitionType partitionType,
@@ -130,7 +128,7 @@ public class ResultPartition implements BufferPoolOwner {
 			IOManager ioManager,
 			IOMode defaultIoMode) {
 
-		this.owner = checkNotNull(owner);
+		this.owningTaskName = checkNotNull(owningTaskName);
 		this.jobId = checkNotNull(jobId);
 		this.partitionId = checkNotNull(partitionId);
 		this.partitionType = checkNotNull(partitionType);
@@ -162,7 +160,7 @@ public class ResultPartition implements BufferPoolOwner {
 		// Initially, partitions should be consumed once before release.
 		pin();
 
-		LOG.debug("{}: Initialized {}", owner.getTaskNameWithSubtasks(), this);
+		LOG.debug("{}: Initialized {}", owningTaskName, this);
 	}
 
 	/**
@@ -281,7 +279,7 @@ public class ResultPartition implements BufferPoolOwner {
 	 */
 	public void release() {
 		if (isReleased.compareAndSet(false, true)) {
-			LOG.debug("{}: Releasing {}.", owner.getTaskNameWithSubtasks(), this);
+			LOG.debug("{}: Releasing {}.", owningTaskName, this);
 
 			// Release all subpartitions
 			for (ResultSubpartition subpartition : subpartitions) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index b0d138a..acda1d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
@@ -101,8 +100,8 @@ public class SingleInputGate implements InputGate {
 	/** Lock object to guard partition requests and runtime channel updates. */
 	private final Object requestLock = new Object();
 
-	/** The owning environment. Mainly for debug purposes. */
-	private final Environment owner;
+	/** The name of the owning task, for logging purposes. */
+	private final String owningTaskName;
 
 	/**
 	 * The ID of the consumed intermediate result. Each input gate consumes partitions of the
@@ -153,12 +152,12 @@ public class SingleInputGate implements InputGate {
 	private int numberOfUninitializedChannels;
 
 	public SingleInputGate(
-			Environment owner,
+			String owningTaskName,
 			IntermediateDataSetID consumedResultId,
 			int consumedSubpartitionIndex,
 			int numberOfInputChannels) {
 
-		this.owner = checkNotNull(owner);
+		this.owningTaskName = checkNotNull(owningTaskName);
 		this.consumedResultId = checkNotNull(consumedResultId);
 
 		checkArgument(consumedSubpartitionIndex >= 0);
@@ -265,7 +264,7 @@ public class SingleInputGate implements InputGate {
 		synchronized (requestLock) {
 			if (!isReleased) {
 				try {
-					LOG.debug("{}: Releasing {}.", owner.getTaskNameWithSubtasks(), this);
+					LOG.debug("{}: Releasing {}.", owningTaskName, this);
 
 					for (InputChannel inputChannel : inputChannels.values()) {
 						try {
@@ -410,7 +409,7 @@ public class SingleInputGate implements InputGate {
 	 * Creates an input gate and all of its input channels.
 	 */
 	public static SingleInputGate create(
-			Environment owner,
+			String owningTaskName,
 			InputGateDeploymentDescriptor igdd,
 			NetworkEnvironment networkEnvironment) {
 
@@ -422,7 +421,7 @@ public class SingleInputGate implements InputGate {
 		final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());
 
 		final SingleInputGate inputGate = new SingleInputGate(
-				owner, consumedResultId, consumedSubpartitionIndex, icdd.length);
+				owningTaskName, consumedResultId, consumedSubpartitionIndex, icdd.length);
 
 		// Create the input channels. There is one input channel for each consumed partition.
 		final InputChannel[] inputChannels = new InputChannel[icdd.length];

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index b528f75..2bee094 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -1067,7 +1067,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	public DistributedRuntimeUDFContext createRuntimeContext(String taskName) {
 		Environment env = getEnvironment();
 		return new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(),
-				env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), env.getCopyTask());
+				env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(),
+				env.getDistributedCacheEntries());
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
new file mode 100644
index 0000000..1321336
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorRef;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorEvent;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.messages.accumulators.ReportAccumulatorResult;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * In implementation of the {@link Environment}.
+ */
+public class RuntimeEnvironment implements Environment {
+	
+	private final JobID jobId;
+	private final JobVertexID jobVertexId;
+	private final ExecutionAttemptID executionId;
+	
+	private final String taskName;
+	private final String taskNameWithSubtasks;
+	private final int subtaskIndex;
+	private final int parallelism;
+	
+	private final Configuration jobConfiguration;
+	private final Configuration taskConfiguration;
+	
+	private final ClassLoader userCodeClassLoader;
+
+	private final MemoryManager memManager;
+	private final IOManager ioManager;
+	private final BroadcastVariableManager bcVarManager;
+	private final InputSplitProvider splitProvider;
+	
+	private final Map<String, Future<Path>> distCacheEntries;
+
+	private final ResultPartitionWriter[] writers;
+	private final InputGate[] inputGates;
+	
+	private final ActorRef jobManagerActor;
+	
+	// ------------------------------------------------------------------------
+
+	public RuntimeEnvironment(JobID jobId, JobVertexID jobVertexId, ExecutionAttemptID executionId,
+								String taskName, String taskNameWithSubtasks,
+								int subtaskIndex, int parallelism,
+								Configuration jobConfiguration, Configuration taskConfiguration,
+								ClassLoader userCodeClassLoader,
+								MemoryManager memManager, IOManager ioManager,
+								BroadcastVariableManager bcVarManager,
+								InputSplitProvider splitProvider,
+								Map<String, Future<Path>> distCacheEntries,
+								ResultPartitionWriter[] writers,
+								InputGate[] inputGates,
+								ActorRef jobManagerActor) {
+		
+		checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism);
+		
+		this.jobId = checkNotNull(jobId);
+		this.jobVertexId = checkNotNull(jobVertexId);
+		this.executionId = checkNotNull(executionId);
+		this.taskName = checkNotNull(taskName);
+		this.taskNameWithSubtasks = checkNotNull(taskNameWithSubtasks);
+		this.subtaskIndex = subtaskIndex;
+		this.parallelism = parallelism;
+		this.jobConfiguration = checkNotNull(jobConfiguration);
+		this.taskConfiguration = checkNotNull(taskConfiguration);
+		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+		this.memManager = checkNotNull(memManager);
+		this.ioManager = checkNotNull(ioManager);
+		this.bcVarManager = checkNotNull(bcVarManager);
+		this.splitProvider = checkNotNull(splitProvider);
+		this.distCacheEntries = checkNotNull(distCacheEntries);
+		this.writers = checkNotNull(writers);
+		this.inputGates = checkNotNull(inputGates);
+		this.jobManagerActor = checkNotNull(jobManagerActor);
+	}
+
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public JobID getJobID() {
+		return jobId;
+	}
+
+	@Override
+	public JobVertexID getJobVertexId() {
+		return jobVertexId;
+	}
+
+	@Override
+	public ExecutionAttemptID getExecutionId() {
+		return executionId;
+	}
+
+	@Override
+	public String getTaskName() {
+		return taskName;
+	}
+
+	@Override
+	public String getTaskNameWithSubtasks() {
+		return taskNameWithSubtasks;
+	}
+
+	@Override
+	public int getNumberOfSubtasks() {
+		return parallelism;
+	}
+
+	@Override
+	public int getIndexInSubtaskGroup() {
+		return subtaskIndex;
+	}
+
+	@Override
+	public Configuration getJobConfiguration() {
+		return jobConfiguration;
+	}
+
+	@Override
+	public Configuration getTaskConfiguration() {
+		return taskConfiguration;
+	}
+	
+	@Override
+	public ClassLoader getUserClassLoader() {
+		return userCodeClassLoader;
+	}
+
+	@Override
+	public MemoryManager getMemoryManager() {
+		return memManager;
+	}
+
+	@Override
+	public IOManager getIOManager() {
+		return ioManager;
+	}
+
+	@Override
+	public BroadcastVariableManager getBroadcastVariableManager() {
+		return bcVarManager;
+	}
+
+	@Override
+	public InputSplitProvider getInputSplitProvider() {
+		return splitProvider;
+	}
+
+	@Override
+	public Map<String, Future<Path>> getDistributedCacheEntries() {
+		return distCacheEntries;
+	}
+
+	@Override
+	public ResultPartitionWriter getWriter(int index) {
+		return writers[index];
+	}
+
+	@Override
+	public ResultPartitionWriter[] getAllWriters() {
+		return writers;
+	}
+
+	@Override
+	public InputGate getInputGate(int index) {
+		return inputGates[index];
+	}
+
+	@Override
+	public InputGate[] getAllInputGates() {
+		return inputGates;
+	}
+
+	@Override
+	public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) {
+		AccumulatorEvent evt;
+		try {
+			evt = new AccumulatorEvent(getJobID(), accumulators);
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Cannot serialize accumulators to send them to JobManager", e);
+		}
+
+		ReportAccumulatorResult accResult = new ReportAccumulatorResult(jobId, executionId, evt);
+		jobManagerActor.tell(accResult, ActorRef.noSender());
+	}
+
+	@Override
+	public ActorRef getJobManager() {
+		return jobManagerActor;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e6eee5b..f12344b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -19,382 +19,755 @@
 package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
+import akka.util.Timeout;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.RuntimeEnvironment;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
+import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.messages.TaskMessages.UnregisterTask;
-import org.apache.flink.runtime.profiling.TaskManagerProfiler;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
+import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
+import org.apache.flink.runtime.state.StateHandle;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-public class Task {
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Task represents one execution of a parallel subtask on a TaskManager.
+ * A Task wraps a Flink operator (which may be a user function) and
+ * runs it, providing all service necessary for example to consume input data,
+ * produce its results (intermediate result partitions) and communicate
+ * with the JobManager.
+ *
+ * <p>The Flink operators (implemented as subclasses of
+ * {@link org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable} have only data
+ * readers, -writers, and certain event callbacks. The task connects those to the
+ * network stack and actor messages, and tracks the state of the execution and
+ * handles exceptions.</p>
+ *
+ * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
+ * are the first attempt to execute the task, or a repeated attempt. All of that
+ * is only known to the JobManager. All the task knows are its own runnable code,
+ * the task's configuration, and the IDs of the intermediate results to consume and
+ * produce (if any).</p>
+ *
+ * <p>Each Task is run by one dedicated thread.</p>
+ */
+public class Task implements Runnable {
 
+	/** The class logger. */
+	private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+	
+	/** The tread group that contains all task threads */
+	private static final ThreadGroup TASK_THREADS_GROUP = new ThreadGroup("Flink Task Threads");
+	
 	/** For atomic state updates */
 	private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState.class, "executionState");
 
-	/** The log object used for debugging. */
-	private static final Logger LOG = LoggerFactory.getLogger(Task.class);
-
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+	//  Constant fields that are part of the initial Task construction
+	// ------------------------------------------------------------------------
 
+	/** The job that the task belongs to */
 	private final JobID jobId;
 
+	/** The vertex in the JobGraph whose code the task executes */
 	private final JobVertexID vertexId;
 
+	/** The execution attempt of the parallel subtask */
+	private final ExecutionAttemptID executionId;
+
+	/** The index of the parallel subtask, in [0, numberOfSubtasks) */
 	private final int subtaskIndex;
 
-	private final int numberOfSubtasks;
-
-	private final ExecutionAttemptID executionId;
+	/** The number of parallel subtasks for the JobVertex/ExecutionJobVertex that this task belongs to */
+	private final int parallelism;
 
+	/** The name of the task */
 	private final String taskName;
 
+	/** The name of the task, including the subtask index and the parallelism */
+	private final String taskNameWithSubtask;
+
+	/** The job-wide configuration object */
+	private final Configuration jobConfiguration;
+
+	/** The task-specific configuration */
+	private final Configuration taskConfiguration;
+
+	/** The jar files used by this task */
+	private final List<BlobKey> requiredJarFiles;
+
+	/** The name of the class that holds the invokable code */
+	private final String nameOfInvokableClass;
+
+	/** The handle to the state that the operator was initialized with */
+	private final StateHandle operatorState;
+
+	/** The memory manager to be used by this task */
+	private final MemoryManager memoryManager;
+
+	/** The I/O manager to be used by this task */
+	private final IOManager ioManager;
+
+	/** The BroadcastVariableManager to be used by this task */
+	private final BroadcastVariableManager broadcastVariableManager;
+
+	private final ResultPartition[] producedPartitions;
+
+	private final ResultPartitionWriter[] writers;
+
+	private final SingleInputGate[] inputGates;
+
+	private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById;
+
+	/** The TaskManager actor that spawned this task */
 	private final ActorRef taskManager;
 
-	private final List<ActorRef> executionListenerActors = new CopyOnWriteArrayList<ActorRef>();
+	/** The JobManager actor */
+	private final ActorRef jobManager;
+
+	/** All actors that want to be notified about changes in the task's execution state */
+	private final List<ActorRef> executionListenerActors;
+
+	/** The timeout for all ask operations on actors */
+	private final Timeout actorAskTimeout;
+
+	private final LibraryCacheManager libraryCache;
+	
+	private final FileCache fileCache;
+	
+	private final NetworkEnvironment network;
 
-	/** The environment (with the invokable) executed by this task */
-	private volatile RuntimeEnvironment environment;
+	/** The thread that executes the task */
+	private final Thread executingThread;
+	
 
+	// ------------------------------------------------------------------------
+	//  Fields that control the task execution
+	// ------------------------------------------------------------------------
+
+	private final AtomicBoolean invokableHasBeenCanceled = new AtomicBoolean(false);
+	
+	/** The invokable of this task, if initialized */
+	private volatile AbstractInvokable invokable;
+	
 	/** The current execution state of the task */
-	private volatile ExecutionState executionState = ExecutionState.DEPLOYING;
+	private volatile ExecutionState executionState = ExecutionState.CREATED;
 
+	/** The observed exception, in case the task execution failed */
 	private volatile Throwable failureCause;
 
-	// --------------------------------------------------------------------------------------------	
+	
+	/**
+	 * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to 
+	 * be undone in the case of a failing task deployment.</p>
+	 */
+	public Task(TaskDeploymentDescriptor tdd,
+				MemoryManager memManager,
+				IOManager ioManager,
+				NetworkEnvironment networkEnvironment,
+				BroadcastVariableManager bcVarManager,
+				ActorRef taskManagerActor,
+				ActorRef jobManagerActor,
+				FiniteDuration actorAskTimeout,
+				LibraryCacheManager libraryCache,
+				FileCache fileCache)
+	{
+		checkArgument(tdd.getNumberOfSubtasks() > 0);
+		checkArgument(tdd.getIndexInSubtaskGroup() >= 0);
+		checkArgument(tdd.getIndexInSubtaskGroup() < tdd.getNumberOfSubtasks());
+
+		this.jobId = checkNotNull(tdd.getJobID());
+		this.vertexId = checkNotNull(tdd.getVertexID());
+		this.executionId  = checkNotNull(tdd.getExecutionId());
+		this.subtaskIndex = tdd.getIndexInSubtaskGroup();
+		this.parallelism = tdd.getNumberOfSubtasks();
+		this.taskName = checkNotNull(tdd.getTaskName());
+		this.taskNameWithSubtask = getTaskNameWithSubtask(taskName, subtaskIndex, parallelism);
+		this.jobConfiguration = checkNotNull(tdd.getJobConfiguration());
+		this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration());
+		this.requiredJarFiles = checkNotNull(tdd.getRequiredJarFiles());
+		this.nameOfInvokableClass = checkNotNull(tdd.getInvokableClassName());
+		this.operatorState = tdd.getOperatorStates();
+
+		this.memoryManager = checkNotNull(memManager);
+		this.ioManager = checkNotNull(ioManager);
+		this.broadcastVariableManager =checkNotNull(bcVarManager);
+
+		this.jobManager = checkNotNull(jobManagerActor);
+		this.taskManager = checkNotNull(taskManagerActor);
+		this.actorAskTimeout = new Timeout(checkNotNull(actorAskTimeout));
+		
+		this.libraryCache = checkNotNull(libraryCache);
+		this.fileCache = checkNotNull(fileCache);
+		this.network = checkNotNull(networkEnvironment);
+
+		this.executionListenerActors = new CopyOnWriteArrayList<ActorRef>();
+
+		// create the reader and writer structures
+
+		final String taskNameWithSubtasksAndId =
+				Task.getTaskNameWithSubtaskAndID(taskName, subtaskIndex, parallelism, executionId);
+
+		List<ResultPartitionDeploymentDescriptor> partitions = tdd.getProducedPartitions();
+		List<InputGateDeploymentDescriptor> consumedPartitions = tdd.getInputGates();
+
+		// Produced intermediate result partitions
+		this.producedPartitions = new ResultPartition[partitions.size()];
+		this.writers = new ResultPartitionWriter[partitions.size()];
+
+		for (int i = 0; i < this.producedPartitions.length; i++) {
+			ResultPartitionDeploymentDescriptor desc = partitions.get(i);
+			ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId);
+
+			this.producedPartitions[i] = new ResultPartition(
+					taskNameWithSubtasksAndId,
+					jobId,
+					partitionId,
+					desc.getPartitionType(),
+					desc.getNumberOfSubpartitions(),
+					networkEnvironment.getPartitionManager(),
+					networkEnvironment.getPartitionConsumableNotifier(),
+					ioManager,
+					networkEnvironment.getDefaultIOMode());
+
+			this.writers[i] = new ResultPartitionWriter(this.producedPartitions[i]);
+		}
+
+		// Consumed intermediate result partitions
+		this.inputGates = new SingleInputGate[consumedPartitions.size()];
+		this.inputGatesById = new HashMap<IntermediateDataSetID, SingleInputGate>();
 
-	public Task(JobID jobId, JobVertexID vertexId, int taskIndex, int parallelism,
-			ExecutionAttemptID executionId, String taskName, ActorRef taskManager) {
+		for (int i = 0; i < this.inputGates.length; i++) {
+			SingleInputGate gate = SingleInputGate.create(
+					taskNameWithSubtasksAndId, consumedPartitions.get(i), networkEnvironment);
 
-		this.jobId = jobId;
-		this.vertexId = vertexId;
-		this.subtaskIndex = taskIndex;
-		this.numberOfSubtasks = parallelism;
-		this.executionId = executionId;
-		this.taskName = taskName;
-		this.taskManager = taskManager;
+			this.inputGates[i] = gate;
+			inputGatesById.put(gate.getConsumedResultId(), gate);
+		}
+		
+		// finally, create the executing thread, but do not start it
+		executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); 
 	}
 
-	/**
-	 * Returns the ID of the job this task belongs to.
-	 */
+	// ------------------------------------------------------------------------
+	//  Accessors
+	// ------------------------------------------------------------------------
+
 	public JobID getJobID() {
-		return this.jobId;
+		return jobId;
 	}
 
-	/**
-	 * Returns the ID of this task vertex.
-	 */
-	public JobVertexID getVertexID() {
-		return this.vertexId;
+	public JobVertexID getJobVertexId() {
+		return vertexId;
 	}
 
-	/**
-	 * Gets the index of the parallel subtask [0, parallelism).
-	 */
-	public int getSubtaskIndex() {
+	public ExecutionAttemptID getExecutionId() {
+		return executionId;
+	}
+
+	public int getIndexInSubtaskGroup() {
 		return subtaskIndex;
 	}
 
-	/**
-	 * Gets the total number of subtasks of the task that this subtask belongs to.
-	 */
 	public int getNumberOfSubtasks() {
-		return numberOfSubtasks;
+		return parallelism;
 	}
 
-	/**
-	 * Gets the ID of the execution attempt.
-	 */
-	public ExecutionAttemptID getExecutionId() {
-		return executionId;
+	public String getTaskName() {
+		return taskName;
+	}
+
+	public String getTaskNameWithSubtasks() {
+		return taskNameWithSubtask;
+	}
+
+	public Configuration getJobConfiguration() {
+		return jobConfiguration;
+	}
+	
+	public Configuration getTaskConfiguration() {
+		return this.taskConfiguration;
+	}
+	
+	public ResultPartitionWriter[] getAllWriters() {
+		return writers;
+	}
+	
+	public SingleInputGate[] getAllInputGates() {
+		return inputGates;
+	}
+
+	public ResultPartition[] getProducedPartitions() {
+		return producedPartitions;
+	}
+
+	public SingleInputGate getInputGateById(IntermediateDataSetID id) {
+		return inputGatesById.get(id);
+	}
+
+	public Thread getExecutingThread() {
+		return executingThread;
 	}
 
+	// ------------------------------------------------------------------------
+	//  Task Execution
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Returns the current execution state of the task.
+	 * @return The current execution state of the task.
 	 */
 	public ExecutionState getExecutionState() {
 		return this.executionState;
 	}
 
-	public void setEnvironment(RuntimeEnvironment environment) {
-		this.environment = environment;
-	}
-
-	public RuntimeEnvironment getEnvironment() {
-		return environment;
-	}
-
+	/**
+	 * Checks whether the task has failed, is canceled, or is being canceled at the moment.
+	 * @return True is the task in state FAILED, CANCELING, or CANCELED, false otherwise.
+	 */
 	public boolean isCanceledOrFailed() {
 		return executionState == ExecutionState.CANCELING ||
 				executionState == ExecutionState.CANCELED ||
 				executionState == ExecutionState.FAILED;
 	}
 
-	public String getTaskName() {
-		if (LOG.isDebugEnabled()) {
-			return taskName + " (" + executionId + ")";
-		} else {
-			return taskName;
-		}
-	}
-
-	public String getTaskNameWithSubtasks() {
-		if (LOG.isDebugEnabled()) {
-			return this.taskName + " (" + (this.subtaskIndex + 1) + "/" + this.numberOfSubtasks +
-					") (" + executionId + ")";
-		} else {
-			return this.taskName + " (" + (this.subtaskIndex + 1) + "/" + this.numberOfSubtasks + ")";
-		}
-	}
-
+	/**
+	 * If the task has failed, this method gets the exception that caused this task to fail.
+	 * Otherwise this method returns null.
+	 *
+	 * @return The exception that caused the task to fail, or null, if the task has not failed.
+	 */
 	public Throwable getFailureCause() {
 		return failureCause;
 	}
 
-	// ----------------------------------------------------------------------------------------------------------------
-	//  States and Transitions
-	// ----------------------------------------------------------------------------------------------------------------
-
 	/**
-	 * Marks the task as finished. This succeeds, if the task was previously in the state
-	 * "RUNNING", otherwise it fails. Failure indicates that the task was either
-	 * canceled, or set to failed.
-	 *
-	 * @return True, if the task correctly enters the state FINISHED.
+	 * Starts the task's thread.
 	 */
-	public boolean markAsFinished() {
-		if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) {
-			notifyObservers(ExecutionState.FINISHED, null);
-			unregisterTask();
-			return true;
-		}
-		else {
-			return false;
-		}
+	public void startTaskThread() {
+		executingThread.start();
 	}
 
-	public void markFailed(Throwable error) {
+	/**
+	 * The core work method that bootstraps the task and executes it code
+	 */
+	public void run() {
+
+		// ----------------------------
+		//  Initial State transition
+		// ----------------------------
 		while (true) {
 			ExecutionState current = this.executionState;
-
-			// if canceled, fine. we are done, and the jobmanager has been told
-			if (current == ExecutionState.CANCELED) {
+			if (current == ExecutionState.CREATED) {
+				if (STATE_UPDATER.compareAndSet(this, ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
+					// success, we can start our work
+					break;
+				}
+			}
+			else if (current == ExecutionState.FAILED) {
+				// we were immediately failed. tell the TaskManager that we reached our final state
+				notifyFinalState();
 				return;
 			}
+			else if (current == ExecutionState.CANCELING) {
+				if (STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) {
+					// we were immediately canceled. tell the TaskManager that we reached our final state
+					notifyFinalState();
+					return;
+				}
+			}
+			else {
+				throw new IllegalStateException("Invalid state for beginning of task operation");
+			}
+		}
 
-			// if canceling, we are done, but we cannot be sure that the jobmanager has been told.
-			// after all, we may have recognized our failure state before the cancelling and never sent a canceled
-			// message back
-			else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
-				this.failureCause = error;
+		// all resource acquisitions and registrations from here on
+		// need to be undone in the end
 
-				notifyObservers(ExecutionState.FAILED, ExceptionUtils.stringifyException(error));
-				unregisterTask();
+		Map<String, Future<Path>> distributedCacheEntries = new HashMap<String, Future<Path>>();
 
-				return;
-			}
-		}
-	}
+		AbstractInvokable invokable = null;
 
-	public void cancelExecution() {
-		while (true) {
-			ExecutionState current = this.executionState;
+		try {
+			// ----------------------------
+			//  Task Bootstrap - We periodically 
+			//  check for canceling as a shortcut
+			// ----------------------------
 
-			// if the task is already canceled (or canceling) or finished or failed,
-			// then we need not do anything
-			if (current == ExecutionState.FINISHED || current == ExecutionState.CANCELED ||
-					current == ExecutionState.CANCELING || current == ExecutionState.FAILED) {
-				return;
-			}
+			// first of all, get a user-code classloader
+			// this may involve downloading the job's JAR files and/or classes
+			LOG.info("Loading JAR files for task " + taskNameWithSubtask);
+			final ClassLoader userCodeClassLoader = createUserCodeClassloader(libraryCache);
 
-			if (current == ExecutionState.DEPLOYING) {
-				// directly set to canceled
-				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+			// now load the task's invokable code
+			invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
 
-					notifyObservers(ExecutionState.CANCELED, null);
-					unregisterTask();
-					return;
-				}
+			if (isCanceledOrFailed()) {
+				throw new CancelTaskException();
 			}
-			else if (current == ExecutionState.RUNNING) {
-				// go to canceling and perform the actual task canceling
-				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELING)) {
-
-					notifyObservers(ExecutionState.CANCELING, null);
-					try {
-						this.environment.cancelExecution();
-					}
-					catch (Throwable e) {
-						LOG.error("Error while cancelling the task.", e);
-					}
 
-					return;
+			// ----------------------------------------------------------------
+			// register the task with the network stack
+			// this operation may fail if the system does not have enough
+			// memory to run the necessary data exchanges
+			// the registration must also strictly be undone
+			// ----------------------------------------------------------------
+
+			LOG.info("Registering task at network: " + this);
+			network.registerTask(this);
+
+			// next, kick off the background copying of files for the distributed cache
+			try {
+				for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
+						DistributedCache.readFileInfoFromConfig(jobConfiguration))
+				{
+					LOG.info("Obtaining local cache file for '" + entry.getKey() + '\'');
+					Future<Path> cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId);
+					distributedCacheEntries.put(entry.getKey(), cp);
 				}
 			}
-			else {
-				throw new RuntimeException("unexpected state for cancelling: " + current);
+			catch (Exception e) {
+				throw new Exception("Exception while adding files to distributed cache.", e);
 			}
-		}
-	}
 
-	/**
-	 * Sets the tasks to be cancelled and reports a failure back to the master.
-	 */
-	public void failExternally(Throwable cause) {
-		while (true) {
-			ExecutionState current = this.executionState;
-
-			// if the task is already canceled (or canceling) or finished or failed,
-			// then we need not do anything
-			if (current == ExecutionState.CANCELED || current == ExecutionState.CANCELING || current == ExecutionState.FAILED) {
-				return;
+			if (isCanceledOrFailed()) {
+				throw new CancelTaskException();
 			}
 
-			if (current == ExecutionState.FINISHED) {
-				// Set state to failed in order to correctly unregister task from network environment
-				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
-					notifyObservers(ExecutionState.FAILED, null);
+			// ----------------------------------------------------------------
+			//  call the user code initialization methods
+			// ----------------------------------------------------------------
 
-					return;
+			TaskInputSplitProvider splitProvider = new TaskInputSplitProvider(jobManager,
+					jobId, vertexId, executionId, userCodeClassLoader, actorAskTimeout);
+
+			Environment env = new RuntimeEnvironment(jobId, vertexId, executionId,
+					taskName, taskNameWithSubtask, subtaskIndex, parallelism,
+					jobConfiguration, taskConfiguration,
+					userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager,
+					splitProvider, distributedCacheEntries,
+					writers, inputGates, jobManager);
+
+			// let the task code create its readers and writers
+			invokable.setEnvironment(env);
+			try {
+				invokable.registerInputOutput();
+			}
+			catch (Exception e) {
+				throw new Exception("Call to registerInputOutput() of invokable failed", e);
+			}
+
+			// the very last thing before the actual execution starts running is to inject
+			// the state into the task. the state is non-empty if this is an execution
+			// of a task that failed but had backuped state from a checkpoint
+			if (operatorState != null) {
+				if (invokable instanceof OperatorStateCarrier) {
+					((OperatorStateCarrier) invokable).injectState(operatorState);
+				}
+				else {
+					throw new IllegalStateException("Found operator state for a non-stateful task invokable");
 				}
 			}
 
-			if (current == ExecutionState.DEPLOYING) {
-				// directly set to canceled
-				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
-					this.failureCause = cause;
+			// ----------------------------------------------------------------
+			//  actual task core work
+			// ----------------------------------------------------------------
 
-					notifyObservers(ExecutionState.FAILED, null);
-					unregisterTask();
-					return;
+			// we must make strictly sure that the invokable is accessible to teh cancel() call
+			// by the time we switched to running.
+			this.invokable = invokable;
+
+			// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
+			if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
+				throw new CancelTaskException();
+			}
+			
+			// notify everyone that we switched to running. especially the TaskManager needs
+			// to know this!
+			notifyObservers(ExecutionState.RUNNING, null);
+			taskManager.tell(new TaskMessages.UpdateTaskExecutionState(
+					new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)), ActorRef.noSender());
+
+			// make sure the user code classloader is accessible thread-locally
+			executingThread.setContextClassLoader(userCodeClassLoader);
+
+			// run the invokable
+			invokable.invoke();
+
+			// make sure, we enter the catch block if the task leaves the invoke() method due
+			// to the fact that it has been canceled
+			if (isCanceledOrFailed()) {
+				throw new CancelTaskException();
+			}
+
+			// ----------------------------------------------------------------
+			//  finalization of a successful execution
+			// ----------------------------------------------------------------
+
+			// finish the produced partitions. if this fails, we consider the execution failed.
+			for (ResultPartition partition : producedPartitions) {
+				if (partition != null) {
+					partition.finish();
 				}
 			}
-			else if (current == ExecutionState.RUNNING) {
-				// go to canceling and perform the actual task canceling
-				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
-					try {
-						this.environment.cancelExecution();
+
+			// try to mark the task as finished
+			// if that fails, the task was canceled/failed in the meantime
+			if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) {
+				notifyObservers(ExecutionState.FINISHED, null);
+			}
+			else {
+				throw new CancelTaskException();
+			}
+		}
+		catch (Throwable t) {
+
+			// ----------------------------------------------------------------
+			// the execution failed. either the invokable code properly failed, or
+			// an exception was thrown as a side effect of cancelling
+			// ----------------------------------------------------------------
+
+			try {
+				// transition into our final state. we should be either in RUNNING, CANCELING, or FAILED 
+				// loop for multiple retries during concurrent state changes via calls to cancel() or
+				// to failExternally()
+				while (true) {
+					ExecutionState current = this.executionState;
+					if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
+						if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+							// proper failure of the task. record the exception as the root cause
+							failureCause = t;
+							notifyObservers(ExecutionState.FAILED, t);
+
+							// in case of an exception during execution, we still call "cancel()" on the task
+							if (invokable != null && this.invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
+								try {
+									invokable.cancel();
+								}
+								catch (Throwable t2) {
+									LOG.error("Error while canceling task " + taskNameWithSubtask, t2);
+								}
+							}
+							break;
+						}
 					}
-					catch (Throwable e) {
-						LOG.error("Error while cancelling the task.", e);
+					else if (current == ExecutionState.CANCELING) {
+						if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+							notifyObservers(ExecutionState.CANCELED, null);
+							break;
+						}
 					}
+					else if (current == ExecutionState.FAILED) {
+						// in state failed already, no transition necessary any more
+						break;
+					}
+					// unexpected state, go to failed
+					else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+						LOG.error("Unexpected state in Task during an exception: " + current);
+						break;
+					}
+					// else fall through the loop and 
+				}
+			}
+			catch (Throwable tt) {
+				String message = "FATAL - exception in task exception handler";
+				LOG.error(message, tt);
+				notifyFatalError(message, tt);
+			}
+		}
+		finally {
+			try {
+				LOG.info("Freeing task resources for " + taskNameWithSubtask);
+				
+				// free the network resources
+				network.unregisterTask(this);
+
+				if (invokable != null) {
+					memoryManager.releaseAll(invokable);
+				}
 
-					this.failureCause = cause;
+				// remove all of the tasks library resources
+				libraryCache.unregisterTask(jobId, executionId);
 
-					notifyObservers(ExecutionState.FAILED, null);
-					unregisterTask();
+				// remove all files in the distributed cache
+				removeCachedFiles(distributedCacheEntries, fileCache);
 
-					return;
-				}
+				notifyFinalState();
 			}
-			else {
-				throw new RuntimeException("unexpected state for failing the task: " + current);
+			catch (Throwable t) {
+				// an error in the resource cleanup is fatal
+				String message = "FATAL - exception in task resource cleanup";
+				LOG.error(message, t);
+				notifyFatalError(message, t);
 			}
 		}
 	}
 
-	public void cancelingDone() {
-		while (true) {
-			ExecutionState current = this.executionState;
+	private ClassLoader createUserCodeClassloader(LibraryCacheManager libraryCache) throws Exception {
+		long startDownloadTime = System.currentTimeMillis();
 
-			if (current == ExecutionState.CANCELED || current == ExecutionState.FAILED) {
-				return;
-			}
-			if (!(current == ExecutionState.RUNNING || current == ExecutionState.CANCELING)) {
-				LOG.error(String.format("Unexpected state transition in Task: %s -> %s", current, ExecutionState.CANCELED));
-			}
+		// triggers the download of all missing jar files from the job manager
+		libraryCache.registerTask(jobId, executionId, requiredJarFiles);
 
-			if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
-				notifyObservers(ExecutionState.CANCELED, null);
-				unregisterTask();
-				return;
-			}
+		LOG.debug("Register task {} at library cache manager took {} milliseconds",
+				executionId, System.currentTimeMillis() - startDownloadTime);
+
+		ClassLoader userCodeClassLoader = libraryCache.getClassLoader(jobId);
+		if (userCodeClassLoader == null) {
+			throw new Exception("No user code classloader available.");
 		}
+		return userCodeClassLoader;
 	}
 
-	/**
-	 * Starts the execution of this task.
-	 */
-	public boolean startExecution() {
-		LOG.info("Starting execution of task {}", this.getTaskName());
-		if (STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
-			final Thread thread = this.environment.getExecutingThread();
-			thread.start();
-			return true;
+	private AbstractInvokable loadAndInstantiateInvokable(ClassLoader classLoader, String className) throws Exception {
+		Class<? extends AbstractInvokable> invokableClass;
+		try {
+			invokableClass = Class.forName(className, true, classLoader)
+					.asSubclass(AbstractInvokable.class);
 		}
-		else {
-			return false;
+		catch (Throwable t) {
+			throw new Exception("Could not load the task's invokable class.", t);
+		}
+		try {
+			return invokableClass.newInstance();
+		}
+		catch (Throwable t) {
+			throw new Exception("Could not instantiate the task's invokable class.", t);
 		}
 	}
 
-	/**
-	 * Unregisters the task from the central memory manager.
-	 */
-	public void unregisterMemoryManager(MemoryManager memoryManager) {
-		RuntimeEnvironment env = this.environment;
-		if (memoryManager != null && env != null) {
-			memoryManager.releaseAll(env.getInvokable());
+	private void removeCachedFiles(Map<String, Future<Path>> entries, FileCache fileCache) {
+		// cancel and release all distributed cache files
+		try {
+			for (Map.Entry<String, Future<Path>> entry : entries.entrySet()) {
+				String name = entry.getKey();
+				try {
+					fileCache.deleteTmpFile(name, jobId);
+				}
+				catch (Exception e) {
+					// unpleasant, but we continue
+					LOG.error("Distributed Cache could not remove cached file registered under '"
+							+ name + "'.", e);
+				}
+			}
+		}
+		catch (Throwable t) {
+			LOG.error("Error while removing cached local files from distributed cache.");
 		}
 	}
 
-	protected void unregisterTask() {
-		taskManager.tell(new UnregisterTask(executionId), ActorRef.noSender());
+	private void notifyFinalState() {
+		taskManager.tell(new TaskInFinalState(executionId), ActorRef.noSender());
 	}
 
-	// -----------------------------------------------------------------------------------------------------------------
-	//                                        Task Profiling
-	// -----------------------------------------------------------------------------------------------------------------
+	private void notifyFatalError(String message, Throwable cause) {
+		taskManager.tell(new FatalError(message, cause), ActorRef.noSender());
+	}
 
-	/**
-	 * Registers the task manager profiler with the task.
-	 */
-	public void registerProfiler(TaskManagerProfiler taskManagerProfiler, Configuration jobConfiguration) {
-		taskManagerProfiler.registerTask(this, jobConfiguration);
+	// ----------------------------------------------------------------------------------------------------------------
+	//  Canceling / Failing the task from the outside
+	// ----------------------------------------------------------------------------------------------------------------
+
+	public void cancelExecution() {
+		LOG.info("Attempting to cancel task " + taskNameWithSubtask);
+		if (cancelOrFailAndCancelInvokable(ExecutionState.CANCELING)) {
+			notifyObservers(ExecutionState.CANCELING, null);
+		}
 	}
 
 	/**
-	 * Unregisters the task from the task manager profiler.
+	 * Sets the tasks to be cancelled and reports a failure back to the master.
 	 */
-	public void unregisterProfiler(TaskManagerProfiler taskManagerProfiler) {
-		if (taskManagerProfiler != null) {
-			taskManagerProfiler.unregisterTask(this.executionId);
+	public void failExternally(Throwable cause) {
+		LOG.info("Attempting to fail task externally " + taskNameWithSubtask);
+		if (cancelOrFailAndCancelInvokable(ExecutionState.FAILED)) {
+			failureCause = cause;
+			notifyObservers(ExecutionState.FAILED, cause);
 		}
 	}
 
-	// ------------------------------------------------------------------------
-	// Intermediate result partitions
-	// ------------------------------------------------------------------------
-
-	public SingleInputGate[] getInputGates() {
-		return environment != null ? environment.getAllInputGates() : null;
-	}
+	private boolean cancelOrFailAndCancelInvokable(ExecutionState targetState) {
+		while (true) {
+			ExecutionState current = this.executionState;
 
-	public ResultPartitionWriter[] getWriters() {
-		return environment != null ? environment.getAllWriters() : null;
-	}
+			// if the task is already canceled (or canceling) or finished or failed,
+			// then we need not do anything
+			if (current.isTerminal() || current == ExecutionState.CANCELING) {
+				return false;
+			}
 
-	public ResultPartition[] getProducedPartitions() {
-		return environment != null ? environment.getProducedPartitions() : null;
+			if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
+				if (STATE_UPDATER.compareAndSet(this, current, targetState)) {
+					// if we manage this state transition, then the invokable gets never called
+					// we need not call cancel on it
+					return true;
+				}
+			}
+			else if (current == ExecutionState.RUNNING) {
+				if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, targetState)) {
+					// we are canceling / failing out of the running state
+					// we need to cancel the invokable
+					if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
+						LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
+
+						// because the canceling may block on user code, we cancel from a separate thread
+						Runnable canceler = new TaskCanceler(LOG, invokable, executingThread, taskNameWithSubtask);
+						Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler,
+								"Canceler for " + taskNameWithSubtask);
+						cancelThread.start();
+					}
+					return true;
+				}
+			}
+			else {
+				throw new IllegalStateException("Unexpected task state: " + current);
+			}
+		}
 	}
 
-	// --------------------------------------------------------------------------------------------
-	//                                     State Listeners
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+	//  State Listeners
+	// ------------------------------------------------------------------------
 
 	public void registerExecutionListener(ActorRef listener) {
 		executionListenerActors.add(listener);
@@ -404,25 +777,146 @@ public class Task {
 		executionListenerActors.remove(listener);
 	}
 
-	private void notifyObservers(ExecutionState newState, String message) {
-		if (LOG.isInfoEnabled()) {
-			LOG.info(getTaskNameWithSubtasks() + " switched to " + newState + (message == null ? "" : " : " + message));
+	private void notifyObservers(ExecutionState newState, Throwable error) {
+		if (error == null) {
+			LOG.info(taskNameWithSubtask + " switched to " + newState);
+		}
+		else {
+			LOG.info(taskNameWithSubtask + " switched to " + newState + " with exception.", error);
 		}
 
+		TaskExecutionState stateUpdate = new TaskExecutionState(jobId, executionId, newState, error);
+		TaskMessages.UpdateTaskExecutionState actorMessage = new
+				TaskMessages.UpdateTaskExecutionState(stateUpdate);
+
 		for (ActorRef listener : executionListenerActors) {
-			listener.tell(new ExecutionGraphMessages.ExecutionStateChanged(
-							jobId, vertexId, taskName, numberOfSubtasks, subtaskIndex,
-							executionId, newState, System.currentTimeMillis(), message),
-					ActorRef.noSender());
+			listener.tell(actorMessage, ActorRef.noSender());
 		}
 	}
 
-	// --------------------------------------------------------------------------------------------
-	//                                       Utilities
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+	//  Notifications on the invokable
+	// ------------------------------------------------------------------------
+
+	public void triggerCheckpointBarrier(final long checkpointID) {
+		AbstractInvokable invokabe = this.invokable;
+		
+		if (executionState == ExecutionState.RUNNING && invokabe != null) {
+			if (invokabe instanceof BarrierTransceiver) {
+				final BarrierTransceiver barrierTransceiver = (BarrierTransceiver) invokabe;
+				final Logger logger = LOG;
+				
+				Thread caller = new Thread("Barrier emitter") {
+					@Override
+					public void run() {
+						try {
+							barrierTransceiver.broadcastBarrierFromSource(checkpointID);
+						}
+						catch (Throwable t) {
+							logger.error("Error while triggering checkpoint barriers", t);
+						}
+					}
+				};
+				caller.setDaemon(true);
+				caller.start();
+			}
+			else {
+				LOG.error("Task received a checkpoint request, but is not a checkpointing task - "
+						+ taskNameWithSubtask);
+			}
+		}
+		else {
+			LOG.debug("Ignoring request to trigger a checkpoint barrier");
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
 
 	@Override
 	public String toString() {
 		return getTaskNameWithSubtasks() + " [" + executionState + ']';
 	}
+	
+	// ------------------------------------------------------------------------
+	//  Task Names
+	// ------------------------------------------------------------------------
+
+	public static String getTaskNameWithSubtask(String name, int subtask, int numSubtasks) {
+		return name + " (" + (subtask+1) + '/' + numSubtasks + ')';
+	}
+
+	public static String getTaskNameWithSubtaskAndID(String name, int subtask, int numSubtasks, ExecutionAttemptID id) {
+		return name + " (" + (subtask+1) + '/' + numSubtasks + ") (" + id + ')';
+	}
+
+	/**
+	 * This runner calls cancel() on the invokable and periodically interrupts the
+	 * thread until it has terminated.
+	 */
+	private static class TaskCanceler implements Runnable {
+
+		private final Logger logger;
+		private final AbstractInvokable invokable;
+		private final Thread executer;
+		private final String taskName;
+
+		public TaskCanceler(Logger logger, AbstractInvokable invokable, Thread executer, String taskName) {
+			this.logger = logger;
+			this.invokable = invokable;
+			this.executer = executer;
+			this.taskName = taskName;
+		}
+
+		@Override
+		public void run() {
+			try {
+				// the user-defined cancel method may throw errors.
+				// we need do continue despite that
+				try {
+					invokable.cancel();
+				}
+				catch (Throwable t) {
+					logger.error("Error while canceling the task", t);
+				}
+
+				// interrupt the running thread initially 
+				executer.interrupt();
+				try {
+					executer.join(10000);
+				}
+				catch (InterruptedException e) {
+					// we can ignore this
+				}
+
+				// it is possible that the user code does not react immediately. for that
+				// reason, we spawn a separate thread that repeatedly interrupts the user code until
+				// it exits
+				while (executer.isAlive()) {
+
+					// build the stack trace of where the thread is stuck, for the log
+					StringBuilder bld = new StringBuilder();
+					StackTraceElement[] stack = executer.getStackTrace();
+					for (StackTraceElement e : stack) {
+						bld.append(e).append('\n');
+					}
+
+					logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}",
+							taskName, bld.toString());
+
+					executer.interrupt();
+					try {
+						executer.join(5000);
+					}
+					catch (InterruptedException e) {
+						// we can ignore this
+					}
+				}
+			}
+			catch (Throwable t) {
+				logger.error("Error in the task canceler", t);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index c81830c..b12f1b5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -24,7 +24,15 @@ import org.apache.flink.runtime.instance.InstanceID
  * Miscellaneous actor messages exchanged with the TaskManager.
  */
 object TaskManagerMessages {
-
+  
+  /**
+   * This message informs the TaskManager about a fatal error that prevents
+   * it from continuing.
+   * 
+   * @param description The description of the problem
+   */
+  case class FatalError(description: String, cause: Throwable)
+  
   /**
    * Tells the task manager to send a heartbeat message to the job manager.
    */
@@ -49,7 +57,7 @@ object TaskManagerMessages {
 
 
   // --------------------------------------------------------------------------
-  //  Utility messages used for notifications during TaskManager startup
+  //  Reporting the current TaskManager stack trace
   // --------------------------------------------------------------------------
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
index c8c5726..b1a08ca 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
@@ -67,12 +67,12 @@ object TaskMessages {
     extends TaskMessage
 
   /**
-   * Unregister the task identified by [[executionID]] from the TaskManager.
-   * Sent to the TaskManager by futures and callbacks.
+   * Notifies the TaskManager that the task has reached its final state,
+   * either FINISHED, CANCELED, or FAILED.
    *
    * @param executionID The task's execution attempt ID.
    */
-  case class UnregisterTask(executionID: ExecutionAttemptID)
+  case class TaskInFinalState(executionID: ExecutionAttemptID)
     extends TaskMessage