You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 14:35:05 UTC
[flink] 06/08: [FLINK-10403] Port
JobManagerHAProcessFailureBatchRecoveryITCase to new code base
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6ace721e5515a8fcc085e501bcfc8586551d1d36
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Sep 24 08:56:58 2018 +0200
[FLINK-10403] Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base
This closes #6751.
---
.../flink/runtime/testutils/DispatcherProcess.java | 179 +++++++++++++++++++++
...ManagerHAProcessFailureBatchRecoveryITCase.java | 144 +++++++++--------
2 files changed, 253 insertions(+), 70 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
new file mode 100644
index 0000000..79b0dc3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
+import org.apache.flink.runtime.jobmanager.JobManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link Dispatcher} instance running in a separate JVM.
+ */
+public class DispatcherProcess extends TestJvmProcess {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcess.class);
+
+ /** Pattern to parse the job manager port from the logs. */
+ private static final Pattern PORT_PATTERN = Pattern.compile(".*Actor system started at akka\\.tcp://flink@.*:(\\d+).*");
+
+ /** ID for this JobManager. */
+ private final int id;
+
+ /** The configuration for the JobManager. */
+ private final Configuration config;
+
+ /** Configuration parsed as args for {@link JobManagerProcess.JobManagerProcessEntryPoint}. */
+ private final String[] jvmArgs;
+
+ /** The port the JobManager listens on. */
+ private int jobManagerPort;
+
+ /**
+ * Creates a {@link JobManager} running in a separate JVM.
+ *
+ * @param id ID for the JobManager
+ * @param config Configuration for the job manager process
+ *
+ * @throws Exception
+ */
+ public DispatcherProcess(int id, Configuration config) throws Exception {
+ checkArgument(id >= 0, "Negative ID");
+ this.id = id;
+ this.config = checkNotNull(config, "Configuration");
+
+ ArrayList<String> args = new ArrayList<>();
+
+ for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+ args.add("--" + entry.getKey());
+ args.add(entry.getValue());
+ }
+
+ this.jvmArgs = new String[args.size()];
+ args.toArray(jvmArgs);
+ }
+
+ @Override
+ public String getName() {
+ return "JobManager " + id;
+ }
+
+ @Override
+ public String[] getJvmArgs() {
+ return jvmArgs;
+ }
+
+ @Override
+ public String getEntryPointClassName() {
+ return DispatcherProcessEntryPoint.class.getName();
+ }
+
+ public Configuration getConfig() {
+ return config;
+ }
+
+ /**
+ * Parses the port from the job manager logs and returns it.
+ *
+ * <p>If a call to this method succeeds, successive calls will directly
+ * return the port and re-parse the logs.
+ *
+ * @param timeout Timeout for log parsing.
+ * @return The port of the job manager
+ * @throws InterruptedException If interrupted while waiting before
+ * retrying to parse the logs
+ * @throws NumberFormatException If the parsed port is not a number
+ */
+ public int getJobManagerPort(FiniteDuration timeout) throws InterruptedException, NumberFormatException {
+ if (jobManagerPort > 0) {
+ return jobManagerPort;
+ } else {
+ Deadline deadline = timeout.fromNow();
+ while (deadline.hasTimeLeft()) {
+ Matcher matcher = PORT_PATTERN.matcher(getProcessOutput());
+ if (matcher.find()) {
+ String port = matcher.group(1);
+ jobManagerPort = Integer.parseInt(port);
+ return jobManagerPort;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+
+ throw new RuntimeException("Could not parse port from logs");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("JobManagerProcess(id=%d, port=%d)", id, jobManagerPort);
+ }
+
+ /**
+ * Entry point for the JobManager process.
+ */
+ public static class DispatcherProcessEntryPoint {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DispatcherProcessEntryPoint.class);
+
+ /**
+ * Entrypoint of the DispatcherProcessEntryPoint.
+ *
+ * <p>Other arguments are parsed to a {@link Configuration} and passed to the
+ * JobManager, for instance: <code>--high-availability ZOOKEEPER --high-availability.zookeeper.quorum
+ * "xyz:123:456"</code>.
+ */
+ public static void main(String[] args) {
+ try {
+ ParameterTool params = ParameterTool.fromArgs(args);
+ Configuration config = params.getConfiguration();
+ LOG.info("Configuration: {}.", config);
+
+ config.setInteger(JobManagerOptions.PORT, 0);
+ config.setInteger(RestOptions.PORT, 0);
+
+ final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(config);
+
+ ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint);
+ }
+ catch (Throwable t) {
+ LOG.error("Failed to start JobManager process", t);
+ System.exit(1);
+ }
+ }
+ }
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index d3accff..9e9ce07 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -22,36 +22,38 @@ import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.DispatcherProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -59,12 +61,15 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import scala.Option;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
@@ -90,23 +95,28 @@ import static org.junit.Assert.fail;
@RunWith(Parameterized.class)
public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
- private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+ private static ZooKeeperTestEnvironment zooKeeper;
private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
- @AfterClass
- public static void tearDown() throws Exception {
- if (ZooKeeper != null) {
- ZooKeeper.shutdown();
- }
+ @BeforeClass
+ public static void setup() {
+ zooKeeper = new ZooKeeperTestEnvironment(1);
}
@Before
public void cleanUp() throws Exception {
- ZooKeeper.deleteAll();
+ zooKeeper.deleteAll();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (zooKeeper != null) {
+ zooKeeper.shutdown();
+ }
}
protected static final String READY_MARKER_FILE_PREFIX = "ready_";
@@ -141,7 +151,6 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
*/
private void testJobManagerFailure(String zkQuorum, final File coordinateDir, final File zookeeperStoragePath) throws Exception {
Configuration config = new Configuration();
- config.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum);
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, zookeeperStoragePath.getAbsolutePath());
@@ -149,7 +158,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"leader", 1, config);
env.setParallelism(PARALLELISM);
- env.setNumberOfExecutionRetries(1);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
env.getConfig().setExecutionMode(executionMode);
env.getConfig().disableSysoutLogging();
@@ -212,7 +221,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
}
@Test
- public void testJobManagerProcessFailure() throws Exception {
+ public void testDispatcherProcessFailure() throws Exception {
+ final Time timeout = Time.seconds(30L);
final File zookeeperStoragePath = temporaryFolder.newFolder();
// Config
@@ -222,15 +232,11 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
assertEquals(PARALLELISM, numberOfTaskManagers * numberOfSlotsPerTaskManager);
- // Setup
- // Test actor system
- ActorSystem testActorSystem;
-
// Job managers
- final JobManagerProcess[] jmProcess = new JobManagerProcess[numberOfJobManagers];
+ final DispatcherProcess[] dispatcherProcesses = new DispatcherProcess[numberOfJobManagers];
// Task managers
- final ActorSystem[] tmActorSystem = new ActorSystem[numberOfTaskManagers];
+ TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numberOfTaskManagers];
HighAvailabilityServices highAvailabilityServices = null;
@@ -239,24 +245,25 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
// Coordination between the processes goes through a directory
File coordinateTempDir = null;
+ // Cluster config
+ Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
+ zooKeeper.getConnectString(), zookeeperStoragePath.getPath());
+ // Task manager configuration
+ config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+ config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+
+ final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
+
try {
final Deadline deadline = TestTimeOut.fromNow();
// Coordination directory
coordinateTempDir = temporaryFolder.newFolder();
- // Job Managers
- Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
- ZooKeeper.getConnectString(), zookeeperStoragePath.getPath());
-
// Start first process
- jmProcess[0] = new JobManagerProcess(0, config);
- jmProcess[0].startProcess();
-
- // Task manager configuration
- config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
- config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+ dispatcherProcesses[0] = new DispatcherProcess(0, config);
+ dispatcherProcesses[0].startProcess();
highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
config,
@@ -264,27 +271,13 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
// Start the task manager process
for (int i = 0; i < numberOfTaskManagers; i++) {
- tmActorSystem[i] = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
- TaskManager.startTaskManagerComponentsAndActor(
- config,
- ResourceID.generate(),
- tmActorSystem[i],
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- "localhost",
- Option.<String>empty(),
- false,
- TaskManager.class);
+ taskManagerRunners[i] = new TaskManagerRunner(config, ResourceID.generate());
+ taskManagerRunners[i].start();
}
- // Test actor system
- testActorSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-
- jmProcess[0].getActorRef(testActorSystem, deadline.timeLeft());
-
// Leader listener
TestingListener leaderListener = new TestingListener();
- leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
+ leaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
leaderRetrievalService.start(leaderListener);
// Initial submission
@@ -293,13 +286,14 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
String leaderAddress = leaderListener.getAddress();
UUID leaderId = leaderListener.getLeaderSessionID();
- // Get the leader ref
- ActorRef leaderRef = AkkaUtils.getActorRef(leaderAddress, testActorSystem, deadline.timeLeft());
- ActorGateway leaderGateway = new AkkaActorGateway(leaderRef, leaderId);
+ final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = rpcService.connect(
+ leaderAddress,
+ DispatcherId.fromUuid(leaderId),
+ DispatcherGateway.class);
+ final DispatcherGateway dispatcherGateway = dispatcherGatewayFuture.get();
// Wait for all task managers to connect to the leading job manager
- JobManagerActorTestUtils.waitForTaskManagers(numberOfTaskManagers, leaderGateway,
- deadline.timeLeft());
+ waitForTaskManagers(numberOfTaskManagers, dispatcherGateway, deadline.timeLeft());
final File coordinateDirClosure = coordinateTempDir;
final Throwable[] errorRef = new Throwable[1];
@@ -309,7 +303,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
@Override
public void run() {
try {
- testJobManagerFailure(ZooKeeper.getConnectString(), coordinateDirClosure, zookeeperStoragePath);
+ testJobManagerFailure(zooKeeper.getConnectString(), coordinateDirClosure, zookeeperStoragePath);
}
catch (Throwable t) {
t.printStackTrace();
@@ -326,12 +320,10 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
READY_MARKER_FILE_PREFIX, PARALLELISM, deadline.timeLeft().toMillis());
// Kill one of the job managers and trigger recovery
- jmProcess[0].destroy();
+ dispatcherProcesses[0].destroy();
- jmProcess[1] = new JobManagerProcess(1, config);
- jmProcess[1].startProcess();
-
- jmProcess[1].getActorRef(testActorSystem, deadline.timeLeft());
+ dispatcherProcesses[1] = new DispatcherProcess(1, config);
+ dispatcherProcesses[1].startProcess();
// we create the marker file which signals the program functions tasks that they can complete
AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
@@ -358,7 +350,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
// for Travis and the root problem is not shown)
t.printStackTrace();
- for (JobManagerProcess p : jmProcess) {
+ for (DispatcherProcess p : dispatcherProcesses) {
if (p != null) {
p.printProcessLog();
}
@@ -368,8 +360,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
}
finally {
for (int i = 0; i < numberOfTaskManagers; i++) {
- if (tmActorSystem[i] != null) {
- tmActorSystem[i].shutdown();
+ if (taskManagerRunners[i] != null) {
+ taskManagerRunners[i].close();
}
}
@@ -377,7 +369,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
leaderRetrievalService.stop();
}
- for (JobManagerProcess jmProces : jmProcess) {
+ for (DispatcherProcess jmProces : dispatcherProcesses) {
if (jmProces != null) {
jmProces.destroy();
}
@@ -387,6 +379,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
highAvailabilityServices.closeAndCleanupAllData();
}
+ RpcUtils.terminateRpcService(rpcService, timeout);
+
// Delete coordination directory
if (coordinateTempDir != null) {
try {
@@ -398,4 +392,14 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
}
}
+ private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws ExecutionException, InterruptedException {
+ FutureUtils.retrySuccesfulWithDelay(
+ () -> dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())),
+ Time.milliseconds(50L),
+ org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())),
+ clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= numberOfTaskManagers,
+ new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor()))
+ .get();
+ }
+
}