You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 14:35:01 UTC
[flink] 02/08: [FLINK-10402] Port
AbstractTaskManagerProcessFailureRecoveryTest to new code base
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 93b042da45a0bbf418e7df96059fe36eb4f18b9e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sun Sep 23 22:05:28 2018 +0200
[FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
This closes #6750.
---
.../runtime/entrypoint/ClusterEntrypoint.java | 4 +-
...tractTaskManagerProcessFailureRecoveryTest.java | 134 ++++-----------------
...skManagerProcessFailureBatchRecoveryITCase.java | 4 +-
...nagerProcessFailureStreamingRecoveryITCase.java | 2 -
4 files changed, 28 insertions(+), 116 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index c9a1722..5665500 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -147,7 +147,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
return terminationFuture;
}
- protected void startCluster() throws ClusterEntrypointException {
+ public void startCluster() throws ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());
try {
@@ -392,7 +392,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
return resultConfiguration;
}
- private CompletableFuture<ApplicationStatus> shutDownAsync(
+ public CompletableFuture<ApplicationStatus> shutDownAsync(
ApplicationStatus applicationStatus,
@Nullable String diagnostics,
boolean cleanupHaData) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 5cd6f30..0962ddf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -19,29 +19,21 @@
package org.apache.flink.test.recovery;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -52,17 +44,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.StringWriter;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
import static org.junit.Assert.assertFalse;
@@ -92,6 +75,9 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @Rule
+ public final BlobServerResource blobServerResource = new BlobServerResource();
+
@Test
public void testTaskManagerProcessFailure() throws Exception {
@@ -99,14 +85,25 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
final StringWriter processOutput2 = new StringWriter();
final StringWriter processOutput3 = new StringWriter();
- ActorSystem jmActorSystem = null;
- HighAvailabilityServices highAvailabilityServices = null;
Process taskManagerProcess1 = null;
Process taskManagerProcess2 = null;
Process taskManagerProcess3 = null;
File coordinateTempDir = null;
+ final int jobManagerPort = NetUtils.getAvailablePort();
+ final int restPort = NetUtils.getAvailablePort();
+
+ Configuration jmConfig = new Configuration();
+ jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+ jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+ jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+ jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 500L);
+ jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L);
+ jmConfig.setInteger(RestOptions.PORT, restPort);
+
+ final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig);
+
try {
// check that we run this test only if the java command
// is available on this machine
@@ -124,37 +121,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
// coordination between the processes goes through a directory
coordinateTempDir = temporaryFolder.newFolder();
- // find a free port to start the JobManager
- final int jobManagerPort = NetUtils.getAvailablePort();
-
- // start a JobManager
- Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
-
- Configuration jmConfig = new Configuration();
- jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms");
- jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s");
- jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
- jmConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "10 s");
- jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
- jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1());
- jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
-
- highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
- jmConfig,
- TestingUtils.defaultExecutor(),
- HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
- jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<>(localAddress));
- ActorRef jmActor = JobManager.startJobManagerActors(
- jmConfig,
- jmActorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- Option.empty(),
- JobManager.class,
- MemoryArchivist.class)._1();
+ clusterEntrypoint.startCluster();
// the TaskManager java command
String[] command = new String[] {
@@ -163,7 +130,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
"-Xms80m", "-Xmx80m",
"-classpath", getCurrentClasspath(),
- TaskManagerProcessEntryPoint.class.getName(),
+ TaskExecutorProcessEntryPoint.class.getName(),
String.valueOf(jobManagerPort)
};
@@ -173,10 +140,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
taskManagerProcess2 = new ProcessBuilder(command).start();
new CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
- // we wait for the JobManager to have the two TaskManagers available
- // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
- waitUntilNumTaskManagersAreRegistered(jmActor, 2, 120000);
-
// the program will set a marker file in each of its parallel tasks once they are ready, so that
// this coordinating code is aware of this.
// the program will very slowly consume elements until the marker file (later created by the
@@ -189,7 +152,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
@Override
public void run() {
try {
- testTaskManagerFailure(jobManagerPort, coordinateDirClosure);
+ testTaskManagerFailure(restPort, coordinateDirClosure);
}
catch (Throwable t) {
t.printStackTrace();
@@ -220,10 +183,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
taskManagerProcess3 = new ProcessBuilder(command).start();
new CommonTestUtils.PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
- // we wait for the third TaskManager to register
- // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
- waitUntilNumTaskManagersAreRegistered(jmActor, 3, 120000);
-
// kill one of the previous TaskManagers, triggering a failure and recovery
taskManagerProcess1.destroy();
taskManagerProcess1 = null;
@@ -270,13 +229,8 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
if (taskManagerProcess3 != null) {
taskManagerProcess3.destroy();
}
- if (jmActorSystem != null) {
- jmActorSystem.shutdown();
- }
- if (highAvailabilityServices != null) {
- highAvailabilityServices.closeAndCleanupAllData();
- }
+ clusterEntrypoint.shutDownAsync(ApplicationStatus.SUCCEEDED, null, true).get();
}
}
@@ -290,44 +244,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
*/
public abstract void testTaskManagerFailure(int jobManagerPort, File coordinateDir) throws Exception;
- protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelayMillis)
- throws Exception {
- final long pollInterval = 10_000_000; // 10 ms = 10,000,000 nanos
- final long deadline = System.nanoTime() + maxDelayMillis * 1_000_000;
-
- long time;
-
- while ((time = System.nanoTime()) < deadline) {
- FiniteDuration timeout = new FiniteDuration(pollInterval, TimeUnit.NANOSECONDS);
-
- try {
- Future<?> result = Patterns.ask(jobManager,
- JobManagerMessages.getRequestNumberRegisteredTaskManager(),
- new Timeout(timeout));
-
- int numTMs = (Integer) Await.result(result, timeout);
-
- if (numTMs == numExpected) {
- return;
- }
- }
- catch (TimeoutException e) {
- // ignore and retry
- }
- catch (ClassCastException e) {
- fail("Wrong response: " + e.getMessage());
- }
-
- long timePassed = System.nanoTime() - time;
- long remainingMillis = (pollInterval - timePassed) / 1_000_000;
- if (remainingMillis > 0) {
- Thread.sleep(remainingMillis);
- }
- }
-
- fail("The TaskManagers did not register within the expected time (" + maxDelayMillis + "msecs)");
- }
-
protected static void printProcessLog(String processName, String log) {
if (log == null || log.length() == 0) {
return;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
index 7dc6f0c..4815c49 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -68,10 +67,9 @@ public class TaskManagerProcessFailureBatchRecoveryITCase extends AbstractTaskMa
public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir) throws Exception {
final Configuration configuration = new Configuration();
- configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
env.getConfig().setExecutionMode(executionMode);
env.getConfig().disableSysoutLogging();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
index 766a799..fbf6b5b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -67,7 +66,6 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
final File tempCheckpointDir = tempFolder.newFolder();
final Configuration configuration = new Configuration();
- configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
"localhost",
jobManagerPort,