You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 14:35:01 UTC

[flink] 02/08: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93b042da45a0bbf418e7df96059fe36eb4f18b9e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sun Sep 23 22:05:28 2018 +0200

    [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
    
    This closes #6750.
---
 .../runtime/entrypoint/ClusterEntrypoint.java      |   4 +-
 ...tractTaskManagerProcessFailureRecoveryTest.java | 134 ++++-----------------
 ...skManagerProcessFailureBatchRecoveryITCase.java |   4 +-
 ...nagerProcessFailureStreamingRecoveryITCase.java |   2 -
 4 files changed, 28 insertions(+), 116 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index c9a1722..5665500 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -147,7 +147,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return terminationFuture;
 	}
 
-	protected void startCluster() throws ClusterEntrypointException {
+	public void startCluster() throws ClusterEntrypointException {
 		LOG.info("Starting {}.", getClass().getSimpleName());
 
 		try {
@@ -392,7 +392,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return resultConfiguration;
 	}
 
-	private CompletableFuture<ApplicationStatus> shutDownAsync(
+	public CompletableFuture<ApplicationStatus> shutDownAsync(
 			ApplicationStatus applicationStatus,
 			@Nullable String diagnostics,
 			boolean cleanupHaData) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 5cd6f30..0962ddf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -19,29 +19,21 @@
 package org.apache.flink.test.recovery;
 
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -52,17 +44,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.StringWriter;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
 import static org.junit.Assert.assertFalse;
@@ -92,6 +75,9 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 	@Rule
 	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+	@Rule
+	public final BlobServerResource blobServerResource = new BlobServerResource();
+
 	@Test
 	public void testTaskManagerProcessFailure() throws Exception {
 
@@ -99,14 +85,25 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 		final StringWriter processOutput2 = new StringWriter();
 		final StringWriter processOutput3 = new StringWriter();
 
-		ActorSystem jmActorSystem = null;
-		HighAvailabilityServices highAvailabilityServices = null;
 		Process taskManagerProcess1 = null;
 		Process taskManagerProcess2 = null;
 		Process taskManagerProcess3 = null;
 
 		File coordinateTempDir = null;
 
+		final int jobManagerPort = NetUtils.getAvailablePort();
+		final int restPort = NetUtils.getAvailablePort();
+
+		Configuration jmConfig = new Configuration();
+		jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+		jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+		jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+		jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 500L);
+		jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L);
+		jmConfig.setInteger(RestOptions.PORT, restPort);
+
+		final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig);
+
 		try {
 			// check that we run this test only if the java command
 			// is available on this machine
@@ -124,37 +121,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			// coordination between the processes goes through a directory
 			coordinateTempDir = temporaryFolder.newFolder();
 
-			// find a free port to start the JobManager
-			final int jobManagerPort = NetUtils.getAvailablePort();
-
-			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
-
-			Configuration jmConfig = new Configuration();
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms");
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s");
-			jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
-			jmConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "10 s");
-			jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
-			jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1());
-			jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
-
-			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-				jmConfig,
-				TestingUtils.defaultExecutor(),
-				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(
-				jmConfig,
-				jmActorSystem,
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				highAvailabilityServices,
-				NoOpMetricRegistry.INSTANCE,
-				Option.empty(),
-				JobManager.class,
-				MemoryArchivist.class)._1();
+			clusterEntrypoint.startCluster();
 
 			// the TaskManager java command
 			String[] command = new String[] {
@@ -163,7 +130,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
 					"-Xms80m", "-Xmx80m",
 					"-classpath", getCurrentClasspath(),
-					TaskManagerProcessEntryPoint.class.getName(),
+					TaskExecutorProcessEntryPoint.class.getName(),
 					String.valueOf(jobManagerPort)
 			};
 
@@ -173,10 +140,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			taskManagerProcess2 = new ProcessBuilder(command).start();
 			new CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
 
-			// we wait for the JobManager to have the two TaskManagers available
-			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 2, 120000);
-
 			// the program will set a marker file in each of its parallel tasks once they are ready, so that
 			// this coordinating code is aware of this.
 			// the program will very slowly consume elements until the marker file (later created by the
@@ -189,7 +152,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 				@Override
 				public void run() {
 					try {
-						testTaskManagerFailure(jobManagerPort, coordinateDirClosure);
+						testTaskManagerFailure(restPort, coordinateDirClosure);
 					}
 					catch (Throwable t) {
 						t.printStackTrace();
@@ -220,10 +183,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			taskManagerProcess3 = new ProcessBuilder(command).start();
 			new CommonTestUtils.PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
 
-			// we wait for the third TaskManager to register
-			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 3, 120000);
-
 			// kill one of the previous TaskManagers, triggering a failure and recovery
 			taskManagerProcess1.destroy();
 			taskManagerProcess1 = null;
@@ -270,13 +229,8 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			if (taskManagerProcess3 != null) {
 				taskManagerProcess3.destroy();
 			}
-			if (jmActorSystem != null) {
-				jmActorSystem.shutdown();
-			}
 
-			if (highAvailabilityServices != null) {
-				highAvailabilityServices.closeAndCleanupAllData();
-			}
+			clusterEntrypoint.shutDownAsync(ApplicationStatus.SUCCEEDED, null, true).get();
 		}
 	}
 
@@ -290,44 +244,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 	 */
 	public abstract void testTaskManagerFailure(int jobManagerPort, File coordinateDir) throws Exception;
 
-	protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelayMillis)
-			throws Exception {
-		final long pollInterval = 10_000_000; // 10 ms = 10,000,000 nanos
-		final long deadline = System.nanoTime() + maxDelayMillis * 1_000_000;
-
-		long time;
-
-		while ((time = System.nanoTime()) < deadline) {
-			FiniteDuration timeout = new FiniteDuration(pollInterval, TimeUnit.NANOSECONDS);
-
-			try {
-				Future<?> result = Patterns.ask(jobManager,
-						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-						new Timeout(timeout));
-
-				int numTMs = (Integer) Await.result(result, timeout);
-
-				if (numTMs == numExpected) {
-					return;
-				}
-			}
-			catch (TimeoutException e) {
-				// ignore and retry
-			}
-			catch (ClassCastException e) {
-				fail("Wrong response: " + e.getMessage());
-			}
-
-			long timePassed = System.nanoTime() - time;
-			long remainingMillis = (pollInterval - timePassed) / 1_000_000;
-			if (remainingMillis > 0) {
-				Thread.sleep(remainingMillis);
-			}
-		}
-
-		fail("The TaskManagers did not register within the expected time (" + maxDelayMillis + "msecs)");
-	}
-
 	protected static void printProcessLog(String processName, String log) {
 		if (log == null || log.length() == 0) {
 			return;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
index 7dc6f0c..4815c49 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -68,10 +67,9 @@ public class TaskManagerProcessFailureBatchRecoveryITCase extends AbstractTaskMa
 	public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir) throws Exception {
 
 		final Configuration configuration = new Configuration();
-		configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration);
 		env.setParallelism(PARALLELISM);
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000));
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
 		env.getConfig().setExecutionMode(executionMode);
 		env.getConfig().disableSysoutLogging();
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
index 766a799..fbf6b5b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -67,7 +66,6 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
 		final File tempCheckpointDir = tempFolder.newFolder();
 
 		final Configuration configuration = new Configuration();
-		configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
 			"localhost",
 			jobManagerPort,