You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 14:35:05 UTC

[flink] 06/08: [FLINK-10403] Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6ace721e5515a8fcc085e501bcfc8586551d1d36
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Sep 24 08:56:58 2018 +0200

    [FLINK-10403] Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base
    
    This closes #6751.
---
 .../flink/runtime/testutils/DispatcherProcess.java | 179 +++++++++++++++++++++
 ...ManagerHAProcessFailureBatchRecoveryITCase.java | 144 +++++++++--------
 2 files changed, 253 insertions(+), 70 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
new file mode 100644
index 0000000..79b0dc3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
+import org.apache.flink.runtime.jobmanager.JobManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link Dispatcher} instance running in a separate JVM.
+ */
+public class DispatcherProcess extends TestJvmProcess {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcess.class);
+
+	/** Pattern to parse the job manager port from the logs. */
+	private static final Pattern PORT_PATTERN = Pattern.compile(".*Actor system started at akka\\.tcp://flink@.*:(\\d+).*");
+
+	/** ID for this JobManager. */
+	private final int id;
+
+	/** The configuration for the JobManager. */
+	private final Configuration config;
+
+	/** Configuration parsed as args for {@link JobManagerProcess.JobManagerProcessEntryPoint}. */
+	private final String[] jvmArgs;
+
+	/** The port the JobManager listens on. */
+	private int jobManagerPort;
+
+	/**
+	 * Creates a {@link JobManager} running in a separate JVM.
+	 *
+	 * @param id     ID for the JobManager
+	 * @param config Configuration for the job manager process
+	 *
+	 * @throws Exception
+	 */
+	public DispatcherProcess(int id, Configuration config) throws Exception {
+		checkArgument(id >= 0, "Negative ID");
+		this.id = id;
+		this.config = checkNotNull(config, "Configuration");
+
+		ArrayList<String> args = new ArrayList<>();
+
+		for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+			args.add("--" + entry.getKey());
+			args.add(entry.getValue());
+		}
+
+		this.jvmArgs = new String[args.size()];
+		args.toArray(jvmArgs);
+	}
+
+	@Override
+	public String getName() {
+		return "JobManager " + id;
+	}
+
+	@Override
+	public String[] getJvmArgs() {
+		return jvmArgs;
+	}
+
+	@Override
+	public String getEntryPointClassName() {
+		return DispatcherProcessEntryPoint.class.getName();
+	}
+
+	public Configuration getConfig() {
+		return config;
+	}
+
+	/**
+	 * Parses the port from the job manager logs and returns it.
+	 *
+	 * <p>If a call to this method succeeds, successive calls will directly
+	 * return the port and re-parse the logs.
+	 *
+	 * @param timeout Timeout for log parsing.
+	 * @return The port of the job manager
+	 * @throws InterruptedException  If interrupted while waiting before
+	 *                               retrying to parse the logs
+	 * @throws NumberFormatException If the parsed port is not a number
+	 */
+	public int getJobManagerPort(FiniteDuration timeout) throws InterruptedException, NumberFormatException {
+		if (jobManagerPort > 0) {
+			return jobManagerPort;
+		} else {
+			Deadline deadline = timeout.fromNow();
+			while (deadline.hasTimeLeft()) {
+				Matcher matcher = PORT_PATTERN.matcher(getProcessOutput());
+				if (matcher.find()) {
+					String port = matcher.group(1);
+					jobManagerPort = Integer.parseInt(port);
+					return jobManagerPort;
+				} else {
+					Thread.sleep(100);
+				}
+			}
+
+			throw new RuntimeException("Could not parse port from logs");
+		}
+	}
+
+	@Override
+	public String toString() {
+		return String.format("JobManagerProcess(id=%d, port=%d)", id, jobManagerPort);
+	}
+
+	/**
+	 * Entry point for the JobManager process.
+	 */
+	public static class DispatcherProcessEntryPoint {
+
+		private static final Logger LOG = LoggerFactory.getLogger(DispatcherProcessEntryPoint.class);
+
+		/**
+		 * Entrypoint of the DispatcherProcessEntryPoint.
+		 *
+		 * <p>Other arguments are parsed to a {@link Configuration} and passed to the
+		 * JobManager, for instance: <code>--high-availability ZOOKEEPER --high-availability.zookeeper.quorum
+		 * "xyz:123:456"</code>.
+		 */
+		public static void main(String[] args) {
+			try {
+				ParameterTool params = ParameterTool.fromArgs(args);
+				Configuration config = params.getConfiguration();
+				LOG.info("Configuration: {}.", config);
+
+				config.setInteger(JobManagerOptions.PORT, 0);
+				config.setInteger(RestOptions.PORT, 0);
+
+				final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(config);
+
+				ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint);
+			}
+			catch (Throwable t) {
+				LOG.error("Failed to start JobManager process", t);
+				System.exit(1);
+			}
+		}
+	}
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index d3accff..9e9ce07 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -22,36 +22,38 @@ import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.DispatcherProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import org.apache.commons.io.FileUtils;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -59,12 +61,15 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -90,23 +95,28 @@ import static org.junit.Assert.fail;
 @RunWith(Parameterized.class)
 public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 
-	private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+	private static ZooKeeperTestEnvironment zooKeeper;
 
 	private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
 
 	@Rule
 	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (ZooKeeper != null) {
-			ZooKeeper.shutdown();
-		}
+	@BeforeClass
+	public static void setup() {
+		zooKeeper = new ZooKeeperTestEnvironment(1);
 	}
 
 	@Before
 	public void cleanUp() throws Exception {
-		ZooKeeper.deleteAll();
+		zooKeeper.deleteAll();
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (zooKeeper != null) {
+			zooKeeper.shutdown();
+		}
 	}
 
 	protected static final String READY_MARKER_FILE_PREFIX = "ready_";
@@ -141,7 +151,6 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 	 */
 	private void testJobManagerFailure(String zkQuorum, final File coordinateDir, final File zookeeperStoragePath) throws Exception {
 		Configuration config = new Configuration();
-		config.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum);
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, zookeeperStoragePath.getAbsolutePath());
@@ -149,7 +158,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"leader", 1, config);
 		env.setParallelism(PARALLELISM);
-		env.setNumberOfExecutionRetries(1);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
 		env.getConfig().setExecutionMode(executionMode);
 		env.getConfig().disableSysoutLogging();
 
@@ -212,7 +221,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 	}
 
 	@Test
-	public void testJobManagerProcessFailure() throws Exception {
+	public void testDispatcherProcessFailure() throws Exception {
+		final Time timeout = Time.seconds(30L);
 		final File zookeeperStoragePath = temporaryFolder.newFolder();
 
 		// Config
@@ -222,15 +232,11 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 
 		assertEquals(PARALLELISM, numberOfTaskManagers * numberOfSlotsPerTaskManager);
 
-		// Setup
-		// Test actor system
-		ActorSystem testActorSystem;
-
 		// Job managers
-		final JobManagerProcess[] jmProcess = new JobManagerProcess[numberOfJobManagers];
+		final DispatcherProcess[] dispatcherProcesses = new DispatcherProcess[numberOfJobManagers];
 
 		// Task managers
-		final ActorSystem[] tmActorSystem = new ActorSystem[numberOfTaskManagers];
+		TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numberOfTaskManagers];
 
 		HighAvailabilityServices highAvailabilityServices = null;
 
@@ -239,24 +245,25 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 		// Coordination between the processes goes through a directory
 		File coordinateTempDir = null;
 
+		// Cluster config
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
+			zooKeeper.getConnectString(), zookeeperStoragePath.getPath());
+		// Task manager configuration
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+
+		final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
+
 		try {
 			final Deadline deadline = TestTimeOut.fromNow();
 
 			// Coordination directory
 			coordinateTempDir = temporaryFolder.newFolder();
 
-			// Job Managers
-			Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
-					ZooKeeper.getConnectString(), zookeeperStoragePath.getPath());
-
 			// Start first process
-			jmProcess[0] = new JobManagerProcess(0, config);
-			jmProcess[0].startProcess();
-
-			// Task manager configuration
-			config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
-			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+			dispatcherProcesses[0] = new DispatcherProcess(0, config);
+			dispatcherProcesses[0].startProcess();
 
 			highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
 				config,
@@ -264,27 +271,13 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 
 			// Start the task manager process
 			for (int i = 0; i < numberOfTaskManagers; i++) {
-				tmActorSystem[i] = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-				TaskManager.startTaskManagerComponentsAndActor(
-					config,
-					ResourceID.generate(),
-					tmActorSystem[i],
-					highAvailabilityServices,
-					NoOpMetricRegistry.INSTANCE,
-					"localhost",
-					Option.<String>empty(),
-					false,
-					TaskManager.class);
+				taskManagerRunners[i] = new TaskManagerRunner(config, ResourceID.generate());
+				taskManagerRunners[i].start();
 			}
 
-			// Test actor system
-			testActorSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-
-			jmProcess[0].getActorRef(testActorSystem, deadline.timeLeft());
-
 			// Leader listener
 			TestingListener leaderListener = new TestingListener();
-			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
+			leaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
 			leaderRetrievalService.start(leaderListener);
 
 			// Initial submission
@@ -293,13 +286,14 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 			String leaderAddress = leaderListener.getAddress();
 			UUID leaderId = leaderListener.getLeaderSessionID();
 
-			// Get the leader ref
-			ActorRef leaderRef = AkkaUtils.getActorRef(leaderAddress, testActorSystem, deadline.timeLeft());
-			ActorGateway leaderGateway = new AkkaActorGateway(leaderRef, leaderId);
+			final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = rpcService.connect(
+				leaderAddress,
+				DispatcherId.fromUuid(leaderId),
+				DispatcherGateway.class);
+			final DispatcherGateway dispatcherGateway = dispatcherGatewayFuture.get();
 
 			// Wait for all task managers to connect to the leading job manager
-			JobManagerActorTestUtils.waitForTaskManagers(numberOfTaskManagers, leaderGateway,
-					deadline.timeLeft());
+			waitForTaskManagers(numberOfTaskManagers, dispatcherGateway, deadline.timeLeft());
 
 			final File coordinateDirClosure = coordinateTempDir;
 			final Throwable[] errorRef = new Throwable[1];
@@ -309,7 +303,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 				@Override
 				public void run() {
 					try {
-						testJobManagerFailure(ZooKeeper.getConnectString(), coordinateDirClosure, zookeeperStoragePath);
+						testJobManagerFailure(zooKeeper.getConnectString(), coordinateDirClosure, zookeeperStoragePath);
 					}
 					catch (Throwable t) {
 						t.printStackTrace();
@@ -326,12 +320,10 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 					READY_MARKER_FILE_PREFIX, PARALLELISM, deadline.timeLeft().toMillis());
 
 			// Kill one of the job managers and trigger recovery
-			jmProcess[0].destroy();
+			dispatcherProcesses[0].destroy();
 
-			jmProcess[1] = new JobManagerProcess(1, config);
-			jmProcess[1].startProcess();
-
-			jmProcess[1].getActorRef(testActorSystem, deadline.timeLeft());
+			dispatcherProcesses[1] = new DispatcherProcess(1, config);
+			dispatcherProcesses[1].startProcess();
 
 			// we create the marker file which signals the program functions tasks that they can complete
 			AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
@@ -358,7 +350,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 			// for Travis and the root problem is not shown)
 			t.printStackTrace();
 
-			for (JobManagerProcess p : jmProcess) {
+			for (DispatcherProcess p : dispatcherProcesses) {
 				if (p != null) {
 					p.printProcessLog();
 				}
@@ -368,8 +360,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 		}
 		finally {
 			for (int i = 0; i < numberOfTaskManagers; i++) {
-				if (tmActorSystem[i] != null) {
-					tmActorSystem[i].shutdown();
+				if (taskManagerRunners[i] != null) {
+					taskManagerRunners[i].close();
 				}
 			}
 
@@ -377,7 +369,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 				leaderRetrievalService.stop();
 			}
 
-			for (JobManagerProcess jmProces : jmProcess) {
+			for (DispatcherProcess jmProces : dispatcherProcesses) {
 				if (jmProces != null) {
 					jmProces.destroy();
 				}
@@ -387,6 +379,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 				highAvailabilityServices.closeAndCleanupAllData();
 			}
 
+			RpcUtils.terminateRpcService(rpcService, timeout);
+
 			// Delete coordination directory
 			if (coordinateTempDir != null) {
 				try {
@@ -398,4 +392,14 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 		}
 	}
 
+	private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws ExecutionException, InterruptedException {
+		FutureUtils.retrySuccesfulWithDelay(
+			() -> dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())),
+			Time.milliseconds(50L),
+			org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())),
+			clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= numberOfTaskManagers,
+			new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor()))
+			.get();
+	}
+
 }