You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/09 20:27:19 UTC
[1/4] flink git commit: [FLINK-1668] [core] Add a config option to
specify delays between restarts
Repository: flink
Updated Branches:
refs/heads/master 0b15bc3c5 -> 0df5601ad
[FLINK-1668] [core] Add a config option to specify delays between restarts
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abbb0a93
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abbb0a93
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abbb0a93
Branch: refs/heads/master
Commit: abbb0a93ca67da17197dc5372e6d95edd8149d44
Parents: 500ddff
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 9 19:28:54 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 9 19:31:05 2015 +0100
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 8 +++++++-
.../flink/runtime/jobmanager/JobManager.scala | 18 +++++++++++++++---
.../flink/runtime/jobmanager/RecoveryITCase.scala | 1 +
.../flink/test/misc/AutoParallelismITCase.java | 2 --
.../ProcessFailureBatchRecoveryITCase.java | 1 +
.../flink/test/recovery/SimpleRecoveryITCase.java | 2 +-
.../jobmanager/JobManagerFailsITCase.scala | 1 +
.../taskmanager/TaskManagerFailsITCase.scala | 1 +
8 files changed, 27 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 0f42a17..028c258 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -40,6 +40,12 @@ public final class ConfigConstants {
* value to 0 effectively disables fault tolerance.
*/
public static final String DEFAULT_EXECUTION_RETRIES_KEY = "execution-retries.default";
+
+ /**
+ * Config parameter for the delay between execution retries. The value must be specified in the
+ * notation "10 s" or "1 min" (style of Scala Finite Durations)
+ */
+ public static final String DEFAULT_EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay";
// -------------------------------- Runtime -------------------------------
@@ -339,7 +345,7 @@ public final class ConfigConstants {
public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout";
/**
- * Timeout for all blocking calls
+ * Timeout for all blocking calls that look up remote actors
*/
public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e3e96e5..7ba06e7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -850,9 +850,21 @@ object JobManager {
ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY,
ConfigConstants.DEFAULT_EXECUTION_RETRIES)
- val delayBetweenRetries = 2 * Duration(configuration.getString(
- ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
- ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)).toMillis
+ // configure the delay between execution retries.
+ // unless explicitly specifies, this is dependent on the heartbeat timeout
+ val pauseString = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
+ ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)
+ val delayString = configuration.getString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY,
+ pauseString)
+
+ val delayBetweenRetries: Long = try {
+ Duration(delayString).toMillis
+ }
+ catch {
+ case n: NumberFormatException => throw new Exception(
+ s"Invalid config value for ${ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY}: " +
+ s"${pauseString}. Value must be a valid duration (such as 100 milli or 1 min)");
+ }
val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index e7d1d83..c201d08 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -49,6 +49,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers)
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, heartbeatTimeout)
+ config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, heartbeatTimeout)
new TestingCluster(config)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index ea79a3a..8ddd7bc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -56,8 +56,6 @@ public class AutoParallelismITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TM);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM);
- config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
-
cluster = new ForkableFlinkMiniCluster(config, false);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
index 6866fbc..cceeb47 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
@@ -125,6 +125,7 @@ public class ProcessFailureBatchRecoveryITCase {
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
+ jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "4 s");
jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index df6fbba..48afce1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -51,7 +51,7 @@ public class SimpleRecoveryITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
- config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+ config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "100 ms");
cluster = new ForkableFlinkMiniCluster(config, false);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index f9b1b4c..625ca07 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -137,6 +137,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers)
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms")
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
+ config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "8000 ms")
config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
new ForkableFlinkMiniCluster(config, singleActorSystem = false)
http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 245bcd9..659262c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -232,6 +232,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers)
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms")
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
+ config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "4000 ms")
config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
new ForkableFlinkMiniCluster(config, singleActorSystem = false)
[3/4] flink git commit: [tests] Add comments and to recovery tests
Posted by se...@apache.org.
[tests] Add comments and to recovery tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9edc804e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9edc804e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9edc804e
Branch: refs/heads/master
Commit: 9edc804e15d0155450ef2b7f710a125545f94062
Parents: 0b15bc3
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 9 14:15:25 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 9 19:31:05 2015 +0100
----------------------------------------------------------------------
.../flink/test/recovery/SimpleRecoveryITCase.java | 4 ++++
.../recovery/TaskManagerFailureRecoveryITCase.java | 14 ++++++++++++++
2 files changed, 18 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9edc804e/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index 8330109..df6fbba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -37,6 +37,10 @@ import java.util.List;
import static org.junit.Assert.*;
+/**
+ * A series of tests (reusing one FlinkMiniCluster) where tasks fail (one or more time)
+ * and the recovery should restart them to verify job completion.
+ */
@SuppressWarnings("serial")
public class SimpleRecoveryITCase {
http://git-wip-us.apache.org/repos/asf/flink/blob/9edc804e/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 85856ba..eb04234 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -45,6 +45,18 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*;
+/**
+ * This test verifies the behavior of the recovery in the case when a TaskManager
+ * fails (shut down) in the middle of a job execution.
+ *
+ * The test works with multiple in-process task managers. Initially, it starts a JobManager
+ * and two TaskManagers with 2 slots each. It submits a program with parallelism 4
+ * and waits until all tasks are brought up (coordination between the test and the tasks
+ * happens via shared blocking queues). It then starts another TaskManager, which is
+ * guaranteed to remain empty (all tasks are already deployed) and kills one of
+ * the original task managers. The recovery should restart the tasks on the new TaskManager.
+ */
+@SuppressWarnings("serial")
public class TaskManagerFailureRecoveryITCase {
@Test
@@ -165,11 +177,13 @@ public class TaskManagerFailureRecoveryITCase {
}
private static class FailingMapper<T> extends RichMapFunction<T, T> {
+ private static final long serialVersionUID = 4435412404173331157L;
private static final BlockingQueue<Object> TASK_TO_COORD_QUEUE = new LinkedBlockingQueue<Object>();
private static final BlockingQueue<Object> COORD_TO_TASK_QUEUE = new LinkedBlockingQueue<Object>();
+
@Override
public void open(Configuration parameters) throws Exception {
TASK_TO_COORD_QUEUE.add(new Object());
[4/4] flink git commit: [hotfix] Fix erroneous testing log4j
configuration for flink-runtime
Posted by se...@apache.org.
[hotfix] Fix erroneous testing log4j configuration for flink-runtime
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0df5601a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0df5601a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0df5601a
Branch: refs/heads/master
Commit: 0df5601adad965e6a6fed10a57f1e1da149eeba5
Parents: abbb0a9
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 9 19:37:53 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 9 19:37:53 2015 +0100
----------------------------------------------------------------------
flink-runtime/src/test/resources/log4j-test.properties | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0df5601a/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 505e8fa..491b219 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -35,4 +35,4 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
\ No newline at end of file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
[2/4] flink git commit: [FLINK-1667] [runtime] Add test for recovery
after TaskManager process failure
Posted by se...@apache.org.
[FLINK-1667] [runtime] Add test for recovery after TaskManager process failure
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/500ddff4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/500ddff4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/500ddff4
Branch: refs/heads/master
Commit: 500ddff4e3b5b47c7244411e14d76b65eb68563c
Parents: 9edc804
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 9 18:20:59 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 9 19:31:05 2015 +0100
----------------------------------------------------------------------
flink-tests/pom.xml | 8 +
.../ProcessFailureBatchRecoveryITCase.java | 460 +++++++++++++++++++
2 files changed, 468 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/500ddff4/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 17c6b3e..95ca77c 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -125,6 +125,14 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.10</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/flink/blob/500ddff4/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
new file mode 100644
index 0000000..6866fbc
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.commons.io.FileUtils;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+
+import static org.junit.Assert.*;
+
+/**
+ * This test verifies the behavior of the recovery in the case when a TaskManager
+ * fails (process is killed) in the middle of a job execution.
+ *
+ * The test works with multiple task managers processes by spawning JVMs.
+ * Initially, it starts a JobManager in process and two TaskManagers JVMs with
+ * 2 task slots each.
+ * It submits a program with parallelism 4 and waits until all tasks are brought up.
+ * Coordination between the test and the tasks happens via checking for the
+ * existence of temporary files. It then starts another TaskManager, which is
+ * guaranteed to remain empty (all tasks are already deployed) and kills one of
+ * the original task managers. The recovery should restart the tasks on the new TaskManager.
+ */
+@SuppressWarnings("serial")
+public class ProcessFailureBatchRecoveryITCase {
+
+ private static final String READY_MARKER_FILE_PREFIX = "ready_";
+ private static final String PROCEED_MARKER_FILE = "proceed";
+
+ private static final int PARALLELISM = 4;
+
+ @Test
+ public void testTaskManagerProcessFailure() {
+
+ final StringWriter processOutput1 = new StringWriter();
+ final StringWriter processOutput2 = new StringWriter();
+ final StringWriter processOutput3 = new StringWriter();
+
+ ActorSystem jmActorSystem = null;
+ Process taskManagerProcess1 = null;
+ Process taskManagerProcess2 = null;
+ Process taskManagerProcess3 = null;
+
+ File coordinateTempDir = null;
+
+ try {
+ // check that we run this test only if the java command
+ // is available on this machine
+ String javaCommand = getJavaCommandPath();
+ if (javaCommand == null) {
+ System.out.println("---- Skipping ProcessFailureBatchRecoveryITCase : Could not find java executable");
+ return;
+ }
+
+ // create a logging file for the process
+ File tempLogFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties");
+ tempLogFile.deleteOnExit();
+ CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+ // coordination between the processes goes through a directory
+ coordinateTempDir = createTempDirectory();
+
+ // find a free port to start the JobManager
+ final int jobManagerPort = NetUtils.getAvailablePort();
+
+ // start a JobManager
+ Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
+
+ Configuration jmConfig = new Configuration();
+ jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
+ jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+ jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
+
+ jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
+ ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();
+
+ // the TaskManager java command
+ String[] command = new String[] {
+ javaCommand,
+ "-Dlog.level=DEBUG",
+ "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
+ "-Xms80m", "-Xmx80m",
+ "-classpath", getCurrentClasspath(),
+ TaskManagerProcessEntryPoint.class.getName(),
+ String.valueOf(jobManagerPort)
+ };
+
+ // start the first two TaskManager processes
+ taskManagerProcess1 = new ProcessBuilder(command).start();
+ new PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1);
+ taskManagerProcess2 = new ProcessBuilder(command).start();
+ new PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
+
+ // we wait for the JobManager to have the two TaskManagers available
+ // wait for at most 20 seconds
+ waitUntilNumTaskManagersAreRegistered(jmActor, 2, 20000);
+
+ // the program will set a marker file in each of its parallel tasks once they are ready, so that
+ // this coordinating code is aware of this.
+ // the program will very slowly consume elements until the marker file (later created by the
+ // test driver code) is present
+ final File coordinateDirClosure = coordinateTempDir;
+ final Throwable[] errorRef = new Throwable[1];
+
+ ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
+ env.setDegreeOfParallelism(PARALLELISM);
+ env.setNumberOfExecutionRetries(1);
+
+ final long NUM_ELEMENTS = 1000000L;
+ final DataSet<Long> result = env.generateSequence(1, NUM_ELEMENTS)
+
+ // make sure every mapper is involved (no one is skipped because of lazy split assignment)
+ .rebalance()
+ // the majority of the behavior is in the MapFunction
+ .map(new RichMapFunction<Long, Long>() {
+
+ private final File proceedFile = new File(coordinateDirClosure, PROCEED_MARKER_FILE);
+
+ private boolean markerCreated = false;
+ private boolean checkForProceedFile = true;
+
+ @Override
+ public Long map(Long value) throws Exception {
+ if (!markerCreated) {
+ int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
+ touchFile(new File(coordinateDirClosure, READY_MARKER_FILE_PREFIX + taskIndex));
+ markerCreated = true;
+ }
+
+ // check if the proceed file exists
+ if (checkForProceedFile) {
+ if (proceedFile.exists()) {
+ checkForProceedFile = false;
+ }
+ else {
+ // otherwise wait so that we make slow progress
+ Thread.sleep(10);
+ }
+ }
+ return value;
+ }
+ })
+ .reduce(new ReduceFunction<Long>() {
+ @Override
+ public Long reduce(Long value1, Long value2) {
+ return value1 + value2;
+ }
+ });
+
+ // we trigger a program now (in a separate thread)
+ Thread programTrigger = new Thread("ProcessFailureBatchRecoveryITCase Program Trigger") {
+ @Override
+ public void run() {
+ try {
+ long sum = result.collect().get(0);
+ assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, sum);
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ errorRef[0] = t;
+ }
+ }
+ };
+ programTrigger.start();
+
+ // wait until all marker files are in place, indicating that all tasks have started
+ // max 20 seconds
+ waitForMarkerFiles(coordinateTempDir, PARALLELISM, 20000);
+
+ // start the third TaskManager
+ taskManagerProcess3 = new ProcessBuilder(command).start();
+ new PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
+
+ // we wait for the third TaskManager to register (20 seconds max)
+ waitUntilNumTaskManagersAreRegistered(jmActor, 3, 20000);
+
+ // kill one of the previous TaskManagers, triggering a failure and recovery
+ taskManagerProcess1.destroy();
+ taskManagerProcess1 = null;
+
+ // we create the marker file which signals the program functions tasks that they can complete
+ touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
+
+ // wait for at most 30 seconds for the program to complete
+ programTrigger.join(30000);
+
+ // check that the program really finished
+ assertFalse("The program did not finish in time", programTrigger.isAlive());
+
+ // check whether the program encountered an error
+ if (errorRef[0] != null) {
+ Throwable error = errorRef[0];
+ error.printStackTrace();
+ fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
+ }
+
+ // all seems well :-)
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ printProcessLog("TaskManager 1", processOutput1.toString());
+ printProcessLog("TaskManager 2", processOutput2.toString());
+ printProcessLog("TaskManager 3", processOutput3.toString());
+ fail(e.getMessage());
+ }
+ catch (Error e) {
+ e.printStackTrace();
+ printProcessLog("TaskManager 1", processOutput1.toString());
+ printProcessLog("TaskManager 2", processOutput2.toString());
+ printProcessLog("TaskManager 3", processOutput3.toString());
+ throw e;
+ }
+ finally {
+ if (taskManagerProcess1 != null) {
+ taskManagerProcess1.destroy();
+ }
+ if (taskManagerProcess2 != null) {
+ taskManagerProcess2.destroy();
+ }
+ if (taskManagerProcess3 != null) {
+ taskManagerProcess3.destroy();
+ }
+ if (jmActorSystem != null) {
+ jmActorSystem.shutdown();
+ }
+ if (coordinateTempDir != null) {
+ try {
+ FileUtils.deleteDirectory(coordinateTempDir);
+ }
+ catch (Throwable t) {
+ // we can ignore this
+ }
+ }
+ }
+ }
+
+ private void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
+ throws Exception
+ {
+ final long deadline = System.currentTimeMillis() + maxDelay;
+ while (true) {
+ long remaining = deadline - System.currentTimeMillis();
+ if (remaining <= 0) {
+ fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
+ }
+
+ FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
+
+ try {
+ Future<?> result = Patterns.ask(jobManager,
+ JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+ new Timeout(timeout));
+ Integer numTMs = (Integer) Await.result(result, timeout);
+ if (numTMs == numExpected) {
+ break;
+ }
+ }
+ catch (TimeoutException e) {
+ // ignore and retry
+ }
+ catch (ClassCastException e) {
+ fail("Wrong response: " + e.getMessage());
+ }
+ }
+ }
+
+ private static void printProcessLog(String processName, String log) {
+ if (log == null || log.length() == 0) {
+ return;
+ }
+
+ System.out.println("-----------------------------------------");
+ System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
+ System.out.println("-----------------------------------------");
+ System.out.println(log);
+ System.out.println("-----------------------------------------");
+ System.out.println(" END SPAWNED PROCESS LOG");
+ System.out.println("-----------------------------------------");
+ }
+
+ private static File createTempDirectory() throws IOException {
+ File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+ for (int i = 0; i < 10; i++) {
+ File dir = new File(tempDir, UUID.randomUUID().toString());
+ if (!dir.exists() && dir.mkdirs()) {
+ return dir;
+ }
+ System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
+ }
+
+ throw new IOException("Could not create temporary file directory");
+ }
+
+ private static void touchFile(File file) throws IOException {
+ if (!file.exists()) {
+ new FileOutputStream(file).close();
+ }
+ if (!file.setLastModified(System.currentTimeMillis())) {
+ throw new IOException("Could not touch the file.");
+ }
+ }
+
+ private static void waitForMarkerFiles(File basedir, int num, long timeout) {
+ long now = System.currentTimeMillis();
+ final long deadline = now + timeout;
+
+
+ while (now < deadline) {
+ boolean allFound = true;
+
+ for (int i = 0; i < num; i++) {
+ File nextToCheck = new File(basedir, READY_MARKER_FILE_PREFIX + i);
+ if (!nextToCheck.exists()) {
+ allFound = false;
+ break;
+ }
+ }
+
+ if (allFound) {
+ return;
+ }
+ else {
+ // not all found, wait for a bit
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ now = System.currentTimeMillis();
+ }
+ }
+
+ fail("The tasks were not started within time (" + timeout + "msecs)");
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager.
+ */
+ public static class TaskManagerProcessEntryPoint {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
+
+ public static void main(String[] args) {
+ try {
+ int jobManagerPort = Integer.parseInt(args[0]);
+
+ Configuration cfg = new Configuration();
+ cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+ cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+
+ TaskManager.runTaskManager(cfg, TaskManager.class);
+
+ // wait forever
+ Object lock = new Object();
+ synchronized (lock) {
+ lock.wait();
+ }
+ }
+ catch (Throwable t) {
+ LOG.error("Failed to start TaskManager process", t);
+ System.exit(1);
+ }
+ }
+ }
+
+ /**
+ * Utility class to read the output of a process stream and forward it into a StringWriter.
+ */
+ private static class PipeForwarder extends Thread {
+
+ private final StringWriter target;
+ private final InputStream source;
+
+ public PipeForwarder(InputStream source, StringWriter target) {
+ super("Pipe Forwarder");
+ setDaemon(true);
+
+ this.source = source;
+ this.target = target;
+
+ start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ int next;
+ while ((next = source.read()) != -1) {
+ target.write(next);
+ }
+ }
+ catch (IOException e) {
+ // terminate
+ }
+ }
+ }
+}