You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/09 20:27:19 UTC

[1/4] flink git commit: [FLINK-1668] [core] Add a config option to specify delays between restarts

Repository: flink
Updated Branches:
  refs/heads/master 0b15bc3c5 -> 0df5601ad


[FLINK-1668] [core] Add a config option to specify delays between restarts


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abbb0a93
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abbb0a93
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abbb0a93

Branch: refs/heads/master
Commit: abbb0a93ca67da17197dc5372e6d95edd8149d44
Parents: 500ddff
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 9 19:28:54 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 9 19:31:05 2015 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java      |  8 +++++++-
 .../flink/runtime/jobmanager/JobManager.scala     | 18 +++++++++++++++---
 .../flink/runtime/jobmanager/RecoveryITCase.scala |  1 +
 .../flink/test/misc/AutoParallelismITCase.java    |  2 --
 .../ProcessFailureBatchRecoveryITCase.java        |  1 +
 .../flink/test/recovery/SimpleRecoveryITCase.java |  2 +-
 .../jobmanager/JobManagerFailsITCase.scala        |  1 +
 .../taskmanager/TaskManagerFailsITCase.scala      |  1 +
 8 files changed, 27 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 0f42a17..028c258 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -40,6 +40,12 @@ public final class ConfigConstants {
 	 * value to 0 effectively disables fault tolerance.
 	 */
 	public static final String DEFAULT_EXECUTION_RETRIES_KEY = "execution-retries.default";
+
+	/**
+	 * Config parameter for the delay between execution retries. The value must be specified in the
+	 * notation "10 s" or "1 min" (style of Scala Finite Durations)
+	 */
+	public static final String DEFAULT_EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay";
 	
 	// -------------------------------- Runtime -------------------------------
 	
@@ -339,7 +345,7 @@ public final class ConfigConstants {
 	public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout";
 
 	/**
-	 * Timeout for all blocking calls
+	 * Timeout for all blocking calls that look up remote actors
 	 */
 	public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e3e96e5..7ba06e7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -850,9 +850,21 @@ object JobManager {
       ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY,
       ConfigConstants.DEFAULT_EXECUTION_RETRIES)
 
-    val delayBetweenRetries = 2 * Duration(configuration.getString(
-      ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
-      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)).toMillis
+    // configure the delay between execution retries.
+    // unless explicitly specifies, this is dependent on the heartbeat timeout
+    val pauseString = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
+                                              ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)
+    val delayString = configuration.getString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY,
+                                              pauseString)
+
+    val delayBetweenRetries: Long = try {
+        Duration(delayString).toMillis
+      }
+      catch {
+        case n: NumberFormatException => throw new Exception(
+          s"Invalid config value for ${ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY}: " +
+            s"${pauseString}. Value must be a valid duration (such as 100 milli or 1 min)");
+      }
 
     val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index e7d1d83..c201d08 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -49,6 +49,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers)
     config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, heartbeatTimeout)
+    config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, heartbeatTimeout)
     new TestingCluster(config)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index ea79a3a..8ddd7bc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -56,8 +56,6 @@ public class AutoParallelismITCase {
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TM);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM);
-		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
-
 		cluster = new ForkableFlinkMiniCluster(config, false);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
index 6866fbc..cceeb47 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
@@ -125,6 +125,7 @@ public class ProcessFailureBatchRecoveryITCase {
 			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
 			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
 			jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
+			jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "4 s");
 
 			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
 			ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index df6fbba..48afce1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -51,7 +51,7 @@ public class SimpleRecoveryITCase {
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+		config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "100 ms");
 
 		cluster = new ForkableFlinkMiniCluster(config, false);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index f9b1b4c..625ca07 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -137,6 +137,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
     config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers)
     config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms")
     config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
+    config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "8000 ms")
     config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
 
     new ForkableFlinkMiniCluster(config, singleActorSystem = false)

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 245bcd9..659262c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -232,6 +232,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
     config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers)
     config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms")
     config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
+    config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "4000 ms")
     config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
 
     new ForkableFlinkMiniCluster(config, singleActorSystem = false)


[3/4] flink git commit: [tests] Add comments and to recovery tests

Posted by se...@apache.org.
[tests] Add comments and to recovery tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9edc804e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9edc804e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9edc804e

Branch: refs/heads/master
Commit: 9edc804e15d0155450ef2b7f710a125545f94062
Parents: 0b15bc3
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 9 14:15:25 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 9 19:31:05 2015 +0100

----------------------------------------------------------------------
 .../flink/test/recovery/SimpleRecoveryITCase.java     |  4 ++++
 .../recovery/TaskManagerFailureRecoveryITCase.java    | 14 ++++++++++++++
 2 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9edc804e/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index 8330109..df6fbba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -37,6 +37,10 @@ import java.util.List;
 
 import static org.junit.Assert.*;
 
+/**
+ * A series of tests (reusing one FlinkMiniCluster) where tasks fail (one or more time)
+ * and the recovery should restart them to verify job completion.
+ */
 @SuppressWarnings("serial")
 public class SimpleRecoveryITCase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9edc804e/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 85856ba..eb04234 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -45,6 +45,18 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
 
+/**
+ * This test verifies the behavior of the recovery in the case when a TaskManager
+ * fails (shut down) in the middle of a job execution.
+ *
+ * The test works with multiple in-process task managers. Initially, it starts a JobManager
+ * and two TaskManagers with 2 slots each. It submits a program with parallelism 4
+ * and waits until all tasks are brought up (coordination between the test and the tasks
+ * happens via shared blocking queues). It then starts another TaskManager, which is
+ * guaranteed to remain empty (all tasks are already deployed) and kills one of
+ * the original task managers. The recovery should restart the tasks on the new TaskManager.
+ */
+@SuppressWarnings("serial")
 public class TaskManagerFailureRecoveryITCase {
 
 	@Test
@@ -165,11 +177,13 @@ public class TaskManagerFailureRecoveryITCase {
 	}
 
 	private static class FailingMapper<T> extends RichMapFunction<T, T> {
+		private static final long serialVersionUID = 4435412404173331157L;
 
 		private static final BlockingQueue<Object> TASK_TO_COORD_QUEUE = new LinkedBlockingQueue<Object>();
 
 		private static final BlockingQueue<Object> COORD_TO_TASK_QUEUE = new LinkedBlockingQueue<Object>();
 
+
 		@Override
 		public void open(Configuration parameters) throws Exception {
 			TASK_TO_COORD_QUEUE.add(new Object());


[4/4] flink git commit: [hotfix] Fix erroneous testing log4j configuration for flink-runtime

Posted by se...@apache.org.
[hotfix] Fix erroneous testing log4j configuration for flink-runtime


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0df5601a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0df5601a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0df5601a

Branch: refs/heads/master
Commit: 0df5601adad965e6a6fed10a57f1e1da149eeba5
Parents: abbb0a9
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 9 19:37:53 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 9 19:37:53 2015 +0100

----------------------------------------------------------------------
 flink-runtime/src/test/resources/log4j-test.properties | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0df5601a/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 505e8fa..491b219 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -35,4 +35,4 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout
 log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
\ No newline at end of file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console


[2/4] flink git commit: [FLINK-1667] [runtime] Add test for recovery after TaskManager process failure

Posted by se...@apache.org.
[FLINK-1667] [runtime] Add test for recovery after TaskManager process failure


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/500ddff4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/500ddff4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/500ddff4

Branch: refs/heads/master
Commit: 500ddff4e3b5b47c7244411e14d76b65eb68563c
Parents: 9edc804
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 9 18:20:59 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 9 19:31:05 2015 +0100

----------------------------------------------------------------------
 flink-tests/pom.xml                             |   8 +
 .../ProcessFailureBatchRecoveryITCase.java      | 460 +++++++++++++++++++
 2 files changed, 468 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/500ddff4/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 17c6b3e..95ca77c 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -125,6 +125,14 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
 			<groupId>org.scalatest</groupId>
 			<artifactId>scalatest_2.10</artifactId>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/500ddff4/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
new file mode 100644
index 0000000..6866fbc
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.commons.io.FileUtils;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+
+import static org.junit.Assert.*;
+
+/**
+ * This test verifies the behavior of the recovery in the case when a TaskManager
+ * fails (process is killed) in the middle of a job execution.
+ *
+ * The test works with multiple task managers processes by spawning JVMs.
+ * Initially, it starts a JobManager in process and two TaskManagers JVMs with
+ * 2 task slots each.
+ * It submits a program with parallelism 4 and waits until all tasks are brought up.
+ * Coordination between the test and the tasks happens via checking for the
+ * existence of temporary files. It then starts another TaskManager, which is
+ * guaranteed to remain empty (all tasks are already deployed) and kills one of
+ * the original task managers. The recovery should restart the tasks on the new TaskManager.
+ */
+@SuppressWarnings("serial")
+public class ProcessFailureBatchRecoveryITCase {
+
+	private static final String READY_MARKER_FILE_PREFIX = "ready_";
+	private static final String PROCEED_MARKER_FILE = "proceed";
+
+	private static final int PARALLELISM = 4;
+
+	@Test
+	public void testTaskManagerProcessFailure() {
+
+		final StringWriter processOutput1 = new StringWriter();
+		final StringWriter processOutput2 = new StringWriter();
+		final StringWriter processOutput3 = new StringWriter();
+
+		ActorSystem jmActorSystem = null;
+		Process taskManagerProcess1 = null;
+		Process taskManagerProcess2 = null;
+		Process taskManagerProcess3 = null;
+
+		File coordinateTempDir = null;
+
+		try {
+			// check that we run this test only if the java command
+			// is available on this machine
+			String javaCommand = getJavaCommandPath();
+			if (javaCommand == null) {
+				System.out.println("---- Skipping ProcessFailureBatchRecoveryITCase : Could not find java executable");
+				return;
+			}
+
+			// create a logging file for the process
+			File tempLogFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties");
+			tempLogFile.deleteOnExit();
+			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+			// coordination between the processes goes through a directory
+			coordinateTempDir = createTempDirectory();
+
+			// find a free port to start the JobManager
+			final int jobManagerPort = NetUtils.getAvailablePort();
+
+			// start a JobManager
+			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
+
+			Configuration jmConfig = new Configuration();
+			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
+			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+			jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
+
+			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
+			ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();
+
+			// the TaskManager java command
+			String[] command = new String[] {
+					javaCommand,
+					"-Dlog.level=DEBUG",
+					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
+					"-Xms80m", "-Xmx80m",
+					"-classpath", getCurrentClasspath(),
+					TaskManagerProcessEntryPoint.class.getName(),
+					String.valueOf(jobManagerPort)
+			};
+
+			// start the first two TaskManager processes
+			taskManagerProcess1 = new ProcessBuilder(command).start();
+			new PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1);
+			taskManagerProcess2 = new ProcessBuilder(command).start();
+			new PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
+
+			// we wait for the JobManager to have the two TaskManagers available
+			// wait for at most 20 seconds
+			waitUntilNumTaskManagersAreRegistered(jmActor, 2, 20000);
+
+			// the program will set a marker file in each of its parallel tasks once they are ready, so that
+			// this coordinating code is aware of this.
+			// the program will very slowly consume elements until the marker file (later created by the
+			// test driver code) is present
+			final File coordinateDirClosure = coordinateTempDir;
+			final Throwable[] errorRef = new Throwable[1];
+
+			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
+			env.setDegreeOfParallelism(PARALLELISM);
+			env.setNumberOfExecutionRetries(1);
+
+			final long NUM_ELEMENTS = 1000000L;
+			final DataSet<Long> result = env.generateSequence(1, NUM_ELEMENTS)
+
+				// make sure every mapper is involved (no one is skipped because of lazy split assignment)
+				.rebalance()
+				// the majority of the behavior is in the MapFunction
+				.map(new RichMapFunction<Long, Long>() {
+
+					private final File proceedFile = new File(coordinateDirClosure, PROCEED_MARKER_FILE);
+
+					private boolean markerCreated = false;
+					private boolean checkForProceedFile = true;
+
+					@Override
+					public Long map(Long value) throws Exception {
+						if (!markerCreated) {
+							int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
+							touchFile(new File(coordinateDirClosure, READY_MARKER_FILE_PREFIX + taskIndex));
+							markerCreated = true;
+						}
+
+						// check if the proceed file exists
+						if (checkForProceedFile) {
+							if (proceedFile.exists()) {
+								checkForProceedFile = false;
+							}
+							else {
+								// otherwise wait so that we make slow progress
+								Thread.sleep(10);
+							}
+						}
+						return value;
+					}
+				})
+				.reduce(new ReduceFunction<Long>() {
+					@Override
+					public Long reduce(Long value1, Long value2) {
+						return value1 + value2;
+					}
+				});
+
+			// we trigger a program now (in a separate thread)
+			Thread programTrigger = new Thread("ProcessFailureBatchRecoveryITCase Program Trigger") {
+				@Override
+				public void run() {
+					try {
+						long sum = result.collect().get(0);
+						assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, sum);
+					}
+					catch (Throwable t) {
+						t.printStackTrace();
+						errorRef[0] = t;
+					}
+				}
+			};
+			programTrigger.start();
+
+			// wait until all marker files are in place, indicating that all tasks have started
+			// max 20 seconds
+			waitForMarkerFiles(coordinateTempDir, PARALLELISM, 20000);
+
+			// start the third TaskManager
+			taskManagerProcess3 = new ProcessBuilder(command).start();
+			new PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
+
+			// we wait for the third TaskManager to register (20 seconds max)
+			waitUntilNumTaskManagersAreRegistered(jmActor, 3, 20000);
+
+			// kill one of the previous TaskManagers, triggering a failure and recovery
+			taskManagerProcess1.destroy();
+			taskManagerProcess1 = null;
+
+			// we create the marker file which signals the program functions tasks that they can complete
+			touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
+
+			// wait for at most 30 seconds for the program to complete
+			programTrigger.join(30000);
+
+			// check that the program really finished
+			assertFalse("The program did not finish in time", programTrigger.isAlive());
+
+			// check whether the program encountered an error
+			if (errorRef[0] != null) {
+				Throwable error = errorRef[0];
+				error.printStackTrace();
+				fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
+			}
+
+			// all seems well :-)
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			printProcessLog("TaskManager 1", processOutput1.toString());
+			printProcessLog("TaskManager 2", processOutput2.toString());
+			printProcessLog("TaskManager 3", processOutput3.toString());
+			fail(e.getMessage());
+		}
+		catch (Error e) {
+			e.printStackTrace();
+			printProcessLog("TaskManager 1", processOutput1.toString());
+			printProcessLog("TaskManager 2", processOutput2.toString());
+			printProcessLog("TaskManager 3", processOutput3.toString());
+			throw e;
+		}
+		finally {
+			if (taskManagerProcess1 != null) {
+				taskManagerProcess1.destroy();
+			}
+			if (taskManagerProcess2 != null) {
+				taskManagerProcess2.destroy();
+			}
+			if (taskManagerProcess3 != null) {
+				taskManagerProcess3.destroy();
+			}
+			if (jmActorSystem != null) {
+				jmActorSystem.shutdown();
+			}
+			if (coordinateTempDir != null) {
+				try {
+					FileUtils.deleteDirectory(coordinateTempDir);
+				}
+				catch (Throwable t) {
+					// we can ignore this
+				}
+			}
+		}
+	}
+
+	private void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
+		throws Exception
+	{
+		final long deadline = System.currentTimeMillis() + maxDelay;
+		while (true) {
+			long remaining = deadline - System.currentTimeMillis();
+			if (remaining <= 0) {
+				fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
+			}
+
+			FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
+
+			try {
+				Future<?> result = Patterns.ask(jobManager,
+												JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+												new Timeout(timeout));
+				Integer numTMs = (Integer) Await.result(result, timeout);
+				if (numTMs == numExpected) {
+					break;
+				}
+			}
+			catch (TimeoutException e) {
+				// ignore and retry
+			}
+			catch (ClassCastException e) {
+				fail("Wrong response: " + e.getMessage());
+			}
+		}
+	}
+
+	private static void printProcessLog(String processName, String log) {
+		if (log == null || log.length() == 0) {
+			return;
+		}
+
+		System.out.println("-----------------------------------------");
+		System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
+		System.out.println("-----------------------------------------");
+		System.out.println(log);
+		System.out.println("-----------------------------------------");
+		System.out.println("        END SPAWNED PROCESS LOG");
+		System.out.println("-----------------------------------------");
+	}
+
+	private static File createTempDirectory() throws IOException {
+		File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+		for (int i = 0; i < 10; i++) {
+			File dir = new File(tempDir, UUID.randomUUID().toString());
+			if (!dir.exists() && dir.mkdirs()) {
+				return dir;
+			}
+			System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
+		}
+
+		throw new IOException("Could not create temporary file directory");
+	}
+
+	private static void touchFile(File file) throws IOException {
+		if (!file.exists()) {
+			new FileOutputStream(file).close();
+		}
+		if (!file.setLastModified(System.currentTimeMillis())) {
+			throw new IOException("Could not touch the file.");
+		}
+	}
+
+	private static void waitForMarkerFiles(File basedir, int num, long timeout) {
+		long now = System.currentTimeMillis();
+		final long deadline = now + timeout;
+
+
+		while (now < deadline) {
+			boolean allFound = true;
+
+			for (int i = 0; i < num; i++) {
+				File nextToCheck = new File(basedir, READY_MARKER_FILE_PREFIX + i);
+				if (!nextToCheck.exists()) {
+					allFound = false;
+					break;
+				}
+			}
+
+			if (allFound) {
+				return;
+			}
+			else {
+				// not all found, wait for a bit
+				try {
+					Thread.sleep(10);
+				}
+				catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				}
+
+				now = System.currentTimeMillis();
+			}
+		}
+
+		fail("The tasks were not started within time (" + timeout + "msecs)");
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager.
+	 */
+	public static class TaskManagerProcessEntryPoint {
+
+		private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
+
+		public static void main(String[] args) {
+			try {
+				int jobManagerPort = Integer.parseInt(args[0]);
+
+				Configuration cfg = new Configuration();
+				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+				cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+
+				TaskManager.runTaskManager(cfg, TaskManager.class);
+
+				// wait forever
+				Object lock = new Object();
+				synchronized (lock) {
+					lock.wait();
+				}
+			}
+			catch (Throwable t) {
+				LOG.error("Failed to start TaskManager process", t);
+				System.exit(1);
+			}
+		}
+	}
+
+	/**
+	 * Utility class to read the output of a process stream and forward it into a StringWriter.
+	 */
+	private static class PipeForwarder extends Thread {
+
+		private final StringWriter target;
+		private final InputStream source;
+
+		public PipeForwarder(InputStream source, StringWriter target) {
+			super("Pipe Forwarder");
+			setDaemon(true);
+
+			this.source = source;
+			this.target = target;
+
+			start();
+		}
+
+		@Override
+		public void run() {
+			try {
+				int next;
+				while ((next = source.read()) != -1) {
+					target.write(next);
+				}
+			}
+			catch (IOException e) {
+				// terminate
+			}
+		}
+	}
+}