You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/29 21:44:41 UTC
[2/2] flink git commit: [FLINK-1669] [streaming] Clean up and fix
test for streaming fault tolerance with proess failures
[FLINK-1669] [streaming] Clean up and fix test for streaming fault tolerance with proess failures
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c284745e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c284745e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c284745e
Branch: refs/heads/master
Commit: c284745ee4612054339842789b0d87eb4f9a821d
Parents: 56afefc
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Mar 19 12:07:05 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 29 18:40:27 2015 +0200
----------------------------------------------------------------------
.../environment/StreamExecutionEnvironment.java | 17 +-
.../api/function/sink/SinkFunction.java | 2 +-
.../util/ProcessFailureRecoveryTestBase.java | 456 -------------------
.../AbstractProcessFailureRecoveryTest.java | 419 +++++++++++++++++
.../ProcessFailureBatchRecoveryITCase.java | 52 +--
.../ProcessFailureStreamingRecoveryITCase.java | 228 ++++++----
6 files changed, 575 insertions(+), 599 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c284745e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 1307b7a..62d9c07 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -159,17 +159,6 @@ public abstract class StreamExecutionEnvironment {
}
/**
- * Sets the number of times that failed tasks are re-executed. A value of zero
- * effectively disables fault tolerance. A value of {@code -1} indicates that the system
- * default value (as defined in the configuration) should be used.
- *
- * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
- */
- public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
- config.setNumberOfExecutionRetries(numberOfExecutionRetries);
- }
-
- /**
* Sets the maximum time frequency (milliseconds) for the flushing of the
* output buffers. By default the output buffers flush frequently to provide
* low latency and to aid smooth developer experience. Setting the parameter
@@ -397,10 +386,10 @@ public abstract class StreamExecutionEnvironment {
* The interval of file watching in milliseconds.
* @param watchType
* The watch type of file stream. When watchType is
- * {@link WatchType.ONLY_NEW_FILES}, the system processes only
- * new files. {@link WatchType.REPROCESS_WITH_APPENDED} means
+ * {@link WatchType#ONLY_NEW_FILES}, the system processes only
+ * new files. {@link WatchType#REPROCESS_WITH_APPENDED} means
* that the system re-processes all contents of appended file.
- * {@link WatchType.PROCESS_ONLY_APPENDED} means that the system
+ * {@link WatchType#PROCESS_ONLY_APPENDED} means that the system
* processes only appended contents of files.
*
* @return The DataStream containing the given directory.
http://git-wip-us.apache.org/repos/asf/flink/blob/c284745e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index d4ce24e..eed4234 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.Function;
/**
* Interface for implementing user defined sink functionality.
*
- * @param <IN> INput type parameter.
+ * @param <IN> Input type parameter.
*/
public interface SinkFunction<IN> extends Function, Serializable {
http://git-wip-us.apache.org/repos/asf/flink/blob/c284745e/flink-test-utils/src/main/java/org/apache/flink/test/util/ProcessFailureRecoveryTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/ProcessFailureRecoveryTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/ProcessFailureRecoveryTestBase.java
deleted file mode 100644
index 3aef925..0000000
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/ProcessFailureRecoveryTestBase.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.net.NetUtils;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.HashSet;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
-import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
-
-/**
- * This is a testbase for tests verifying the behavior of the recovery in the
- * case when a TaskManager fails (process is killed) in the middle of a job execution.
- *
- * The test works with multiple task managers processes by spawning JVMs.
- * Initially, it starts a JobManager in process and two TaskManagers JVMs with
- * 2 task slots each.
- * It submits a program with parallelism 4 and waits until all tasks are brought up.
- * Coordination between the test and the tasks happens via checking for the
- * existence of temporary files. It then starts another TaskManager, which is
- * guaranteed to remain empty (all tasks are already deployed) and kills one of
- * the original task managers. The recovery should restart the tasks on the new TaskManager.
- */
-public abstract class ProcessFailureRecoveryTestBase {
-
- protected static final String READY_MARKER_FILE_PREFIX = "ready_";
- protected static final String PROCEED_MARKER_FILE = "proceed";
-
- protected static final int PARALLELISM = 4;
-
- @Test
- public void testTaskManagerProcessFailure() {
-
- final StringWriter processOutput1 = new StringWriter();
- final StringWriter processOutput2 = new StringWriter();
- final StringWriter processOutput3 = new StringWriter();
-
- ActorSystem jmActorSystem = null;
- Process taskManagerProcess1 = null;
- Process taskManagerProcess2 = null;
- Process taskManagerProcess3 = null;
-
- File coordinateTempDir = null;
-
- try {
- // check that we run this test only if the java command
- // is available on this machine
- String javaCommand = getJavaCommandPath();
- if (javaCommand == null) {
- System.out.println("---- Skipping ProcessFailureBatchRecoveryITCase : Could not find java executable");
- return;
- }
-
- // create a logging file for the process
- File tempLogFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties");
- tempLogFile.deleteOnExit();
- CommonTestUtils.printLog4jDebugConfig(tempLogFile);
-
- // coordination between the processes goes through a directory
- coordinateTempDir = createTempDirectory();
-
- // find a free port to start the JobManager
- final int jobManagerPort = NetUtils.getAvailablePort();
-
- // start a JobManager
- Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
-
- Configuration jmConfig = new Configuration();
- jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
- jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
- jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
-
- jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
- ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();
-
- // the TaskManager java command
- String[] command = new String[]{
- javaCommand,
- "-Dlog.level=DEBUG",
- "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
- "-Xms80m", "-Xmx80m",
- "-classpath", getCurrentClasspath(),
- TaskManagerProcessEntryPoint.class.getName(),
- String.valueOf(jobManagerPort)
- };
-
- // start the first two TaskManager processes
- taskManagerProcess1 = new ProcessBuilder(command).start();
- new PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1);
- taskManagerProcess2 = new ProcessBuilder(command).start();
- new PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
-
- // we wait for the JobManager to have the two TaskManagers available
- // wait for at most 20 seconds
- waitUntilNumTaskManagersAreRegistered(jmActor, 2, 20000);
-
- // the program will set a marker file in each of its parallel tasks once they are ready, so that
- // this coordinating code is aware of this.
- // the program will very slowly consume elements until the marker file (later created by the
- // test driver code) is present
- final File coordinateDirClosure = coordinateTempDir;
- final Throwable[] errorRef = new Throwable[1];
-
- // get a trigger for the test program implemented by a subclass
- Thread programTrigger = testProgram(jobManagerPort, coordinateDirClosure, errorRef);
-
- //start the test program
- programTrigger.start();
-
- // wait until all marker files are in place, indicating that all tasks have started
- // max 20 seconds
- waitForMarkerFiles(coordinateTempDir, PARALLELISM, 20000);
-
- // start the third TaskManager
- taskManagerProcess3 = new ProcessBuilder(command).start();
- new PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
-
- // we wait for the third TaskManager to register (20 seconds max)
- waitUntilNumTaskManagersAreRegistered(jmActor, 3, 20000);
-
- // kill one of the previous TaskManagers, triggering a failure and recovery
- taskManagerProcess1.destroy();
- taskManagerProcess1 = null;
-
- // we create the marker file which signals the program functions tasks that they can complete
- touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
-
- // wait for at most 2 minutes for the program to complete
- programTrigger.join(120000);
-
- // check that the program really finished
- assertFalse("The program did not finish in time", programTrigger.isAlive());
-
- // apply post submission checks specified by the subclass
- postSubmit();
-
- // check whether the program encountered an error
- if (errorRef[0] != null) {
- Throwable error = errorRef[0];
- error.printStackTrace();
- fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
- }
-
- // all seems well :-)
-
- } catch (Exception e) {
- e.printStackTrace();
- printProcessLog("TaskManager 1", processOutput1.toString());
- printProcessLog("TaskManager 2", processOutput2.toString());
- printProcessLog("TaskManager 3", processOutput3.toString());
- fail(e.getMessage());
- } catch (Error e) {
- e.printStackTrace();
- printProcessLog("TaskManager 1", processOutput1.toString());
- printProcessLog("TaskManager 2", processOutput2.toString());
- printProcessLog("TaskManager 3", processOutput3.toString());
- throw e;
- } finally {
- if (taskManagerProcess1 != null) {
- taskManagerProcess1.destroy();
- }
- if (taskManagerProcess2 != null) {
- taskManagerProcess2.destroy();
- }
- if (taskManagerProcess3 != null) {
- taskManagerProcess3.destroy();
- }
- if (jmActorSystem != null) {
- jmActorSystem.shutdown();
- }
- if (coordinateTempDir != null) {
- try {
- FileUtils.deleteDirectory(coordinateTempDir);
- }
- catch (Throwable t) {
- // we can ignore this
- }
- }
- }
- }
-
- /**
- * The test program should be implemented here in a form of a separate thread.
- * This provides a solution for checking that it has been terminated.
- *
- * @param jobManagerPort The port for submitting the topology to the local cluster
- * @param coordinateDirClosure taskmanager failure will be triggered only after proccesses
- * have successfully created file under this directory
- * @param errorRef Errors passed back to the superclass
- * @return thread containing the test program
- */
- abstract public Thread testProgram(int jobManagerPort, final File coordinateDirClosure, final Throwable[] errorRef);
-
- /**
- * Check to be carried out after the completion of the test program thread.
- * In case of failed checks {@link java.lang.AssertionError} should be thrown.
- *
- * @throws Error
- * @throws Exception
- */
- abstract public void postSubmit() throws Error, Exception;
-
-
- protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
- throws Exception
- {
- final long deadline = System.currentTimeMillis() + maxDelay;
- while (true) {
- long remaining = deadline - System.currentTimeMillis();
- if (remaining <= 0) {
- fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
- }
-
- FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
-
- try {
- Future<?> result = Patterns.ask(jobManager,
- JobManagerMessages.getRequestNumberRegisteredTaskManager(),
- new Timeout(timeout));
- Integer numTMs = (Integer) Await.result(result, timeout);
- if (numTMs == numExpected) {
- break;
- }
- }
- catch (TimeoutException e) {
- // ignore and retry
- }
- catch (ClassCastException e) {
- fail("Wrong response: " + e.getMessage());
- }
- }
- }
-
- public static void fileBatchHasEveryNumberLower(int n, String path) throws IOException, AssertionError {
-
- HashSet<Integer> set = new HashSet<Integer>(n);
-
- int counter = 0;
- File file = new File(path + "-" + counter);
-
- while (file.exists()) {
-
- BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
-
- String line = bufferedReader.readLine();
-
- while (line != null) {
- int num = Integer.parseInt(line);
-
- set.add(num);
-
- line = bufferedReader.readLine();
- }
-
- bufferedReader.close();
- file.delete();
- counter++;
- file = new File(path + "-" + counter);
- }
-
- for (int i = 0; i < n; i++) {
- if (!set.contains(i)) {
- throw new AssertionError("Missing number: " + i);
- }
- }
- }
-
- protected static void printProcessLog(String processName, String log) {
- if (log == null || log.length() == 0) {
- return;
- }
-
- System.out.println("-----------------------------------------");
- System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
- System.out.println("-----------------------------------------");
- System.out.println(log);
- System.out.println("-----------------------------------------");
- System.out.println(" END SPAWNED PROCESS LOG");
- System.out.println("-----------------------------------------");
- }
-
- protected static File createTempDirectory() throws IOException {
- File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
- for (int i = 0; i < 10; i++) {
- File dir = new File(tempDir, UUID.randomUUID().toString());
- if (!dir.exists() && dir.mkdirs()) {
- return dir;
- }
- System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
- }
-
- throw new IOException("Could not create temporary file directory");
- }
-
- protected static void touchFile(File file) throws IOException {
- if (!file.exists()) {
- new FileOutputStream(file).close();
- }
- if (!file.setLastModified(System.currentTimeMillis())) {
- throw new IOException("Could not touch the file.");
- }
- }
-
- protected static void waitForMarkerFiles(File basedir, int num, long timeout) {
- long now = System.currentTimeMillis();
- final long deadline = now + timeout;
-
-
- while (now < deadline) {
- boolean allFound = true;
-
- for (int i = 0; i < num; i++) {
- File nextToCheck = new File(basedir, READY_MARKER_FILE_PREFIX + i);
- if (!nextToCheck.exists()) {
- allFound = false;
- break;
- }
- }
-
- if (allFound) {
- return;
- }
- else {
- // not all found, wait for a bit
- try {
- Thread.sleep(10);
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
-
- now = System.currentTimeMillis();
- }
- }
-
- fail("The tasks were not started within time (" + timeout + "msecs)");
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager.
- */
- public static class TaskManagerProcessEntryPoint {
-
- private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
-
- public static void main(String[] args) {
- try {
- int jobManagerPort = Integer.parseInt(args[0]);
-
- Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-
- TaskManager.runTaskManager(cfg, TaskManager.class);
-
- // wait forever
- Object lock = new Object();
- synchronized (lock) {
- lock.wait();
- }
- }
- catch (Throwable t) {
- LOG.error("Failed to start TaskManager process", t);
- System.exit(1);
- }
- }
- }
-
- /**
- * Utility class to read the output of a process stream and forward it into a StringWriter.
- */
- protected static class PipeForwarder extends Thread {
-
- private final StringWriter target;
- private final InputStream source;
-
- public PipeForwarder(InputStream source, StringWriter target) {
- super("Pipe Forwarder");
- setDaemon(true);
-
- this.source = source;
- this.target = target;
-
- start();
- }
-
- @Override
- public void run() {
- try {
- int next;
- while ((next = source.read()) != -1) {
- target.write(next);
- }
- }
- catch (IOException e) {
- // terminate
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c284745e/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
new file mode 100644
index 0000000..4311e9c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * Abstract base for tests verifying the behavior of the recovery in the
+ * case when a TaskManager fails (process is killed) in the middle of a job execution.
+ *
+ * The test works with multiple task managers processes by spawning JVMs.
+ * Initially, it starts a JobManager in process and two TaskManagers JVMs with
+ * 2 task slots each.
+ * It submits a program with parallelism 4 and waits until all tasks are brought up.
+ * Coordination between the test and the tasks happens via checking for the
+ * existence of temporary files. It then starts another TaskManager, which is
+ * guaranteed to remain empty (all tasks are already deployed) and kills one of
+ * the original task managers. The recovery should restart the tasks on the new TaskManager.
+ */
+public abstract class AbstractProcessFailureRecoveryTest {
+
+ protected static final String READY_MARKER_FILE_PREFIX = "ready_";
+ protected static final String PROCEED_MARKER_FILE = "proceed";
+
+ protected static final int PARALLELISM = 4;
+
+ @Test
+ public void testTaskManagerProcessFailure() {
+
+ final StringWriter processOutput1 = new StringWriter();
+ final StringWriter processOutput2 = new StringWriter();
+ final StringWriter processOutput3 = new StringWriter();
+
+ ActorSystem jmActorSystem = null;
+ Process taskManagerProcess1 = null;
+ Process taskManagerProcess2 = null;
+ Process taskManagerProcess3 = null;
+
+ File coordinateTempDir = null;
+
+ try {
+ // check that we run this test only if the java command
+ // is available on this machine
+ String javaCommand = getJavaCommandPath();
+ if (javaCommand == null) {
+ System.out.println("---- Skipping Process Failure test : Could not find java executable ----");
+ return;
+ }
+
+ // create a logging file for the process
+ File tempLogFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties");
+ tempLogFile.deleteOnExit();
+ CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+ // coordination between the processes goes through a directory
+ coordinateTempDir = createTempDirectory();
+
+ // find a free port to start the JobManager
+ final int jobManagerPort = NetUtils.getAvailablePort();
+
+ // start a JobManager
+ Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
+
+ Configuration jmConfig = new Configuration();
+ jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
+ jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+ jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
+ jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "2 s");
+
+ jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
+ ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();
+
+ // the TaskManager java command
+ String[] command = new String[] {
+ javaCommand,
+ "-Dlog.level=DEBUG",
+ "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
+ "-Xms80m", "-Xmx80m",
+ "-classpath", getCurrentClasspath(),
+ TaskManagerProcessEntryPoint.class.getName(),
+ String.valueOf(jobManagerPort)
+ };
+
+ // start the first two TaskManager processes
+ taskManagerProcess1 = new ProcessBuilder(command).start();
+ new PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1);
+ taskManagerProcess2 = new ProcessBuilder(command).start();
+ new PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
+
+ // we wait for the JobManager to have the two TaskManagers available
+ // wait for at most 20 seconds
+ waitUntilNumTaskManagersAreRegistered(jmActor, 2, 20000);
+
+ // the program will set a marker file in each of its parallel tasks once they are ready, so that
+ // this coordinating code is aware of this.
+ // the program will very slowly consume elements until the marker file (later created by the
+ // test driver code) is present
+ final File coordinateDirClosure = coordinateTempDir;
+ final Throwable[] errorRef = new Throwable[1];
+
+ // we trigger program execution in a separate thread
+ Thread programTrigger = new Thread("Program Trigger") {
+ @Override
+ public void run() {
+ try {
+ testProgram(jobManagerPort, coordinateDirClosure);
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ errorRef[0] = t;
+ }
+ }
+ };
+
+ //start the test program
+ programTrigger.start();
+
+ // wait until all marker files are in place, indicating that all tasks have started
+ // max 20 seconds
+ waitForMarkerFiles(coordinateTempDir, PARALLELISM, 20000);
+
+ // start the third TaskManager
+ taskManagerProcess3 = new ProcessBuilder(command).start();
+ new PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
+
+ // we wait for the third TaskManager to register (20 seconds max)
+ waitUntilNumTaskManagersAreRegistered(jmActor, 3, 20000);
+
+ // kill one of the previous TaskManagers, triggering a failure and recovery
+ taskManagerProcess1.destroy();
+ taskManagerProcess1 = null;
+
+ // we create the marker file which signals the program functions tasks that they can complete
+ touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
+
+ // wait for at most 2 minutes for the program to complete
+ programTrigger.join(120000);
+
+ // check that the program really finished
+ assertFalse("The program did not finish in time", programTrigger.isAlive());
+
+ // check whether the program encountered an error
+ if (errorRef[0] != null) {
+ Throwable error = errorRef[0];
+ error.printStackTrace();
+ fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
+ }
+
+ // all seems well :-)
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ printProcessLog("TaskManager 1", processOutput1.toString());
+ printProcessLog("TaskManager 2", processOutput2.toString());
+ printProcessLog("TaskManager 3", processOutput3.toString());
+ fail(e.getMessage());
+ }
+ catch (Error e) {
+ e.printStackTrace();
+ printProcessLog("TaskManager 1", processOutput1.toString());
+ printProcessLog("TaskManager 2", processOutput2.toString());
+ printProcessLog("TaskManager 3", processOutput3.toString());
+ throw e;
+ }
+ finally {
+ if (taskManagerProcess1 != null) {
+ taskManagerProcess1.destroy();
+ }
+ if (taskManagerProcess2 != null) {
+ taskManagerProcess2.destroy();
+ }
+ if (taskManagerProcess3 != null) {
+ taskManagerProcess3.destroy();
+ }
+ if (jmActorSystem != null) {
+ jmActorSystem.shutdown();
+ }
+ if (coordinateTempDir != null) {
+ try {
+ FileUtils.deleteDirectory(coordinateTempDir);
+ }
+ catch (Throwable t) {
+ // we can ignore this
+ }
+ }
+ }
+ }
+
+ /**
+ * The test program should be implemented here in a form of a separate thread.
+ * This provides a solution for checking that it has been terminated.
+ *
+ * @param jobManagerPort The port for submitting the topology to the local cluster
+ * @param coordinateDir TaskManager failure will be triggered only after processes
+ * have successfully created file under this directory
+ */
+ public abstract void testProgram(int jobManagerPort, File coordinateDir) throws Exception;
+
+
+ protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
+ throws Exception
+ {
+ final long deadline = System.currentTimeMillis() + maxDelay;
+ while (true) {
+ long remaining = deadline - System.currentTimeMillis();
+ if (remaining <= 0) {
+ fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
+ }
+
+ FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
+
+ try {
+ Future<?> result = Patterns.ask(jobManager,
+ JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+ new Timeout(timeout));
+ Integer numTMs = (Integer) Await.result(result, timeout);
+ if (numTMs == numExpected) {
+ break;
+ }
+ }
+ catch (TimeoutException e) {
+ // ignore and retry
+ }
+ catch (ClassCastException e) {
+ fail("Wrong response: " + e.getMessage());
+ }
+ }
+ }
+
+ protected static void printProcessLog(String processName, String log) {
+ if (log == null || log.length() == 0) {
+ return;
+ }
+
+ System.out.println("-----------------------------------------");
+ System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
+ System.out.println("-----------------------------------------");
+ System.out.println(log);
+ System.out.println("-----------------------------------------");
+ System.out.println(" END SPAWNED PROCESS LOG");
+ System.out.println("-----------------------------------------");
+ }
+
+ protected static File createTempDirectory() throws IOException {
+ File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+ for (int i = 0; i < 10; i++) {
+ File dir = new File(tempDir, UUID.randomUUID().toString());
+ if (!dir.exists() && dir.mkdirs()) {
+ return dir;
+ }
+ System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
+ }
+
+ throw new IOException("Could not create temporary file directory");
+ }
+
+ protected static void touchFile(File file) throws IOException {
+ if (!file.exists()) {
+ new FileOutputStream(file).close();
+ }
+ if (!file.setLastModified(System.currentTimeMillis())) {
+ throw new IOException("Could not touch the file.");
+ }
+ }
+
+ protected static void waitForMarkerFiles(File basedir, int num, long timeout) {
+ long now = System.currentTimeMillis();
+ final long deadline = now + timeout;
+
+
+ while (now < deadline) {
+ boolean allFound = true;
+
+ for (int i = 0; i < num; i++) {
+ File nextToCheck = new File(basedir, READY_MARKER_FILE_PREFIX + i);
+ if (!nextToCheck.exists()) {
+ allFound = false;
+ break;
+ }
+ }
+
+ if (allFound) {
+ return;
+ }
+ else {
+ // not all found, wait for a bit
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ now = System.currentTimeMillis();
+ }
+ }
+
+ fail("The tasks were not started within time (" + timeout + "msecs)");
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager.
+ */
+ public static class TaskManagerProcessEntryPoint {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
+
+ public static void main(String[] args) {
+ try {
+ int jobManagerPort = Integer.parseInt(args[0]);
+
+ Configuration cfg = new Configuration();
+ cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+ cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+
+ TaskManager.runTaskManager(cfg, TaskManager.class);
+
+ // wait forever
+ Object lock = new Object();
+ synchronized (lock) {
+ lock.wait();
+ }
+ }
+ catch (Throwable t) {
+ LOG.error("Failed to start TaskManager process", t);
+ System.exit(1);
+ }
+ }
+ }
+
+ /**
+ * Utility class to read the output of a process stream and forward it into a StringWriter.
+ */
+ protected static class PipeForwarder extends Thread {
+
+ private final StringWriter target;
+ private final InputStream source;
+
+ public PipeForwarder(InputStream source, StringWriter target) {
+ super("Pipe Forwarder");
+ setDaemon(true);
+
+ this.source = source;
+ this.target = target;
+
+ start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ int next;
+ while ((next = source.read()) != -1) {
+ target.write(next);
+ }
+ }
+ catch (IOException e) {
+ // terminate
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c284745e/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
index c013fa8..cdee8ce 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
@@ -18,24 +18,32 @@
package org.apache.flink.test.recovery;
+import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.ProcessFailureRecoveryTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
import static org.junit.Assert.assertEquals;
/**
- * Test for streaming program behaviour in case of taskmanager failure
- * based on {@link ProcessFailureRecoveryTestBase}.
+ * Test the recovery of a simple batch program in the case of TaskManager process failure.
*/
@SuppressWarnings("serial")
-public class ProcessFailureBatchRecoveryITCase extends ProcessFailureStreamingRecoveryITCase {
+@RunWith(Parameterized.class)
+public class ProcessFailureBatchRecoveryITCase extends AbstractProcessFailureRecoveryTest {
- private ExecutionMode executionMode;
+ // --------------------------------------------------------------------------------------------
+ // Parametrization (run pipelined and batch)
+ // --------------------------------------------------------------------------------------------
+
+ private final ExecutionMode executionMode;
public ProcessFailureBatchRecoveryITCase(ExecutionMode executionMode) {
this.executionMode = executionMode;
@@ -48,12 +56,17 @@ public class ProcessFailureBatchRecoveryITCase extends ProcessFailureStreamingRe
{ExecutionMode.BATCH}});
}
+ // --------------------------------------------------------------------------------------------
+ // Test the program
+ // --------------------------------------------------------------------------------------------
+
@Override
- public Thread testProgram(int jobManagerPort, final File coordinateDirClosure, final Throwable[] errorRef) {
+ public void testProgram(int jobManagerPort, final File coordinateDir) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
- env.setDegreeOfParallelism(PARALLELISM);
+ env.setParallelism(PARALLELISM);
env.setNumberOfExecutionRetries(1);
+ env.getConfig().setExecutionMode(executionMode);
final long NUM_ELEMENTS = 100000L;
final DataSet<Long> result = env.generateSequence(1, NUM_ELEMENTS)
@@ -63,7 +76,7 @@ public class ProcessFailureBatchRecoveryITCase extends ProcessFailureStreamingRe
// the majority of the behavior is in the MapFunction
.map(new RichMapFunction<Long, Long>() {
- private final File proceedFile = new File(coordinateDirClosure, PROCEED_MARKER_FILE);
+ private final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
private boolean markerCreated = false;
private boolean checkForProceedFile = true;
@@ -72,7 +85,7 @@ public class ProcessFailureBatchRecoveryITCase extends ProcessFailureStreamingRe
public Long map(Long value) throws Exception {
if (!markerCreated) {
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
- touchFile(new File(coordinateDirClosure, READY_MARKER_FILE_PREFIX + taskIndex));
+ touchFile(new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex));
markerCreated = true;
}
@@ -95,24 +108,7 @@ public class ProcessFailureBatchRecoveryITCase extends ProcessFailureStreamingRe
}
});
- // we trigger program execution in a separate thread
- return new Thread("ProcessFailureBatchRecoveryITCase Program Trigger") {
- @Override
- public void run() {
- try {
- long sum = result.collect().get(0);
- assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, sum);
- } catch (Throwable t) {
- t.printStackTrace();
- errorRef[0] = t;
- }
- }
- };
- }
-
- @Override
- public void postSubmit() throws Exception, Error {
- // unnecessary
+ long sum = result.collect().get(0);
+ assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, sum);
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c284745e/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index 1933766..82b9d6a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -18,7 +18,9 @@
package org.apache.flink.test.recovery;
+import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -26,58 +28,70 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-import org.apache.flink.test.util.ProcessFailureRecoveryTestBase;
import org.apache.flink.util.Collector;
-import java.io.BufferedWriter;
+import java.io.BufferedReader;
import java.io.File;
-import java.io.FileWriter;
+import java.io.FileReader;
import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
/**
- * Test for streaming program behaviour in case of taskmanager failure
- * based on {@link ProcessFailureRecoveryTestBase}.
+ * Test for streaming program behaviour in case of TaskManager failure
+ * based on {@link AbstractProcessFailureRecoveryTest}.
+ *
+ * The logic in this test is as follows:
+ * - The source slowly emits records (every 10 msecs) until the test driver
+ * gives the "go" for regular execution
+ * - The "go" is given after the first taskmanager has been killed, so it can only
+ * happen in the recovery run
+ * - The mapper must not be slow, because otherwise the checkpoint barrier cannot pass
+ * the mapper and no checkpoint will be completed before the killing of the first
+ * TaskManager.
*/
@SuppressWarnings("serial")
-public class ProcessFailureStreamingRecoveryITCase extends ProcessFailureRecoveryTestBase {
+public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailureRecoveryTest {
- private static final String RESULT_PATH = "tempTestOutput";
- private static final int DATA_COUNT = 252;
+ private static final int DATA_COUNT = 10000;
@Override
- public Thread testProgram(int jobManagerPort, final File coordinateDirClosure, final Throwable[] errorRef) {
+ public void testProgram(int jobManagerPort, final File coordinateDir) throws Exception {
+
+ final File tempTestOutput = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH),
+ UUID.randomUUID().toString());
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
- env.setDegreeOfParallelism(PARALLELISM);
- env.setNumberOfExecutionRetries(1);
- env.enableMonitoring(100);
+ assertTrue("Cannot create directory for temp output", tempTestOutput.mkdirs());
- final DataStream<Long> result = env.addSource(new SleepyDurableGenerateSequence(DATA_COUNT))
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .createRemoteEnvironment("localhost", jobManagerPort);
+ env.setParallelism(PARALLELISM);
+ env.setNumberOfExecutionRetries(1);
+ env.enableCheckpointing(200);
+
+ DataStream<Long> result = env.addSource(new SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT))
// make sure every mapper is involved
- .shuffle()
+// .shuffle()
- // populate the coordinate directory so we can proceed to taskmanager failure
+ // populate the coordinate directory so we can proceed to TaskManager failure
.map(new RichMapFunction<Long, Long>() {
private boolean markerCreated = false;
@Override
- public void open(Configuration parameters) throws IOException {
-
+ public Long map(Long value) throws Exception {
if (!markerCreated) {
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
- try {
- touchFile(new File(coordinateDirClosure, READY_MARKER_FILE_PREFIX + taskIndex));
- } catch (IOException e) {
- e.printStackTrace();
- }
+ touchFile(new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex));
markerCreated = true;
}
- }
-
- @Override
- public Long map(Long value) throws Exception {
return value;
}
});
@@ -85,124 +99,138 @@ public class ProcessFailureStreamingRecoveryITCase extends ProcessFailureRecover
//write result to temporary file
result.addSink(new RichSinkFunction<Long>() {
- private transient File output;
- private transient int outputIndex;
- private transient BufferedWriter writer;
+ // the sink needs to do its write operations synchronized with
+ // the disk FS, otherwise the process kill will discard data
+ // in buffers in the process
+ private transient FileChannel writer;
@Override
public void open(Configuration parameters) throws IOException {
- outputIndex = 0;
- do {
- output = new File(RESULT_PATH + "-" + outputIndex);
- outputIndex++;
- } while (output.exists());
- writer = new BufferedWriter(new FileWriter(output));
- }
+ int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
+ File output = new File(tempTestOutput, "task-" + taskIndex + "-" + UUID.randomUUID().toString());
- @Override
- public void invoke(Long value) throws Exception {
- writer.write(value.toString());
- writer.newLine();
+ // "rws" causes writes to go synchronously to the filesystem, nothing is cached
+ RandomAccessFile outputFile = new RandomAccessFile(output, "rws");
+ this.writer = outputFile.getChannel();
}
@Override
- public void close(){
- try {
- writer.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
+ public void invoke(Long value) throws Exception {
+ String text = value + "\n";
+ byte[] bytes = text.getBytes(Charset.defaultCharset());
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ writer.write(buffer);
}
@Override
- public void cancel() {
- close();
+ public void close() throws Exception {
+ writer.close();
}
-
});
- // we trigger program execution in a separate thread
- return new ProgramTrigger(env, errorRef);
- }
+ try {
+ // blocking call until execution is done
+ env.execute();
- @Override
- public void postSubmit() throws Exception, Error {
-
- // checks at least once processing guarantee of the output stream
- fileBatchHasEveryNumberLower(DATA_COUNT, RESULT_PATH);
+ // validate
+ fileBatchHasEveryNumberLower(PARALLELISM, DATA_COUNT, tempTestOutput);
+ }
+ finally {
+ // clean up
+ if (tempTestOutput.exists()) {
+ FileUtils.deleteDirectory(tempTestOutput);
+ }
+ }
}
public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long> {
- private static final long SLEEP_TIME = 10;
+ private static final long SLEEP_TIME = 50;
- private long end;
- private long collected = 0L;
- private long toCollect;
- private long coungrence;
- private long stepSize;
- private transient OperatorState<Long> collectedState;
+ private final File coordinateDir;
+ private final long end;
- public SleepyDurableGenerateSequence(long end){
+ public SleepyDurableGenerateSequence(File coordinateDir, long end) {
+ this.coordinateDir = coordinateDir;
this.end = end;
}
- public void open(Configuration parameters) throws Exception {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void run(Collector<Long> collector) throws Exception {
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-
+ OperatorState<Long> collectedState;
if (context.containsState("collected")) {
collectedState = (OperatorState<Long>) context.getState("collected");
- collected = collectedState.getState();
- } else {
- collectedState = new OperatorState<Long>(collected);
+
+// if (collected == 0) {
+// throw new RuntimeException("The state did not capture a completed checkpoint");
+// }
+ }
+ else {
+ collectedState = new OperatorState<Long>(0L);
context.registerState("collected", collectedState);
}
- super.open(parameters);
- stepSize = context.getNumberOfParallelSubtasks();
- coungrence = context.getIndexOfThisSubtask();
- toCollect = (end % stepSize > coungrence) ? (end / stepSize + 1) : (end / stepSize);
- }
+ final long stepSize = context.getNumberOfParallelSubtasks();
+ final long congruence = context.getIndexOfThisSubtask();
+ final long toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
+ long collected = collectedState.getState();
+
+ final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
+ boolean checkForProceedFile = true;
+
+ while (collected < toCollect) {
+ // check if the proceed file exists (then we go full speed)
+ // if not, we always recheck and sleep
+ if (checkForProceedFile) {
+ if (proceedFile.exists()) {
+ checkForProceedFile = false;
+ } else {
+ // otherwise wait so that we make slow progress
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
- @Override
- public void run(Collector<Long> collector) throws Exception {
- while (collected < toCollect){
- collector.collect(collected * stepSize + coungrence);
+ collector.collect(collected * stepSize + congruence);
collectedState.update(collected);
collected++;
- Thread.sleep(SLEEP_TIME);
}
-
}
@Override
- public void cancel() {
- }
+ public void cancel() {}
}
- public class ProgramTrigger extends Thread {
- StreamExecutionEnvironment env;
- Throwable[] errorRef;
+ private static void fileBatchHasEveryNumberLower(int numFiles, int numbers, File path) throws IOException {
- ProgramTrigger(StreamExecutionEnvironment env, Throwable[] errorRef){
- super("ProcessFailureStreamingRecoveryITCase Program Trigger");
- this.env = env;
- this.errorRef = errorRef;
- }
+ HashSet<Integer> set = new HashSet<Integer>(numbers);
- @Override
- public void run() {
- try {
- env.execute();
- }
- catch (Throwable t) {
- t.printStackTrace();
- errorRef[0] = t;
+ File[] files = path.listFiles();
+ assertNotNull(files);
+ assertTrue("Not enough output files", files.length >= numFiles);
+
+ for (File file : files) {
+ assertTrue("Output file does not exist", file.exists());
+
+ BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
+
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ int num = Integer.parseInt(line);
+ set.add(num);
}
+
+ bufferedReader.close();
}
+ for (int i = 0; i < numbers; i++) {
+ if (!set.contains(i)) {
+ fail("Missing number: " + i);
+ }
+ }
}
}