You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/12 23:03:21 UTC
[10/10] flink git commit: [tests] Add test that checkes canceling
against a failed TaskManager process
[tests] Add test that checkes canceling against a failed TaskManager process
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4bd35259
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4bd35259
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4bd35259
Branch: refs/heads/master
Commit: 4bd352599d0b4ff97ae8cf7eebc528dbcbd33dff
Parents: 959d227
Author: Stephan Ewen <se...@apache.org>
Authored: Tue May 12 16:45:05 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 21:35:58 2015 +0200
----------------------------------------------------------------------
.../recovery/ProcessFailureCancelingITCase.java | 289 +++++++++++++++++++
1 file changed, 289 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4bd35259/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
new file mode 100644
index 0000000..5482fc5
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * This test makes sure that jobs are canceled properly in cases where
+ * the task manager went down and did not respond to cancel messages.
+ */
+@SuppressWarnings("serial")
+public class ProcessFailureCancelingITCase {
+
+ @Test
+ public void testCancelingOnProcessFailure() {
+ final StringWriter processOutput = new StringWriter();
+
+ ActorSystem jmActorSystem = null;
+ Process taskManagerProcess = null;
+
+ try {
+ // check that we run this test only if the java command
+ // is available on this machine
+ String javaCommand = getJavaCommandPath();
+ if (javaCommand == null) {
+ System.out.println("---- Skipping Process Failure test : Could not find java executable ----");
+ return;
+ }
+
+ // create a logging file for the process
+ File tempLogFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties");
+ tempLogFile.deleteOnExit();
+ CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+ // find a free port to start the JobManager
+ final int jobManagerPort = NetUtils.getAvailablePort();
+
+ // start a JobManager
+ Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
+
+ Configuration jmConfig = new Configuration();
+ jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "5 s");
+ jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2000 s");
+ jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 10);
+ jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10 s");
+
+ jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
+ ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();
+
+ // the TaskManager java command
+ String[] command = new String[] {
+ javaCommand,
+ "-Dlog.level=DEBUG",
+ "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
+ "-Xms80m", "-Xmx80m",
+ "-classpath", getCurrentClasspath(),
+ AbstractProcessFailureRecoveryTest.TaskManagerProcessEntryPoint.class.getName(),
+ String.valueOf(jobManagerPort)
+ };
+
+ // start the first two TaskManager processes
+ taskManagerProcess = new ProcessBuilder(command).start();
+ new AbstractProcessFailureRecoveryTest.PipeForwarder(taskManagerProcess.getErrorStream(), processOutput);
+
+ // we wait for the JobManager to have the two TaskManagers available
+ // wait for at most 30 seconds
+ waitUntilNumTaskManagersAreRegistered(jmActor, 1, 30000);
+
+ final Throwable[] errorRef = new Throwable[1];
+
+ // start the test program, which infinitely blocks
+ Runnable programRunner = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
+ env.setParallelism(2);
+ env.setNumberOfExecutionRetries(0);
+ env.getConfig().disableSysoutLogging();
+
+ env.generateSequence(0, Long.MAX_VALUE)
+
+ .map(new MapFunction<Long, Long>() {
+
+ @Override
+ public Long map(Long value) throws Exception {
+ synchronized (this) {
+ wait();
+ }
+ return 0L;
+ }
+ })
+
+ .output(new DiscardingOutputFormat<Long>());
+
+ env.execute();
+ }
+ catch (Throwable t) {
+ errorRef[0] = t;
+ }
+ }
+ };
+
+ Thread programThread = new Thread(programRunner);
+
+ // kill the TaskManager
+ taskManagerProcess.destroy();
+ taskManagerProcess = null;
+
+ // immediately submit the job. this should hit the case
+ // where the JobManager still thinks it has the TaskManager and tries to send it tasks
+ programThread.start();
+
+ // try to cancel the job
+ cancelRunningJob(jmActor);
+
+ // we should see a failure within reasonable time (10s is the ask timeout).
+ // since the CI environment is often slow, we conservatively give it up to 2 minutes,
+ // to fail, which is much lower than the failure time given by the heartbeats ( > 2000s)
+
+ programThread.join(120000);
+
+ assertFalse("The program did not cancel in time (2 minutes)", programThread.isAlive());
+
+ Throwable error = errorRef[0];
+ assertNotNull("The program did not fail properly", error);
+
+ assertTrue(error instanceof ProgramInvocationException);
+ // all seems well :-)
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ printProcessLog("TaskManager", processOutput.toString());
+ fail(e.getMessage());
+ }
+ catch (Error e) {
+ e.printStackTrace();
+ printProcessLog("TaskManager 1", processOutput.toString());
+ throw e;
+ }
+ finally {
+ if (taskManagerProcess != null) {
+ taskManagerProcess.destroy();
+ }
+ if (jmActorSystem != null) {
+ jmActorSystem.shutdown();
+ }
+ }
+ }
+
+ private void cancelRunningJob(ActorRef jobManager) throws Exception {
+ final FiniteDuration askTimeout = new FiniteDuration(10, TimeUnit.SECONDS);
+
+ // try at most for 30 seconds
+ final long deadline = System.currentTimeMillis() + 30000;
+
+ JobID jobId = null;
+
+ do {
+ Future<Object> response = Patterns.ask(jobManager,
+ JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(askTimeout));
+
+ Object result;
+ try {
+ result = Await.result(response, askTimeout);
+ }
+ catch (Exception e) {
+ throw new Exception("Could not retrieve running jobs from the JobManager.", e);
+ }
+
+ if (result instanceof JobManagerMessages.RunningJobsStatus) {
+
+ List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+
+ if (jobs.size() == 1) {
+ jobId = jobs.get(0).getJobId();
+ break;
+ }
+ }
+ }
+ while (System.currentTimeMillis() < deadline);
+
+ if (jobId == null) {
+ // we never found it running, must have failed already
+ return;
+ }
+
+ // tell the JobManager to cancel the job
+ jobManager.tell(new JobManagerMessages.CancelJob(jobId), ActorRef.noSender());
+ }
+
+ private void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
+ throws Exception
+ {
+ final long deadline = System.currentTimeMillis() + maxDelay;
+ while (true) {
+ long remaining = deadline - System.currentTimeMillis();
+ if (remaining <= 0) {
+ fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
+ }
+
+ FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
+
+ try {
+ Future<?> result = Patterns.ask(jobManager,
+ JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+ new Timeout(timeout));
+ Integer numTMs = (Integer) Await.result(result, timeout);
+ if (numTMs == numExpected) {
+ break;
+ }
+ }
+ catch (TimeoutException e) {
+ // ignore and retry
+ }
+ catch (ClassCastException e) {
+ fail("Wrong response: " + e.getMessage());
+ }
+ }
+ }
+
+ private void printProcessLog(String processName, String log) {
+ if (log == null || log.length() == 0) {
+ return;
+ }
+
+ System.out.println("-----------------------------------------");
+ System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
+ System.out.println("-----------------------------------------");
+ System.out.println(log);
+ System.out.println("-----------------------------------------");
+ System.out.println(" END SPAWNED PROCESS LOG");
+ System.out.println("-----------------------------------------");
+ }
+}