You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/03 17:04:09 UTC

[1/5] git commit: [FLINK-1203] Deactivate fork reuse in tests (workaround for potential surefire bug)

Repository: incubator-flink
Updated Branches:
  refs/heads/master f5898a01e -> f0fd8823e


[FLINK-1203] Deactivate fork reuse in tests (workaround for potential surefire bug)

This closes #174


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f42dcc35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f42dcc35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f42dcc35

Branch: refs/heads/master
Commit: f42dcc3573dd595c4ee11f089f7ea5d68905ebca
Parents: f5898a0
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 3 12:27:03 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:04:30 2014 +0100

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f42dcc35/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 60e87da..698f04e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,7 +76,7 @@ under the License.
 			 forkCount is not exposed as a property. With this we can set
 			 it on the "mvn" commandline in travis. -->
 		<flink.forkCount>1.5C</flink.forkCount>
-		<flink.reuseForks>true</flink.reuseForks>
+		<flink.reuseForks>false</flink.reuseForks>
 		<slf4j.version>1.7.7</slf4j.version>
 		<guava.version>17.0</guava.version>
 	</properties>


[5/5] git commit: Introduce a delay before restarts to make sure that taskmanager failures are detected before restart.

Posted by se...@apache.org.
Introduce a delay before restarts to make sure that taskmanager failures are detected before restart.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f0fd8823
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f0fd8823
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f0fd8823

Branch: refs/heads/master
Commit: f0fd8823ee8d157f9a5a00b2687827b94206d7b7
Parents: dd687bc
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 6 22:01:36 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:08:14 2014 +0100

----------------------------------------------------------------------
 .../api/common/typeutils/TypeSerializer.java      |  4 ++--
 .../flink/configuration/ConfigConstants.java      |  2 +-
 .../runtime/executiongraph/ExecutionGraph.java    | 18 ++++++++++++++++++
 .../flink/runtime/instance/InstanceManager.java   |  8 ++++++--
 .../flink/runtime/jobmanager/JobManager.java      |  8 ++++++++
 .../runtime/instance/InstanceManagerTest.java     | 12 +++++-------
 .../runtime/jobgraph/JobManagerTestUtils.java     |  2 ++
 .../flink/runtime/jobmanager/RecoveryITCase.java  |  4 ++--
 8 files changed, 44 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 87d7e20..5e32c86 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -117,7 +117,7 @@ public abstract class TypeSerializer<T> implements Serializable {
 	 * De-serializes a record from the given source input view.
 	 * 
 	 * @param source The input view from which to read the data.
-	 * @result The deserialized element.
+	 * @return The deserialized element.
 	 * 
 	 * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
 	 *                     input view, which may have an underlying I/O channel from which it reads.
@@ -129,7 +129,7 @@ public abstract class TypeSerializer<T> implements Serializable {
 	 * 
 	 * @param reuse The record instance into which to de-serialize the data.
 	 * @param source The input view from which to read the data.
-	 * @result The deserialized element.
+	 * @return The deserialized element.
 	 * 
 	 * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
 	 *                     input view, which may have an underlying I/O channel from which it reads.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 75ebe54..dcd9342 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -344,7 +344,7 @@ public final class ConfigConstants {
 	 * Default number of seconds after which a task manager is marked as failed.
 	 */
 	// 30 seconds (its enough to get to mars, should be enough to detect failure)
-	public static final int DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT = 30;
+	public static final int DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT = 30*1000;
 	
 	/**
 	 * The default network port the task manager expects incoming IPC connections. The {@code -1} means that

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9a33dbf..3df3452 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -107,6 +107,8 @@ public class ExecutionGraph {
 	
 	private int numberOfRetriesLeft;
 	
+	private long delayBeforeRetrying;
+	
 	private volatile JobStatus state = JobStatus.CREATED;
 	
 	private volatile Throwable failureCause;
@@ -159,6 +161,17 @@ public class ExecutionGraph {
 		return numberOfRetriesLeft;
 	}
 	
+	public void setDelayBeforeRetrying(long delayBeforeRetrying) {
+		if (delayBeforeRetrying < 0) {
+			throw new IllegalArgumentException("Delay before retry must be non-negative.");
+		}
+		this.delayBeforeRetrying = delayBeforeRetrying;
+	}
+	
+	public long getDelayBeforeRetrying() {
+		return delayBeforeRetrying;
+	}
+	
 	public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
@@ -428,6 +441,11 @@ public class ExecutionGraph {
 								execute(new Runnable() {
 									@Override
 									public void run() {
+										try {
+											Thread.sleep(delayBeforeRetrying);
+										} catch (InterruptedException e) {
+											// should only happen on shutdown
+										}
 										restart();
 									}
 								});

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 8127f1d..ced1afe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -77,7 +77,7 @@ public class InstanceManager {
 	 * where a task manager is still considered alive.
 	 */
 	public InstanceManager() {
-		this(1000 * GlobalConfiguration.getLong(
+		this(GlobalConfiguration.getLong(
 				ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
 				ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT));
 	}
@@ -98,6 +98,10 @@ public class InstanceManager {
 
 		new Timer(true).schedule(cleanupStaleMachines, cleanupInterval, cleanupInterval);
 	}
+	
+	public long getHeartbeatTimeout() {
+		return heartbeatTimeout;
+	}
 
 	public void shutdown() {
 		synchronized (this.lock) {
@@ -126,7 +130,7 @@ public class InstanceManager {
 		
 		synchronized (this.lock) {
 			if (this.shutdown) {
-				throw new IllegalStateException("InstanceManager is shut down.");
+				return false;
 			}
 			
 			Instance host = registeredHostsById.get(instanceId);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 5a32244..4bce4a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -142,6 +142,8 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	
 	private final int defaultExecutionRetries;
 	
+	private final long delayBetweenRetries;
+	
 	private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
 	
 	private volatile boolean isShutDown;
@@ -179,6 +181,11 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 		this.defaultExecutionRetries = GlobalConfiguration.getInteger(
 			ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES);
 
+		// delay between retries should be one heartbeat timeout
+		this.delayBetweenRetries = 2 * GlobalConfiguration.getLong(
+				ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
+				ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT);
+		
 		// Load the job progress collector
 		this.eventCollector = new EventCollector(this.recommendedClientPollingInterval);
 
@@ -334,6 +341,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 
 				executionGraph.setNumberOfRetriesLeft(job.getNumberOfExecutionRetries() >= 0 ?
 						job.getNumberOfExecutionRetries() : this.defaultExecutionRetries);
+				executionGraph.setDelayBeforeRetrying(this.delayBetweenRetries);
 
 				ExecutionGraph previous = this.currentJobs.putIfAbsent(job.getJobID(), executionGraph);
 				if (previous != null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index 34d86b4..ecdf891 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -43,6 +43,10 @@ public class InstanceManagerTest {
 		try {
 			InstanceManager cm = new InstanceManager();
 			
+			// catches error that some parts assumed config values in seconds, others in
+			// milliseconds by verifying that the timeout is not larger than 2 minutes.
+			assertTrue(cm.getHeartbeatTimeout() < 2 * 60 * 1000);
+			
 			final int ipcPort = 10000;
 			final int dataPort = 20000;
 
@@ -182,13 +186,7 @@ public class InstanceManagerTest {
 				// expected
 			}
 			
-			try {
-				cm.reportHeartBeat(new InstanceID());
-				fail("Should raise exception in shutdown state");
-			}
-			catch (IllegalStateException e) {
-				// expected
-			}
+			assertFalse(cm.reportHeartBeat(new InstanceID()));
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
index 168c454..cc767c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -46,6 +46,8 @@ public class JobManagerTestUtils {
 		cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
 		cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
 		cfg.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+		cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 500);
+		cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 2000);
 		
 		if (additionalParams != null) {
 			cfg.addAll(additionalParams);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
index 0b8518f..9e259f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
@@ -200,8 +200,8 @@ public class RecoveryITCase {
 			
 			// make sure we have fast heartbeats and failure detection
 			Configuration cfg = new Configuration();
-			cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 2000);
-			cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 500);
+			cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 3000);
+			cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 1000);
 			
 			jm = startJobManager(2, NUM_TASKS, cfg);
 			


[3/5] git commit: Remove obsolete collection execution example. Correct remote collector format example.

Posted by se...@apache.org.
Remove obsolete collection execution example.
Correct remote collector format example.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2557832a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2557832a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2557832a

Branch: refs/heads/master
Commit: 2557832af50e5d7bb479d568b370bfdd96b54fef
Parents: a747b61
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Oct 5 21:02:33 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:08:13 2014 +0100

----------------------------------------------------------------------
 .../CollectionExecutionExample.java             |  48 --------
 .../RemoteCollectorOutputFormatExample.java     | 114 +++++++++++++++++++
 .../RemoteCollectorOutputFormatExample.java     | 114 -------------------
 3 files changed, 114 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2557832a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/environments/CollectionExecutionExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/environments/CollectionExecutionExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/environments/CollectionExecutionExample.java
deleted file mode 100644
index 1ce3e7a..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/environments/CollectionExecutionExample.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.environments;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.CollectionEnvironment;
-
-/**
- * This example shows how to use Flink's collection execution functionality.
- * Collection-based execution is an extremely lightweight, non-parallel way to
- * execute programs on small data: The programs are s
- * 
- * Because this method of execution spawns no background threads, managed memory, 
- * coordinator, or parallel worker, it has a minimal execution footprint. 
- */
-public class CollectionExecutionExample {
-
-	public static void main(String[] args) throws Exception {
-		
-		CollectionEnvironment env = new CollectionEnvironment();
-		
-		env.fromElements("A", "B", "C", "D")
-			.map(new MapFunction<String, String>() {
-				public String map(String value) {
-					return value + " " + 1;
-				};
-			})
-		.print();
-		
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2557832a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
new file mode 100644
index 0000000..f524718
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.misc;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.io.RemoteCollectorConsumer;
+import org.apache.flink.api.java.io.RemoteCollectorImpl;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence
+ * histogram over some sample data and collects the results with an
+ * implementation of a {@link RemoteCollectorConsumer}.
+ */
+@SuppressWarnings("serial")
+public class RemoteCollectorOutputFormatExample {
+
+	public static void main(String[] args) throws Exception {
+
+		/**
+		 * We create a remote {@link ExecutionEnvironment} here, because this
+		 * OutputFormat is designed for use in a distributed setting. For local
+		 * use you should consider using the {@link LocalCollectionOutputFormat
+		 * <T>}.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("<remote>", 6124,
+				"/path/to/your/file.jar");
+
+		// get input data
+		DataSet<String> text = env.fromElements(
+				"To be, or not to be,--that is the question:--",
+				"Whether 'tis nobler in the mind to suffer",
+				"The slings and arrows of outrageous fortune",
+				"Or to take arms against a sea of troubles,");
+
+		DataSet<Tuple2<String, Integer>> counts =
+		// split up the lines in pairs (2-tuples) containing: (word,1)
+		text.flatMap(new LineSplitter())
+		// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0).aggregate(Aggregations.SUM, 1);
+
+		// emit result
+		RemoteCollectorImpl.collectLocal(counts,
+				new RemoteCollectorConsumer<Tuple2<String, Integer>>() {
+					// user defined IRemoteCollectorConsumer
+					@Override
+					public void collect(Tuple2<String, Integer> element) {
+						System.out.println("word/occurrences:" + element);
+					}
+				});
+
+		// local collection to store results in
+		Set<Tuple2<String, Integer>> collection = new HashSet<Tuple2<String, Integer>>();
+		// collect results from remote in local collection
+		RemoteCollectorImpl.collectLocal(counts, collection);
+
+		// execute program
+		env.execute("WordCount Example with RemoteCollectorOutputFormat");
+
+		System.out.println(collection);
+		
+		RemoteCollectorImpl.shutdownAll();
+	}
+
+	//
+	// User Functions
+	//
+
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a
+	 * user-defined FlatMapFunction. The function takes a line (String) and
+	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
+	 * Integer>).
+	 */
+	public static final class LineSplitter implements
+			FlatMapFunction<String, Tuple2<String, Integer>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2557832a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java
deleted file mode 100644
index 36b5c82..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.remotecollectoroutputformat;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.io.RemoteCollectorConsumer;
-import org.apache.flink.api.java.io.RemoteCollectorImpl;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over some sample data and collects the results with an
- * implementation of a {@link RemoteCollectorConsumer}.
- */
-@SuppressWarnings("serial")
-public class RemoteCollectorOutputFormatExample {
-
-	public static void main(String[] args) throws Exception {
-
-		/**
-		 * We create a remote {@link ExecutionEnvironment} here, because this
-		 * OutputFormat is designed for use in a distributed setting. For local
-		 * use you should consider using the {@link LocalCollectionOutputFormat
-		 * <T>}.
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("<remote>", 6124,
-				"/path/to/your/file.jar");
-
-		// get input data
-		DataSet<String> text = env.fromElements(
-				"To be, or not to be,--that is the question:--",
-				"Whether 'tis nobler in the mind to suffer",
-				"The slings and arrows of outrageous fortune",
-				"Or to take arms against a sea of troubles,");
-
-		DataSet<Tuple2<String, Integer>> counts =
-		// split up the lines in pairs (2-tuples) containing: (word,1)
-		text.flatMap(new LineSplitter())
-		// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0).aggregate(Aggregations.SUM, 1);
-
-		// emit result
-		RemoteCollectorImpl.collectLocal(counts,
-				new RemoteCollectorConsumer<Tuple2<String, Integer>>() {
-					// user defined IRemoteCollectorConsumer
-					@Override
-					public void collect(Tuple2<String, Integer> element) {
-						System.out.println("word/occurrences:" + element);
-					}
-				});
-
-		// local collection to store results in
-		Set<Tuple2<String, Integer>> collection = new HashSet<Tuple2<String, Integer>>();
-		// collect results from remote in local collection
-		RemoteCollectorImpl.collectLocal(counts, collection);
-
-		// execute program
-		env.execute("WordCount Example with RemoteCollectorOutputFormat");
-
-		System.out.println(collection);
-		
-		RemoteCollectorImpl.shutdownAll();
-	}
-
-	//
-	// User Functions
-	//
-
-	/**
-	 * Implements the string tokenizer that splits sentences into words as a
-	 * user-defined FlatMapFunction. The function takes a line (String) and
-	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
-	 * Integer>).
-	 */
-	public static final class LineSplitter implements
-			FlatMapFunction<String, Tuple2<String, Integer>> {
-
-		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
-}
\ No newline at end of file


[4/5] git commit: Implement coarse-grained fault tolerance

Posted by se...@apache.org.
Implement coarse-grained fault tolerance


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/dd687bc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/dd687bc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/dd687bc6

Branch: refs/heads/master
Commit: dd687bc6729d9539e05db9761e22a2aadc707341
Parents: 2557832
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Oct 5 20:48:56 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:08:14 2014 +0100

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java |   1 +
 .../java/org/apache/flink/api/common/Plan.java  |  30 +++
 .../flink/configuration/ConfigConstants.java    |  12 +-
 .../flink/api/java/ExecutionEnvironment.java    |  28 +++
 .../flink/runtime/executiongraph/Execution.java |  24 +-
 .../runtime/executiongraph/ExecutionEdge.java   |  18 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  75 +++++-
 .../executiongraph/ExecutionJobVertex.java      |  54 +++-
 .../runtime/executiongraph/ExecutionVertex.java |  17 +-
 .../apache/flink/runtime/instance/Instance.java |   4 +
 .../runtime/io/network/ChannelManager.java      |  16 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  31 +++
 .../flink/runtime/jobgraph/JobStatus.java       |   5 +-
 .../flink/runtime/jobmanager/JobManager.java    |  15 +-
 .../scheduler/CoLocationConstraint.java         |   6 +
 .../jobmanager/scheduler/CoLocationGroup.java   |  11 +-
 .../jobmanager/scheduler/SlotSharingGroup.java  |   9 +
 .../BlobLibraryCacheManagerTest.java            |  15 +-
 .../ExecutionGraphDeploymentTest.java           |   3 -
 .../ExecutionGraphRestartTest.java              | 127 ++++++++++
 .../executiongraph/ExecutionGraphTestUtils.java |   6 +-
 .../ExecutionStateProgressTest.java             |   3 -
 .../ExecutionVertexCancelTest.java              |   3 -
 .../ExecutionVertexDeploymentTest.java          |   4 -
 .../runtime/jobgraph/JobManagerTestUtils.java   |   8 +
 .../jobmanager/CoLocationConstraintITCase.java  |   4 -
 .../runtime/jobmanager/JobManagerITCase.java    |   4 -
 .../runtime/jobmanager/RecoveryITCase.java      | 247 +++++++++++++++++++
 .../runtime/jobmanager/SlotSharingITCase.java   |   4 -
 .../jobmanager/TaskManagerFailsITCase.java      |   3 -
 .../jobmanager/tasks/ReceiverBlockingOnce.java  |  52 ++++
 .../jobmanager/tasks/ReceiverFailingOnce.java   |  50 ++++
 .../flink/api/scala/ExecutionEnvironment.scala  |  16 ++
 33 files changed, 828 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index 3dd9685..d5f9b94 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -183,6 +183,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		
 		// create the jobgraph object
 		JobGraph graph = new JobGraph(program.getJobName());
+		graph.setNumberOfExecutionRetries(program.getOriginalPactPlan().getNumberOfExecutionRetries());
 		graph.setAllowQueuedScheduling(false);
 		
 		// add vertices to the graph

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index f7e93b4..f299ef4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -67,6 +67,11 @@ public class Plan implements Visitable<Operator<?>> {
 	 * The default parallelism to use for nodes that have no explicitly specified parallelism.
 	 */
 	protected int defaultParallelism = DEFAULT_PARALELLISM;
+	
+	/**
+	 * The number of times failed tasks are re-executed.
+	 */
+	protected int numberOfExecutionRetries;
 
 	/**
 	 * Hash map for files in the distributed cache: registered name to cache entry.
@@ -259,6 +264,31 @@ public class Plan implements Visitable<Operator<?>> {
 	}
 	
 	/**
+	 * Sets the number of times that failed tasks are re-executed. A value of zero
+	 * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+	 * default value (as defined in the configuration) should be used.
+	 * 
+	 * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
+	 */
+	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+		if (numberOfExecutionRetries < -1) {
+			throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+		}
+		this.numberOfExecutionRetries = numberOfExecutionRetries;
+	}
+	
+	/**
+	 * Gets the number of times the system will try to re-execute failed tasks. A value
+	 * of {@code -1} indicates that the system default value (as defined in the configuration)
+	 * should be used.
+	 * 
+	 * @return The number of times the system will try to re-execute failed tasks.
+	 */
+	public int getNumberOfExecutionRetries() {
+		return numberOfExecutionRetries;
+	}
+	
+	/**
 	 * Gets the optimizer post-pass class for this job. The post-pass typically creates utility classes
 	 * for data types and is specific to a particular data model (record, tuple, Scala, ...)
 	 *

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 05b7047..75ebe54 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.configuration;
 
 /**
@@ -36,6 +35,12 @@ public final class ConfigConstants {
 	 */
 	public static final String DEFAULT_PARALLELIZATION_DEGREE_KEY = "parallelization.degree.default";
 	
+	/**
+	 * Config parameter for the number of re-tries for failed tasks. Setting this
+	 * value to 0 effectively disables fault tolerance.
+	 */
+	public static final String DEFAULT_EXECUTION_RETRIES_KEY = "execution-retries.default";
+	
 	// -------------------------------- Runtime -------------------------------
 
 	/**
@@ -313,6 +318,11 @@ public final class ConfigConstants {
 	 */
 	public static final int DEFAULT_PARALLELIZATION_DEGREE = 1;
 	
+	/**
+	 * The default number of execution retries.
+	 */
+	public static final int DEFAULT_EXECUTION_RETRIES = 0;
+	
 	// ------------------------------ Runtime ---------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 54e36c0..6b95ad8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -96,6 +96,8 @@ public abstract class ExecutionEnvironment {
 
 	private int degreeOfParallelism = -1;
 	
+	private int numberOfExecutionRetries = -1;
+	
 	
 	// --------------------------------------------------------------------------------------------
 	//  Constructor and Properties
@@ -144,6 +146,31 @@ public abstract class ExecutionEnvironment {
 	}
 	
 	/**
+	 * Sets the number of times that failed tasks are re-executed. A value of zero
+	 * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+	 * default value (as defined in the configuration) should be used.
+	 * 
+	 * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
+	 */
+	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+		if (numberOfExecutionRetries < -1) {
+			throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+		}
+		this.numberOfExecutionRetries = numberOfExecutionRetries;
+	}
+	
+	/**
+	 * Gets the number of times the system will try to re-execute failed tasks. A value
+	 * of {@code -1} indicates that the system default value (as defined in the configuration)
+	 * should be used.
+	 * 
+	 * @return The number of times the system will try to re-execute failed tasks.
+	 */
+	public int getNumberOfExecutionRetries() {
+		return numberOfExecutionRetries;
+	}
+	
+	/**
 	 * Gets the UUID by which this environment is identified. The UUID sets the execution context
 	 * in the cluster or local environment.
 	 *
@@ -652,6 +679,7 @@ public abstract class ExecutionEnvironment {
 		if (getDegreeOfParallelism() > 0) {
 			plan.setDefaultParallelism(getDegreeOfParallelism());
 		}
+		plan.setNumberOfExecutionRetries(this.numberOfExecutionRetries);
 		
 		try {
 			registerCachedFilesWithPlan(plan);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 2f881d7..3cc6b02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -391,13 +391,13 @@ public class Execution {
 			
 				if (transitionState(current, FINISHED)) {
 					try {
-						vertex.executionFinished();
-						return;
+						assignedResource.releaseSlot();
+						vertex.getExecutionGraph().deregisterExecution(this);
 					}
 					finally {
-						vertex.getExecutionGraph().deregisterExecution(this);
-						assignedResource.releaseSlot();
+						vertex.executionFinished();
 					}
+					return;
 				}
 			}
 			else if (current == CANCELING) {
@@ -433,14 +433,14 @@ public class Execution {
 			if (current == CANCELED) {
 				return;
 			}
-			else if (current == CANCELING || current == RUNNING) {
+			else if (current == CANCELING || current == RUNNING || current == DEPLOYING) {
 				if (transitionState(current, CANCELED)) {
 					try {
-						vertex.executionCanceled();
+						assignedResource.releaseSlot();
+						vertex.getExecutionGraph().deregisterExecution(this);
 					}
 					finally {
-						vertex.getExecutionGraph().deregisterExecution(this);
-						assignedResource.releaseSlot();
+						vertex.executionCanceled();
 					}
 					return;
 				}
@@ -493,13 +493,13 @@ public class Execution {
 				this.failureCause = t;
 				
 				try {
-					vertex.getExecutionGraph().deregisterExecution(this);
-					vertex.executionFailed(t);
-				}
-				finally {
 					if (assignedResource != null) {
 						assignedResource.releaseSlot();
 					}
+					vertex.getExecutionGraph().deregisterExecution(this);
+				}
+				finally {
+					vertex.executionFailed(t);
 				}
 				
 				if (!isCallback && (current == RUNNING || current == DEPLOYING)) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
index 918a0ca..92ca394 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
@@ -28,9 +28,9 @@ public class ExecutionEdge {
 	
 	private final int inputNum;
 
-	private final ChannelID inputChannelId;
+	private ChannelID inputChannelId;
 	
-	private final ChannelID outputChannelId;
+	private ChannelID outputChannelId;
 	
 	
 	public ExecutionEdge(IntermediateResultPartition source, ExecutionVertex target, int inputNum) {
@@ -42,15 +42,6 @@ public class ExecutionEdge {
 		this.outputChannelId = new ChannelID();
 	}
 	
-	public ExecutionEdge(IntermediateResultPartition source, ExecutionVertex target, int inputNum, ChannelID inputChannelId, ChannelID outputChannelId) {
-		this.source = source;
-		this.target = target;
-		this.inputNum = inputNum;
-		
-		this.inputChannelId = inputChannelId;
-		this.outputChannelId = outputChannelId;
-	}
-	
 	
 	public IntermediateResultPartition getSource() {
 		return source;
@@ -71,4 +62,9 @@ public class ExecutionEdge {
 	public ChannelID getOutputChannelId() {
 		return outputChannelId;
 	}
+	
+	public void assignNewChannelIDs() {
+		inputChannelId = new ChannelID();
+		outputChannelId = new ChannelID();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 1954d70..9a33dbf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -106,6 +105,8 @@ public class ExecutionGraph {
 	
 	private int nextVertexToFinish;
 	
+	private int numberOfRetriesLeft;
+	
 	private volatile JobStatus state = JobStatus.CREATED;
 	
 	private volatile Throwable failureCause;
@@ -147,6 +148,17 @@ public class ExecutionGraph {
 
 	// --------------------------------------------------------------------------------------------
 	
+	public void setNumberOfRetriesLeft(int numberOfRetriesLeft) {
+		if (numberOfRetriesLeft < -1) {
+			throw new IllegalArgumentException();
+		}
+		this.numberOfRetriesLeft = numberOfRetriesLeft;
+	}
+	
+	public int getNumberOfRetriesLeft() {
+		return numberOfRetriesLeft;
+	}
+	
 	public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
@@ -344,8 +356,14 @@ public class ExecutionGraph {
 	
 	public void waitForJobEnd(long timeout) throws InterruptedException {
 		synchronized (progressLock) {
-			while (nextVertexToFinish < verticesInCreationOrder.size()) {
-				progressLock.wait(timeout);
+			
+			long now = System.currentTimeMillis();
+			long deadline = timeout == 0 ? Long.MAX_VALUE : now + timeout;
+			
+			
+			while (now < deadline && !state.isTerminalState()) {
+				progressLock.wait(deadline - now);
+				now = System.currentTimeMillis();
 			}
 		}
 	}
@@ -403,8 +421,21 @@ public class ExecutionGraph {
 						if (current == JobStatus.CANCELLING && transitionState(current, JobStatus.CANCELED)) {
 							break;
 						}
-						if (current == JobStatus.FAILING && transitionState(current, JobStatus.FAILED, failureCause)) {
-							break;
+						if (current == JobStatus.FAILING) {
+							if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) {
+								numberOfRetriesLeft--;
+								
+								execute(new Runnable() {
+									@Override
+									public void run() {
+										restart();
+									}
+								});
+								break;
+							}
+							else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) {
+								break;
+							}
 						}
 						if (current == JobStatus.CANCELED || current == JobStatus.CREATED || current == JobStatus.FINISHED) {
 							fail(new Exception("ExecutionGraph went into final state from state " + current));
@@ -659,4 +690,38 @@ public class ExecutionGraph {
 			action.run();
 		}
 	}
+	
+	public void restart() {
+		try {
+			if (state == JobStatus.FAILED) {
+				transitionState(JobStatus.FAILED, JobStatus.RESTARTING);
+			}
+			synchronized (progressLock) {
+				if (state != JobStatus.RESTARTING) {
+					throw new IllegalStateException("Can only restart job from state restarting.");
+				}
+				if (scheduler == null) {
+					throw new IllegalStateException("The execution graph has not been schedudled before - scheduler is null.");
+				}
+				
+				this.currentExecutions.clear();
+				this.edges.clear();
+				
+				for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
+					jv.resetForNewExecution();
+				}
+				
+				for (int i = 0; i < stateTimestamps.length; i++) {
+					stateTimestamps[i] = 0;
+				}
+				nextVertexToFinish = 0;
+				transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
+			}
+			
+			scheduleForExecution(scheduler);
+		}
+		catch (Throwable t) {
+			fail(t);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 37a1893..73534f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -56,19 +56,20 @@ public class ExecutionJobVertex {
 	
 	private final List<IntermediateResult> inputs;
 	
-	private final InputSplitAssigner splitAssigner;
-	
 	private final int parallelism;
 	
 	private final boolean[] finishedSubtasks;
 			
 	private volatile int numSubtasksInFinalState;
 	
-	
 	private final SlotSharingGroup slotSharingGroup;
 	
 	private final CoLocationGroup coLocationGroup;
 	
+	private final InputSplit[] inputSplits;
+	
+	private InputSplitAssigner splitAssigner;
+	
 	
 	public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex, int defaultParallelism) throws JobException {
 		this(graph, jobVertex, defaultParallelism, System.currentTimeMillis());
@@ -126,9 +127,10 @@ public class ExecutionJobVertex {
 			@SuppressWarnings("unchecked")
 			InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
 			if (splitSource != null) {
-				InputSplit[] splits = splitSource.createInputSplits(numTaskVertices);
-				this.splitAssigner = splitSource.getInputSplitAssigner(splits);
+				this.inputSplits = splitSource.createInputSplits(numTaskVertices);
+				this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
 			} else {
+				this.inputSplits = null;
 				this.splitAssigner = null;
 			}
 		}
@@ -259,6 +261,48 @@ public class ExecutionJobVertex {
 		}
 	}
 	
+	public void resetForNewExecution() {
+		if (!(numSubtasksInFinalState == 0 || numSubtasksInFinalState == parallelism)) {
+			throw new IllegalStateException("Cannot reset vertex that is not in final state");
+		}
+		
+		synchronized (stateMonitor) {
+			// check and reset the sharing groups with scheduler hints
+			if (slotSharingGroup != null) {
+				slotSharingGroup.clearTaskAssignment();
+			}
+			if (coLocationGroup != null) {
+				coLocationGroup.resetConstraints();
+			}
+			
+			// reset vertices one by one. if one reset fails, the "vertices in final state"
+			// fields will be consistent to handle triggered cancel calls
+			for (int i = 0; i < parallelism; i++) {
+				taskVertices[i].resetForNewExecution();
+				if (finishedSubtasks[i]) {
+					finishedSubtasks[i] = false;
+					numSubtasksInFinalState--;
+				}
+			}
+			
+			if (numSubtasksInFinalState != 0) {
+				throw new RuntimeException("Bug: resetting the execution job vertex failed.");
+			}
+			
+			// set up the input splits again
+			try {
+				if (this.inputSplits != null) {
+					@SuppressWarnings("unchecked")
+					InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
+					this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
+				}
+			}
+			catch (Throwable t) {
+				throw new RuntimeException("Re-creating the input split assigner failed: " + t.getMessage(), t);
+			}
+		}
+	}
+	
 	//---------------------------------------------------------------------------------------------
 	//  Notifications
 	//---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 3ea1afc..26dd19e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -67,7 +67,7 @@ public class ExecutionVertex {
 	
 	private final List<Execution> priorExecutions;
 	
-	private final CoLocationConstraint locationConstraint;
+	private volatile CoLocationConstraint locationConstraint;
 	
 	private volatile Execution currentExecution;	// this field must never be null
 	
@@ -316,6 +316,21 @@ public class ExecutionVertex {
 			if (state == FINISHED || state == CANCELED || state == FAILED) {
 				priorExecutions.add(execution);
 				currentExecution = new Execution(this, execution.getAttemptNumber()+1, System.currentTimeMillis());
+				
+				CoLocationGroup grp = jobVertex.getCoLocationGroup();
+				if (grp != null) {
+					this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
+				}
+				
+				// temp: assign new channel IDs.
+				ExecutionGraph graph = getExecutionGraph();
+				
+				for (ExecutionEdge[] input : this.inputEdges) {
+					for (ExecutionEdge e : input) {
+						e.assignNewChannelIDs();
+						graph.registerExecutionEdge(e);
+					}
+				}
 			}
 			else {
 				throw new IllegalStateException("Cannot reset a vertex that is in state " + state);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 88450c2..0cafcec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -157,6 +157,10 @@ public class Instance {
 	// --------------------------------------------------------------------------------------------
 	
 	public TaskOperationProtocol getTaskManagerProxy() throws IOException {
+		if (isDead) {
+			throw new IOException("Instance has died");
+		}
+		
 		TaskOperationProtocol tm = this.taskManager;
 		
 		if (tm == null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index e48f3af..5f302e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -396,7 +396,9 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 			}
 		}
 
-		this.receiverCache.put(sourceChannelID, receiverList);
+		if (channels.containsKey(sourceChannelID)) {
+			this.receiverCache.put(sourceChannelID, receiverList);
+		}
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(String.format("Receiver for %s: %s [%s])",
@@ -659,4 +661,16 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 			}
 		}
 	}
+	
+	public void verifyAllCachesEmpty() {
+		if (!channels.isEmpty()) {
+			throw new IllegalStateException("Channel manager caches not empty: There are still registered channels.");
+		}
+		if (!localBuffersPools.isEmpty()) {
+			throw new IllegalStateException("Channel manager caches not empty: There are still local buffer pools.");
+		}
+		if (!receiverCache.isEmpty()) {
+			throw new IllegalStateException("Channel manager caches not empty: There are still entries in the receiver cache.");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index c42bf92..4a8ca11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -71,6 +71,10 @@ public class JobGraph implements IOReadableWritable {
 	/** Name of this job. */
 	private String jobName;
 	
+	/** The number of times that failed tasks should be re-executed */
+	private int numExecutionRetries;
+	
+	/** flag to enable queued scheduling */
 	private boolean allowQueuedScheduling;
 	
 	// --------------------------------------------------------------------------------------------
@@ -165,6 +169,31 @@ public class JobGraph implements IOReadableWritable {
 		return this.jobConfiguration;
 	}
 	
+	/**
+	 * Sets the number of times that failed tasks are re-executed. A value of zero
+	 * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+	 * default value (as defined in the configuration) should be used.
+	 * 
+	 * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
+	 */
+	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+		if (numberOfExecutionRetries < -1) {
+			throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+		}
+		this.numExecutionRetries = numberOfExecutionRetries;
+	}
+	
+	/**
+	 * Gets the number of times the system will try to re-execute failed tasks. A value
+	 * of {@code -1} indicates that the system default value (as defined in the configuration)
+	 * should be used.
+	 * 
+	 * @return The number of times the system will try to re-execute failed tasks.
+	 */
+	public int getNumberOfExecutionRetries() {
+		return numExecutionRetries;
+	}
+	
 	public void setAllowQueuedScheduling(boolean allowQueuedScheduling) {
 		this.allowQueuedScheduling = allowQueuedScheduling;
 	}
@@ -318,6 +347,7 @@ public class JobGraph implements IOReadableWritable {
 		this.jobID.read(in);
 		this.jobName = StringValue.readString(in);
 		this.jobConfiguration.read(in);
+		this.numExecutionRetries = in.readInt();
 		this.allowQueuedScheduling = in.readBoolean();
 		
 		final int numVertices = in.readInt();
@@ -347,6 +377,7 @@ public class JobGraph implements IOReadableWritable {
 		this.jobID.write(out);
 		StringValue.writeString(this.jobName, out);
 		this.jobConfiguration.write(out);
+		out.writeInt(numExecutionRetries);
 		out.writeBoolean(allowQueuedScheduling);
 		
 		// write the task vertices using java serialization (to resolve references in the object graph)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 857d999..3722945 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -42,7 +42,10 @@ public enum JobStatus {
 	CANCELED(true),
 
 	/** All of the job's tasks have successfully finished. */
-	FINISHED(true);
+	FINISHED(true),
+	
+	/** The job is currently undergoing a reset and total restart */
+	RESTARTING(false);
 	
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index c93eee3..5a32244 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -115,8 +115,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	
 	
 	/** Executor service for asynchronous commands (to relieve the RPC threads of work) */
-	private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware
-			.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
+	private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
 	
 
 	/** The RPC end point through which the JobManager gets its calls */
@@ -140,7 +139,9 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	
 	private final int recommendedClientPollingInterval;
 	// end: these will be consolidated / removed
-
+	
+	private final int defaultExecutionRetries;
+	
 	private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
 	
 	private volatile boolean isShutDown;
@@ -173,6 +174,10 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 		// Read the suggested client polling interval
 		this.recommendedClientPollingInterval = GlobalConfiguration.getInteger(
 			ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants.DEFAULT_JOBCLIENT_POLLING_INTERVAL);
+		
+		// read the default number of times that failed tasks should be re-executed
+		this.defaultExecutionRetries = GlobalConfiguration.getInteger(
+			ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES);
 
 		// Load the job progress collector
 		this.eventCollector = new EventCollector(this.recommendedClientPollingInterval);
@@ -326,6 +331,10 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 				
 				executionGraph = new ExecutionGraph(job.getJobID(), job.getName(),
 						job.getJobConfiguration(), job.getUserJarBlobKeys(), this.executorService);
+
+				executionGraph.setNumberOfRetriesLeft(job.getNumberOfExecutionRetries() >= 0 ?
+						job.getNumberOfExecutionRetries() : this.defaultExecutionRetries);
+
 				ExecutionGraph previous = this.currentJobs.putIfAbsent(job.getJobID(), executionGraph);
 				if (previous != null) {
 					throw new JobException("Concurrent submission of a job with the same jobId: " + job.getJobID());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index f554bbb..36430de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -63,7 +63,13 @@ public class CoLocationConstraint {
 		return this.sharedSlot == null;
 	}
 	
+	public boolean isUnassignedOrDisposed() {
+		return this.sharedSlot == null || this.sharedSlot.isDisposed();
+	}
+	
 	public AbstractID getGroupId() {
 		return this.group.getId();
 	}
+	
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
index d1c3bd5..fa379cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
@@ -74,7 +74,7 @@ public class CoLocationGroup implements java.io.Serializable {
 		return constraints.get(subtask);
 	}
 	
-	public void ensureConstraints(int num) {
+	private void ensureConstraints(int num) {
 		if (constraints == null) {
 			constraints = new ArrayList<CoLocationConstraint>(num);
 		} else {
@@ -92,4 +92,13 @@ public class CoLocationGroup implements java.io.Serializable {
 	public AbstractID getId() {
 		return id;
 	}
+	
+	public void resetConstraints() {
+		for (CoLocationConstraint c : this.constraints) {
+			if (!c.isUnassignedOrDisposed()) {
+				throw new IllegalStateException("Cannot reset co-location group: some constraints still have executing vertices.");
+			}
+		}
+		this.constraints.clear();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
index c5a88f3..dcde6b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
@@ -70,6 +70,15 @@ public class SlotSharingGroup implements java.io.Serializable {
 		return this.taskAssignment;
 	}
 	
+	public void clearTaskAssignment() {
+		if (this.taskAssignment != null) {
+			if (this.taskAssignment.getNumberOfSlots() > 0) {
+				throw new IllegalStateException("SlotSharingGroup cannot clear task assignment, group still has allocated resources.");
+			}
+		}
+		this.taskAssignment = null;
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 606fff1..df32a81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -21,11 +21,9 @@ package org.apache.flink.runtime.execution.librarycache;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.junit.Test;
 import static org.junit.Assert.*;
@@ -33,7 +31,6 @@ import static org.junit.Assert.*;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -90,11 +87,15 @@ public class BlobLibraryCacheManagerTest {
 			}
 
 			assertEquals(2, caughtExceptions);
-		}catch(Exception e){
+			
+			bc.close();
+		}
+		catch(Exception e){
 			e.printStackTrace();
 			fail(e.getMessage());
-		}finally{
-			if(server != null){
+		}
+		finally{
+			if (server != null){
 				try {
 					server.shutdown();
 				} catch (IOException e) {
@@ -102,7 +103,7 @@ public class BlobLibraryCacheManagerTest {
 				}
 			}
 
-			if(libraryCacheManager != null){
+			if (libraryCacheManager != null){
 				try {
 					libraryCacheManager.shutdown();
 				} catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 74ab08b..4cddcbd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -27,17 +27,14 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.doAnswer;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
new file mode 100644
index 0000000..f1855f2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getSimpleAcknowledgingTaskmanager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable;
+import org.apache.flink.runtime.protocols.TaskOperationProtocol;
+import org.junit.Test;
+
+public class ExecutionGraphRestartTest {
+	
+	@Test
+	public void testRestartManually() {
+		final int NUM_TASKS = 31;
+		
+		try {
+			TaskOperationProtocol tm = getSimpleAcknowledgingTaskmanager();
+			Instance instance = getInstance(tm);
+			
+			Scheduler scheduler = new Scheduler();
+			scheduler.newInstanceAvailable(instance);
+			
+			// The job:
+			
+			final AbstractJobVertex sender = new AbstractJobVertex("Task");
+			sender.setInvokableClass(NoOpInvokable.class);
+			sender.setParallelism(NUM_TASKS);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender);
+			
+			ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration());
+			eg.setNumberOfRetriesLeft(0);
+			eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+			
+			assertEquals(JobStatus.CREATED, eg.getState());
+			
+			eg.scheduleForExecution(scheduler);
+			assertEquals(JobStatus.RUNNING, eg.getState());
+			
+			eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+			assertEquals(JobStatus.FAILED, eg.getState());
+			
+			eg.restart();
+			assertEquals(JobStatus.RUNNING, eg.getState());
+			
+			for (ExecutionVertex v : eg.getAllExecutionVertices()) {
+				v.executionFinished();
+			}
+			assertEquals(JobStatus.FINISHED, eg.getState());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testRestartSelf() {
+		final int NUM_TASKS = 31;
+		
+		try {
+			TaskOperationProtocol tm = getSimpleAcknowledgingTaskmanager();
+			Instance instance = getInstance(tm);
+			
+			Scheduler scheduler = new Scheduler();
+			scheduler.newInstanceAvailable(instance);
+			
+			// The job:
+			
+			final AbstractJobVertex sender = new AbstractJobVertex("Task");
+			sender.setInvokableClass(NoOpInvokable.class);
+			sender.setParallelism(NUM_TASKS);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender);
+			
+			ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration());
+			eg.setNumberOfRetriesLeft(1);
+			eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+			
+			assertEquals(JobStatus.CREATED, eg.getState());
+			
+			eg.scheduleForExecution(scheduler);
+			assertEquals(JobStatus.RUNNING, eg.getState());
+			
+			eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+			
+			// should have restarted itself
+			assertEquals(JobStatus.RUNNING, eg.getState());
+			
+			for (ExecutionVertex v : eg.getAllExecutionVertices()) {
+				v.executionFinished();
+			}
+			assertEquals(JobStatus.FINISHED, eg.getState());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 7eefa7e..30b05ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -96,11 +96,15 @@ public class ExecutionGraphTestUtils {
 	// --------------------------------------------------------------------------------------------
 	
 	public static Instance getInstance(final TaskOperationProtocol top) throws Exception {
+		return getInstance(top, 1);
+	}
+	
+	public static Instance getInstance(final TaskOperationProtocol top, int numSlots) throws Exception {
 		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
 		InetAddress address = InetAddress.getByName("127.0.0.1");
 		InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10000, 10001);
 		
-		return new Instance(connection, new InstanceID(), hardwareDescription, 1) {
+		return new Instance(connection, new InstanceID(), hardwareDescription, numSlots) {
 			@Override
 			public TaskOperationProtocol getTaskManagerProxy() {
 				return top;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index f5a4d39..2848466 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -22,12 +22,9 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.JobID;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 1f74ae3..9769529 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -26,12 +26,9 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.times;
 
 import java.io.IOException;
-import java.util.ArrayList;
 
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index efb2af4..f3081bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -28,10 +28,8 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.times;
 import static org.mockito.Matchers.any;
 
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -43,8 +41,6 @@ import org.junit.Test;
 
 import org.mockito.Matchers;
 
-import java.util.ArrayList;
-
 public class ExecutionVertexDeploymentTest {
 	
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
index 8ed7a6d..168c454 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -36,6 +36,10 @@ public class JobManagerTestUtils {
 	}
 	
 	public static final JobManager startJobManager(int numTaskManagers, int numSlotsPerTaskManager) throws Exception {
+		return startJobManager(numTaskManagers, numSlotsPerTaskManager, null);
+	}
+	
+	public static final JobManager startJobManager(int numTaskManagers, int numSlotsPerTaskManager, Configuration additionalParams) throws Exception {
 		Configuration cfg = new Configuration();
 		cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 		cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getAvailablePort());
@@ -43,6 +47,10 @@ public class JobManagerTestUtils {
 		cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
 		cfg.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
 		
+		if (additionalParams != null) {
+			cfg.addAll(additionalParams);
+		}
+		
 		GlobalConfiguration.includeConfiguration(cfg);
 		
 		JobManager jm = new JobManager(ExecutionMode.LOCAL);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
index f8b229f..23a75cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
@@ -22,10 +22,8 @@ import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.AbstractJobResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.LocalInstanceManager;
 import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
@@ -38,8 +36,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Receiver;
 import org.apache.flink.runtime.jobmanager.tasks.Sender;
 import org.junit.Test;
 
-import java.util.ArrayList;
-
 public class CoLocationConstraintITCase {
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index 0952f60..ae7857f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -24,10 +24,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.AbstractJobResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.LocalInstanceManager;
 import org.apache.flink.runtime.io.network.api.RecordReader;
@@ -47,8 +45,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Sender;
 import org.apache.flink.runtime.types.IntegerRecord;
 import org.junit.Test;
 
-import java.util.ArrayList;
-
 /**
  * This test is intended to cover the basic functionality of the {@link JobManager}.
  */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
new file mode 100644
index 0000000..0b8518f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager;
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmanager.tasks.ReceiverBlockingOnce;
+import org.apache.flink.runtime.jobmanager.tasks.ReceiverFailingOnce;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
+import org.junit.Test;
+
+/**
+ * This test is intended to cover the basic functionality of the {@link JobManager}.
+ */
+public class RecoveryITCase {
+	
+	@Test
+	public void testForwardJob() {
+		
+		ReceiverFailingOnce.resetFailedBefore();
+		
+		final int NUM_TASKS = 31;
+		
+		JobManager jm = null;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(ReceiverFailingOnce.class);
+			
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			jobGraph.setNumberOfExecutionRetries(1);
+			
+			jm = startJobManager(2 * NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
+			JobSubmissionResult result = jm.submitJob(jobGraph);
+
+			if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+				System.out.println(result.getDescription());
+			}
+			assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+			
+			// monitor the execution
+			ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+			
+			if (eg != null) {
+				eg.waitForJobEnd();
+				assertEquals(JobStatus.FINISHED, eg.getState());
+			}
+			else {
+				// already done, that was fast;
+			}
+			
+			// make sure that in any case, the network buffers are all returned
+			waitForTaskThreadsToBeTerminated();
+			assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (jm != null) {
+				jm.shutdown();
+			}
+		}
+	}
+	
+	@Test
+	public void testForwardJobWithSlotSharing() {
+		
+		ReceiverFailingOnce.resetFailedBefore();
+		
+		final int NUM_TASKS = 31;
+		
+		JobManager jm = null;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(ReceiverFailingOnce.class);
+			
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup();
+			sender.setSlotSharingGroup(sharingGroup);
+			receiver.setSlotSharingGroup(sharingGroup);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			jobGraph.setNumberOfExecutionRetries(1);
+			
+			jm = startJobManager(NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
+			JobSubmissionResult result = jm.submitJob(jobGraph);
+
+			if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+				System.out.println(result.getDescription());
+			}
+			assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+			
+			// monitor the execution
+			ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+			
+			if (eg != null) {
+				eg.waitForJobEnd();
+				assertEquals(JobStatus.FINISHED, eg.getState());
+			}
+			else {
+				// already done, that was fast;
+			}
+			
+			// make sure that in any case, the network buffers are all returned
+			waitForTaskThreadsToBeTerminated();
+			assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (jm != null) {
+				jm.shutdown();
+			}
+		}
+	}
+	
+	@Test
+	public void testRecoverTaskManagerFailure() {
+		
+		final int NUM_TASKS = 31;
+		
+		JobManager jm = null;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(ReceiverBlockingOnce.class);
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup();
+			sender.setSlotSharingGroup(sharingGroup);
+			receiver.setSlotSharingGroup(sharingGroup);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			jobGraph.setNumberOfExecutionRetries(1);
+			
+			// make sure we have fast heartbeats and failure detection
+			Configuration cfg = new Configuration();
+			cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 2000);
+			cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 500);
+			
+			jm = startJobManager(2, NUM_TASKS, cfg);
+			
+			JobSubmissionResult result = jm.submitJob(jobGraph);
+
+			if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+				System.out.println(result.getDescription());
+			}
+			assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+			
+			// monitor the execution
+			ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+			
+			// wait for a bit until all is running, make sure the second attempt does not block
+			Thread.sleep(300);
+			ReceiverBlockingOnce.setShouldNotBlock();
+			
+			// shutdown one of the taskmanagers
+			((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0].shutdown();
+			
+			// wait for the recovery to do its work
+			if (eg != null) {
+				eg.waitForJobEnd();
+				assertEquals(JobStatus.FINISHED, eg.getState());
+			}
+			else {
+				// already done, that was fast;
+			}
+			
+			// make sure that in any case, the network buffers are all returned
+			waitForTaskThreadsToBeTerminated();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (jm != null) {
+				jm.shutdown();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
index 29293da..5d79aab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
@@ -22,10 +22,8 @@ import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.AbstractJobResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.LocalInstanceManager;
 import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
@@ -39,8 +37,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Receiver;
 import org.apache.flink.runtime.jobmanager.tasks.Sender;
 import org.junit.Test;
 
-import java.util.ArrayList;
-
 public class SlotSharingITCase {
 
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
index 19ff690..6b8be15 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
@@ -22,11 +22,9 @@ import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobMana
 import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
 import static org.junit.Assert.*;
 
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.AbstractJobResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.LocalInstanceManager;
@@ -39,7 +37,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Sender;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.junit.Test;
 
-import java.util.ArrayList;
 
 public class TaskManagerFailsITCase {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
new file mode 100644
index 0000000..3425842
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class ReceiverBlockingOnce extends AbstractInvokable {
+	
+	private static boolean shouldBlock = true;
+
+	private RecordReader<IntegerRecord> reader;
+	
+	@Override
+	public void registerInputOutput() {
+		reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		if (shouldBlock) {
+			
+			Object o = new Object();
+			synchronized (o) {
+				o.wait();
+			}
+		}
+		
+		while (reader.next() != null);
+	}
+	
+	public static void setShouldNotBlock() {
+		shouldBlock = false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
new file mode 100644
index 0000000..3fad6b1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class ReceiverFailingOnce extends AbstractInvokable {
+	
+	private static boolean hasFailedBefore = false;
+
+	private RecordReader<IntegerRecord> reader;
+	
+	@Override
+	public void registerInputOutput() {
+		reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		if (!hasFailedBefore && getEnvironment().getIndexInSubtaskGroup() == 0) {
+			hasFailedBefore = true;
+			throw new Exception("Test exception");
+		}
+		
+		while (reader.next() != null);
+	}
+	
+	
+	public static void resetFailedBefore() {
+		hasFailedBefore = false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 53d57d2..ff96519 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -75,6 +75,22 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * value can be overridden by individual operations using [[DataSet.setParallelism]]
    */
   def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
+  
+  /**
+   * Sets the number of times that failed tasks are re-executed. A value of zero
+   * effectively disables fault tolerance. A value of "-1" indicates that the system
+   * default value (as defined in the configuration) should be used.
+   */
+  def setNumberOfExecutionRetries(numRetries: Int): Unit = {
+    javaEnv.setNumberOfExecutionRetries(numRetries)
+  }
+
+  /**
+   * Gets the number of times the system will try to re-execute failed tasks. A value
+   * of "-1" indicates that the system default value (as defined in the configuration)
+   * should be used.
+   */
+  def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
 
   /**
    * Gets the UUID by which this environment is identified. The UUID sets the execution context


[2/5] git commit: [FLINK-1202] Remove incomplete file outputs on failure

Posted by se...@apache.org.
[FLINK-1202] Remove incomplete file outputs on failure

This closes #175


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a747b614
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a747b614
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a747b614

Branch: refs/heads/master
Commit: a747b6146ed5d5766b42e6bed3c2e7a811e8d00e
Parents: f42dcc3
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 3 11:47:51 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:04:46 2014 +0100

----------------------------------------------------------------------
 .../api/common/io/CleanupWhenUnsuccessful.java  | 32 +++++++++++++++++
 .../flink/api/common/io/FileOutputFormat.java   | 37 +++++++++++++++-----
 .../flink/runtime/operators/DataSinkTask.java   | 26 ++++++++++++++
 .../runtime/operators/DataSinkTaskTest.java     |  7 ++--
 4 files changed, 90 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java b/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java
new file mode 100644
index 0000000..4b912e1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+/**
+ * {@link OutputFormat}s may implement this interface to run a cleanup hook when the execution is not successful.
+ */
+public interface CleanupWhenUnsuccessful {
+	
+	/**
+	 * Hook that is called upon an unsuccessful execution.
+	 * 
+	 * @throws Exception The method may forward exceptions when the cleanup fails.
+	 */
+	void tryCleanupOnError() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index a9beddb..bc7ab73 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.io;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 
 import org.slf4j.Logger;
@@ -35,7 +36,7 @@ import org.apache.flink.core.fs.FileSystem.WriteMode;
  * The abstract base class for all output formats that are file based. Contains the logic to open/close the target
  * file streams.
  */
-public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, InitializeOnMaster {
+public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, InitializeOnMaster, CleanupWhenUnsuccessful {
 	
 	private static final long serialVersionUID = 1L;
 
@@ -106,10 +107,14 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali
 	
 	// --------------------------------------------------------------------------------------------
 	
-	/**
-	 * The stream to which the data is written;
-	 */
+	/** The stream to which the data is written; */
 	protected transient FSDataOutputStream stream;
+	
+	/** The path that is actually written to (may a a file in a the directory defined by {@code outputFilePath} ) */
+	private transient Path actualFilePath;
+	
+	/** Flag indicating whether this format actually created a file, which should be removed on cleanup. */
+	private transient boolean fileCreated;
 
 	// --------------------------------------------------------------------------------------------
 	
@@ -231,12 +236,13 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali
 			
 			
 		// Suffix the path with the parallel instance index, if needed
-		if (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) {
-			p = p.suffix("/" + (taskNumber+1));
-		}
+		this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix("/" + (taskNumber+1)) : p;
 
 		// create output file
-		this.stream = fs.create(p, writeMode == WriteMode.OVERWRITE);
+		this.stream = fs.create(this.actualFilePath, writeMode == WriteMode.OVERWRITE);
+		
+		// at this point, the file creation must have succeeded, or an exception has been thrown
+		this.fileCreated = true;
 	}
 
 	@Override
@@ -283,6 +289,21 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali
 		}
 	}
 	
+	@Override
+	public void tryCleanupOnError() {
+		if (this.fileCreated) {
+			this.fileCreated = false;
+			
+			try {
+				FileSystem.get(this.actualFilePath.toUri()).delete(actualFilePath, false);
+			} catch (FileNotFoundException e) {
+				// ignore, may not be visible yet or may be already removed
+			} catch (Throwable t) {
+				LOG.error("Could not remove the incomplete file " + actualFilePath);
+			}
+		}
+	}
+	
 	// ============================================================================================
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 1a378b2..b1185c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -78,6 +79,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 	// cancel flag
 	private volatile boolean taskCanceled;
 	
+	private volatile boolean cleanupCalled;
+	
 
 	@Override
 	public void registerInputOutput() {
@@ -180,6 +183,18 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 			}
 		}
 		catch (Exception ex) {
+			
+			// make a best effort to clean up
+			try {
+				if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
+					cleanupCalled = true;
+					((CleanupWhenUnsuccessful) format).tryCleanupOnError();
+				}
+			}
+			catch (Throwable t) {
+				LOG.error("Cleanup on error failed.", t);
+			}
+			
 			ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
 
 			if (ex instanceof CancelTaskException) {
@@ -237,6 +252,17 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 			try {
 				this.format.close();
 			} catch (Throwable t) {}
+			
+			// make a best effort to clean up
+			try {
+				if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
+					cleanupCalled = true;
+					((CleanupWhenUnsuccessful) format).tryCleanupOnError();
+				}
+			}
+			catch (Throwable t) {
+				LOG.error("Cleanup on error failed.", t);
+			}
 		}
 		
 		if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index f5381e4..3219a21 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -306,7 +306,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		
 		// assert that temp file was created
 		File tempTestFile = new File(this.tempTestPath);
-		Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
+		Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
 		
 	}
 	
@@ -347,7 +347,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		
 		// assert that temp file was created
 		File tempTestFile = new File(this.tempTestPath);
-		Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
+		Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
 		
 	}
 	
@@ -388,8 +388,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		
 		// assert that temp file was created
 		File tempTestFile = new File(this.tempTestPath);
-		Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
-				
+		Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
 	}
 	
 	@Test