You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 14:35:00 UTC

[flink] 01/08: [FLINK-10401] Port ProcessFailureCancelingITCase to new code base

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ff97d1cb3efbac4f47abc0a38bac932a55fc1288
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sun Sep 23 00:28:30 2018 +0200

    [FLINK-10401] Port ProcessFailureCancelingITCase to new code base
    
    This closes #6749.
---
 .../DispatcherResourceManagerComponent.java        |  10 +
 .../flink/runtime/util/BlobServerResource.java     |   4 +
 ...tractTaskManagerProcessFailureRecoveryTest.java |  29 +++
 .../recovery/ProcessFailureCancelingITCase.java    | 256 ++++++++++-----------
 4 files changed, 159 insertions(+), 140 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
index 94925b2..b07095c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
@@ -116,6 +116,16 @@ public class DispatcherResourceManagerComponent<T extends Dispatcher> implements
 		return shutDownFuture;
 	}
 
+	@Nonnull
+	public T getDispatcher() {
+		return dispatcher;
+	}
+
+	@Nonnull
+	public WebMonitorEndpoint<?> getWebMonitorEndpoint() {
+		return webMonitorEndpoint;
+	}
+
 	@Override
 	public CompletableFuture<Void> closeAsync() {
 		if (isRunning.compareAndSet(true, false)) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
index 080ecf8..654b2bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
@@ -62,4 +62,8 @@ public class BlobServerResource extends ExternalResource {
 	public int getBlobServerPort() {
 		return blobServer.getPort();
 	}
+
+	public BlobServer getBlobServer() {
+		return blobServer;
+	}
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 56327ad..5cd6f30 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -421,4 +422,32 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 		}
 	}
 
+	/**
+	 * The entry point for the TaskExecutor JVM. Simply configures and runs a TaskExecutor.
+	 */
+	public static class TaskExecutorProcessEntryPoint {
+
+		private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorProcessEntryPoint.class);
+
+		public static void main(String[] args) {
+			try {
+				int jobManagerPort = Integer.parseInt(args[0]);
+
+				Configuration cfg = new Configuration();
+				cfg.setString(JobManagerOptions.ADDRESS, "localhost");
+				cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
+				cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+				cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+				cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+				cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+
+				TaskManagerRunner.runTaskManager(cfg, ResourceID.generate());
+			}
+			catch (Throwable t) {
+				LOG.error("Failed to start TaskManager process", t);
+				System.exit(1);
+			}
+		}
+	}
+
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index b85a410..afca8f1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -21,51 +21,58 @@ package org.apache.flink.test.recovery;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
+import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.NetUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.CheckedSupplier;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.StringWriter;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * This test makes sure that jobs are canceled properly in cases where
@@ -74,15 +81,36 @@ import static org.junit.Assert.fail;
 @SuppressWarnings("serial")
 public class ProcessFailureCancelingITCase extends TestLogger {
 
+	@Rule
+	public final BlobServerResource blobServerResource = new BlobServerResource();
+
 	@Test
 	public void testCancelingOnProcessFailure() throws Exception {
 		final StringWriter processOutput = new StringWriter();
+		final Time timeout = Time.minutes(2L);
 
-		ActorSystem jmActorSystem = null;
+		RestClusterClient<String> clusterClient = null;
 		Process taskManagerProcess = null;
-		HighAvailabilityServices highAvailabilityServices = null;
+		final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+
+		Configuration jmConfig = new Configuration();
+		jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+		jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+		jmConfig.setInteger(RestOptions.PORT, 0);
+
+		final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, jmConfig);
+		final int jobManagerPort = rpcService.getPort();
+		jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+
+		final SessionDispatcherResourceManagerComponentFactory resourceManagerComponentFactory = new SessionDispatcherResourceManagerComponentFactory(
+			StandaloneResourceManagerFactory.INSTANCE);
+		DispatcherResourceManagerComponent<?> dispatcherResourceManagerComponent = null;
+
+		try (final HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+				jmConfig,
+				TestingUtils.defaultExecutor(),
+				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION)) {
 
-		try {
 			// check that we run this test only if the java command
 			// is available on this machine
 			String javaCommand = getJavaCommandPath();
@@ -96,36 +124,22 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 			tempLogFile.deleteOnExit();
 			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
 
-			// find a free port to start the JobManager
-			final int jobManagerPort = NetUtils.getAvailablePort();
-
-			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
-
-			Configuration jmConfig = new Configuration();
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "5 s");
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2000 s");
-			jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 10);
-			jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
-			jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1());
-			jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
-
-			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-				jmConfig,
-				TestingUtils.defaultExecutor(),
-				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(
+			dispatcherResourceManagerComponent = resourceManagerComponentFactory.create(
 				jmConfig,
-				jmActorSystem,
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				highAvailabilityServices,
+				rpcService,
+				haServices,
+				blobServerResource.getBlobServer(),
+				new HeartbeatServices(100L, 1000L),
 				NoOpMetricRegistry.INSTANCE,
-				Option.empty(),
-				JobManager.class,
-				MemoryArchivist.class)._1();
+				new MemoryArchivedExecutionGraphStore(),
+				fatalErrorHandler);
+
+			// update the rest ports
+			final int restPort = dispatcherResourceManagerComponent
+				.getWebMonitorEndpoint()
+				.getServerAddress()
+				.getPort();
+			jmConfig.setInteger(RestOptions.PORT, restPort);
 
 			// the TaskManager java command
 			String[] command = new String[] {
@@ -134,7 +148,7 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
 					"-Xms80m", "-Xmx80m",
 					"-classpath", getCurrentClasspath(),
-					AbstractTaskManagerProcessFailureRecoveryTest.TaskManagerProcessEntryPoint.class.getName(),
+					AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName(),
 					String.valueOf(jobManagerPort)
 			};
 
@@ -142,21 +156,14 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 			taskManagerProcess = new ProcessBuilder(command).start();
 			new CommonTestUtils.PipeForwarder(taskManagerProcess.getErrorStream(), processOutput);
 
-			// we wait for the JobManager to have the two TaskManagers available
-			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 1, 120000);
-
 			final Throwable[] errorRef = new Throwable[1];
 
-			final Configuration configuration = new Configuration();
-			configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
-
 			// start the test program, which infinitely blocks
 			Runnable programRunner = new Runnable() {
 				@Override
 				public void run() {
 					try {
-						ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration);
+						ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", restPort, new Configuration());
 						env.setParallelism(2);
 						env.setRestartStrategy(RestartStrategies.noRestart());
 						env.getConfig().disableSysoutLogging();
@@ -187,15 +194,30 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 			Thread programThread = new Thread(programRunner);
 
 			// kill the TaskManager
+			programThread.start();
+
+			final LeaderConnectionInfo leaderConnectionInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(), Time.seconds(10L));
+
+			final DispatcherGateway dispatcherGateway = rpcService.connect(
+				leaderConnectionInfo.getAddress(),
+				DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()),
+				DispatcherGateway.class).get();
+
+			waitUntilAllSlotsAreUsed(dispatcherGateway, timeout);
+
+			clusterClient = new RestClusterClient<>(jmConfig, "standalone");
+
+			final Collection<JobID> jobIds = waitForRunningJobs(clusterClient, timeout);
+
+			assertThat(jobIds, hasSize(1));
+			final JobID jobId = jobIds.iterator().next();
+
+			// kill the TaskManager after the job started to run
 			taskManagerProcess.destroy();
 			taskManagerProcess = null;
 
-			// immediately submit the job. this should hit the case
-			// where the JobManager still thinks it has the TaskManager and tries to send it tasks
-			programThread.start();
-
 			// try to cancel the job
-			cancelRunningJob(jmActor);
+			clusterClient.cancel(jobId);
 
 			// we should see a failure within reasonable time (10s is the ask timeout).
 			// since the CI environment is often slow, we conservatively give it up to 2 minutes,
@@ -223,88 +245,42 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 			if (taskManagerProcess != null) {
 				taskManagerProcess.destroy();
 			}
-			if (jmActorSystem != null) {
-				jmActorSystem.shutdown();
-			}
-
-			if (highAvailabilityServices != null) {
-				highAvailabilityServices.closeAndCleanupAllData();
-			}
-		}
-	}
-
-	private void cancelRunningJob(ActorRef jobManager) throws Exception {
-		final FiniteDuration askTimeout = new FiniteDuration(10, TimeUnit.SECONDS);
-
-		// try at most for 30 seconds
-		final long deadline = System.currentTimeMillis() + 30000;
-
-		JobID jobId = null;
-
-		do {
-			Future<Object> response = Patterns.ask(jobManager,
-					JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(askTimeout));
-
-			Object result;
-			try {
-				result = Await.result(response, askTimeout);
+			if (clusterClient != null) {
+				clusterClient.shutdown();
 			}
-			catch (Exception e) {
-				throw new Exception("Could not retrieve running jobs from the JobManager.", e);
+			if (dispatcherResourceManagerComponent != null) {
+				dispatcherResourceManagerComponent.close();
 			}
 
-			if (result instanceof JobManagerMessages.RunningJobsStatus) {
-
-				List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+			fatalErrorHandler.rethrowError();
 
-				if (jobs.size() == 1) {
-					jobId = jobs.get(0).getJobId();
-					break;
-				}
-			}
-		}
-		while (System.currentTimeMillis() < deadline);
-
-		if (jobId == null) {
-			// we never found it running, must have failed already
-			return;
+			RpcUtils.terminateRpcService(rpcService, Time.seconds(10L));
 		}
-
-		// tell the JobManager to cancel the job
-		jobManager.tell(
-			new JobManagerMessages.LeaderSessionMessage(
-				HighAvailabilityServices.DEFAULT_LEADER_ID,
-				new JobManagerMessages.CancelJob(jobId)),
-			ActorRef.noSender());
 	}
 
-	private void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
-			throws Exception {
-		final long deadline = System.currentTimeMillis() + maxDelay;
-		while (true) {
-			long remaining = deadline - System.currentTimeMillis();
-			if (remaining <= 0) {
-				fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
-			}
-
-			FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
+	private void waitUntilAllSlotsAreUsed(DispatcherGateway dispatcherGateway, Time timeout) throws ExecutionException, InterruptedException {
+		FutureUtils.retrySuccesfulWithDelay(
+			() -> dispatcherGateway.requestClusterOverview(timeout),
+			Time.milliseconds(50L),
+			Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
+			clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= 1 &&
+				clusterOverview.getNumSlotsAvailable() == 0 &&
+				clusterOverview.getNumSlotsTotal() == 2,
+			TestingUtils.defaultScheduledExecutor())
+			.get();
+	}
 
-			try {
-				Future<?> result = Patterns.ask(jobManager,
-						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-						new Timeout(timeout));
-				Integer numTMs = (Integer) Await.result(result, timeout);
-				if (numTMs == numExpected) {
-					break;
-				}
-			}
-			catch (TimeoutException e) {
-				// ignore and retry
-			}
-			catch (ClassCastException e) {
-				fail("Wrong response: " + e.getMessage());
-			}
-		}
+	private Collection<JobID> waitForRunningJobs(ClusterClient<?> clusterClient, Time timeout) throws ExecutionException, InterruptedException {
+		return FutureUtils.retrySuccesfulWithDelay(
+				CheckedSupplier.unchecked(clusterClient::listJobs),
+				Time.milliseconds(50L),
+				Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
+				jobs -> !jobs.isEmpty(),
+				TestingUtils.defaultScheduledExecutor())
+			.get()
+			.stream()
+			.map(JobStatusMessage::getJobId)
+			.collect(Collectors.toList());
 	}
 
 	private void printProcessLog(String processName, String log) {