You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/04 18:37:28 UTC

[2/3] flink git commit: [tests] Add test for restart recovery

[tests] Add test for restart recovery


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/785f2041
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/785f2041
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/785f2041

Branch: refs/heads/release-0.8
Commit: 785f2041fdf8f8409e7cfcc66451b79d7bf57673
Parents: 4ff3f4c
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 16 21:40:06 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 4 17:49:59 2015 +0100

----------------------------------------------------------------------
 .../client/minicluster/NepheleMiniCluster.java  |  40 ++-
 .../client/program/AutoParallelismITCase.java   | 118 --------
 .../flink/api/java/ExecutionEnvironment.java    |   3 +
 .../runtime/executiongraph/ExecutionGraph.java  |   2 +-
 .../test/recovery/AutoParallelismITCase.java    | 122 ++++++++
 .../test/recovery/SimpleRecoveryITCase.java     | 287 +++++++++++++++++++
 6 files changed, 448 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
index a40b733..79099c7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
@@ -82,6 +82,9 @@ public class NepheleMiniCluster {
 	
 	private boolean defaultAlwaysCreateDirectory = false;
 
+	private long heartbeatInterval = ConfigConstants.DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL;
+
+	private long heartbeatTimeout = ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT;
 	
 	private JobManager jobManager;
 
@@ -163,11 +166,33 @@ public class NepheleMiniCluster {
 
 	public void setNumTaskManager(int numTaskManager) { this.numTaskManager = numTaskManager; }
 
-	public int getNumTaskManager() { return numTaskManager; }
+	public int getNumTaskManager() {
+		return numTaskManager;
+	}
 
-	public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
+	public void setTaskManagerNumSlots(int taskManagerNumSlots) {
+		this.taskManagerNumSlots = taskManagerNumSlots;
+	}
 
-	public int getTaskManagerNumSlots() { return taskManagerNumSlots; }
+	public int getTaskManagerNumSlots() {
+		return taskManagerNumSlots;
+	}
+
+	public void setHeartbeatInterval(long heartbeatInterval) {
+		this.heartbeatInterval = heartbeatInterval;
+	}
+
+	public long getHeartbeatInterval() {
+		return heartbeatInterval;
+	}
+
+	public void setHeartbeatTimeout(long heartbeatTimeout) {
+		this.heartbeatTimeout = heartbeatTimeout;
+	}
+
+	public long getHeartbeatTimeout() {
+		return heartbeatTimeout;
+	}
 
 	// ------------------------------------------------------------------------
 	// Life cycle and Job Submission
@@ -206,7 +231,8 @@ public class NepheleMiniCluster {
 			} else {
 				Configuration conf = getMiniclusterDefaultConfig(jobManagerRpcPort, taskManagerRpcPort,
 					taskManagerDataPort, memorySize, hdfsConfigFile, lazyMemoryAllocation, defaultOverwriteFiles,
-						defaultAlwaysCreateDirectory, taskManagerNumSlots, numTaskManager);
+						defaultAlwaysCreateDirectory, taskManagerNumSlots, numTaskManager,
+					heartbeatInterval, heartbeatTimeout);
 				GlobalConfiguration.includeConfiguration(conf);
 			}
 
@@ -297,7 +323,8 @@ public class NepheleMiniCluster {
 	public static Configuration getMiniclusterDefaultConfig(int jobManagerRpcPort, int taskManagerRpcPort,
 			int taskManagerDataPort, long memorySize, String hdfsConfigFile, boolean lazyMemory,
 			boolean defaultOverwriteFiles, boolean defaultAlwaysCreateDirectory,
-			int taskManagerNumSlots, int numTaskManager)
+			int taskManagerNumSlots, int numTaskManager,
+			long heartbeatInterval, long heartbeatTimeout)
 	{
 		final Configuration config = new Configuration();
 		
@@ -350,6 +377,9 @@ public class NepheleMiniCluster {
 		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManager);
 
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
+
+		config.setLong(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, heartbeatInterval);
+		config.setLong(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, heartbeatTimeout);
 		
 		return config;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java b/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
deleted file mode 100644
index c1fa888..0000000
--- a/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.program;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichMapPartitionFunction;
-import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.minicluster.NepheleMiniCluster;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * This test verifies that the auto parallelism is properly forwarded to the runtime.
- */
-public class AutoParallelismITCase {
-
-	private static final int NUM_TM = 2;
-	private static final int SLOTS_PER_TM = 7;
-	private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
-
-	@Test
-	public void testProgramWithAutoParallelism() {
-
-		NepheleMiniCluster cluster = new NepheleMiniCluster();
-		cluster.setNumTaskManager(NUM_TM);
-		cluster.setTaskManagerNumSlots(SLOTS_PER_TM);
-
-		try {
-			cluster.start();
-
-			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRpcPort());
-			env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
-
-			DataSet<Integer> result = env
-					.createInput(new ParallelismDependentInputFormat())
-					.mapPartition(new ParallelismDependentMapPartition());
-
-			List<Integer> resultCollection = new ArrayList<Integer>();
-			result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
-
-			env.execute();
-
-			assertEquals(PARALLELISM, resultCollection.size());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			try {
-				cluster.stop();
-			}
-			catch (Throwable t) {
-				// ignore exceptions on shutdown
-			}
-		}
-	}
-
-	private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
-
-		private transient boolean emitted;
-
-		@Override
-		public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
-			assertEquals(PARALLELISM, numSplits);
-			return super.createInputSplits(numSplits);
-		}
-
-		@Override
-		public boolean reachedEnd() {
-			return emitted;
-		}
-
-		@Override
-		public Integer nextRecord(Integer reuse) {
-			if (emitted) {
-				return null;
-			}
-			emitted = true;
-			return 1;
-		}
-	}
-
-	private static class ParallelismDependentMapPartition extends RichMapPartitionFunction<Integer, Integer> {
-
-		@Override
-		public void mapPartition(Iterable<Integer> values, Collector<Integer> out) {
-			out.collect(getRuntimeContext().getIndexOfThisSubtask());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 61a74b9..2026ace 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -813,6 +813,9 @@ public abstract class ExecutionEnvironment {
 		if (getDegreeOfParallelism() > 0) {
 			plan.setDefaultParallelism(getDegreeOfParallelism());
 		}
+		if (getNumberOfExecutionRetries() >= 0) {
+			plan.setNumberOfExecutionRetries(getNumberOfExecutionRetries());
+		}
 
 		try {
 			registerCachedFilesWithPlan(plan);

http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 1f9cf26..dfd2e39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -735,7 +735,7 @@ public class ExecutionGraph {
 					throw new IllegalStateException("Can only restart job from state restarting.");
 				}
 				if (scheduler == null) {
-					throw new IllegalStateException("The execution graph has not been schedudled before - scheduler is null.");
+					throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
 				}
 				
 				this.currentExecutions.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java
new file mode 100644
index 0000000..bbfe0a9
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.minicluster.NepheleMiniCluster;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This test verifies that the auto parallelism is properly forwarded to the runtime.
+ */
+public class AutoParallelismITCase {
+
+	private static final int NUM_TM = 2;
+	private static final int SLOTS_PER_TM = 7;
+	private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
+
+	@Test
+	public void testProgramWithAutoParallelism() {
+
+		NepheleMiniCluster cluster = new NepheleMiniCluster();
+		cluster.setNumTaskManager(NUM_TM);
+		cluster.setTaskManagerNumSlots(SLOTS_PER_TM);
+
+		try {
+			cluster.start();
+
+			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRpcPort());
+			env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+
+			DataSet<Integer> result = env
+					.createInput(new ParallelismDependentInputFormat())
+					.rebalance()
+					.mapPartition(new ParallelismDependentMapPartition());
+
+			List<Integer> resultCollection = new ArrayList<Integer>();
+			result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
+
+			env.execute();
+
+			assertEquals(PARALLELISM, resultCollection.size());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			try {
+				cluster.stop();
+			}
+			catch (Throwable t) {
+				// ignore exceptions on shutdown
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Utility classes
+	// --------------------------------------------------------------------------------------------
+
+	private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
+
+		private transient boolean emitted;
+
+		@Override
+		public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
+			assertEquals(PARALLELISM, numSplits);
+			return super.createInputSplits(numSplits);
+		}
+
+		@Override
+		public boolean reachedEnd() {
+			return emitted;
+		}
+
+		@Override
+		public Integer nextRecord(Integer reuse) {
+			if (emitted) {
+				return null;
+			}
+			emitted = true;
+			return 1;
+		}
+	}
+
+	private static class ParallelismDependentMapPartition extends RichMapPartitionFunction<Integer, Integer> {
+
+		@Override
+		public void mapPartition(Iterable<Integer> values, Collector<Integer> out) {
+			out.collect(getRuntimeContext().getIndexOfThisSubtask());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
new file mode 100644
index 0000000..bec9c3f
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.minicluster.NepheleMiniCluster;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class SimpleRecoveryITCase {
+
+	private static NepheleMiniCluster cluster;
+
+	@BeforeClass
+	public static void setupCluster() {
+		try {
+			cluster = new NepheleMiniCluster();
+			cluster.setNumTaskManager(2);
+			cluster.setTaskManagerNumSlots(2);
+
+			// these two parameters determine how fast the restart happens
+			cluster.setHeartbeatInterval(500);
+			cluster.setHeartbeatTimeout(2000);
+
+			cluster.start();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Could not start test minicluster: " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void tearDownCluster() {
+		try {
+			cluster.stop();
+		}
+		catch (Exception e) {
+			System.err.println("Error stopping cluster on shutdown");
+			e.printStackTrace();
+			fail("Cluster shutdown caused an exception: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testFailedRunThenSuccessfulRun() {
+
+		try {
+			List<Long> resultCollection = new ArrayList<Long>();
+
+			// attempt 1
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+						"localhost", cluster.getJobManagerRpcPort());
+
+				env.setDegreeOfParallelism(4);
+				env.setNumberOfExecutionRetries(0);
+
+				env.generateSequence(1, 10)
+						.rebalance()
+						.map(new FailingMapper1<Long>())
+						.reduce(new ReduceFunction<Long>() {
+							@Override
+							public Long reduce(Long value1, Long value2) {
+								return value1 + value2;
+							}
+						})
+						.output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+				try {
+					JobExecutionResult res = env.execute();
+					String msg = res == null ? "null result" : "result in " + res.getNetRuntime();
+					fail("The program should have failed, but returned " + msg);
+				}
+				catch (ProgramInvocationException e) {
+					// expected
+				}
+			}
+
+			// attempt 2
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+						"localhost", cluster.getJobManagerRpcPort());
+
+				env.setDegreeOfParallelism(4);
+				env.setNumberOfExecutionRetries(0);
+
+				env.generateSequence(1, 10)
+						.rebalance()
+						.map(new FailingMapper1<Long>())
+						.reduce(new ReduceFunction<Long>() {
+							@Override
+							public Long reduce(Long value1, Long value2) {
+								return value1 + value2;
+							}
+						})
+						.output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+				try {
+					JobExecutionResult result = env.execute();
+					assertTrue(result.getNetRuntime() >= 0);
+					assertNotNull(result.getAllAccumulatorResults());
+					assertTrue(result.getAllAccumulatorResults().isEmpty());
+				}
+				catch (JobExecutionException e) {
+					fail("The program should have succeeded on the second run");
+				}
+
+				long sum = 0;
+				for (long l : resultCollection) {
+					sum += l;
+				}
+				assertEquals(55, sum);
+			}
+
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testRestart() {
+		try {
+			List<Long> resultCollection = new ArrayList<Long>();
+
+			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getJobManagerRpcPort());
+
+			env.setDegreeOfParallelism(4);
+			env.setNumberOfExecutionRetries(1);
+
+			env.generateSequence(1, 10)
+					.rebalance()
+					.map(new FailingMapper2<Long>())
+					.reduce(new ReduceFunction<Long>() {
+						@Override
+						public Long reduce(Long value1, Long value2) {
+							return value1 + value2;
+						}
+					})
+					.output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+			try {
+				JobExecutionResult result = env.execute();
+				assertTrue(result.getNetRuntime() >= 0);
+				assertNotNull(result.getAllAccumulatorResults());
+				assertTrue(result.getAllAccumulatorResults().isEmpty());
+			}
+			catch (JobExecutionException e) {
+				fail("The program should have succeeded on the second run");
+			}
+
+			long sum = 0;
+			for (long l : resultCollection) {
+				sum += l;
+			}
+			assertEquals(55, sum);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testRestartMultipleTimes() {
+		try {
+			List<Long> resultCollection = new ArrayList<Long>();
+
+			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getJobManagerRpcPort());
+
+			env.setDegreeOfParallelism(4);
+			env.setNumberOfExecutionRetries(3);
+
+			env.generateSequence(1, 10)
+					.rebalance()
+					.map(new FailingMapper3<Long>())
+					.reduce(new ReduceFunction<Long>() {
+						@Override
+						public Long reduce(Long value1, Long value2) {
+							return value1 + value2;
+						}
+					})
+					.output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+			try {
+				JobExecutionResult result = env.execute();
+				assertTrue(result.getNetRuntime() >= 0);
+				assertNotNull(result.getAllAccumulatorResults());
+				assertTrue(result.getAllAccumulatorResults().isEmpty());
+			}
+			catch (JobExecutionException e) {
+				fail("The program should have succeeded on the second run");
+			}
+
+			long sum = 0;
+			for (long l : resultCollection) {
+				sum += l;
+			}
+			assertEquals(55, sum);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// ------------------------------------------------------------------------------------
+
+	private static class FailingMapper1<T> extends RichMapFunction<T, T> {
+
+		private static int failuresBeforeSuccess = 1;
+
+		@Override
+		public T map(T value) throws Exception {
+			if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) {
+				failuresBeforeSuccess--;
+				throw new Exception("Test Failure");
+			}
+
+			return value;
+		}
+	}
+
+	private static class FailingMapper2<T> extends RichMapFunction<T, T> {
+
+		private static int failuresBeforeSuccess = 1;
+
+		@Override
+		public T map(T value) throws Exception {
+			if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) {
+				failuresBeforeSuccess--;
+				throw new Exception("Test Failure");
+			}
+
+			return value;
+		}
+	}
+
+	private static class FailingMapper3<T> extends RichMapFunction<T, T> {
+
+		private static int failuresBeforeSuccess = 3;
+
+		@Override
+		public T map(T value) throws Exception {
+			if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) {
+				failuresBeforeSuccess--;
+				throw new Exception("Test Failure");
+			}
+
+			return value;
+		}
+	}
+}