You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:07 UTC

[43/63] [abbrv] git commit: Adjust ExecutionGraph state machine to TaskManager's failing model (direct transitions to canceled)

Adjust ExecutionGraph state machine to TaskManager's failing model (direct transitions to canceled)


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/25acb6ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/25acb6ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/25acb6ba

Branch: refs/heads/master
Commit: 25acb6ba7724f40ac041a499b607ac0206eadc97
Parents: ae139f5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Sep 11 16:31:50 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:49 2014 +0200

----------------------------------------------------------------------
 .../flink/core/memory/DataInputViewStream.java  |   2 +-
 .../deployment/TaskDeploymentDescriptor.java    |   4 +
 .../runtime/execution/RuntimeEnvironment.java   |  62 +-
 .../flink/runtime/executiongraph/Execution.java | 181 +++--
 .../runtime/executiongraph/ExecutionGraph.java  | 267 ++++----
 .../executiongraph/ExecutionJobVertex.java      |   4 +-
 .../runtime/executiongraph/ExecutionVertex.java |   5 +-
 .../executiongraph/IntermediateResult.java      |   9 +
 .../IntermediateResultPartition.java            |   1 +
 .../runtime/instance/InstanceDiedException.java |  25 +-
 .../runtime/io/network/ChannelManager.java      |   2 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  12 +
 .../flink/runtime/jobgraph/JobStatus.java       |  27 +-
 .../runtime/jobmanager/EventCollector.java      |   6 +
 .../flink/runtime/jobmanager/JobManager.java    |  20 +-
 .../jobmanager/scheduler/DefaultScheduler.java  | 413 ------------
 .../runtime/jobmanager/scheduler/Scheduler.java | 447 +++++++++++++
 .../apache/flink/runtime/taskmanager/Task.java  |  35 +-
 .../flink/runtime/taskmanager/TaskManager.java  |  39 +-
 .../runtime/util/ExecutorThreadFactory.java     |   4 +-
 .../ExecutionGraphDeploymentTest.java           |   4 +-
 .../ExecutionVertexCancelTest.java              |  45 +-
 .../ExecutionVertexSchedulingTest.java          |   8 +-
 .../runtime/jobmanager/JobManagerITCase.java    | 670 ++++++++++++++++++-
 .../scheduler/SchedulerIsolatedTasksTest.java   |  12 +-
 .../scheduler/SchedulerSlotSharingTest.java     |  14 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  14 -
 .../util/EnvironmentInformationTest.java        |  25 +-
 .../runtime/util/KeyGroupedIteratorTest.java    |   1 -
 29 files changed, 1623 insertions(+), 735 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java
index 25835f5..8fe7b03 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java
@@ -36,7 +36,7 @@ public final class DataInputViewStream extends InputStream {
 		return inputView.readByte();
 	}
 	
-	public int read(byte b[], int off, int len) throws IOException {
+	public int read(byte[] b, int off, int len) throws IOException {
 		inputView.readFully(b, off, len);
 		return len;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 2d00f40..e1e80f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -228,6 +228,10 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
 		return inputGates;
 	}
 	
+	public String[] getRequiredJarFiles() {
+		return requiredJarFiles;
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Serialization
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index eef081c..79a4aaa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -113,7 +113,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	
 	/** The thread executing the task in the environment. */
-	private volatile Thread executingThread;
+	private Thread executingThread;
 
 	/**
 	 * The RPC proxy to report accumulators to JobManager
@@ -221,7 +221,6 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	@Override
 	public void run() {
-
 		// quick fail in case the task was cancelled while the tread was started
 		if (owner.isCanceled()) {
 			owner.cancelingDone();
@@ -238,6 +237,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 			}
 		}
 		catch (Throwable t) {
+			
 			if (!this.owner.isCanceled()) {
 
 				// Perform clean up when the task failed and has been not canceled by the user
@@ -260,7 +260,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 			return;
 		}
-
+		
 		try {
 			// If there is any unclosed input gate, close it and propagate close operation to corresponding output gate
 			closeInputGates();
@@ -275,6 +275,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 			waitForOutputChannelsToBeClosed();
 		}
 		catch (Throwable t) {
+			
 			// Release all resources that may currently be allocated by the individual channels
 			releaseAllChannelResources();
 
@@ -396,37 +397,38 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 		}
 		
 		final Thread executingThread = this.executingThread;
-		
-		// interrupt the running thread and wait for it to die
-		executingThread.interrupt();
-		
-		try {
-			executingThread.join(5000);
-		} catch (InterruptedException e) {}
-		
-		if (!executingThread.isAlive()) {
-			return;
-		}
-		
-		// Continuously interrupt the user thread until it changed to state CANCELED
-		while (executingThread != null && executingThread.isAlive()) {
-			LOG.warn("Task " + owner.getTaskNameWithSubtasks() + " did not react to cancelling signal. Sending repeated interrupt.");
-
-			if (LOG.isDebugEnabled()) {
-				StringBuilder bld = new StringBuilder("Task ").append(owner.getTaskNameWithSubtasks()).append(" is stuck in method:\n");
-				
-				StackTraceElement[] stack = executingThread.getStackTrace();
-				for (StackTraceElement e : stack) {
-					bld.append(e).append('\n');
-				}
-				LOG.debug(bld.toString());
-			}
-			
+		if (executingThread != null) {
+			// interrupt the running thread and wait for it to die
 			executingThread.interrupt();
 			
 			try {
-				executingThread.join(1000);
+				executingThread.join(5000);
 			} catch (InterruptedException e) {}
+			
+			if (!executingThread.isAlive()) {
+				return;
+			}
+			
+			// Continuously interrupt the user thread until it changed to state CANCELED
+			while (executingThread != null && executingThread.isAlive()) {
+				LOG.warn("Task " + owner.getTaskNameWithSubtasks() + " did not react to cancelling signal. Sending repeated interrupt.");
+	
+				if (LOG.isDebugEnabled()) {
+					StringBuilder bld = new StringBuilder("Task ").append(owner.getTaskNameWithSubtasks()).append(" is stuck in method:\n");
+					
+					StackTraceElement[] stack = executingThread.getStackTrace();
+					for (StackTraceElement e : stack) {
+						bld.append(e).append('\n');
+					}
+					LOG.debug(bld.toString());
+				}
+				
+				executingThread.interrupt();
+				
+				try {
+					executingThread.join(1000);
+				} catch (InterruptedException e) {}
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 8cfc7fd..c290883 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -18,7 +18,14 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import static org.apache.flink.runtime.execution.ExecutionState.*;
+import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
+import static org.apache.flink.runtime.execution.ExecutionState.CANCELING;
+import static org.apache.flink.runtime.execution.ExecutionState.CREATED;
+import static org.apache.flink.runtime.execution.ExecutionState.DEPLOYING;
+import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
+import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
+import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
+import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
@@ -27,9 +34,9 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.taskmanager.TaskOperationResult;
@@ -124,6 +131,10 @@ public class Execution {
 		return failureCause;
 	}
 	
+	public long[] getStateTimestamps() {
+		return stateTimestamps;
+	}
+	
 	public long getStateTimestamp(ExecutionState state) {
 		return this.stateTimestamps[state.ordinal()];
 	}
@@ -146,16 +157,13 @@ public class Execution {
 	 * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
 	 * @throws NoResourceAvailableException Thrown is no queued scheduling is allowed and no resources are currently available.
 	 */
-	public void scheduleForExecution(DefaultScheduler scheduler, boolean queued) throws NoResourceAvailableException {
+	public void scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
 		if (scheduler == null) {
 			throw new NullPointerException();
 		}
 		
 		if (transitionState(CREATED, SCHEDULED)) {
 			
-			// record that we were scheduled
-			vertex.notifyStateTransition(attemptId, SCHEDULED, null);
-			
 			ScheduledUnit toSchedule = new ScheduledUnit(this, vertex.getJobVertex().getSlotSharingGroup());
 		
 			// IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
@@ -221,8 +229,6 @@ public class Execution {
 				// this should actually not happen and indicates a race somewhere else
 				throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
 			}
-			
-			vertex.notifyStateTransition(attemptId, DEPLOYING, null);
 		}
 		else {
 			// vertex may have been cancelled, or it was already scheduled
@@ -236,9 +242,15 @@ public class Execution {
 			}
 			this.assignedResource = slot;
 			
+			// race double check, did we fail/cancel and do we need to release the slot?
+			if (this.state != DEPLOYING) {
+				slot.releaseSlot();
+				return;
+			}
+			
 			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot);
 			
-			// register this execution at the execution graph, to receive callbacks
+			// register this execution at the execution graph, to receive call backs
 			vertex.getExecutionGraph().registerExecution(this);
 			
 			// we execute the actual deploy call in a concurrent action to prevent this call from blocking for long
@@ -300,7 +312,6 @@ public class Execution {
 			else if (current == RUNNING || current == DEPLOYING) {
 				// try to transition to canceling, if successful, send the cancel call
 				if (transitionState(current, CANCELING)) {
-					vertex.notifyStateTransition(attemptId, CANCELING, null);
 					sendCancelRpcCall();
 					return;
 				}
@@ -318,7 +329,7 @@ public class Execution {
 					
 					// we skip the canceling state. set the timestamp, for a consistent appearance
 					markTimestamp(CANCELING, getStateTimestamp(CANCELED));
-					vertex.notifyStateTransition(attemptId, CANCELED, null);
+					vertex.executionCanceled();
 					return;
 				}
 				// else: fall through the loop
@@ -336,11 +347,7 @@ public class Execution {
 	 * @param t The exception that caused the task to fail.
 	 */
 	public void fail(Throwable t) {
-		if (processFail(t, false)) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Task " + getVertexWithAttempt() + " was failed.", t);
-			}
-		}
+		processFail(t, false);
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -355,15 +362,11 @@ public class Execution {
 	 * @param t The exception that caused the task to fail.
 	 */
 	void markFailed(Throwable t) {
-		// the call returns true if it actually made the state transition (was not already failed before, etc)
-		if (processFail(t, true)) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Task " + getVertexWithAttempt() + " failed.", t);
-			}
-		}
+		processFail(t, true);
 	}
 	
 	void markFinished() {
+		
 		// this call usually comes during RUNNING, but may also come while still in deploying (very fast tasks!)
 		while (true) {
 			ExecutionState current = this.state;
@@ -372,7 +375,6 @@ public class Execution {
 			
 				if (transitionState(current, FINISHED)) {
 					try {
-						vertex.notifyStateTransition(attemptId, FINISHED, null);
 						vertex.executionFinished();
 						return;
 					}
@@ -382,41 +384,60 @@ public class Execution {
 					}
 				}
 			}
-			else {
-				if (current == CANCELED || current == CANCELING || current == FAILED) {
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Task FINISHED, but concurrently went to state " + state);
-					}
-					return;
-				}
-				else {
-					// this should not happen, we need to fail this
-					markFailed(new Exception("Vertex received FINISHED message while being in state " + state));
-					return;
+			else if (current == CANCELING) {
+				// we sent a cancel call, and the task manager finished before it arrived. We
+				// will never get a CANCELED call back from the job manager
+				cancelingComplete();
+				return;
+			}
+			else if (current == CANCELED || current == FAILED) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Task FINISHED, but concurrently went to state " + state);
 				}
+				return;
+			}
+			else {
+				// this should not happen, we need to fail this
+				markFailed(new Exception("Vertex received FINISHED message while being in state " + state));
+				return;
 			}
 		}
 	}
 	
 	void cancelingComplete() {
-		if (transitionState(CANCELING, CANCELED)) {
-			try {
-				vertex.executionCanceled();
-				vertex.notifyStateTransition(attemptId, CANCELED, null);
-			}
-			finally {
-				vertex.getExecutionGraph().deregisterExecution(this);
-				assignedResource.releaseSlot();
+		
+		// the taskmanagers can themselves cancel tasks without an external trigger, if they find that the
+		// network stack is canceled (for example by a failing / canceling receiver or sender
+		// this is an artifact of the old network runtime, but for now we need to support task transitions
+		// from running directly to canceled
+		
+		while (true) {
+			ExecutionState current = this.state;
+			
+			if (current == CANCELED) {
+				return;
 			}
-		}
-		else {
-			ExecutionState actualState = this.state;
-			// failing in the meantime may happen and is no problem.
-			// anything else is a serious problem !!!
-			if (actualState != FAILED) {
-				String message = String.format("Asynchronous race: Found state %s after successful cancel call.", state);
-				LOG.error(message);
-				vertex.getExecutionGraph().fail(new Exception(message));
+			else if (current == CANCELING || current == RUNNING) {
+				if (transitionState(current, CANCELED)) {
+					try {
+						vertex.executionCanceled();
+					}
+					finally {
+						vertex.getExecutionGraph().deregisterExecution(this);
+						assignedResource.releaseSlot();
+					}
+					return;
+				}
+			} 
+			else {
+				// failing in the meantime may happen and is no problem.
+				// anything else is a serious problem !!!
+				if (current != FAILED) {
+					String message = String.format("Asynchronous race: Found state %s after successful cancel call.", state);
+					LOG.error(message);
+					vertex.getExecutionGraph().fail(new Exception(message));
+				}
+				return;
 			}
 		}
 	}
@@ -440,17 +461,29 @@ public class Execution {
 				return false;
 			}
 			
-			if (current == CANCELED || (current == CANCELING && isCallback)) {
+			if (current == CANCELED) {
 				// we are already aborting or are already aborted
 				if (LOG.isDebugEnabled()) {
 					LOG.debug(String.format("Ignoring transition of vertex %s to %s while being %s", 
-							getVertexWithAttempt(), FAILED, current));
+							getVertexWithAttempt(), FAILED, CANCELED));
 				}
 				return false;
 			}
 			
-			if (transitionState(current, FAILED)) {
+			if (transitionState(current, FAILED, t)) {
 				// success (in a manner of speaking)
+				this.failureCause = t;
+				
+				try {
+					vertex.getExecutionGraph().deregisterExecution(this);
+					vertex.executionFailed(t);
+				}
+				finally {
+					if (assignedResource != null) {
+						assignedResource.releaseSlot();
+					}
+				}
+				
 				
 				if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
 					if (LOG.isDebugEnabled()) {
@@ -467,29 +500,16 @@ public class Execution {
 					}
 				}
 				
-				try {
-					this.failureCause = t;
-					vertex.executionFailed(t);
-					vertex.notifyStateTransition(attemptId, FAILED, t);
-				}
-				finally {
-					if (assignedResource != null) {
-						assignedResource.releaseSlot();
-					}
-					vertex.getExecutionGraph().deregisterExecution(this);
-				}
-				
 				// leave the loop
 				return true;
 			}
 		}
 	}
 	
-	private void switchToRunning() {
+	private boolean switchToRunning() {
 		
-		// transition state, the common case
 		if (transitionState(DEPLOYING, RUNNING)) {
-			vertex.notifyStateTransition(attemptId, RUNNING, null);
+			return true;
 		}
 		else {
 			// something happened while the call was in progress.
@@ -501,10 +521,10 @@ public class Execution {
 			ExecutionState currentState = this.state;
 			
 			if (currentState == FINISHED || currentState == CANCELED) {
-				// do nothing, this is nice, the task was really fast
+				// do nothing, the task was really fast (nice)
+				// or it was canceled really fast
 			}
-			
-			if (currentState == CANCELING || currentState == FAILED) {
+			else if (currentState == CANCELING || currentState == FAILED) {
 				if (LOG.isDebugEnabled()) {
 					LOG.debug(String.format("Concurrent canceling/failing of %s while deployment was in progress.", getVertexWithAttempt()));
 				}
@@ -524,13 +544,15 @@ public class Execution {
 				// record the failure
 				markFailed(new Exception(message));
 			}
+			
+			return false;
 		}
 	}
 	
 	private void sendCancelRpcCall() {
 		final AllocatedSlot slot = this.assignedResource;
 		if (slot == null) {
-			throw new IllegalStateException("Cannot cancel when task was not running or deployed.");
+			return;
 		}
 		
 		Runnable cancelAction = new Runnable() {
@@ -578,8 +600,21 @@ public class Execution {
 	// --------------------------------------------------------------------------------------------
 	
 	private boolean transitionState(ExecutionState currentState, ExecutionState targetState) {
+		return transitionState(currentState, targetState, null);
+	}
+	
+	private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) {
 		if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) {
 			markTimestamp(targetState);
+			
+			// make sure that the state transition completes normally.
+			// potential errors (in listeners may not affect the main logic)
+			try {
+				vertex.notifyStateTransition(attemptId, targetState, error);
+			}
+			catch (Throwable t) {
+				LOG.error("Error while notifying execution graph of execution state trnsition.", t);
+			}
 			return true;
 		} else {
 			return false;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 72525e9..3dab13e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -37,15 +38,17 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.execution.ExecutionListener;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
+import org.apache.flink.runtime.io.network.RemoteReceiver;
 import org.apache.flink.runtime.io.network.channels.ChannelID;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -105,7 +108,9 @@ public class ExecutionGraph {
 	private volatile JobStatus state = JobStatus.CREATED;
 	
 	
-	private boolean allowQueuedScheduling = false;
+	private Scheduler scheduler;
+	
+	private boolean allowQueuedScheduling = true;
 	
 	
 	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig) {
@@ -265,12 +270,17 @@ public class ExecutionGraph {
 	//  Actions
 	// --------------------------------------------------------------------------------------------
 	
-	public void scheduleForExecution(DefaultScheduler scheduler) throws JobException {
+	public void scheduleForExecution(Scheduler scheduler) throws JobException {
 		if (scheduler == null) {
 			throw new IllegalArgumentException("Scheduler must not be null.");
 		}
 		
+		if (this.scheduler != null && this.scheduler != scheduler) {
+			throw new IllegalArgumentException("Cann not use different schedulers for the same job");
+		}
+		
 		if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
+			this.scheduler = scheduler;
 			
 			// initially, we simply take the ones without inputs.
 			// next, we implement the logic to go back from vertices that need computation
@@ -289,32 +299,35 @@ public class ExecutionGraph {
 	public void cancel() {
 		while (true) {
 			JobStatus current = state;
+			
 			if (current == JobStatus.RUNNING || current == JobStatus.CREATED) {
 				if (transitionState(current, JobStatus.CANCELLING)) {
 					for (ExecutionJobVertex ejv : verticesInCreationOrder) {
 						ejv.cancel();
 					}
+					return;
 				}
 			}
-			
-			// no need to treat other states
+			else {
+				// no need to treat other states
+				return;
+			}
 		}
 	}
 	
 	public void fail(Throwable t) {
-		if (LOG.isErrorEnabled()) {
-			LOG.error(String.format("Failing ExecutionGraph %s (%s): ", getJobID(), getJobName()), t);
-		}
-		
 		while (true) {
 			JobStatus current = state;
-			if (current != JobStatus.FAILED) {
-				if (transitionState(current, JobStatus.FAILED, t)) {
-					// cancel all. what is failed will not cancel but stay failed
-					for (ExecutionJobVertex ejv : verticesInCreationOrder) {
-						ejv.cancel();
-					}
+			if (current == JobStatus.FAILED || current == JobStatus.FAILING) {
+				return;
+			}
+			else if (transitionState(current, JobStatus.FAILING, t)) {
+				// cancel all. what is failed will not cancel but stay failed
+				for (ExecutionJobVertex ejv : verticesInCreationOrder) {
+					ejv.cancel();
 				}
+				
+				return;
 			}
 			
 			// no need to treat other states
@@ -362,6 +375,7 @@ public class ExecutionGraph {
 				nextVertexToFinish = nextPos;
 				
 				if (nextPos == verticesInCreationOrder.size()) {
+					
 					// we are done, transition to the final state
 					
 					while (true) {
@@ -372,7 +386,7 @@ public class ExecutionGraph {
 						if (current == JobStatus.CANCELLING && transitionState(current, JobStatus.CANCELED)) {
 							break;
 						}
-						if (current == JobStatus.FAILED) {
+						if (current == JobStatus.FAILING && transitionState(current, JobStatus.FAILED)) {
 							break;
 						}
 						if (current == JobStatus.CANCELED || current == JobStatus.CREATED || current == JobStatus.FINISHED) {
@@ -417,99 +431,118 @@ public class ExecutionGraph {
 	}
 	
 	public ConnectionInfoLookupResponse lookupConnectionInfoAndDeployReceivers(InstanceConnectionInfo caller, ChannelID sourceChannelID) {
-		//TODO
-		return null;
 		
-//		final InternalJobStatus jobStatus = eg.getJobStatus();
-//		if (jobStatus == InternalJobStatus.FAILING || jobStatus == InternalJobStatus.CANCELING) {
-//			return ConnectionInfoLookupResponse.createJobIsAborting();
-//		}
-//
-//		final ExecutionEdge edge = eg.getEdgeByID(sourceChannelID);
-//		if (edge == null) {
-//			LOG.error("Cannot find execution edge associated with ID " + sourceChannelID);
-//			return ConnectionInfoLookupResponse.createReceiverNotFound();
-//		}
-//
-//		if (sourceChannelID.equals(edge.getInputChannelID())) {
-//			// Request was sent from an input channel
-//
-//			final ExecutionVertex connectedVertex = edge.getOutputGate().getVertex();
-//
-//			final Instance assignedInstance = connectedVertex.getAllocatedResource().getInstance();
-//			if (assignedInstance == null) {
-//				LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getOutputGateIndex()
-//					+ " but no instance assigned");
-//				// LOG.info("Created receiverNotReady for " + connectedVertex + " 1");
-//				return ConnectionInfoLookupResponse.createReceiverNotReady();
-//			}
-//
-//			// Check execution state
-//			final ExecutionState executionState = connectedVertex.getExecutionState();
-//			if (executionState == ExecutionState.FINISHED) {
-//				// that should not happen. if there is data pending, the receiver cannot be ready
-//				return ConnectionInfoLookupResponse.createReceiverNotFound();
-//			}
-//
-//			// running is common, finishing is happens when the lookup is for the close event
-//			if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING) {
-//				// LOG.info("Created receiverNotReady for " + connectedVertex + " in state " + executionState + " 2");
-//				return ConnectionInfoLookupResponse.createReceiverNotReady();
-//			}
-//
-//			if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
-//				// Receiver runs on the same task manager
-//				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getOutputChannelID());
-//			} else {
-//				// Receiver runs on a different task manager
-//
-//				final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
-//				final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
-//
-//				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
-//			}
-//		}
-//		// else, the request is for an output channel
-//		// Find vertex of connected input channel
-//		final ExecutionVertex targetVertex = edge.getInputGate().getVertex();
-//
-//		// Check execution state
-//		final ExecutionState executionState = targetVertex.getExecutionState();
-//
-//		// check whether the task needs to be deployed
-//		if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING && executionState != ExecutionState.FINISHED) {
-//
-//			if (executionState == ExecutionState.ASSIGNED) {
-//				final Runnable command = new Runnable() {
-//					@Override
-//					public void run() {
-//						scheduler.deployAssignedVertices(targetVertex);
-//					}
-//				};
-//				eg.executeCommand(command);
-//			}
-//
-//			// LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 3");
-//			return ConnectionInfoLookupResponse.createReceiverNotReady();
-//		}
-//
-//		final Instance assignedInstance = targetVertex.getAllocatedResource().getInstance();
-//		if (assignedInstance == null) {
-//			LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getInputChannelID() + " but no instance assigned");
-//			// LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 4");
-//			return ConnectionInfoLookupResponse.createReceiverNotReady();
-//		}
-//
-//		if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
-//			// Receiver runs on the same task manager
-//			return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getInputChannelID());
-//		} else {
-//			// Receiver runs on a different task manager
-//			final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
-//			final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
-//
-//			return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
-//		}
+		final ExecutionEdge edge = edges.get(sourceChannelID);
+		if (edge == null) {
+			// that is bad, we need to fail the job
+			LOG.error("Cannot find execution edge associated with ID " + sourceChannelID);
+			fail(new Exception("Channels are not correctly registered"));
+			return ConnectionInfoLookupResponse.createReceiverNotFound();
+		}
+		
+		
+		//  ----- Request was sent from an input channel (receiver side), requesting the output channel (sender side) ------
+		//  -----                               This is the case for backwards events                                 ------
+
+		if (sourceChannelID.equals(edge.getInputChannelId())) {
+			final ExecutionVertex targetVertex = edge.getSource().getProducer();
+			final ExecutionState executionState = targetVertex.getExecutionState();
+			
+			// common case - found task running
+			if (executionState == ExecutionState.RUNNING) {
+				Instance location = targetVertex.getCurrentAssignedResource().getInstance();
+				
+				if (location.getInstanceConnectionInfo().equals(caller)) {
+					// Receiver runs on the same task manager
+					return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getOutputChannelId());
+				}
+				else {
+					// Receiver runs on a different task manager
+					final InstanceConnectionInfo ici = location.getInstanceConnectionInfo();
+					final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
+
+					int connectionIdx = edge.getSource().getIntermediateResult().getConnectionIndex();
+					return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, connectionIdx));
+				}
+			}
+			else if (executionState == ExecutionState.FINISHED) {
+				// that should not happen. if there is data pending, the sender cannot yet be done
+				// we need to fail the whole affair
+				LOG.error("Receiver " + targetVertex + " set to FINISHED even though data is pending");
+				fail(new Exception("Channels are not correctly registered"));
+				return ConnectionInfoLookupResponse.createReceiverNotFound();
+			}
+			else if (executionState == ExecutionState.FAILED || executionState == ExecutionState.CANCELED ||
+					executionState == ExecutionState.CANCELING)
+			{
+				return ConnectionInfoLookupResponse.createJobIsAborting();
+			}
+			else {
+				// all other states should not be, because the sender cannot be in CREATED, SCHEDULED, or DEPLOYING
+				// state when the receiver is already running
+				LOG.error("Channel lookup (backwards) - sender " + targetVertex + " found in inconsistent state " + executionState);
+				fail(new Exception("Channels are not correctly registered"));
+				return ConnectionInfoLookupResponse.createReceiverNotFound();
+			}
+		}
+		
+		//  ----- Request was sent from an output channel (sender side), requesting the input channel (receiver side) ------
+		//  -----                                 This is the case for forward data                                   ------
+		
+		final ExecutionVertex targetVertex = edge.getTarget();
+		final ExecutionState executionState = targetVertex.getExecutionState();
+
+		if (executionState == ExecutionState.RUNNING) {
+			
+			// already online
+			Instance location = targetVertex.getCurrentAssignedResource().getInstance();
+			
+			if (location.getInstanceConnectionInfo().equals(caller)) {
+				// Receiver runs on the same task manager
+				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getInputChannelId());
+			}
+			else {
+				// Receiver runs on a different task manager
+				final InstanceConnectionInfo ici = location.getInstanceConnectionInfo();
+				final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
+
+				final int connectionIdx = edge.getSource().getIntermediateResult().getConnectionIndex();
+				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, connectionIdx));
+			}
+		}
+		else if (executionState == ExecutionState.DEPLOYING || executionState == ExecutionState.SCHEDULED) {
+			return ConnectionInfoLookupResponse.createReceiverNotReady();
+		}
+		else if (executionState == ExecutionState.CREATED) {
+			// bring the receiver online
+			try {
+				edge.getTarget().scheduleForExecution(scheduler, false);
+				
+				// delay the requester
+				return ConnectionInfoLookupResponse.createReceiverNotReady();
+			}
+			catch (JobException e) {
+				fail(new Exception("Cannot schedule the receivers, not enough resources."));
+				return ConnectionInfoLookupResponse.createJobIsAborting();
+			}
+		}
+		else if (executionState == ExecutionState.CANCELED || executionState == ExecutionState.CANCELING ||
+				executionState == ExecutionState.FAILED)
+		{
+			return ConnectionInfoLookupResponse.createJobIsAborting();
+		}
+		else {
+			// illegal state for all other states - or all the other state, since the only remaining state is FINISHED
+			// state when the receiver is already running
+			String message = "Channel lookup (forward) - receiver " + targetVertex + " found in inconsistent state " + executionState;
+			LOG.error(message);
+			fail(new Exception(message));
+			return ConnectionInfoLookupResponse.createReceiverNotFound();
+		}
+	}
+	
+	public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
+		return Collections.unmodifiableMap(currentExecutions);
 	}
 	
 	void registerExecution(Execution exec) {
@@ -521,18 +554,15 @@ public class ExecutionGraph {
 	
 	void deregisterExecution(Execution exec) {
 		Execution contained = currentExecutions.remove(exec.getAttemptId());
+		
 		if (contained != null && contained != exec) {
 			fail(new Exception("De-registering execution " + exec + " failed. Found for same ID execution " + contained));
 		}
 	}
 	
-	Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
-		return Collections.unmodifiableMap(currentExecutions);
-	}
-	
 	void registerExecutionEdge(ExecutionEdge edge) {
-		ChannelID source = edge.getInputChannelId();
 		ChannelID target = edge.getInputChannelId();
+		ChannelID source = edge.getOutputChannelId();
 		edges.put(source, edge);
 		edges.put(target, edge);
 	}
@@ -581,6 +611,7 @@ public class ExecutionGraph {
 	 */
 	void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionId, ExecutionState newExecutionState, Throwable error) {
 		
+		// we must be very careful here with exceptions 
 		if (this.executionListeners.size() > 0) {
 			
 			String message = error == null ? null : ExceptionUtils.stringifyException(error);
@@ -592,11 +623,11 @@ public class ExecutionGraph {
 					LOG.error("Notification of execution state change caused an error.", t);
 				}
 			}
-			
-			// see what this means for us. currently, the first FAILED state means -> FAILED
-			if (newExecutionState == ExecutionState.FAILED) {
-				fail(error);
-			}
+		}
+		
+		// see what this means for us. currently, the first FAILED state means -> FAILED
+		if (newExecutionState == ExecutionState.FAILED) {
+			fail(error);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 440566d..1884ce0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 
@@ -225,7 +225,7 @@ public class ExecutionJobVertex {
 	//  Actions
 	//---------------------------------------------------------------------------------------------
 	
-	public void scheduleAll(DefaultScheduler scheduler, boolean queued) throws NoResourceAvailableException {
+	public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
 		for (ExecutionVertex ev : getTaskVertices()) {
 			ev.scheduleForExecution(scheduler, queued);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index bbc0c97..3c65f2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.slf4j.Logger;
-
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -39,7 +38,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 
 /**
@@ -305,7 +304,7 @@ public class ExecutionVertex {
 		}
 	}
 	
-	public void scheduleForExecution(DefaultScheduler scheduler, boolean queued) throws NoResourceAvailableException {
+	public void scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
 		this.currentExecution.scheduleForExecution(scheduler, queued);
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 0b822ab..f770b87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -34,6 +34,8 @@ public class IntermediateResult {
 	
 	private int numConsumers;
 	
+	private final int connectionIndex;
+	
 	
 	public IntermediateResult(IntermediateDataSetID id, ExecutionJobVertex producer, int numParallelProducers) {
 		this.id = id;
@@ -43,6 +45,9 @@ public class IntermediateResult {
 		
 		// we do not set the intermediate result partitions here, because we let them be initialized by
 		// the execution vertex that produces them
+		
+		// assign a random connection index
+		this.connectionIndex = (int) (Math.random() * Integer.MAX_VALUE);
 	}
 	
 	public void setPartition(int partitionNumber, IntermediateResultPartition partition) {
@@ -87,4 +92,8 @@ public class IntermediateResult {
 		}
 		return index;
 	}
+	
+	public int getConnectionIndex() {
+		return connectionIndex;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index 1c4e1fb..1cc5e13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -39,6 +39,7 @@ public class IntermediateResultPartition {
 		this.consumers = new ArrayList<List<ExecutionEdge>>(0);
 	}
 	
+	
 	public ExecutionVertex getProducer() {
 		return producer;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
index 42b9817..69e41d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
@@ -1,17 +1,20 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.flink.runtime.instance;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index 3066bb5..602f88b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -366,7 +366,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 			}
 			else if (lookupResponse.receiverNotReady()) {
 				try {
-					Thread.sleep(500);
+					Thread.sleep(100);
 				} catch (InterruptedException e) {
 					if (reportException) {
 						throw new IOException("Lookup was interrupted.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index f8b5ab9..85978fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -71,6 +71,8 @@ public class JobGraph implements IOReadableWritable {
 	/** Name of this job. */
 	private String jobName;
 	
+	private boolean allowQueuedScheduling;
+	
 	// --------------------------------------------------------------------------------------------
 	
 	/**
@@ -162,6 +164,14 @@ public class JobGraph implements IOReadableWritable {
 	public Configuration getJobConfiguration() {
 		return this.jobConfiguration;
 	}
+	
+	public void setAllowQueuedScheduling(boolean allowQueuedScheduling) {
+		this.allowQueuedScheduling = allowQueuedScheduling;
+	}
+	
+	public boolean getAllowQueuedScheduling() {
+		return allowQueuedScheduling;
+	}
 
 	/**
 	 * Adds a new task vertex to the job graph if it is not already included.
@@ -304,6 +314,7 @@ public class JobGraph implements IOReadableWritable {
 		this.jobID.read(in);
 		this.jobName = StringValue.readString(in);
 		this.jobConfiguration.read(in);
+		this.allowQueuedScheduling = in.readBoolean();
 		
 		final int numVertices = in.readInt();
 		
@@ -332,6 +343,7 @@ public class JobGraph implements IOReadableWritable {
 		this.jobID.write(out);
 		StringValue.writeString(this.jobName, out);
 		this.jobConfiguration.write(out);
+		out.writeBoolean(allowQueuedScheduling);
 		
 		// write the task vertices using java serialization (to resolve references in the object graph)
 		out.writeInt(taskVertices.size());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 60b2edf..f5a2e9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -24,20 +24,35 @@ package org.apache.flink.runtime.jobgraph;
 public enum JobStatus {
 
 	/** Job is newly created, no task has started to run. */
-	CREATED,
+	CREATED(false),
 
 	/** Some tasks are scheduled or running, some may be pending, some may be finished. */
-	RUNNING,
+	RUNNING(false),
 
+	/** The job has failed and is currently waiting for the cleanup to complete */
+	FAILING(false),
+	
 	/** The job has failed to to non-recoverable task failure */
-	FAILED,
+	FAILED(true),
 
 	/** Job is being cancelled */
-	CANCELLING,
+	CANCELLING(false),
 	
 	/** Job has been cancelled */
-	CANCELED,
+	CANCELED(true),
 
 	/** All of the job's tasks have successfully finished. */
-	FINISHED
+	FINISHED(true);
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private final boolean terminalState;
+	
+	private JobStatus(boolean terminalState) {
+		this.terminalState = terminalState;
+	}
+	
+	public boolean isTerminalState() {
+		return terminalState;
+	}
 };

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
index a45507a..551dce2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
@@ -26,6 +26,8 @@ import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.flink.runtime.event.job.AbstractEvent;
 import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
 import org.apache.flink.runtime.event.job.JobEvent;
@@ -52,6 +54,8 @@ import org.apache.flink.runtime.profiling.types.ProfilingEvent;
  */
 public final class EventCollector extends TimerTask implements ProfilingListener {
 
+	private static final Log LOG = LogFactory.getLog(EventCollector.class);
+
 	/**
 	 * The execution listener wrapper is an auxiliary class. It is required
 	 * because the job vertex ID and the management vertex ID cannot be accessed from
@@ -93,6 +97,8 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 					executionId, newExecutionState);
 
 			this.eventCollector.addEvent(jobID, executionStateChangeEvent);
+			
+			LOG.info(vertexEvent.toString());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index d3a920c..3526e15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -79,7 +79,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager;
 import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
 import org.apache.flink.runtime.jobmanager.archive.MemoryArchivist;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer;
 import org.apache.flink.runtime.protocols.AccumulatorProtocol;
 import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
@@ -122,7 +122,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	private final InstanceManager instanceManager;
 	
 	/** Assigns tasks to slots and keeps track on available and allocated task slots*/
-	private final DefaultScheduler scheduler;
+	private final Scheduler scheduler;
 	
 	/** The currently running jobs */
 	private final ConcurrentHashMap<JobID, ExecutionGraph> currentJobs;
@@ -143,7 +143,6 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	
 	private volatile boolean isShutDown;
 	
-	
 	private WebInfoServer server;
 	
 	
@@ -222,7 +221,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 		}
 
 		// create the scheduler and make it listen at the availability of new instances
-		this.scheduler = new DefaultScheduler();
+		this.scheduler = new Scheduler(this.executorService);
 		this.instanceManager.addInstanceListener(this.scheduler);
 	}
 
@@ -349,6 +348,9 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 			if (LOG.isDebugEnabled()) {
 				LOG.debug(String.format("Successfully created execution graph from job graph %s (%s)", job.getJobID(), job.getName()));
 			}
+			
+			// should the job fail if a vertex cannot be deployed immediately (streams, closed iterations)
+			executionGraph.setQueuedSchedulingAllowed(job.getAllowQueuedScheduling());
 	
 			// Register job with the progress collector
 			if (this.eventCollector != null) {
@@ -461,12 +463,12 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 		
 		if (LOG.isInfoEnabled()) {
 			String message = optionalMessage == null ? "." : ": " + optionalMessage;
-			LOG.info(String.format("Status of job %s (%s) changed to %s%s", 
+			LOG.info(String.format("Job %s (%s) switched to %s%s", 
 					jid, executionGraph.getJobName(), newJobStatus, message));
 		}
 
 		// remove the job graph if the state is any terminal state
-		if (newJobStatus == JobStatus.FINISHED || newJobStatus == JobStatus.CANCELED || newJobStatus == JobStatus.FAILED) {
+		if (newJobStatus.isTerminalState()) {
 			this.currentJobs.remove(jid);
 			
 			try {
@@ -643,7 +645,11 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 
 	@Override
 	public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) {
-		return this.instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription, numberOfSlots);
+		if (this.instanceManager != null) {
+			return this.instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription, numberOfSlots);
+		} else {
+			return null;
+		}
 	}
 	
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
deleted file mode 100644
index 54e16b9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import java.util.ArrayDeque;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AllocatedSlot;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceDiedException;
-import org.apache.flink.runtime.instance.InstanceListener;
-
-/**
- * The scheduler is responsible for distributing the ready-to-run tasks and assigning them to instances and
- * slots.
- */
-public class DefaultScheduler implements InstanceListener, SlotAvailablilityListener {
-
-	private static final Logger LOG = LoggerFactory.getLogger(DefaultScheduler.class);
-	
-	
-	private final Object globalLock = new Object();
-	
-	
-	/** All instances that the scheduler can deploy to */
-	private final Set<Instance> allInstances = new HashSet<Instance>();
-	
-	/** All instances that still have available resources */
-	private final Queue<Instance> instancesWithAvailableResources = new SetQueue<Instance>();
-	
-	/** All tasks pending to be scheduled */
-	private final Queue<QueuedTask> taskQueue = new ArrayDeque<QueuedTask>();
-	
-	
-	private int unconstrainedAssignments = 0;
-	
-	private int localizedAssignments = 0;
-	
-	private int nonLocalizedAssignments = 0;
-	
-	
-	public DefaultScheduler() {
-	}
-	
-	/**
-	 * Shuts the scheduler down. After shut down no more tasks can be added to the scheduler.
-	 */
-	public void shutdown() {
-		synchronized (globalLock) {
-			for (Instance i : allInstances) {
-				i.removeSlotListener();
-				i.cancelAndReleaseAllSlots();
-			}
-			allInstances.clear();
-			instancesWithAvailableResources.clear();
-			taskQueue.clear();
-		}
-	}
-
-	/**
-	 * 
-	 * NOTE: In the presence of multi-threaded operations, this number may be inexact.
-	 * 
-	 * @return The number of empty slots, for tasks.
-	 */
-	public int getNumberOfAvailableSlots() {
-		int count = 0;
-		
-		synchronized (globalLock) {
-			for (Instance instance : instancesWithAvailableResources) {
-				count += instance.getNumberOfAvailableSlots();
-			}
-		}
-		
-		return count;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Scheduling
-	// --------------------------------------------------------------------------------------------
-	
-	public AllocatedSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException {
-		Object ret = scheduleTask(task, false);
-		if (ret instanceof AllocatedSlot) {
-			return (AllocatedSlot) ret;
-		}
-		else {
-			throw new RuntimeException();
-		}
-	}
-	
-	public SlotAllocationFuture scheduleQueued(ScheduledUnit task) throws NoResourceAvailableException {
-		Object ret = scheduleTask(task, true);
-		if (ret instanceof AllocatedSlot) {
-			return new SlotAllocationFuture((AllocatedSlot) ret);
-		}
-		if (ret instanceof SlotAllocationFuture) {
-			return (SlotAllocationFuture) ret;
-		}
-		else {
-			throw new RuntimeException();
-		}
-	}
-	
-	/**
-	 * Returns either an {@link AllocatedSlot}, or an {@link SlotAllocationFuture}.
-	 */
-	private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
-		if (task == null) {
-			throw new IllegalArgumentException();
-		}
-		
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Scheduling task " + task);
-		}
-		
-		final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
-	
-		synchronized (globalLock) {
-			// 1) If the task has a strict co-schedule hint, obey it, if it has been assigned.
-//			CoLocationHint hint = task.getCoScheduleHint();
-//			if (hint != null) {
-//				
-//				// try to add to the slot, or make it wait on the hint and schedule the hint itself
-//				if () {
-//					return slot;
-//				}
-//			}
-		
-			// 2) See if we can place the task somewhere together with another existing task.
-			//    This is defined by the slot sharing groups
-			SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
-			if (sharingUnit != null) {
-				// see if we can add the task to the current sharing group.
-				SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
-				AllocatedSlot slot = assignment.getSlotForTask(vertex.getJobvertexId(), vertex);
-				if (slot != null) {
-					return slot;
-				}
-			}
-		
-			// 3) We could not schedule it to an existing slot, so we need to get a new one or queue the task
-			
-			// we need potentially to loop multiple times, because there may be false positives
-			// in the set-with-available-instances
-			while (true) {
-				
-				
-				Instance instanceToUse = getFreeInstanceForTask(task.getTaskToExecute().getVertex());
-			
-				if (instanceToUse != null) {
-					try {
-						AllocatedSlot slot = instanceToUse.allocateSlot(vertex.getJobId());
-						
-						// if the instance has further available slots, re-add it to the set of available resources.
-						if (instanceToUse.hasResourcesAvailable()) {
-							this.instancesWithAvailableResources.add(instanceToUse);
-						}
-						
-						if (slot != null) {
-							
-							// if the task is in a shared group, assign the slot to that group
-							// and get a sub slot in turn
-							if (sharingUnit != null) {
-								slot = sharingUnit.getTaskAssignment().addSlotWithTask(slot, task.getJobVertexId());
-							}
-							
-							return slot;
-						}
-					}
-					catch (InstanceDiedException e) {
-						// the instance died it has not yet been propagated to this scheduler
-						// remove the instance from the set of available instances
-						this.allInstances.remove(instanceToUse);
-						this.instancesWithAvailableResources.remove(instanceToUse);
-					}
-				}
-				else {
-					// no resource available now, so queue the request
-					if (queueIfNoResource) {
-						SlotAllocationFuture future = new SlotAllocationFuture();
-						this.taskQueue.add(new QueuedTask(task, future));
-						return future;
-					}
-					else {
-						throw new NoResourceAvailableException(task);
-					}
-				}
-			}
-		}
-	}
-		
-	/**
-	 * Gets a suitable instance to schedule the vertex execution to.
-	 * <p>
-	 * NOTE: This method does is not thread-safe, it needs to be synchronized by the caller.
-	 * 
-	 * @param vertex The task to run. 
-	 * @return The instance to run the vertex on, it {@code null}, if no instance is available.
-	 */
-	protected Instance getFreeInstanceForTask(ExecutionVertex vertex) {
-		if (this.instancesWithAvailableResources.isEmpty()) {
-			return null;
-		}
-		
-		Iterable<Instance> locationsIterable = vertex.getPreferredLocations();
-		Iterator<Instance> locations = locationsIterable == null ? null : locationsIterable.iterator();
-		
-		if (locations != null && locations.hasNext()) {
-			
-			while (locations.hasNext()) {
-				Instance location = locations.next();
-				
-				if (location != null && this.instancesWithAvailableResources.remove(location)) {
-					
-					localizedAssignments++;
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + location);
-					}
-					
-					return location;
-				}
-			}
-			
-			Instance instance = this.instancesWithAvailableResources.poll();
-			nonLocalizedAssignments++;
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instance);
-			}
-			return instance;
-		}
-		else {
-			Instance instance = this.instancesWithAvailableResources.poll();
-			unconstrainedAssignments++;
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instance);
-			}
-			
-			return instance;
-		}
-	}
-	
-	@Override
-	public void newSlotAvailable(Instance instance) {
-		
-		// global lock before instance lock, so that the order of acquiring locks is always 1) global, 2) instance
-		synchronized (globalLock) {
-			QueuedTask queued = taskQueue.peek();
-			
-			// the slot was properly released, we can allocate a new one from that instance
-			
-			if (queued != null) {
-				ScheduledUnit task = queued.getTask();
-				ExecutionVertex vertex = task.getTaskToExecute().getVertex();
-				
-				try {
-					AllocatedSlot newSlot = instance.allocateSlot(vertex.getJobId());
-					if (newSlot != null) {
-						
-						// success, remove from the task queue and notify the future
-						taskQueue.poll();
-						if (queued.getFuture() != null) {
-							try {
-								queued.getFuture().setSlot(newSlot);
-							}
-							catch (Throwable t) {
-								LOG.error("Error calling allocation future for task " + vertex.getSimpleName(), t);
-								task.getTaskToExecute().fail(t);
-							}
-						}
-					}
-				}
-				catch (InstanceDiedException e) {
-					this.allInstances.remove(instance);
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Instance " + instance + " was marked dead asynchronously.");
-					}
-				}
-			}
-			else {
-				this.instancesWithAvailableResources.add(instance);
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Instance Availability
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void newInstanceAvailable(Instance instance) {
-		if (instance == null) {
-			throw new IllegalArgumentException();
-		}
-		if (instance.getNumberOfAvailableSlots() <= 0) {
-			throw new IllegalArgumentException("The given instance has no resources.");
-		}
-		if (!instance.isAlive()) {
-			throw new IllegalArgumentException("The instance is not alive.");
-		}
-		
-		// synchronize globally for instance changes
-		synchronized (this.globalLock) {
-			
-			// check we do not already use this instance
-			if (!this.allInstances.add(instance)) {
-				throw new IllegalArgumentException("The instance is already contained.");
-			}
-			
-			try {
-				instance.setSlotAvailabilityListener(this);
-			}
-			catch (IllegalStateException e) {
-				this.allInstances.remove(instance);
-				LOG.error("Scheduler could not attach to the instance as a listener.");
-			}
-			
-			
-			// add it to the available resources and let potential waiters know
-			this.instancesWithAvailableResources.add(instance);
-			
-			for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
-				newSlotAvailable(instance);
-			}
-		}
-	}
-	
-	@Override
-	public void instanceDied(Instance instance) {
-		if (instance == null) {
-			throw new IllegalArgumentException();
-		}
-		
-		instance.markDead();
-		
-		// we only remove the instance from the pools, we do not care about the 
-		synchronized (this.globalLock) {
-			// the instance must not be available anywhere any more
-			this.allInstances.remove(instance);
-			this.instancesWithAvailableResources.remove(instance);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Status reporting
-	// --------------------------------------------------------------------------------------------
-
-	public int getNumberOfAvailableInstances() {
-		return allInstances.size();
-	}
-	
-	public int getNumberOfInstancesWithAvailableSlots() {
-		return instancesWithAvailableResources.size();
-	}
-	
-	public int getNumberOfUnconstrainedAssignments() {
-		return unconstrainedAssignments;
-	}
-	
-	public int getNumberOfLocalizedAssignments() {
-		return localizedAssignments;
-	}
-	
-	public int getNumberOfNonLocalizedAssignments() {
-		return nonLocalizedAssignments;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private static final class QueuedTask {
-		
-		private final ScheduledUnit task;
-		
-		private final SlotAllocationFuture future;
-		
-		
-		public QueuedTask(ScheduledUnit task, SlotAllocationFuture future) {
-			this.task = task;
-			this.future = future;
-		}
-
-		public ScheduledUnit getTask() {
-			return task;
-		}
-
-		public SlotAllocationFuture getFuture() {
-			return future;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
new file mode 100644
index 0000000..cd57454
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceDiedException;
+import org.apache.flink.runtime.instance.InstanceListener;
+
+/**
+ * The scheduler is responsible for distributing the ready-to-run tasks and assigning them to instances and
+ * slots.
+ */
+public class Scheduler implements InstanceListener, SlotAvailablilityListener {
+
+	private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
+	
+	
+	private final Object globalLock = new Object();
+	
+	private final ExecutorService executor;
+	
+	
+	/** All instances that the scheduler can deploy to */
+	private final Set<Instance> allInstances = new HashSet<Instance>();
+	
+	/** All instances that still have available resources */
+	private final Queue<Instance> instancesWithAvailableResources = new SetQueue<Instance>();
+	
+	/** All tasks pending to be scheduled */
+	private final Queue<QueuedTask> taskQueue = new ArrayDeque<QueuedTask>();
+	
+	
+	private int unconstrainedAssignments = 0;
+	
+	private int localizedAssignments = 0;
+	
+	private int nonLocalizedAssignments = 0;
+	
+	
+	public Scheduler() {
+		this(null);
+	}
+	
+	public Scheduler(ExecutorService executorService) {
+		this.executor = executorService;
+	}
+	
+	
+	/**
+	 * Shuts the scheduler down. After shut down no more tasks can be added to the scheduler.
+	 */
+	public void shutdown() {
+		synchronized (globalLock) {
+			for (Instance i : allInstances) {
+				i.removeSlotListener();
+				i.cancelAndReleaseAllSlots();
+			}
+			allInstances.clear();
+			instancesWithAvailableResources.clear();
+			taskQueue.clear();
+		}
+	}
+
+	/**
+	 * 
+	 * NOTE: In the presence of multi-threaded operations, this number may be inexact.
+	 * 
+	 * @return The number of empty slots, for tasks.
+	 */
+	public int getNumberOfAvailableSlots() {
+		int count = 0;
+		
+		synchronized (globalLock) {
+			for (Instance instance : instancesWithAvailableResources) {
+				count += instance.getNumberOfAvailableSlots();
+			}
+		}
+		
+		return count;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Scheduling
+	// --------------------------------------------------------------------------------------------
+	
+	public AllocatedSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException {
+		Object ret = scheduleTask(task, false);
+		if (ret instanceof AllocatedSlot) {
+			return (AllocatedSlot) ret;
+		}
+		else {
+			throw new RuntimeException();
+		}
+	}
+	
+	public SlotAllocationFuture scheduleQueued(ScheduledUnit task) throws NoResourceAvailableException {
+		Object ret = scheduleTask(task, true);
+		if (ret instanceof AllocatedSlot) {
+			return new SlotAllocationFuture((AllocatedSlot) ret);
+		}
+		if (ret instanceof SlotAllocationFuture) {
+			return (SlotAllocationFuture) ret;
+		}
+		else {
+			throw new RuntimeException();
+		}
+	}
+	
+	/**
+	 * Returns either an {@link AllocatedSlot}, or an {@link SlotAllocationFuture}.
+	 */
+	private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
+		if (task == null) {
+			throw new IllegalArgumentException();
+		}
+		
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Scheduling task " + task);
+		}
+		
+		final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
+	
+		synchronized (globalLock) {
+			// 1) If the task has a strict co-schedule hint, obey it, if it has been assigned.
+//			CoLocationHint hint = task.getCoScheduleHint();
+//			if (hint != null) {
+//				
+//				// try to add to the slot, or make it wait on the hint and schedule the hint itself
+//				if () {
+//					return slot;
+//				}
+//			}
+		
+			// 2) See if we can place the task somewhere together with another existing task.
+			//    This is defined by the slot sharing groups
+			SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
+			if (sharingUnit != null) {
+				// see if we can add the task to the current sharing group.
+				SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
+				AllocatedSlot slot = assignment.getSlotForTask(vertex.getJobvertexId(), vertex);
+				if (slot != null) {
+					return slot;
+				}
+			}
+		
+			// 3) We could not schedule it to an existing slot, so we need to get a new one or queue the task
+			
+			// we need potentially to loop multiple times, because there may be false positives
+			// in the set-with-available-instances
+			while (true) {
+				
+				
+				Instance instanceToUse = getFreeInstanceForTask(task.getTaskToExecute().getVertex());
+			
+				if (instanceToUse != null) {
+					try {
+						AllocatedSlot slot = instanceToUse.allocateSlot(vertex.getJobId());
+						
+						// if the instance has further available slots, re-add it to the set of available resources.
+						if (instanceToUse.hasResourcesAvailable()) {
+							this.instancesWithAvailableResources.add(instanceToUse);
+						}
+						
+						if (slot != null) {
+							
+							// if the task is in a shared group, assign the slot to that group
+							// and get a sub slot in turn
+							if (sharingUnit != null) {
+								slot = sharingUnit.getTaskAssignment().addSlotWithTask(slot, task.getJobVertexId());
+							}
+							
+							return slot;
+						}
+					}
+					catch (InstanceDiedException e) {
+						// the instance died it has not yet been propagated to this scheduler
+						// remove the instance from the set of available instances
+						this.allInstances.remove(instanceToUse);
+						this.instancesWithAvailableResources.remove(instanceToUse);
+					}
+				}
+				else {
+					// no resource available now, so queue the request
+					if (queueIfNoResource) {
+						SlotAllocationFuture future = new SlotAllocationFuture();
+						this.taskQueue.add(new QueuedTask(task, future));
+						return future;
+					}
+					else {
+						throw new NoResourceAvailableException(task);
+					}
+				}
+			}
+		}
+	}
+		
+	/**
+	 * Gets a suitable instance to schedule the vertex execution to.
+	 * <p>
+	 * NOTE: This method does is not thread-safe, it needs to be synchronized by the caller.
+	 * 
+	 * @param vertex The task to run. 
+	 * @return The instance to run the vertex on, it {@code null}, if no instance is available.
+	 */
+	protected Instance getFreeInstanceForTask(ExecutionVertex vertex) {
+		if (this.instancesWithAvailableResources.isEmpty()) {
+			return null;
+		}
+		
+		Iterable<Instance> locationsIterable = vertex.getPreferredLocations();
+		Iterator<Instance> locations = locationsIterable == null ? null : locationsIterable.iterator();
+		
+		if (locations != null && locations.hasNext()) {
+			
+			while (locations.hasNext()) {
+				Instance location = locations.next();
+				
+				if (location != null && this.instancesWithAvailableResources.remove(location)) {
+					
+					localizedAssignments++;
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + location);
+					}
+					
+					return location;
+				}
+			}
+			
+			Instance instance = this.instancesWithAvailableResources.poll();
+			nonLocalizedAssignments++;
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instance);
+			}
+			return instance;
+		}
+		else {
+			Instance instance = this.instancesWithAvailableResources.poll();
+			unconstrainedAssignments++;
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instance);
+			}
+			
+			return instance;
+		}
+	}
+	
+	@Override
+	public void newSlotAvailable(final Instance instance) {
+		
+		// WARNING: The asynchrony here is necessary, because  we cannot guarantee the order
+		// of lock acquisition (global scheduler, instance) and otherwise lead to potential deadlocks:
+		// 
+		// -> The scheduler needs to grab them (1) global scheduler lock
+		//                                     (2) slot/instance lock
+		// -> The slot releasing grabs (1) slot/instance (for releasing) and
+		//                             (2) scheduler (to check whether to take a new task item
+		// 
+		// that leads with a high probability to deadlocks, when scheduling fast
+		
+		if (this.executor != null) {
+			this.executor.execute(new Runnable() {
+				@Override
+				public void run() {
+					handleNewSlot(instance);
+				}
+			});
+		}
+		else {
+			// for tests, we use the synchronous variant
+			handleNewSlot(instance);
+		}
+	}
+	
+	private void handleNewSlot(Instance instance) {
+		
+		synchronized (globalLock) {
+			QueuedTask queued = taskQueue.peek();
+			
+			// the slot was properly released, we can allocate a new one from that instance
+			
+			if (queued != null) {
+				ScheduledUnit task = queued.getTask();
+				ExecutionVertex vertex = task.getTaskToExecute().getVertex();
+				
+				try {
+					AllocatedSlot newSlot = instance.allocateSlot(vertex.getJobId());
+					if (newSlot != null) {
+						
+						// success, remove from the task queue and notify the future
+						taskQueue.poll();
+						if (queued.getFuture() != null) {
+							try {
+								queued.getFuture().setSlot(newSlot);
+							}
+							catch (Throwable t) {
+								LOG.error("Error calling allocation future for task " + vertex.getSimpleName(), t);
+								task.getTaskToExecute().fail(t);
+							}
+						}
+					}
+				}
+				catch (InstanceDiedException e) {
+					this.allInstances.remove(instance);
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Instance " + instance + " was marked dead asynchronously.");
+					}
+				}
+			}
+			else {
+				this.instancesWithAvailableResources.add(instance);
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Instance Availability
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void newInstanceAvailable(Instance instance) {
+		if (instance == null) {
+			throw new IllegalArgumentException();
+		}
+		if (instance.getNumberOfAvailableSlots() <= 0) {
+			throw new IllegalArgumentException("The given instance has no resources.");
+		}
+		if (!instance.isAlive()) {
+			throw new IllegalArgumentException("The instance is not alive.");
+		}
+		
+		// synchronize globally for instance changes
+		synchronized (this.globalLock) {
+			
+			// check we do not already use this instance
+			if (!this.allInstances.add(instance)) {
+				throw new IllegalArgumentException("The instance is already contained.");
+			}
+			
+			try {
+				instance.setSlotAvailabilityListener(this);
+			}
+			catch (IllegalStateException e) {
+				this.allInstances.remove(instance);
+				LOG.error("Scheduler could not attach to the instance as a listener.");
+			}
+			
+			
+			// add it to the available resources and let potential waiters know
+			this.instancesWithAvailableResources.add(instance);
+			
+			for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
+				newSlotAvailable(instance);
+			}
+		}
+	}
+	
+	@Override
+	public void instanceDied(Instance instance) {
+		if (instance == null) {
+			throw new IllegalArgumentException();
+		}
+		
+		instance.markDead();
+		
+		// we only remove the instance from the pools, we do not care about the 
+		synchronized (this.globalLock) {
+			// the instance must not be available anywhere any more
+			this.allInstances.remove(instance);
+			this.instancesWithAvailableResources.remove(instance);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Status reporting
+	// --------------------------------------------------------------------------------------------
+
+	public int getNumberOfAvailableInstances() {
+		return allInstances.size();
+	}
+	
+	public int getNumberOfInstancesWithAvailableSlots() {
+		return instancesWithAvailableResources.size();
+	}
+	
+	public int getNumberOfUnconstrainedAssignments() {
+		return unconstrainedAssignments;
+	}
+	
+	public int getNumberOfLocalizedAssignments() {
+		return localizedAssignments;
+	}
+	
+	public int getNumberOfNonLocalizedAssignments() {
+		return nonLocalizedAssignments;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static final class QueuedTask {
+		
+		private final ScheduledUnit task;
+		
+		private final SlotAllocationFuture future;
+		
+		
+		public QueuedTask(ScheduledUnit task, SlotAllocationFuture future) {
+			this.task = task;
+			this.future = future;
+		}
+
+		public ScheduledUnit getTask() {
+			return task;
+		}
+
+		public SlotAllocationFuture getFuture() {
+			return future;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 7b692e3..6ac7c54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -185,11 +185,15 @@ public final class Task {
 		while (true) {
 			ExecutionState current = this.executionState;
 			
-			if (current == ExecutionState.CANCELED || current == ExecutionState.CANCELING) {
+			// if canceled, fine. we are done, and the jobmanager has been told
+			if (current == ExecutionState.CANCELED) {
 				return;
 			}
 			
-			if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+			// if canceling, we are done, but we cannot be sure that the jobmanager has been told.
+			// after all, we may have recognized our failure state before the cancelling and never sent a canceled
+			// message back
+			else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
 				notifyObservers(ExecutionState.FAILED, ExceptionUtils.stringifyException(error));
 				taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.FAILED, error);
 				return;
@@ -201,16 +205,18 @@ public final class Task {
 		while (true) {
 			ExecutionState current = this.executionState;
 			
-			// if the task is already canceled (or canceling) or finished, then we
-			// need not do anything
+			// if the task is already canceled (or canceling) or finished or failed,
+			// then we need not do anything
 			if (current == ExecutionState.FINISHED || current == ExecutionState.CANCELED ||
-					current == ExecutionState.CANCELING) {
+					current == ExecutionState.CANCELING || current == ExecutionState.FAILED)
+			{
 				return;
 			}
 			
 			if (current == ExecutionState.DEPLOYING) {
 				// directly set to canceled
 				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+					
 					notifyObservers(ExecutionState.CANCELED, null);
 					taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.CANCELED, null);
 					return;
@@ -219,6 +225,7 @@ public final class Task {
 			else if (current == ExecutionState.RUNNING) {
 				// go to canceling and perform the actual task canceling
 				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELING)) {
+					
 					notifyObservers(ExecutionState.CANCELING, null);
 					try {
 						this.environment.cancelExecution();
@@ -236,9 +243,21 @@ public final class Task {
 	}
 	
 	public void cancelingDone() {
-		if (STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) {
-			notifyObservers(ExecutionState.CANCELED, null);
-			taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.CANCELED, null);
+		while (true) {
+			ExecutionState current = this.executionState;
+			
+			if (current == ExecutionState.CANCELED) {
+				return;
+			}
+			if (!(current == ExecutionState.RUNNING || current == ExecutionState.CANCELING)) {
+				LOG.error(String.format("Unexpected state transition in Task: %s -> %s", current, ExecutionState.CANCELED));
+			}
+			
+			if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+				notifyObservers(ExecutionState.CANCELED, null);
+				taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.CANCELED, null);
+				return;
+			}
 		}
 	}