You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/03 17:04:12 UTC

[4/5] git commit: Implement coarse-grained fault tolerance

Implement coarse-grained fault tolerance


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/dd687bc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/dd687bc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/dd687bc6

Branch: refs/heads/master
Commit: dd687bc6729d9539e05db9761e22a2aadc707341
Parents: 2557832
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Oct 5 20:48:56 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:08:14 2014 +0100

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java |   1 +
 .../java/org/apache/flink/api/common/Plan.java  |  30 +++
 .../flink/configuration/ConfigConstants.java    |  12 +-
 .../flink/api/java/ExecutionEnvironment.java    |  28 +++
 .../flink/runtime/executiongraph/Execution.java |  24 +-
 .../runtime/executiongraph/ExecutionEdge.java   |  18 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  75 +++++-
 .../executiongraph/ExecutionJobVertex.java      |  54 +++-
 .../runtime/executiongraph/ExecutionVertex.java |  17 +-
 .../apache/flink/runtime/instance/Instance.java |   4 +
 .../runtime/io/network/ChannelManager.java      |  16 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  31 +++
 .../flink/runtime/jobgraph/JobStatus.java       |   5 +-
 .../flink/runtime/jobmanager/JobManager.java    |  15 +-
 .../scheduler/CoLocationConstraint.java         |   6 +
 .../jobmanager/scheduler/CoLocationGroup.java   |  11 +-
 .../jobmanager/scheduler/SlotSharingGroup.java  |   9 +
 .../BlobLibraryCacheManagerTest.java            |  15 +-
 .../ExecutionGraphDeploymentTest.java           |   3 -
 .../ExecutionGraphRestartTest.java              | 127 ++++++++++
 .../executiongraph/ExecutionGraphTestUtils.java |   6 +-
 .../ExecutionStateProgressTest.java             |   3 -
 .../ExecutionVertexCancelTest.java              |   3 -
 .../ExecutionVertexDeploymentTest.java          |   4 -
 .../runtime/jobgraph/JobManagerTestUtils.java   |   8 +
 .../jobmanager/CoLocationConstraintITCase.java  |   4 -
 .../runtime/jobmanager/JobManagerITCase.java    |   4 -
 .../runtime/jobmanager/RecoveryITCase.java      | 247 +++++++++++++++++++
 .../runtime/jobmanager/SlotSharingITCase.java   |   4 -
 .../jobmanager/TaskManagerFailsITCase.java      |   3 -
 .../jobmanager/tasks/ReceiverBlockingOnce.java  |  52 ++++
 .../jobmanager/tasks/ReceiverFailingOnce.java   |  50 ++++
 .../flink/api/scala/ExecutionEnvironment.scala  |  16 ++
 33 files changed, 828 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index 3dd9685..d5f9b94 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -183,6 +183,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		
 		// create the jobgraph object
 		JobGraph graph = new JobGraph(program.getJobName());
+		graph.setNumberOfExecutionRetries(program.getOriginalPactPlan().getNumberOfExecutionRetries());
 		graph.setAllowQueuedScheduling(false);
 		
 		// add vertices to the graph

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index f7e93b4..f299ef4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -67,6 +67,11 @@ public class Plan implements Visitable<Operator<?>> {
 	 * The default parallelism to use for nodes that have no explicitly specified parallelism.
 	 */
 	protected int defaultParallelism = DEFAULT_PARALELLISM;
+	
+	/**
+	 * The number of times failed tasks are re-executed.
+	 */
+	protected int numberOfExecutionRetries;
 
 	/**
 	 * Hash map for files in the distributed cache: registered name to cache entry.
@@ -259,6 +264,31 @@ public class Plan implements Visitable<Operator<?>> {
 	}
 	
 	/**
+	 * Sets the number of times that failed tasks are re-executed. A value of zero
+	 * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+	 * default value (as defined in the configuration) should be used.
+	 * 
+	 * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
+	 */
+	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+		if (numberOfExecutionRetries < -1) {
+			throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+		}
+		this.numberOfExecutionRetries = numberOfExecutionRetries;
+	}
+	
+	/**
+	 * Gets the number of times the system will try to re-execute failed tasks. A value
+	 * of {@code -1} indicates that the system default value (as defined in the configuration)
+	 * should be used.
+	 * 
+	 * @return The number of times the system will try to re-execute failed tasks.
+	 */
+	public int getNumberOfExecutionRetries() {
+		return numberOfExecutionRetries;
+	}
+	
+	/**
 	 * Gets the optimizer post-pass class for this job. The post-pass typically creates utility classes
 	 * for data types and is specific to a particular data model (record, tuple, Scala, ...)
 	 *

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 05b7047..75ebe54 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.configuration;
 
 /**
@@ -36,6 +35,12 @@ public final class ConfigConstants {
 	 */
 	public static final String DEFAULT_PARALLELIZATION_DEGREE_KEY = "parallelization.degree.default";
 	
+	/**
+	 * Config parameter for the number of re-tries for failed tasks. Setting this
+	 * value to 0 effectively disables fault tolerance.
+	 */
+	public static final String DEFAULT_EXECUTION_RETRIES_KEY = "execution-retries.default";
+	
 	// -------------------------------- Runtime -------------------------------
 
 	/**
@@ -313,6 +318,11 @@ public final class ConfigConstants {
 	 */
 	public static final int DEFAULT_PARALLELIZATION_DEGREE = 1;
 	
+	/**
+	 * The default number of execution retries.
+	 */
+	public static final int DEFAULT_EXECUTION_RETRIES = 0;
+	
 	// ------------------------------ Runtime ---------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 54e36c0..6b95ad8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -96,6 +96,8 @@ public abstract class ExecutionEnvironment {
 
 	private int degreeOfParallelism = -1;
 	
+	private int numberOfExecutionRetries = -1;
+	
 	
 	// --------------------------------------------------------------------------------------------
 	//  Constructor and Properties
@@ -144,6 +146,31 @@ public abstract class ExecutionEnvironment {
 	}
 	
 	/**
+	 * Sets the number of times that failed tasks are re-executed. A value of zero
+	 * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+	 * default value (as defined in the configuration) should be used.
+	 * 
+	 * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
+	 */
+	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+		if (numberOfExecutionRetries < -1) {
+			throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+		}
+		this.numberOfExecutionRetries = numberOfExecutionRetries;
+	}
+	
+	/**
+	 * Gets the number of times the system will try to re-execute failed tasks. A value
+	 * of {@code -1} indicates that the system default value (as defined in the configuration)
+	 * should be used.
+	 * 
+	 * @return The number of times the system will try to re-execute failed tasks.
+	 */
+	public int getNumberOfExecutionRetries() {
+		return numberOfExecutionRetries;
+	}
+	
+	/**
 	 * Gets the UUID by which this environment is identified. The UUID sets the execution context
 	 * in the cluster or local environment.
 	 *
@@ -652,6 +679,7 @@ public abstract class ExecutionEnvironment {
 		if (getDegreeOfParallelism() > 0) {
 			plan.setDefaultParallelism(getDegreeOfParallelism());
 		}
+		plan.setNumberOfExecutionRetries(this.numberOfExecutionRetries);
 		
 		try {
 			registerCachedFilesWithPlan(plan);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 2f881d7..3cc6b02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -391,13 +391,13 @@ public class Execution {
 			
 				if (transitionState(current, FINISHED)) {
 					try {
-						vertex.executionFinished();
-						return;
+						assignedResource.releaseSlot();
+						vertex.getExecutionGraph().deregisterExecution(this);
 					}
 					finally {
-						vertex.getExecutionGraph().deregisterExecution(this);
-						assignedResource.releaseSlot();
+						vertex.executionFinished();
 					}
+					return;
 				}
 			}
 			else if (current == CANCELING) {
@@ -433,14 +433,14 @@ public class Execution {
 			if (current == CANCELED) {
 				return;
 			}
-			else if (current == CANCELING || current == RUNNING) {
+			else if (current == CANCELING || current == RUNNING || current == DEPLOYING) {
 				if (transitionState(current, CANCELED)) {
 					try {
-						vertex.executionCanceled();
+						assignedResource.releaseSlot();
+						vertex.getExecutionGraph().deregisterExecution(this);
 					}
 					finally {
-						vertex.getExecutionGraph().deregisterExecution(this);
-						assignedResource.releaseSlot();
+						vertex.executionCanceled();
 					}
 					return;
 				}
@@ -493,13 +493,13 @@ public class Execution {
 				this.failureCause = t;
 				
 				try {
-					vertex.getExecutionGraph().deregisterExecution(this);
-					vertex.executionFailed(t);
-				}
-				finally {
 					if (assignedResource != null) {
 						assignedResource.releaseSlot();
 					}
+					vertex.getExecutionGraph().deregisterExecution(this);
+				}
+				finally {
+					vertex.executionFailed(t);
 				}
 				
 				if (!isCallback && (current == RUNNING || current == DEPLOYING)) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
index 918a0ca..92ca394 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
@@ -28,9 +28,9 @@ public class ExecutionEdge {
 	
 	private final int inputNum;
 
-	private final ChannelID inputChannelId;
+	private ChannelID inputChannelId;
 	
-	private final ChannelID outputChannelId;
+	private ChannelID outputChannelId;
 	
 	
 	public ExecutionEdge(IntermediateResultPartition source, ExecutionVertex target, int inputNum) {
@@ -42,15 +42,6 @@ public class ExecutionEdge {
 		this.outputChannelId = new ChannelID();
 	}
 	
-	public ExecutionEdge(IntermediateResultPartition source, ExecutionVertex target, int inputNum, ChannelID inputChannelId, ChannelID outputChannelId) {
-		this.source = source;
-		this.target = target;
-		this.inputNum = inputNum;
-		
-		this.inputChannelId = inputChannelId;
-		this.outputChannelId = outputChannelId;
-	}
-	
 	
 	public IntermediateResultPartition getSource() {
 		return source;
@@ -71,4 +62,9 @@ public class ExecutionEdge {
 	public ChannelID getOutputChannelId() {
 		return outputChannelId;
 	}
+	
+	public void assignNewChannelIDs() {
+		inputChannelId = new ChannelID();
+		outputChannelId = new ChannelID();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 1954d70..9a33dbf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -106,6 +105,8 @@ public class ExecutionGraph {
 	
 	private int nextVertexToFinish;
 	
+	private int numberOfRetriesLeft;
+	
 	private volatile JobStatus state = JobStatus.CREATED;
 	
 	private volatile Throwable failureCause;
@@ -147,6 +148,17 @@ public class ExecutionGraph {
 
 	// --------------------------------------------------------------------------------------------
 	
+	public void setNumberOfRetriesLeft(int numberOfRetriesLeft) {
+		if (numberOfRetriesLeft < -1) {
+			throw new IllegalArgumentException();
+		}
+		this.numberOfRetriesLeft = numberOfRetriesLeft;
+	}
+	
+	public int getNumberOfRetriesLeft() {
+		return numberOfRetriesLeft;
+	}
+	
 	public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
@@ -344,8 +356,14 @@ public class ExecutionGraph {
 	
 	public void waitForJobEnd(long timeout) throws InterruptedException {
 		synchronized (progressLock) {
-			while (nextVertexToFinish < verticesInCreationOrder.size()) {
-				progressLock.wait(timeout);
+			
+			long now = System.currentTimeMillis();
+			long deadline = timeout == 0 ? Long.MAX_VALUE : now + timeout;
+			
+			
+			while (now < deadline && !state.isTerminalState()) {
+				progressLock.wait(deadline - now);
+				now = System.currentTimeMillis();
 			}
 		}
 	}
@@ -403,8 +421,21 @@ public class ExecutionGraph {
 						if (current == JobStatus.CANCELLING && transitionState(current, JobStatus.CANCELED)) {
 							break;
 						}
-						if (current == JobStatus.FAILING && transitionState(current, JobStatus.FAILED, failureCause)) {
-							break;
+						if (current == JobStatus.FAILING) {
+							if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) {
+								numberOfRetriesLeft--;
+								
+								execute(new Runnable() {
+									@Override
+									public void run() {
+										restart();
+									}
+								});
+								break;
+							}
+							else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) {
+								break;
+							}
 						}
 						if (current == JobStatus.CANCELED || current == JobStatus.CREATED || current == JobStatus.FINISHED) {
 							fail(new Exception("ExecutionGraph went into final state from state " + current));
@@ -659,4 +690,38 @@ public class ExecutionGraph {
 			action.run();
 		}
 	}
+	
+	public void restart() {
+		try {
+			if (state == JobStatus.FAILED) {
+				transitionState(JobStatus.FAILED, JobStatus.RESTARTING);
+			}
+			synchronized (progressLock) {
+				if (state != JobStatus.RESTARTING) {
+					throw new IllegalStateException("Can only restart job from state restarting.");
+				}
+				if (scheduler == null) {
+					throw new IllegalStateException("The execution graph has not been schedudled before - scheduler is null.");
+				}
+				
+				this.currentExecutions.clear();
+				this.edges.clear();
+				
+				for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
+					jv.resetForNewExecution();
+				}
+				
+				for (int i = 0; i < stateTimestamps.length; i++) {
+					stateTimestamps[i] = 0;
+				}
+				nextVertexToFinish = 0;
+				transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
+			}
+			
+			scheduleForExecution(scheduler);
+		}
+		catch (Throwable t) {
+			fail(t);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 37a1893..73534f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -56,19 +56,20 @@ public class ExecutionJobVertex {
 	
 	private final List<IntermediateResult> inputs;
 	
-	private final InputSplitAssigner splitAssigner;
-	
 	private final int parallelism;
 	
 	private final boolean[] finishedSubtasks;
 			
 	private volatile int numSubtasksInFinalState;
 	
-	
 	private final SlotSharingGroup slotSharingGroup;
 	
 	private final CoLocationGroup coLocationGroup;
 	
+	private final InputSplit[] inputSplits;
+	
+	private InputSplitAssigner splitAssigner;
+	
 	
 	public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex, int defaultParallelism) throws JobException {
 		this(graph, jobVertex, defaultParallelism, System.currentTimeMillis());
@@ -126,9 +127,10 @@ public class ExecutionJobVertex {
 			@SuppressWarnings("unchecked")
 			InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
 			if (splitSource != null) {
-				InputSplit[] splits = splitSource.createInputSplits(numTaskVertices);
-				this.splitAssigner = splitSource.getInputSplitAssigner(splits);
+				this.inputSplits = splitSource.createInputSplits(numTaskVertices);
+				this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
 			} else {
+				this.inputSplits = null;
 				this.splitAssigner = null;
 			}
 		}
@@ -259,6 +261,48 @@ public class ExecutionJobVertex {
 		}
 	}
 	
+	public void resetForNewExecution() {
+		if (!(numSubtasksInFinalState == 0 || numSubtasksInFinalState == parallelism)) {
+			throw new IllegalStateException("Cannot reset vertex that is not in final state");
+		}
+		
+		synchronized (stateMonitor) {
+			// check and reset the sharing groups with scheduler hints
+			if (slotSharingGroup != null) {
+				slotSharingGroup.clearTaskAssignment();
+			}
+			if (coLocationGroup != null) {
+				coLocationGroup.resetConstraints();
+			}
+			
+			// reset vertices one by one. if one reset fails, the "vertices in final state"
+			// fields will be consistent to handle triggered cancel calls
+			for (int i = 0; i < parallelism; i++) {
+				taskVertices[i].resetForNewExecution();
+				if (finishedSubtasks[i]) {
+					finishedSubtasks[i] = false;
+					numSubtasksInFinalState--;
+				}
+			}
+			
+			if (numSubtasksInFinalState != 0) {
+				throw new RuntimeException("Bug: resetting the execution job vertex failed.");
+			}
+			
+			// set up the input splits again
+			try {
+				if (this.inputSplits != null) {
+					@SuppressWarnings("unchecked")
+					InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
+					this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
+				}
+			}
+			catch (Throwable t) {
+				throw new RuntimeException("Re-creating the input split assigner failed: " + t.getMessage(), t);
+			}
+		}
+	}
+	
 	//---------------------------------------------------------------------------------------------
 	//  Notifications
 	//---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 3ea1afc..26dd19e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -67,7 +67,7 @@ public class ExecutionVertex {
 	
 	private final List<Execution> priorExecutions;
 	
-	private final CoLocationConstraint locationConstraint;
+	private volatile CoLocationConstraint locationConstraint;
 	
 	private volatile Execution currentExecution;	// this field must never be null
 	
@@ -316,6 +316,21 @@ public class ExecutionVertex {
 			if (state == FINISHED || state == CANCELED || state == FAILED) {
 				priorExecutions.add(execution);
 				currentExecution = new Execution(this, execution.getAttemptNumber()+1, System.currentTimeMillis());
+				
+				CoLocationGroup grp = jobVertex.getCoLocationGroup();
+				if (grp != null) {
+					this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
+				}
+				
+				// temp: assign new channel IDs.
+				ExecutionGraph graph = getExecutionGraph();
+				
+				for (ExecutionEdge[] input : this.inputEdges) {
+					for (ExecutionEdge e : input) {
+						e.assignNewChannelIDs();
+						graph.registerExecutionEdge(e);
+					}
+				}
 			}
 			else {
 				throw new IllegalStateException("Cannot reset a vertex that is in state " + state);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 88450c2..0cafcec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -157,6 +157,10 @@ public class Instance {
 	// --------------------------------------------------------------------------------------------
 	
 	public TaskOperationProtocol getTaskManagerProxy() throws IOException {
+		if (isDead) {
+			throw new IOException("Instance has died");
+		}
+		
 		TaskOperationProtocol tm = this.taskManager;
 		
 		if (tm == null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index e48f3af..5f302e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -396,7 +396,9 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 			}
 		}
 
-		this.receiverCache.put(sourceChannelID, receiverList);
+		if (channels.containsKey(sourceChannelID)) {
+			this.receiverCache.put(sourceChannelID, receiverList);
+		}
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(String.format("Receiver for %s: %s [%s])",
@@ -659,4 +661,16 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 			}
 		}
 	}
+	
+	public void verifyAllCachesEmpty() {
+		if (!channels.isEmpty()) {
+			throw new IllegalStateException("Channel manager caches not empty: There are still registered channels.");
+		}
+		if (!localBuffersPools.isEmpty()) {
+			throw new IllegalStateException("Channel manager caches not empty: There are still local buffer pools.");
+		}
+		if (!receiverCache.isEmpty()) {
+			throw new IllegalStateException("Channel manager caches not empty: There are still entries in the receiver cache.");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index c42bf92..4a8ca11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -71,6 +71,10 @@ public class JobGraph implements IOReadableWritable {
 	/** Name of this job. */
 	private String jobName;
 	
+	/** The number of times that failed tasks should be re-executed */
+	private int numExecutionRetries;
+	
+	/** flag to enable queued scheduling */
 	private boolean allowQueuedScheduling;
 	
 	// --------------------------------------------------------------------------------------------
@@ -165,6 +169,31 @@ public class JobGraph implements IOReadableWritable {
 		return this.jobConfiguration;
 	}
 	
+	/**
+	 * Sets the number of times that failed tasks are re-executed. A value of zero
+	 * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+	 * default value (as defined in the configuration) should be used.
+	 * 
+	 * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
+	 */
+	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+		if (numberOfExecutionRetries < -1) {
+			throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+		}
+		this.numExecutionRetries = numberOfExecutionRetries;
+	}
+	
+	/**
+	 * Gets the number of times the system will try to re-execute failed tasks. A value
+	 * of {@code -1} indicates that the system default value (as defined in the configuration)
+	 * should be used.
+	 * 
+	 * @return The number of times the system will try to re-execute failed tasks.
+	 */
+	public int getNumberOfExecutionRetries() {
+		return numExecutionRetries;
+	}
+	
 	public void setAllowQueuedScheduling(boolean allowQueuedScheduling) {
 		this.allowQueuedScheduling = allowQueuedScheduling;
 	}
@@ -318,6 +347,7 @@ public class JobGraph implements IOReadableWritable {
 		this.jobID.read(in);
 		this.jobName = StringValue.readString(in);
 		this.jobConfiguration.read(in);
+		this.numExecutionRetries = in.readInt();
 		this.allowQueuedScheduling = in.readBoolean();
 		
 		final int numVertices = in.readInt();
@@ -347,6 +377,7 @@ public class JobGraph implements IOReadableWritable {
 		this.jobID.write(out);
 		StringValue.writeString(this.jobName, out);
 		this.jobConfiguration.write(out);
+		out.writeInt(numExecutionRetries);
 		out.writeBoolean(allowQueuedScheduling);
 		
 		// write the task vertices using java serialization (to resolve references in the object graph)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 857d999..3722945 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -42,7 +42,10 @@ public enum JobStatus {
 	CANCELED(true),
 
 	/** All of the job's tasks have successfully finished. */
-	FINISHED(true);
+	FINISHED(true),
+	
+	/** The job is currently undergoing a reset and total restart */
+	RESTARTING(false);
 	
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index c93eee3..5a32244 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -115,8 +115,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	
 	
 	/** Executor service for asynchronous commands (to relieve the RPC threads of work) */
-	private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware
-			.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
+	private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
 	
 
 	/** The RPC end point through which the JobManager gets its calls */
@@ -140,7 +139,9 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	
 	private final int recommendedClientPollingInterval;
 	// end: these will be consolidated / removed
-
+	
+	private final int defaultExecutionRetries;
+	
 	private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
 	
 	private volatile boolean isShutDown;
@@ -173,6 +174,10 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 		// Read the suggested client polling interval
 		this.recommendedClientPollingInterval = GlobalConfiguration.getInteger(
 			ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants.DEFAULT_JOBCLIENT_POLLING_INTERVAL);
+		
+		// read the default number of times that failed tasks should be re-executed
+		this.defaultExecutionRetries = GlobalConfiguration.getInteger(
+			ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES);
 
 		// Load the job progress collector
 		this.eventCollector = new EventCollector(this.recommendedClientPollingInterval);
@@ -326,6 +331,10 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 				
 				executionGraph = new ExecutionGraph(job.getJobID(), job.getName(),
 						job.getJobConfiguration(), job.getUserJarBlobKeys(), this.executorService);
+
+				executionGraph.setNumberOfRetriesLeft(job.getNumberOfExecutionRetries() >= 0 ?
+						job.getNumberOfExecutionRetries() : this.defaultExecutionRetries);
+
 				ExecutionGraph previous = this.currentJobs.putIfAbsent(job.getJobID(), executionGraph);
 				if (previous != null) {
 					throw new JobException("Concurrent submission of a job with the same jobId: " + job.getJobID());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index f554bbb..36430de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -63,7 +63,13 @@ public class CoLocationConstraint {
 		return this.sharedSlot == null;
 	}
 	
+	public boolean isUnassignedOrDisposed() {
+		return this.sharedSlot == null || this.sharedSlot.isDisposed();
+	}
+	
 	public AbstractID getGroupId() {
 		return this.group.getId();
 	}
+	
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
index d1c3bd5..fa379cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
@@ -74,7 +74,7 @@ public class CoLocationGroup implements java.io.Serializable {
 		return constraints.get(subtask);
 	}
 	
-	public void ensureConstraints(int num) {
+	private void ensureConstraints(int num) {
 		if (constraints == null) {
 			constraints = new ArrayList<CoLocationConstraint>(num);
 		} else {
@@ -92,4 +92,13 @@ public class CoLocationGroup implements java.io.Serializable {
 	public AbstractID getId() {
 		return id;
 	}
+	
+	public void resetConstraints() {
+		for (CoLocationConstraint c : this.constraints) {
+			if (!c.isUnassignedOrDisposed()) {
+				throw new IllegalStateException("Cannot reset co-location group: some constraints still have executing vertices.");
+			}
+		}
+		this.constraints.clear();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
index c5a88f3..dcde6b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
@@ -70,6 +70,15 @@ public class SlotSharingGroup implements java.io.Serializable {
 		return this.taskAssignment;
 	}
 	
+	public void clearTaskAssignment() {
+		if (this.taskAssignment != null) {
+			if (this.taskAssignment.getNumberOfSlots() > 0) {
+				throw new IllegalStateException("SlotSharingGroup cannot clear task assignment, group still has allocated resources.");
+			}
+		}
+		this.taskAssignment = null;
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 606fff1..df32a81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -21,11 +21,9 @@ package org.apache.flink.runtime.execution.librarycache;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.junit.Test;
 import static org.junit.Assert.*;
@@ -33,7 +31,6 @@ import static org.junit.Assert.*;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -90,11 +87,15 @@ public class BlobLibraryCacheManagerTest {
 			}
 
 			assertEquals(2, caughtExceptions);
-		}catch(Exception e){
+			
+			bc.close();
+		}
+		catch(Exception e){
 			e.printStackTrace();
 			fail(e.getMessage());
-		}finally{
-			if(server != null){
+		}
+		finally{
+			if (server != null){
 				try {
 					server.shutdown();
 				} catch (IOException e) {
@@ -102,7 +103,7 @@ public class BlobLibraryCacheManagerTest {
 				}
 			}
 
-			if(libraryCacheManager != null){
+			if (libraryCacheManager != null){
 				try {
 					libraryCacheManager.shutdown();
 				} catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 74ab08b..4cddcbd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -27,17 +27,14 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.doAnswer;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
new file mode 100644
index 0000000..f1855f2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getSimpleAcknowledgingTaskmanager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable;
+import org.apache.flink.runtime.protocols.TaskOperationProtocol;
+import org.junit.Test;
+
+public class ExecutionGraphRestartTest {
+	
+	@Test
+	public void testRestartManually() {
+		final int NUM_TASKS = 31;
+		
+		try {
+			TaskOperationProtocol tm = getSimpleAcknowledgingTaskmanager();
+			Instance instance = getInstance(tm);
+			
+			Scheduler scheduler = new Scheduler();
+			scheduler.newInstanceAvailable(instance);
+			
+			// The job:
+			
+			final AbstractJobVertex sender = new AbstractJobVertex("Task");
+			sender.setInvokableClass(NoOpInvokable.class);
+			sender.setParallelism(NUM_TASKS);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender);
+			
+			ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration());
+			eg.setNumberOfRetriesLeft(0);
+			eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+			
+			assertEquals(JobStatus.CREATED, eg.getState());
+			
+			eg.scheduleForExecution(scheduler);
+			assertEquals(JobStatus.RUNNING, eg.getState());
+			
+			eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+			assertEquals(JobStatus.FAILED, eg.getState());
+			
+			eg.restart();
+			assertEquals(JobStatus.RUNNING, eg.getState());
+			
+			for (ExecutionVertex v : eg.getAllExecutionVertices()) {
+				v.executionFinished();
+			}
+			assertEquals(JobStatus.FINISHED, eg.getState());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testRestartSelf() {
+		final int NUM_TASKS = 31;
+		
+		try {
+			TaskOperationProtocol tm = getSimpleAcknowledgingTaskmanager();
+			Instance instance = getInstance(tm);
+			
+			Scheduler scheduler = new Scheduler();
+			scheduler.newInstanceAvailable(instance);
+			
+			// The job:
+			
+			final AbstractJobVertex sender = new AbstractJobVertex("Task");
+			sender.setInvokableClass(NoOpInvokable.class);
+			sender.setParallelism(NUM_TASKS);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender);
+			
+			ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration());
+			eg.setNumberOfRetriesLeft(1);
+			eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+			
+			assertEquals(JobStatus.CREATED, eg.getState());
+			
+			eg.scheduleForExecution(scheduler);
+			assertEquals(JobStatus.RUNNING, eg.getState());
+			
+			eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+			
+			// should have restarted itself
+			assertEquals(JobStatus.RUNNING, eg.getState());
+			
+			for (ExecutionVertex v : eg.getAllExecutionVertices()) {
+				v.executionFinished();
+			}
+			assertEquals(JobStatus.FINISHED, eg.getState());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 7eefa7e..30b05ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -96,11 +96,15 @@ public class ExecutionGraphTestUtils {
 	// --------------------------------------------------------------------------------------------
 	
 	public static Instance getInstance(final TaskOperationProtocol top) throws Exception {
+		return getInstance(top, 1);
+	}
+	
+	public static Instance getInstance(final TaskOperationProtocol top, int numSlots) throws Exception {
 		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
 		InetAddress address = InetAddress.getByName("127.0.0.1");
 		InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10000, 10001);
 		
-		return new Instance(connection, new InstanceID(), hardwareDescription, 1) {
+		return new Instance(connection, new InstanceID(), hardwareDescription, numSlots) {
 			@Override
 			public TaskOperationProtocol getTaskManagerProxy() {
 				return top;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index f5a4d39..2848466 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -22,12 +22,9 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.JobID;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 1f74ae3..9769529 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -26,12 +26,9 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.times;
 
 import java.io.IOException;
-import java.util.ArrayList;
 
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index efb2af4..f3081bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -28,10 +28,8 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.times;
 import static org.mockito.Matchers.any;
 
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -43,8 +41,6 @@ import org.junit.Test;
 
 import org.mockito.Matchers;
 
-import java.util.ArrayList;
-
 public class ExecutionVertexDeploymentTest {
 	
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
index 8ed7a6d..168c454 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -36,6 +36,10 @@ public class JobManagerTestUtils {
 	}
 	
 	public static final JobManager startJobManager(int numTaskManagers, int numSlotsPerTaskManager) throws Exception {
+		return startJobManager(numTaskManagers, numSlotsPerTaskManager, null);
+	}
+	
+	public static final JobManager startJobManager(int numTaskManagers, int numSlotsPerTaskManager, Configuration additionalParams) throws Exception {
 		Configuration cfg = new Configuration();
 		cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 		cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getAvailablePort());
@@ -43,6 +47,10 @@ public class JobManagerTestUtils {
 		cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
 		cfg.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
 		
+		if (additionalParams != null) {
+			cfg.addAll(additionalParams);
+		}
+		
 		GlobalConfiguration.includeConfiguration(cfg);
 		
 		JobManager jm = new JobManager(ExecutionMode.LOCAL);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
index f8b229f..23a75cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
@@ -22,10 +22,8 @@ import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.AbstractJobResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.LocalInstanceManager;
 import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
@@ -38,8 +36,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Receiver;
 import org.apache.flink.runtime.jobmanager.tasks.Sender;
 import org.junit.Test;
 
-import java.util.ArrayList;
-
 public class CoLocationConstraintITCase {
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index 0952f60..ae7857f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -24,10 +24,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.AbstractJobResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.LocalInstanceManager;
 import org.apache.flink.runtime.io.network.api.RecordReader;
@@ -47,8 +45,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Sender;
 import org.apache.flink.runtime.types.IntegerRecord;
 import org.junit.Test;
 
-import java.util.ArrayList;
-
 /**
  * This test is intended to cover the basic functionality of the {@link JobManager}.
  */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
new file mode 100644
index 0000000..0b8518f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager;
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmanager.tasks.ReceiverBlockingOnce;
+import org.apache.flink.runtime.jobmanager.tasks.ReceiverFailingOnce;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
+import org.junit.Test;
+
+/**
+ * This test is intended to cover the basic functionality of the {@link JobManager}.
+ */
+public class RecoveryITCase {
+	
+	@Test
+	public void testForwardJob() {
+		
+		ReceiverFailingOnce.resetFailedBefore();
+		
+		final int NUM_TASKS = 31;
+		
+		JobManager jm = null;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(ReceiverFailingOnce.class);
+			
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			jobGraph.setNumberOfExecutionRetries(1);
+			
+			jm = startJobManager(2 * NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
+			JobSubmissionResult result = jm.submitJob(jobGraph);
+
+			if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+				System.out.println(result.getDescription());
+			}
+			assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+			
+			// monitor the execution
+			ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+			
+			if (eg != null) {
+				eg.waitForJobEnd();
+				assertEquals(JobStatus.FINISHED, eg.getState());
+			}
+			else {
+				// already done, that was fast;
+			}
+			
+			// make sure that in any case, the network buffers are all returned
+			waitForTaskThreadsToBeTerminated();
+			assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (jm != null) {
+				jm.shutdown();
+			}
+		}
+	}
+	
+	@Test
+	public void testForwardJobWithSlotSharing() {
+		
+		ReceiverFailingOnce.resetFailedBefore();
+		
+		final int NUM_TASKS = 31;
+		
+		JobManager jm = null;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(ReceiverFailingOnce.class);
+			
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup();
+			sender.setSlotSharingGroup(sharingGroup);
+			receiver.setSlotSharingGroup(sharingGroup);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			jobGraph.setNumberOfExecutionRetries(1);
+			
+			jm = startJobManager(NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
+			JobSubmissionResult result = jm.submitJob(jobGraph);
+
+			if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+				System.out.println(result.getDescription());
+			}
+			assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+			
+			// monitor the execution
+			ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+			
+			if (eg != null) {
+				eg.waitForJobEnd();
+				assertEquals(JobStatus.FINISHED, eg.getState());
+			}
+			else {
+				// already done, that was fast;
+			}
+			
+			// make sure that in any case, the network buffers are all returned
+			waitForTaskThreadsToBeTerminated();
+			assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (jm != null) {
+				jm.shutdown();
+			}
+		}
+	}
+	
+	@Test
+	public void testRecoverTaskManagerFailure() {
+		
+		final int NUM_TASKS = 31;
+		
+		JobManager jm = null;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(ReceiverBlockingOnce.class);
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup();
+			sender.setSlotSharingGroup(sharingGroup);
+			receiver.setSlotSharingGroup(sharingGroup);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			jobGraph.setNumberOfExecutionRetries(1);
+			
+			// make sure we have fast heartbeats and failure detection
+			Configuration cfg = new Configuration();
+			cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 2000);
+			cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 500);
+			
+			jm = startJobManager(2, NUM_TASKS, cfg);
+			
+			JobSubmissionResult result = jm.submitJob(jobGraph);
+
+			if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+				System.out.println(result.getDescription());
+			}
+			assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+			
+			// monitor the execution
+			ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+			
+			// wait for a bit until all is running, make sure the second attempt does not block
+			Thread.sleep(300);
+			ReceiverBlockingOnce.setShouldNotBlock();
+			
+			// shutdown one of the taskmanagers
+			((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0].shutdown();
+			
+			// wait for the recovery to do its work
+			if (eg != null) {
+				eg.waitForJobEnd();
+				assertEquals(JobStatus.FINISHED, eg.getState());
+			}
+			else {
+				// already done, that was fast;
+			}
+			
+			// make sure that in any case, the network buffers are all returned
+			waitForTaskThreadsToBeTerminated();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (jm != null) {
+				jm.shutdown();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
index 29293da..5d79aab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
@@ -22,10 +22,8 @@ import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.AbstractJobResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.LocalInstanceManager;
 import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
@@ -39,8 +37,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Receiver;
 import org.apache.flink.runtime.jobmanager.tasks.Sender;
 import org.junit.Test;
 
-import java.util.ArrayList;
-
 public class SlotSharingITCase {
 
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
index 19ff690..6b8be15 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
@@ -22,11 +22,9 @@ import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobMana
 import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
 import static org.junit.Assert.*;
 
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.AbstractJobResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.LocalInstanceManager;
@@ -39,7 +37,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Sender;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.junit.Test;
 
-import java.util.ArrayList;
 
 public class TaskManagerFailsITCase {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
new file mode 100644
index 0000000..3425842
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class ReceiverBlockingOnce extends AbstractInvokable {
+	
+	private static boolean shouldBlock = true;
+
+	private RecordReader<IntegerRecord> reader;
+	
+	@Override
+	public void registerInputOutput() {
+		reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		if (shouldBlock) {
+			
+			Object o = new Object();
+			synchronized (o) {
+				o.wait();
+			}
+		}
+		
+		while (reader.next() != null);
+	}
+	
+	public static void setShouldNotBlock() {
+		shouldBlock = false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
new file mode 100644
index 0000000..3fad6b1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class ReceiverFailingOnce extends AbstractInvokable {
+	
+	private static boolean hasFailedBefore = false;
+
+	private RecordReader<IntegerRecord> reader;
+	
+	@Override
+	public void registerInputOutput() {
+		reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		if (!hasFailedBefore && getEnvironment().getIndexInSubtaskGroup() == 0) {
+			hasFailedBefore = true;
+			throw new Exception("Test exception");
+		}
+		
+		while (reader.next() != null);
+	}
+	
+	
+	public static void resetFailedBefore() {
+		hasFailedBefore = false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 53d57d2..ff96519 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -75,6 +75,22 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * value can be overridden by individual operations using [[DataSet.setParallelism]]
    */
   def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
+  
+  /**
+   * Sets the number of times that failed tasks are re-executed. A value of zero
+   * effectively disables fault tolerance. A value of "-1" indicates that the system
+   * default value (as defined in the configuration) should be used.
+   */
+  def setNumberOfExecutionRetries(numRetries: Int): Unit = {
+    javaEnv.setNumberOfExecutionRetries(numRetries)
+  }
+
+  /**
+   * Gets the number of times the system will try to re-execute failed tasks. A value
+   * of "-1" indicates that the system default value (as defined in the configuration)
+   * should be used.
+   */
+  def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
 
   /**
    * Gets the UUID by which this environment is identified. The UUID sets the execution context