You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 14:34:59 UTC

[flink] branch master updated (cc334db -> a8434d6)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from cc334db  [hotfix] Remove StandaloneMiniCluster from ScalaShellITCase
     new ff97d1c  [FLINK-10401] Port ProcessFailureCancelingITCase to new code base
     new 93b042d  [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
     new 2a798e4  [hotfix] Remove TaskManagerProcess
     new 87c27e4  [hotfix] Remove TaskManagerProcessEntryPoint
     new 20447b2  [hotfix] Let ClusterEntrypoint implement AutoCloseableAsync
     new 6ace721  [FLINK-10403] Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base
     new b0d5e99  [hotfix] Remove DispatcherProcess#getJobManagerPort function
     new a8434d6  [FLINK-10397] Remove CoreOptions#MODE

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/_includes/generated/core_configuration.html   |   5 -
 .../org/apache/flink/client/LocalExecutor.java     |  50 ++--
 .../org/apache/flink/client/RemoteExecutor.java    |   8 +-
 .../org/apache/flink/client/cli/CliFrontend.java   |  12 +-
 .../flink/client/cli/CliFrontendTestBase.java      |  26 +--
 .../apache/flink/configuration/CoreOptions.java    |  22 --
 .../runtime/entrypoint/ClusterEntrypoint.java      |  13 +-
 .../DispatcherResourceManagerComponent.java        |  10 +
 .../flink/runtime/testutils/DispatcherProcess.java | 136 +++++++++++
 .../runtime/testutils/TaskManagerProcess.java      | 132 -----------
 .../flink/runtime/util/BlobServerResource.java     |   4 +
 .../org/apache/flink/api/scala/FlinkShell.scala    |  34 +--
 .../apache/flink/api/scala/ScalaShellITCase.scala  |   1 -
 .../api/environment/RemoteStreamEnvironment.java   |   8 +-
 .../environment/StreamExecutionEnvironment.java    |   9 +-
 .../test/operators/RemoteEnvironmentITCase.java    |  69 +-----
 ...tractTaskManagerProcessFailureRecoveryTest.java | 150 +++---------
 ...ManagerHAProcessFailureBatchRecoveryITCase.java | 144 ++++++------
 .../recovery/ProcessFailureCancelingITCase.java    | 256 ++++++++++-----------
 ...skManagerProcessFailureBatchRecoveryITCase.java |   4 +-
 ...nagerProcessFailureStreamingRecoveryITCase.java |   2 -
 .../flink/yarn/CliFrontendRunWithYarnTest.java     |   5 -
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |  32 +--
 23 files changed, 436 insertions(+), 696 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java


[flink] 04/08: [hotfix] Remove TaskManagerProcessEntryPoint

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 87c27e4ca82ad8ce48cef743b8da1f7845199b0e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Sep 24 09:58:23 2018 +0200

    [hotfix] Remove TaskManagerProcessEntryPoint
---
 ...tractTaskManagerProcessFailureRecoveryTest.java | 36 ----------------------
 1 file changed, 36 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 0962ddf..1374b70 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
-import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.util.BlobServerResource;
 import org.apache.flink.util.NetUtils;
@@ -304,41 +303,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager.
-	 */
-	public static class TaskManagerProcessEntryPoint {
-
-		private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
-
-		public static void main(String[] args) {
-			try {
-				int jobManagerPort = Integer.parseInt(args[0]);
-
-				Configuration cfg = new Configuration();
-				cfg.setString(JobManagerOptions.ADDRESS, "localhost");
-				cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
-				cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
-				cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-				cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
-				cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
-
-				TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg,
-					ResourceID.generate(), TaskManager.class);
-
-				// wait forever
-				Object lock = new Object();
-				synchronized (lock) {
-					lock.wait();
-				}
-			}
-			catch (Throwable t) {
-				LOG.error("Failed to start TaskManager process", t);
-				System.exit(1);
-			}
-		}
-	}
-
-	/**
 	 * The entry point for the TaskExecutor JVM. Simply configures and runs a TaskExecutor.
 	 */
 	public static class TaskExecutorProcessEntryPoint {


[flink] 06/08: [FLINK-10403] Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6ace721e5515a8fcc085e501bcfc8586551d1d36
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Sep 24 08:56:58 2018 +0200

    [FLINK-10403] Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base
    
    This closes #6751.
---
 .../flink/runtime/testutils/DispatcherProcess.java | 179 +++++++++++++++++++++
 ...ManagerHAProcessFailureBatchRecoveryITCase.java | 144 +++++++++--------
 2 files changed, 253 insertions(+), 70 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
new file mode 100644
index 0000000..79b0dc3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
+import org.apache.flink.runtime.jobmanager.JobManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link Dispatcher} instance running in a separate JVM.
+ */
+public class DispatcherProcess extends TestJvmProcess {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcess.class);
+
+	/** Pattern to parse the job manager port from the logs. */
+	private static final Pattern PORT_PATTERN = Pattern.compile(".*Actor system started at akka\\.tcp://flink@.*:(\\d+).*");
+
+	/** ID for this JobManager. */
+	private final int id;
+
+	/** The configuration for the JobManager. */
+	private final Configuration config;
+
+	/** Configuration parsed as args for {@link JobManagerProcess.JobManagerProcessEntryPoint}. */
+	private final String[] jvmArgs;
+
+	/** The port the JobManager listens on. */
+	private int jobManagerPort;
+
+	/**
+	 * Creates a {@link JobManager} running in a separate JVM.
+	 *
+	 * @param id     ID for the JobManager
+	 * @param config Configuration for the job manager process
+	 *
+	 * @throws Exception
+	 */
+	public DispatcherProcess(int id, Configuration config) throws Exception {
+		checkArgument(id >= 0, "Negative ID");
+		this.id = id;
+		this.config = checkNotNull(config, "Configuration");
+
+		ArrayList<String> args = new ArrayList<>();
+
+		for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+			args.add("--" + entry.getKey());
+			args.add(entry.getValue());
+		}
+
+		this.jvmArgs = new String[args.size()];
+		args.toArray(jvmArgs);
+	}
+
+	@Override
+	public String getName() {
+		return "JobManager " + id;
+	}
+
+	@Override
+	public String[] getJvmArgs() {
+		return jvmArgs;
+	}
+
+	@Override
+	public String getEntryPointClassName() {
+		return DispatcherProcessEntryPoint.class.getName();
+	}
+
+	public Configuration getConfig() {
+		return config;
+	}
+
+	/**
+	 * Parses the port from the job manager logs and returns it.
+	 *
+	 * <p>If a call to this method succeeds, successive calls will directly
+	 * return the port and re-parse the logs.
+	 *
+	 * @param timeout Timeout for log parsing.
+	 * @return The port of the job manager
+	 * @throws InterruptedException  If interrupted while waiting before
+	 *                               retrying to parse the logs
+	 * @throws NumberFormatException If the parsed port is not a number
+	 */
+	public int getJobManagerPort(FiniteDuration timeout) throws InterruptedException, NumberFormatException {
+		if (jobManagerPort > 0) {
+			return jobManagerPort;
+		} else {
+			Deadline deadline = timeout.fromNow();
+			while (deadline.hasTimeLeft()) {
+				Matcher matcher = PORT_PATTERN.matcher(getProcessOutput());
+				if (matcher.find()) {
+					String port = matcher.group(1);
+					jobManagerPort = Integer.parseInt(port);
+					return jobManagerPort;
+				} else {
+					Thread.sleep(100);
+				}
+			}
+
+			throw new RuntimeException("Could not parse port from logs");
+		}
+	}
+
+	@Override
+	public String toString() {
+		return String.format("JobManagerProcess(id=%d, port=%d)", id, jobManagerPort);
+	}
+
+	/**
+	 * Entry point for the JobManager process.
+	 */
+	public static class DispatcherProcessEntryPoint {
+
+		private static final Logger LOG = LoggerFactory.getLogger(DispatcherProcessEntryPoint.class);
+
+		/**
+		 * Entrypoint of the DispatcherProcessEntryPoint.
+		 *
+		 * <p>Other arguments are parsed to a {@link Configuration} and passed to the
+		 * JobManager, for instance: <code>--high-availability ZOOKEEPER --high-availability.zookeeper.quorum
+		 * "xyz:123:456"</code>.
+		 */
+		public static void main(String[] args) {
+			try {
+				ParameterTool params = ParameterTool.fromArgs(args);
+				Configuration config = params.getConfiguration();
+				LOG.info("Configuration: {}.", config);
+
+				config.setInteger(JobManagerOptions.PORT, 0);
+				config.setInteger(RestOptions.PORT, 0);
+
+				final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(config);
+
+				ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint);
+			}
+			catch (Throwable t) {
+				LOG.error("Failed to start JobManager process", t);
+				System.exit(1);
+			}
+		}
+	}
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index d3accff..9e9ce07 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -22,36 +22,38 @@ import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.DispatcherProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import org.apache.commons.io.FileUtils;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -59,12 +61,15 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -90,23 +95,28 @@ import static org.junit.Assert.fail;
 @RunWith(Parameterized.class)
 public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 
-	private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+	private static ZooKeeperTestEnvironment zooKeeper;
 
 	private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
 
 	@Rule
 	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (ZooKeeper != null) {
-			ZooKeeper.shutdown();
-		}
+	@BeforeClass
+	public static void setup() {
+		zooKeeper = new ZooKeeperTestEnvironment(1);
 	}
 
 	@Before
 	public void cleanUp() throws Exception {
-		ZooKeeper.deleteAll();
+		zooKeeper.deleteAll();
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (zooKeeper != null) {
+			zooKeeper.shutdown();
+		}
 	}
 
 	protected static final String READY_MARKER_FILE_PREFIX = "ready_";
@@ -141,7 +151,6 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 	 */
 	private void testJobManagerFailure(String zkQuorum, final File coordinateDir, final File zookeeperStoragePath) throws Exception {
 		Configuration config = new Configuration();
-		config.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum);
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, zookeeperStoragePath.getAbsolutePath());
@@ -149,7 +158,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"leader", 1, config);
 		env.setParallelism(PARALLELISM);
-		env.setNumberOfExecutionRetries(1);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
 		env.getConfig().setExecutionMode(executionMode);
 		env.getConfig().disableSysoutLogging();
 
@@ -212,7 +221,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 	}
 
 	@Test
-	public void testJobManagerProcessFailure() throws Exception {
+	public void testDispatcherProcessFailure() throws Exception {
+		final Time timeout = Time.seconds(30L);
 		final File zookeeperStoragePath = temporaryFolder.newFolder();
 
 		// Config
@@ -222,15 +232,11 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 
 		assertEquals(PARALLELISM, numberOfTaskManagers * numberOfSlotsPerTaskManager);
 
-		// Setup
-		// Test actor system
-		ActorSystem testActorSystem;
-
 		// Job managers
-		final JobManagerProcess[] jmProcess = new JobManagerProcess[numberOfJobManagers];
+		final DispatcherProcess[] dispatcherProcesses = new DispatcherProcess[numberOfJobManagers];
 
 		// Task managers
-		final ActorSystem[] tmActorSystem = new ActorSystem[numberOfTaskManagers];
+		TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numberOfTaskManagers];
 
 		HighAvailabilityServices highAvailabilityServices = null;
 
@@ -239,24 +245,25 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 		// Coordination between the processes goes through a directory
 		File coordinateTempDir = null;
 
+		// Cluster config
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
+			zooKeeper.getConnectString(), zookeeperStoragePath.getPath());
+		// Task manager configuration
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+
+		final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
+
 		try {
 			final Deadline deadline = TestTimeOut.fromNow();
 
 			// Coordination directory
 			coordinateTempDir = temporaryFolder.newFolder();
 
-			// Job Managers
-			Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
-					ZooKeeper.getConnectString(), zookeeperStoragePath.getPath());
-
 			// Start first process
-			jmProcess[0] = new JobManagerProcess(0, config);
-			jmProcess[0].startProcess();
-
-			// Task manager configuration
-			config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
-			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+			dispatcherProcesses[0] = new DispatcherProcess(0, config);
+			dispatcherProcesses[0].startProcess();
 
 			highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
 				config,
@@ -264,27 +271,13 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 
 			// Start the task manager process
 			for (int i = 0; i < numberOfTaskManagers; i++) {
-				tmActorSystem[i] = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-				TaskManager.startTaskManagerComponentsAndActor(
-					config,
-					ResourceID.generate(),
-					tmActorSystem[i],
-					highAvailabilityServices,
-					NoOpMetricRegistry.INSTANCE,
-					"localhost",
-					Option.<String>empty(),
-					false,
-					TaskManager.class);
+				taskManagerRunners[i] = new TaskManagerRunner(config, ResourceID.generate());
+				taskManagerRunners[i].start();
 			}
 
-			// Test actor system
-			testActorSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-
-			jmProcess[0].getActorRef(testActorSystem, deadline.timeLeft());
-
 			// Leader listener
 			TestingListener leaderListener = new TestingListener();
-			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
+			leaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
 			leaderRetrievalService.start(leaderListener);
 
 			// Initial submission
@@ -293,13 +286,14 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 			String leaderAddress = leaderListener.getAddress();
 			UUID leaderId = leaderListener.getLeaderSessionID();
 
-			// Get the leader ref
-			ActorRef leaderRef = AkkaUtils.getActorRef(leaderAddress, testActorSystem, deadline.timeLeft());
-			ActorGateway leaderGateway = new AkkaActorGateway(leaderRef, leaderId);
+			final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = rpcService.connect(
+				leaderAddress,
+				DispatcherId.fromUuid(leaderId),
+				DispatcherGateway.class);
+			final DispatcherGateway dispatcherGateway = dispatcherGatewayFuture.get();
 
 			// Wait for all task managers to connect to the leading job manager
-			JobManagerActorTestUtils.waitForTaskManagers(numberOfTaskManagers, leaderGateway,
-					deadline.timeLeft());
+			waitForTaskManagers(numberOfTaskManagers, dispatcherGateway, deadline.timeLeft());
 
 			final File coordinateDirClosure = coordinateTempDir;
 			final Throwable[] errorRef = new Throwable[1];
@@ -309,7 +303,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 				@Override
 				public void run() {
 					try {
-						testJobManagerFailure(ZooKeeper.getConnectString(), coordinateDirClosure, zookeeperStoragePath);
+						testJobManagerFailure(zooKeeper.getConnectString(), coordinateDirClosure, zookeeperStoragePath);
 					}
 					catch (Throwable t) {
 						t.printStackTrace();
@@ -326,12 +320,10 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 					READY_MARKER_FILE_PREFIX, PARALLELISM, deadline.timeLeft().toMillis());
 
 			// Kill one of the job managers and trigger recovery
-			jmProcess[0].destroy();
+			dispatcherProcesses[0].destroy();
 
-			jmProcess[1] = new JobManagerProcess(1, config);
-			jmProcess[1].startProcess();
-
-			jmProcess[1].getActorRef(testActorSystem, deadline.timeLeft());
+			dispatcherProcesses[1] = new DispatcherProcess(1, config);
+			dispatcherProcesses[1].startProcess();
 
 			// we create the marker file which signals the program functions tasks that they can complete
 			AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
@@ -358,7 +350,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 			// for Travis and the root problem is not shown)
 			t.printStackTrace();
 
-			for (JobManagerProcess p : jmProcess) {
+			for (DispatcherProcess p : dispatcherProcesses) {
 				if (p != null) {
 					p.printProcessLog();
 				}
@@ -368,8 +360,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 		}
 		finally {
 			for (int i = 0; i < numberOfTaskManagers; i++) {
-				if (tmActorSystem[i] != null) {
-					tmActorSystem[i].shutdown();
+				if (taskManagerRunners[i] != null) {
+					taskManagerRunners[i].close();
 				}
 			}
 
@@ -377,7 +369,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 				leaderRetrievalService.stop();
 			}
 
-			for (JobManagerProcess jmProces : jmProcess) {
+			for (DispatcherProcess jmProces : dispatcherProcesses) {
 				if (jmProces != null) {
 					jmProces.destroy();
 				}
@@ -387,6 +379,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 				highAvailabilityServices.closeAndCleanupAllData();
 			}
 
+			RpcUtils.terminateRpcService(rpcService, timeout);
+
 			// Delete coordination directory
 			if (coordinateTempDir != null) {
 				try {
@@ -398,4 +392,14 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 		}
 	}
 
+	private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws ExecutionException, InterruptedException {
+		FutureUtils.retrySuccesfulWithDelay(
+			() -> dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())),
+			Time.milliseconds(50L),
+			org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())),
+			clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= numberOfTaskManagers,
+			new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor()))
+			.get();
+	}
+
 }


[flink] 03/08: [hotfix] Remove TaskManagerProcess

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2a798e471db68c360d23d7d9284cee652997e5d9
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Sep 24 09:57:38 2018 +0200

    [hotfix] Remove TaskManagerProcess
---
 .../runtime/testutils/TaskManagerProcess.java      | 132 ---------------------
 1 file changed, 132 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
deleted file mode 100644
index b381f62..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testutils;
-
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link TaskManager} instance running in a separate JVM.
- */
-public class TaskManagerProcess extends TestJvmProcess {
-
-	/** ID for this TaskManager */
-	private final int id;
-
-	/** The configuration for the TaskManager */
-	private final Configuration config;
-
-	/** Configuration parsed as args for {@link TaskManagerProcess.TaskManagerProcessEntryPoint} */
-	private final String[] jvmArgs;
-
-	public TaskManagerProcess(int id, Configuration config) throws Exception {
-		checkArgument(id >= 0, "Negative ID");
-		this.id = id;
-		this.config = checkNotNull(config, "Configuration");
-
-		ArrayList<String> args = new ArrayList<>();
-
-		for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
-			args.add("--" + entry.getKey());
-			args.add(entry.getValue());
-		}
-
-		this.jvmArgs = new String[args.size()];
-		args.toArray(jvmArgs);
-	}
-
-	@Override
-	public String getName() {
-		return "TaskManager " + id;
-	}
-
-	@Override
-	public String[] getJvmArgs() {
-		return jvmArgs;
-	}
-
-	@Override
-	public String getEntryPointClassName() {
-		return TaskManagerProcessEntryPoint.class.getName();
-	}
-
-	public int getId() {
-		return id;
-	}
-
-	@Override
-	public String toString() {
-		return String.format("TaskManagerProcess(id=%d)", id);
-	}
-
-	/**
-	 * Entry point for the TaskManager process.
-	 */
-	public static class TaskManagerProcessEntryPoint {
-
-		private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
-
-		/**
-		 * All arguments are parsed to a {@link Configuration} and passed to the Taskmanager,
-		 * for instance: <code>--high-availability ZOOKEEPER --high-availability.zookeeper.quorum "xyz:123:456"</code>.
-		 */
-		public static void main(String[] args) throws Exception {
-			try {
-				Configuration config = ParameterTool.fromArgs(args).getConfiguration();
-
-				if (!config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
-					config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
-				}
-
-				if (!config.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
-					config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-				}
-
-
-				LOG.info("Configuration: {}.", config);
-
-				// Run the TaskManager
-				TaskManager.selectNetworkInterfaceAndRunTaskManager(
-					config,
-					ResourceID.generate(),
-					TaskManager.class);
-
-				// Run forever
-				new CountDownLatch(1).await();
-			}
-			catch (Throwable t) {
-				LOG.error("Failed to start TaskManager process", t);
-				System.exit(1);
-			}
-		}
-	}
-
-}


[flink] 08/08: [FLINK-10397] Remove CoreOptions#MODE

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8434d686473a088a876967dc2fe9b5dee0e3169
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sat Sep 22 23:16:50 2018 +0200

    [FLINK-10397] Remove CoreOptions#MODE
    
    Removes the MODE option used to switch between the new and legacy mode.
    
    This closes #6752.
---
 docs/_includes/generated/core_configuration.html   |  5 --
 .../org/apache/flink/client/LocalExecutor.java     | 50 ++++++----------
 .../org/apache/flink/client/RemoteExecutor.java    |  8 +--
 .../org/apache/flink/client/cli/CliFrontend.java   | 12 +---
 .../flink/client/cli/CliFrontendTestBase.java      | 26 +-------
 .../apache/flink/configuration/CoreOptions.java    | 22 -------
 .../org/apache/flink/api/scala/FlinkShell.scala    | 34 ++++-------
 .../apache/flink/api/scala/ScalaShellITCase.scala  |  1 -
 .../api/environment/RemoteStreamEnvironment.java   |  8 +--
 .../environment/StreamExecutionEnvironment.java    |  9 +--
 .../test/operators/RemoteEnvironmentITCase.java    | 69 ++++------------------
 .../flink/yarn/CliFrontendRunWithYarnTest.java     |  5 --
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 32 +++-------
 13 files changed, 57 insertions(+), 224 deletions(-)

diff --git a/docs/_includes/generated/core_configuration.html b/docs/_includes/generated/core_configuration.html
index 98cca91..4366e8b 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -28,11 +28,6 @@
             <td></td>
         </tr>
         <tr>
-            <td><h5>mode</h5></td>
-            <td style="word-wrap: break-word;">"new"</td>
-            <td>Switch to select the execution mode. Possible values are 'new' and 'legacy'.</td>
-        </tr>
-        <tr>
             <td><h5>parallelism.default</h5></td>
             <td style="word-wrap: break-word;">1</td>
             <td></td>
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 4e4993a..14d3ee5 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -37,7 +37,6 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
@@ -125,39 +124,28 @@ public class LocalExecutor extends PlanExecutor {
 	}
 
 	private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
-		final JobExecutorService newJobExecutorService;
-		if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
+		if (!configuration.contains(RestOptions.PORT)) {
+			configuration.setInteger(RestOptions.PORT, 0);
+		}
 
-			if (!configuration.contains(RestOptions.PORT)) {
-				configuration.setInteger(RestOptions.PORT, 0);
-			}
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(configuration)
+			.setNumTaskManagers(
+				configuration.getInteger(
+					ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
+					ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
+			.setRpcServiceSharing(RpcServiceSharing.SHARED)
+			.setNumSlotsPerTaskManager(
+				configuration.getInteger(
+					TaskManagerOptions.NUM_TASK_SLOTS, 1))
+			.build();
 
-			final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
-				.setConfiguration(configuration)
-				.setNumTaskManagers(
-					configuration.getInteger(
-						ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
-						ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
-				.setRpcServiceSharing(RpcServiceSharing.SHARED)
-				.setNumSlotsPerTaskManager(
-					configuration.getInteger(
-						TaskManagerOptions.NUM_TASK_SLOTS, 1))
-				.build();
-
-			final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
-			miniCluster.start();
-
-			configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
-
-			newJobExecutorService = miniCluster;
-		} else {
-			final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true);
-			localFlinkMiniCluster.start();
-
-			newJobExecutorService = localFlinkMiniCluster;
-		}
+		final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
+		miniCluster.start();
+
+		configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
 
-		return newJobExecutorService;
+		return miniCluster;
 	}
 
 	@Override
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 0a2f1b4..a4424eb 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -24,10 +24,8 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.optimizer.DataStatistics;
@@ -151,11 +149,7 @@ public class RemoteExecutor extends PlanExecutor {
 	public void start() throws Exception {
 		synchronized (lock) {
 			if (client == null) {
-				if (CoreOptions.LEGACY_MODE.equals(clientConfiguration.getString(CoreOptions.MODE))) {
-					client = new StandaloneClusterClient(clientConfiguration);
-				} else {
-					client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
-				}
+				client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
 				client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
 			}
 			else {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index c7e6344..c7c664d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -123,8 +123,6 @@ public class CliFrontend {
 
 	private final int defaultParallelism;
 
-	private final boolean isNewMode;
-
 	public CliFrontend(
 			Configuration configuration,
 			List<CustomCommandLine<?>> customCommandLines) throws Exception {
@@ -147,8 +145,6 @@ public class CliFrontend {
 
 		this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration);
 		this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
-
-		this.isNewMode = CoreOptions.NEW_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE));
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -233,7 +229,7 @@ public class CliFrontend {
 			final ClusterClient<T> client;
 
 			// directly deploy the job if the cluster is started in job mode and detached
-			if (isNewMode && clusterId == null && runOptions.getDetachedMode()) {
+			if (clusterId == null && runOptions.getDetachedMode()) {
 				int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
 
 				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
@@ -1200,11 +1196,7 @@ public class CliFrontend {
 			LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
 		}
 
-		if (configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE)) {
-			customCommandLines.add(new DefaultCLI(configuration));
-		} else {
-			customCommandLines.add(new LegacyCLI(configuration));
-		}
+		customCommandLines.add(new DefaultCLI(configuration));
 
 		return customCommandLines;
 	}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
index 3c24376..8ff426c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
@@ -19,43 +19,21 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
 /**
- * Base test class for {@link CliFrontend} tests that wraps the new vs. legacy mode.
+ * Base test class for {@link CliFrontend} tests.
  */
-@RunWith(Parameterized.class)
 public abstract class CliFrontendTestBase extends TestLogger {
-	@Parameterized.Parameter
-	public String mode;
-
-	@Parameterized.Parameters(name = "Mode = {0}")
-	public static List<String> parameters() {
-		return Arrays.asList(CoreOptions.LEGACY_MODE, CoreOptions.NEW_MODE);
-	}
 
 	protected Configuration getConfiguration() {
 		final Configuration configuration = GlobalConfiguration
 			.loadConfiguration(CliFrontendTestUtils.getConfigDir());
-		configuration.setString(CoreOptions.MODE, mode);
 		return configuration;
 	}
 
 	static AbstractCustomCommandLine<?> getCli(Configuration configuration) {
-		switch (configuration.getString(CoreOptions.MODE)) {
-			case CoreOptions.LEGACY_MODE:
-				return new LegacyCLI(configuration);
-			case CoreOptions.NEW_MODE:
-				return new DefaultCLI(configuration);
-		}
-		throw new IllegalStateException();
+		return new DefaultCLI(configuration);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 9ae807e..4c928fe 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -304,26 +304,4 @@ public class CoreOptions {
 	public static ConfigOption<Long> fileSystemConnectionLimitStreamInactivityTimeout(String scheme) {
 		return ConfigOptions.key("fs." + scheme + ".limit.stream-timeout").defaultValue(0L);
 	}
-
-	// ------------------------------------------------------------------------
-	//  Distributed architecture
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Constant value for the new execution mode.
-	 */
-	public static final String NEW_MODE = "new";
-
-	/**
-	 * Constant value for the old execution mode.
-	 */
-	public static final String LEGACY_MODE = "legacy";
-
-	/**
-	 * Switch to select the execution mode. Possible values are {@link CoreOptions#NEW_MODE}
-	 * and {@link CoreOptions#LEGACY_MODE}.
-	 */
-	public static final ConfigOption<String> MODE = key("mode")
-		.defaultValue(NEW_MODE)
-		.withDescription("Switch to select the execution mode. Possible values are 'new' and 'legacy'.");
 }
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index c04e845..d493495 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -139,36 +139,24 @@ object FlinkShell {
     }
   }
 
-  private type LocalCluster = Either[StandaloneMiniCluster, MiniCluster]
-
   def fetchConnectionInfo(
     configuration: Configuration,
     config: Config
-  ): (String, Int, Option[Either[LocalCluster , ClusterClient[_]]]) = {
+  ): (String, Int, Option[Either[MiniCluster , ClusterClient[_]]]) = {
     config.executionMode match {
       case ExecutionMode.LOCAL => // Local mode
         val config = configuration
         config.setInteger(JobManagerOptions.PORT, 0)
 
-        val (miniCluster, port) = config.getString(CoreOptions.MODE) match {
-          case CoreOptions.LEGACY_MODE => {
-            val cluster = new StandaloneMiniCluster(config)
-
-            (Left(cluster), cluster.getPort)
-          }
-          case CoreOptions.NEW_MODE => {
-            val miniClusterConfig = new MiniClusterConfiguration.Builder()
-              .setConfiguration(config)
-              .build()
-            val cluster = new MiniCluster(miniClusterConfig)
-            cluster.start()
-
-            (Right(cluster), cluster.getRestAddress.getPort)
-          }
-        }
+        val miniClusterConfig = new MiniClusterConfiguration.Builder()
+          .setConfiguration(config)
+          .build()
+        val cluster = new MiniCluster(miniClusterConfig)
+        cluster.start()
+        val port = cluster.getRestAddress.getPort
 
         println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n")
-        ("localhost", port, Some(Left(miniCluster)))
+        ("localhost", port, Some(Left(cluster)))
 
       case ExecutionMode.REMOTE => // Remote mode
         if (config.host.isEmpty || config.port.isEmpty) {
@@ -211,8 +199,7 @@ object FlinkShell {
     val (repl, cluster) = try {
       val (host, port, cluster) = fetchConnectionInfo(configuration, config)
       val conf = cluster match {
-        case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration
-        case Some(Left(Right(_))) => configuration
+        case Some(Left(_)) => configuration
         case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
         case None => configuration
       }
@@ -242,8 +229,7 @@ object FlinkShell {
     } finally {
       repl.closeInterpreter()
       cluster match {
-        case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close()
-        case Some(Left(Right(newMiniCluster))) => newMiniCluster.close()
+        case Some(Left(miniCluster)) => miniCluster.close()
         case Some(Right(yarnCluster)) => yarnCluster.shutdown()
         case _ =>
       }
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 54bb16f..731bbf6 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -319,7 +319,6 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-    configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
     // set to different than default so not to interfere with ScalaShellLocalStartupITCase
     configuration.setInteger(RestOptions.PORT, 8082)
     val miniConfig = new MiniClusterConfiguration.Builder()
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 9c36dab..0af6d93 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -24,10 +24,8 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -206,11 +204,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 
 		final ClusterClient<?> client;
 		try {
-			if (CoreOptions.LEGACY_MODE.equals(configuration.getString(CoreOptions.MODE))) {
-				client = new StandaloneClusterClient(configuration);
-			} else {
-				client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment");
-			}
+			client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment");
 		}
 		catch (Exception e) {
 			throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b7259de..d4e14f0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -47,7 +47,6 @@ import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -1653,13 +1652,9 @@ public abstract class StreamExecutionEnvironment {
 	public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
 		final LocalStreamEnvironment currentEnvironment;
 
-		if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
-			currentEnvironment = new LocalStreamEnvironment(configuration);
-		} else {
-			currentEnvironment = new LegacyLocalStreamEnvironment(configuration);
-		}
-
+		currentEnvironment = new LocalStreamEnvironment(configuration);
 		currentEnvironment.setParallelism(parallelism);
+
 		return currentEnvironment;
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index c2d6341..451108b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -20,37 +20,27 @@ package org.apache.flink.test.operators;
 
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.runtime.minicluster.StandaloneMiniCluster;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assume.assumeTrue;
 
 /**
  * Integration tests for {@link org.apache.flink.api.java.RemoteEnvironment}.
@@ -78,31 +68,22 @@ public class RemoteEnvironmentITCase extends TestLogger {
 	public static void setupCluster() throws Exception {
 		configuration = new Configuration();
 
-		if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
-			configuration.setInteger(WebOptions.PORT, 0);
-			final MiniCluster miniCluster = new MiniCluster(
-				new MiniClusterConfiguration.Builder()
-					.setConfiguration(configuration)
-					.setNumSlotsPerTaskManager(TM_SLOTS)
-					.build());
+		configuration.setInteger(WebOptions.PORT, 0);
+		final MiniCluster miniCluster = new MiniCluster(
+			new MiniClusterConfiguration.Builder()
+				.setConfiguration(configuration)
+				.setNumSlotsPerTaskManager(TM_SLOTS)
+				.build());
 
-			miniCluster.start();
+		miniCluster.start();
 
-			final URI uri = miniCluster.getRestAddress();
-			hostname = uri.getHost();
-			port = uri.getPort();
+		final URI uri = miniCluster.getRestAddress();
+		hostname = uri.getHost();
+		port = uri.getPort();
 
-			configuration.setInteger(WebOptions.PORT, port);
+		configuration.setInteger(WebOptions.PORT, port);
 
-			resource = miniCluster;
-		} else {
-			configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, TM_SLOTS);
-			final StandaloneMiniCluster standaloneMiniCluster = new StandaloneMiniCluster(configuration);
-			hostname = standaloneMiniCluster.getHostname();
-			port = standaloneMiniCluster.getPort();
-
-			resource = standaloneMiniCluster;
-		}
+		resource = miniCluster;
 	}
 
 	@AfterClass
@@ -111,32 +92,6 @@ public class RemoteEnvironmentITCase extends TestLogger {
 	}
 
 	/**
-	 * Ensure that that Akka configuration parameters can be set.
-	 */
-	@Test(expected = FlinkException.class)
-	public void testInvalidAkkaConfiguration() throws Throwable {
-		assumeTrue(CoreOptions.LEGACY_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE)));
-		Configuration config = new Configuration();
-		config.setString(AkkaOptions.STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
-
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				hostname,
-				port,
-				config
-		);
-		env.getConfig().disableSysoutLogging();
-
-		DataSet<String> result = env.createInput(new TestNonRichInputFormat());
-		result.output(new LocalCollectionOutputFormat<>(new ArrayList<String>()));
-		try {
-			env.execute();
-			Assert.fail("Program should not run successfully, cause of invalid akka settings.");
-		} catch (ProgramInvocationException ex) {
-			throw ex.getCause();
-		}
-	}
-
-	/**
 	 * Ensure that the program parallelism can be set even if the configuration is supplied.
 	 */
 	@Test
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
index d6a029f..75204d9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.client.cli.CliFrontendTestUtils;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.FlinkException;
@@ -40,8 +39,6 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
@@ -53,7 +50,6 @@ import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
  *
  * @see org.apache.flink.client.cli.CliFrontendRunTest
  */
-@RunWith(Parameterized.class)
 public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
 
 	@Rule
@@ -74,7 +70,6 @@ public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
 		String testJarPath = getTestJarPath("BatchWordCount.jar").getAbsolutePath();
 
 		Configuration configuration = new Configuration();
-		configuration.setString(CoreOptions.MODE, mode);
 		configuration.setString(JobManagerOptions.ADDRESS, "localhost");
 		configuration.setInteger(JobManagerOptions.PORT, 8081);
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 65f813e..7ba2150 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -41,7 +41,6 @@ import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
-import org.apache.flink.yarn.LegacyYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -163,8 +162,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 	private final String yarnPropertiesFileLocation;
 
-	private final boolean isNewMode;
-
 	private final YarnConfiguration yarnConfiguration;
 
 	public FlinkYarnSessionCli(
@@ -185,8 +182,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
 		this.acceptInteractiveInput = acceptInteractiveInput;
 
-		this.isNewMode = configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE);
-
 		// Create the command line options
 
 		query = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
@@ -375,10 +370,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 	}
 
 	private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) {
-		if (!isNewMode && !cmd.hasOption(container.getOpt())) { // number of containers is required option!
-			LOG.error("Missing required argument {}", container.getOpt());
-			printUsage();
-			throw new IllegalArgumentException("Missing required argument " + container.getOpt());
+		if (cmd.hasOption(container.getOpt())) { // number of containers is required option!
+			LOG.info("The argument {} is deprecated in will be ignored.", container.getOpt());
 		}
 
 		// TODO: The number of task manager should be deprecated soon
@@ -989,20 +982,11 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		yarnClient.init(yarnConfiguration);
 		yarnClient.start();
 
-		if (isNewMode) {
-			return new YarnClusterDescriptor(
-				configuration,
-				yarnConfiguration,
-				configurationDirectory,
-				yarnClient,
-				false);
-		} else {
-			return new LegacyYarnClusterDescriptor(
-				configuration,
-				yarnConfiguration,
-				configurationDirectory,
-				yarnClient,
-				false);
-		}
+		return new YarnClusterDescriptor(
+			configuration,
+			yarnConfiguration,
+			configurationDirectory,
+			yarnClient,
+			false);
 	}
 }


[flink] 02/08: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93b042da45a0bbf418e7df96059fe36eb4f18b9e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sun Sep 23 22:05:28 2018 +0200

    [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
    
    This closes #6750.
---
 .../runtime/entrypoint/ClusterEntrypoint.java      |   4 +-
 ...tractTaskManagerProcessFailureRecoveryTest.java | 134 ++++-----------------
 ...skManagerProcessFailureBatchRecoveryITCase.java |   4 +-
 ...nagerProcessFailureStreamingRecoveryITCase.java |   2 -
 4 files changed, 28 insertions(+), 116 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index c9a1722..5665500 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -147,7 +147,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return terminationFuture;
 	}
 
-	protected void startCluster() throws ClusterEntrypointException {
+	public void startCluster() throws ClusterEntrypointException {
 		LOG.info("Starting {}.", getClass().getSimpleName());
 
 		try {
@@ -392,7 +392,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return resultConfiguration;
 	}
 
-	private CompletableFuture<ApplicationStatus> shutDownAsync(
+	public CompletableFuture<ApplicationStatus> shutDownAsync(
 			ApplicationStatus applicationStatus,
 			@Nullable String diagnostics,
 			boolean cleanupHaData) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 5cd6f30..0962ddf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -19,29 +19,21 @@
 package org.apache.flink.test.recovery;
 
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -52,17 +44,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.StringWriter;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
 import static org.junit.Assert.assertFalse;
@@ -92,6 +75,9 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 	@Rule
 	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+	@Rule
+	public final BlobServerResource blobServerResource = new BlobServerResource();
+
 	@Test
 	public void testTaskManagerProcessFailure() throws Exception {
 
@@ -99,14 +85,25 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 		final StringWriter processOutput2 = new StringWriter();
 		final StringWriter processOutput3 = new StringWriter();
 
-		ActorSystem jmActorSystem = null;
-		HighAvailabilityServices highAvailabilityServices = null;
 		Process taskManagerProcess1 = null;
 		Process taskManagerProcess2 = null;
 		Process taskManagerProcess3 = null;
 
 		File coordinateTempDir = null;
 
+		final int jobManagerPort = NetUtils.getAvailablePort();
+		final int restPort = NetUtils.getAvailablePort();
+
+		Configuration jmConfig = new Configuration();
+		jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+		jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+		jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+		jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 500L);
+		jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L);
+		jmConfig.setInteger(RestOptions.PORT, restPort);
+
+		final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig);
+
 		try {
 			// check that we run this test only if the java command
 			// is available on this machine
@@ -124,37 +121,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			// coordination between the processes goes through a directory
 			coordinateTempDir = temporaryFolder.newFolder();
 
-			// find a free port to start the JobManager
-			final int jobManagerPort = NetUtils.getAvailablePort();
-
-			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
-
-			Configuration jmConfig = new Configuration();
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms");
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s");
-			jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
-			jmConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "10 s");
-			jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
-			jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1());
-			jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
-
-			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-				jmConfig,
-				TestingUtils.defaultExecutor(),
-				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(
-				jmConfig,
-				jmActorSystem,
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				highAvailabilityServices,
-				NoOpMetricRegistry.INSTANCE,
-				Option.empty(),
-				JobManager.class,
-				MemoryArchivist.class)._1();
+			clusterEntrypoint.startCluster();
 
 			// the TaskManager java command
 			String[] command = new String[] {
@@ -163,7 +130,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
 					"-Xms80m", "-Xmx80m",
 					"-classpath", getCurrentClasspath(),
-					TaskManagerProcessEntryPoint.class.getName(),
+					TaskExecutorProcessEntryPoint.class.getName(),
 					String.valueOf(jobManagerPort)
 			};
 
@@ -173,10 +140,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			taskManagerProcess2 = new ProcessBuilder(command).start();
 			new CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
 
-			// we wait for the JobManager to have the two TaskManagers available
-			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 2, 120000);
-
 			// the program will set a marker file in each of its parallel tasks once they are ready, so that
 			// this coordinating code is aware of this.
 			// the program will very slowly consume elements until the marker file (later created by the
@@ -189,7 +152,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 				@Override
 				public void run() {
 					try {
-						testTaskManagerFailure(jobManagerPort, coordinateDirClosure);
+						testTaskManagerFailure(restPort, coordinateDirClosure);
 					}
 					catch (Throwable t) {
 						t.printStackTrace();
@@ -220,10 +183,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			taskManagerProcess3 = new ProcessBuilder(command).start();
 			new CommonTestUtils.PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
 
-			// we wait for the third TaskManager to register
-			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 3, 120000);
-
 			// kill one of the previous TaskManagers, triggering a failure and recovery
 			taskManagerProcess1.destroy();
 			taskManagerProcess1 = null;
@@ -270,13 +229,8 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			if (taskManagerProcess3 != null) {
 				taskManagerProcess3.destroy();
 			}
-			if (jmActorSystem != null) {
-				jmActorSystem.shutdown();
-			}
 
-			if (highAvailabilityServices != null) {
-				highAvailabilityServices.closeAndCleanupAllData();
-			}
+			clusterEntrypoint.shutDownAsync(ApplicationStatus.SUCCEEDED, null, true).get();
 		}
 	}
 
@@ -290,44 +244,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 	 */
 	public abstract void testTaskManagerFailure(int jobManagerPort, File coordinateDir) throws Exception;
 
-	protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelayMillis)
-			throws Exception {
-		final long pollInterval = 10_000_000; // 10 ms = 10,000,000 nanos
-		final long deadline = System.nanoTime() + maxDelayMillis * 1_000_000;
-
-		long time;
-
-		while ((time = System.nanoTime()) < deadline) {
-			FiniteDuration timeout = new FiniteDuration(pollInterval, TimeUnit.NANOSECONDS);
-
-			try {
-				Future<?> result = Patterns.ask(jobManager,
-						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-						new Timeout(timeout));
-
-				int numTMs = (Integer) Await.result(result, timeout);
-
-				if (numTMs == numExpected) {
-					return;
-				}
-			}
-			catch (TimeoutException e) {
-				// ignore and retry
-			}
-			catch (ClassCastException e) {
-				fail("Wrong response: " + e.getMessage());
-			}
-
-			long timePassed = System.nanoTime() - time;
-			long remainingMillis = (pollInterval - timePassed) / 1_000_000;
-			if (remainingMillis > 0) {
-				Thread.sleep(remainingMillis);
-			}
-		}
-
-		fail("The TaskManagers did not register within the expected time (" + maxDelayMillis + "msecs)");
-	}
-
 	protected static void printProcessLog(String processName, String log) {
 		if (log == null || log.length() == 0) {
 			return;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
index 7dc6f0c..4815c49 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -68,10 +67,9 @@ public class TaskManagerProcessFailureBatchRecoveryITCase extends AbstractTaskMa
 	public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir) throws Exception {
 
 		final Configuration configuration = new Configuration();
-		configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration);
 		env.setParallelism(PARALLELISM);
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000));
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
 		env.getConfig().setExecutionMode(executionMode);
 		env.getConfig().disableSysoutLogging();
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
index 766a799..fbf6b5b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -67,7 +66,6 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
 		final File tempCheckpointDir = tempFolder.newFolder();
 
 		final Configuration configuration = new Configuration();
-		configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
 			"localhost",
 			jobManagerPort,


[flink] 07/08: [hotfix] Remove DispatcherProcess#getJobManagerPort function

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b0d5e99319f6dc38da890e389f8b1a7e2cabe6cf
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Sep 24 10:31:46 2018 +0200

    [hotfix] Remove DispatcherProcess#getJobManagerPort function
---
 .../flink/runtime/testutils/DispatcherProcess.java | 45 +---------------------
 1 file changed, 1 insertion(+), 44 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
index 79b0dc3..85d3caa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
@@ -32,11 +32,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -48,9 +43,6 @@ public class DispatcherProcess extends TestJvmProcess {
 
 	private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcess.class);
 
-	/** Pattern to parse the job manager port from the logs. */
-	private static final Pattern PORT_PATTERN = Pattern.compile(".*Actor system started at akka\\.tcp://flink@.*:(\\d+).*");
-
 	/** ID for this JobManager. */
 	private final int id;
 
@@ -60,9 +52,6 @@ public class DispatcherProcess extends TestJvmProcess {
 	/** Configuration parsed as args for {@link JobManagerProcess.JobManagerProcessEntryPoint}. */
 	private final String[] jvmArgs;
 
-	/** The port the JobManager listens on. */
-	private int jobManagerPort;
-
 	/**
 	 * Creates a {@link JobManager} running in a separate JVM.
 	 *
@@ -106,41 +95,9 @@ public class DispatcherProcess extends TestJvmProcess {
 		return config;
 	}
 
-	/**
-	 * Parses the port from the job manager logs and returns it.
-	 *
-	 * <p>If a call to this method succeeds, successive calls will directly
-	 * return the port and re-parse the logs.
-	 *
-	 * @param timeout Timeout for log parsing.
-	 * @return The port of the job manager
-	 * @throws InterruptedException  If interrupted while waiting before
-	 *                               retrying to parse the logs
-	 * @throws NumberFormatException If the parsed port is not a number
-	 */
-	public int getJobManagerPort(FiniteDuration timeout) throws InterruptedException, NumberFormatException {
-		if (jobManagerPort > 0) {
-			return jobManagerPort;
-		} else {
-			Deadline deadline = timeout.fromNow();
-			while (deadline.hasTimeLeft()) {
-				Matcher matcher = PORT_PATTERN.matcher(getProcessOutput());
-				if (matcher.find()) {
-					String port = matcher.group(1);
-					jobManagerPort = Integer.parseInt(port);
-					return jobManagerPort;
-				} else {
-					Thread.sleep(100);
-				}
-			}
-
-			throw new RuntimeException("Could not parse port from logs");
-		}
-	}
-
 	@Override
 	public String toString() {
-		return String.format("JobManagerProcess(id=%d, port=%d)", id, jobManagerPort);
+		return String.format("JobManagerProcess(id=%d)", id);
 	}
 
 	/**


[flink] 01/08: [FLINK-10401] Port ProcessFailureCancelingITCase to new code base

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ff97d1cb3efbac4f47abc0a38bac932a55fc1288
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sun Sep 23 00:28:30 2018 +0200

    [FLINK-10401] Port ProcessFailureCancelingITCase to new code base
    
    This closes #6749.
---
 .../DispatcherResourceManagerComponent.java        |  10 +
 .../flink/runtime/util/BlobServerResource.java     |   4 +
 ...tractTaskManagerProcessFailureRecoveryTest.java |  29 +++
 .../recovery/ProcessFailureCancelingITCase.java    | 256 ++++++++++-----------
 4 files changed, 159 insertions(+), 140 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
index 94925b2..b07095c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
@@ -116,6 +116,16 @@ public class DispatcherResourceManagerComponent<T extends Dispatcher> implements
 		return shutDownFuture;
 	}
 
+	@Nonnull
+	public T getDispatcher() {
+		return dispatcher;
+	}
+
+	@Nonnull
+	public WebMonitorEndpoint<?> getWebMonitorEndpoint() {
+		return webMonitorEndpoint;
+	}
+
 	@Override
 	public CompletableFuture<Void> closeAsync() {
 		if (isRunning.compareAndSet(true, false)) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
index 080ecf8..654b2bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
@@ -62,4 +62,8 @@ public class BlobServerResource extends ExternalResource {
 	public int getBlobServerPort() {
 		return blobServer.getPort();
 	}
+
+	public BlobServer getBlobServer() {
+		return blobServer;
+	}
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 56327ad..5cd6f30 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -421,4 +422,32 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 		}
 	}
 
+	/**
+	 * The entry point for the TaskExecutor JVM. Simply configures and runs a TaskExecutor.
+	 */
+	public static class TaskExecutorProcessEntryPoint {
+
+		private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorProcessEntryPoint.class);
+
+		public static void main(String[] args) {
+			try {
+				int jobManagerPort = Integer.parseInt(args[0]);
+
+				Configuration cfg = new Configuration();
+				cfg.setString(JobManagerOptions.ADDRESS, "localhost");
+				cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
+				cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+				cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+				cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+				cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+
+				TaskManagerRunner.runTaskManager(cfg, ResourceID.generate());
+			}
+			catch (Throwable t) {
+				LOG.error("Failed to start TaskManager process", t);
+				System.exit(1);
+			}
+		}
+	}
+
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index b85a410..afca8f1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -21,51 +21,58 @@ package org.apache.flink.test.recovery;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
+import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.NetUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.CheckedSupplier;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.StringWriter;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * This test makes sure that jobs are canceled properly in cases where
@@ -74,15 +81,36 @@ import static org.junit.Assert.fail;
 @SuppressWarnings("serial")
 public class ProcessFailureCancelingITCase extends TestLogger {
 
+	@Rule
+	public final BlobServerResource blobServerResource = new BlobServerResource();
+
 	@Test
 	public void testCancelingOnProcessFailure() throws Exception {
 		final StringWriter processOutput = new StringWriter();
+		final Time timeout = Time.minutes(2L);
 
-		ActorSystem jmActorSystem = null;
+		RestClusterClient<String> clusterClient = null;
 		Process taskManagerProcess = null;
-		HighAvailabilityServices highAvailabilityServices = null;
+		final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+
+		Configuration jmConfig = new Configuration();
+		jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+		jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+		jmConfig.setInteger(RestOptions.PORT, 0);
+
+		final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, jmConfig);
+		final int jobManagerPort = rpcService.getPort();
+		jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+
+		final SessionDispatcherResourceManagerComponentFactory resourceManagerComponentFactory = new SessionDispatcherResourceManagerComponentFactory(
+			StandaloneResourceManagerFactory.INSTANCE);
+		DispatcherResourceManagerComponent<?> dispatcherResourceManagerComponent = null;
+
+		try (final HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+				jmConfig,
+				TestingUtils.defaultExecutor(),
+				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION)) {
 
-		try {
 			// check that we run this test only if the java command
 			// is available on this machine
 			String javaCommand = getJavaCommandPath();
@@ -96,36 +124,22 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 			tempLogFile.deleteOnExit();
 			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
 
-			// find a free port to start the JobManager
-			final int jobManagerPort = NetUtils.getAvailablePort();
-
-			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
-
-			Configuration jmConfig = new Configuration();
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "5 s");
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2000 s");
-			jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 10);
-			jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
-			jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1());
-			jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
-
-			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-				jmConfig,
-				TestingUtils.defaultExecutor(),
-				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(
+			dispatcherResourceManagerComponent = resourceManagerComponentFactory.create(
 				jmConfig,
-				jmActorSystem,
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				highAvailabilityServices,
+				rpcService,
+				haServices,
+				blobServerResource.getBlobServer(),
+				new HeartbeatServices(100L, 1000L),
 				NoOpMetricRegistry.INSTANCE,
-				Option.empty(),
-				JobManager.class,
-				MemoryArchivist.class)._1();
+				new MemoryArchivedExecutionGraphStore(),
+				fatalErrorHandler);
+
+			// update the rest ports
+			final int restPort = dispatcherResourceManagerComponent
+				.getWebMonitorEndpoint()
+				.getServerAddress()
+				.getPort();
+			jmConfig.setInteger(RestOptions.PORT, restPort);
 
 			// the TaskManager java command
 			String[] command = new String[] {
@@ -134,7 +148,7 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
 					"-Xms80m", "-Xmx80m",
 					"-classpath", getCurrentClasspath(),
-					AbstractTaskManagerProcessFailureRecoveryTest.TaskManagerProcessEntryPoint.class.getName(),
+					AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName(),
 					String.valueOf(jobManagerPort)
 			};
 
@@ -142,21 +156,14 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 			taskManagerProcess = new ProcessBuilder(command).start();
 			new CommonTestUtils.PipeForwarder(taskManagerProcess.getErrorStream(), processOutput);
 
-			// we wait for the JobManager to have the two TaskManagers available
-			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 1, 120000);
-
 			final Throwable[] errorRef = new Throwable[1];
 
-			final Configuration configuration = new Configuration();
-			configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
-
 			// start the test program, which infinitely blocks
 			Runnable programRunner = new Runnable() {
 				@Override
 				public void run() {
 					try {
-						ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration);
+						ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", restPort, new Configuration());
 						env.setParallelism(2);
 						env.setRestartStrategy(RestartStrategies.noRestart());
 						env.getConfig().disableSysoutLogging();
@@ -187,15 +194,30 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 			Thread programThread = new Thread(programRunner);
 
 			// kill the TaskManager
+			programThread.start();
+
+			final LeaderConnectionInfo leaderConnectionInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(), Time.seconds(10L));
+
+			final DispatcherGateway dispatcherGateway = rpcService.connect(
+				leaderConnectionInfo.getAddress(),
+				DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()),
+				DispatcherGateway.class).get();
+
+			waitUntilAllSlotsAreUsed(dispatcherGateway, timeout);
+
+			clusterClient = new RestClusterClient<>(jmConfig, "standalone");
+
+			final Collection<JobID> jobIds = waitForRunningJobs(clusterClient, timeout);
+
+			assertThat(jobIds, hasSize(1));
+			final JobID jobId = jobIds.iterator().next();
+
+			// kill the TaskManager after the job started to run
 			taskManagerProcess.destroy();
 			taskManagerProcess = null;
 
-			// immediately submit the job. this should hit the case
-			// where the JobManager still thinks it has the TaskManager and tries to send it tasks
-			programThread.start();
-
 			// try to cancel the job
-			cancelRunningJob(jmActor);
+			clusterClient.cancel(jobId);
 
 			// we should see a failure within reasonable time (10s is the ask timeout).
 			// since the CI environment is often slow, we conservatively give it up to 2 minutes,
@@ -223,88 +245,42 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 			if (taskManagerProcess != null) {
 				taskManagerProcess.destroy();
 			}
-			if (jmActorSystem != null) {
-				jmActorSystem.shutdown();
-			}
-
-			if (highAvailabilityServices != null) {
-				highAvailabilityServices.closeAndCleanupAllData();
-			}
-		}
-	}
-
-	private void cancelRunningJob(ActorRef jobManager) throws Exception {
-		final FiniteDuration askTimeout = new FiniteDuration(10, TimeUnit.SECONDS);
-
-		// try at most for 30 seconds
-		final long deadline = System.currentTimeMillis() + 30000;
-
-		JobID jobId = null;
-
-		do {
-			Future<Object> response = Patterns.ask(jobManager,
-					JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(askTimeout));
-
-			Object result;
-			try {
-				result = Await.result(response, askTimeout);
+			if (clusterClient != null) {
+				clusterClient.shutdown();
 			}
-			catch (Exception e) {
-				throw new Exception("Could not retrieve running jobs from the JobManager.", e);
+			if (dispatcherResourceManagerComponent != null) {
+				dispatcherResourceManagerComponent.close();
 			}
 
-			if (result instanceof JobManagerMessages.RunningJobsStatus) {
-
-				List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+			fatalErrorHandler.rethrowError();
 
-				if (jobs.size() == 1) {
-					jobId = jobs.get(0).getJobId();
-					break;
-				}
-			}
-		}
-		while (System.currentTimeMillis() < deadline);
-
-		if (jobId == null) {
-			// we never found it running, must have failed already
-			return;
+			RpcUtils.terminateRpcService(rpcService, Time.seconds(10L));
 		}
-
-		// tell the JobManager to cancel the job
-		jobManager.tell(
-			new JobManagerMessages.LeaderSessionMessage(
-				HighAvailabilityServices.DEFAULT_LEADER_ID,
-				new JobManagerMessages.CancelJob(jobId)),
-			ActorRef.noSender());
 	}
 
-	private void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
-			throws Exception {
-		final long deadline = System.currentTimeMillis() + maxDelay;
-		while (true) {
-			long remaining = deadline - System.currentTimeMillis();
-			if (remaining <= 0) {
-				fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
-			}
-
-			FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
+	private void waitUntilAllSlotsAreUsed(DispatcherGateway dispatcherGateway, Time timeout) throws ExecutionException, InterruptedException {
+		FutureUtils.retrySuccesfulWithDelay(
+			() -> dispatcherGateway.requestClusterOverview(timeout),
+			Time.milliseconds(50L),
+			Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
+			clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= 1 &&
+				clusterOverview.getNumSlotsAvailable() == 0 &&
+				clusterOverview.getNumSlotsTotal() == 2,
+			TestingUtils.defaultScheduledExecutor())
+			.get();
+	}
 
-			try {
-				Future<?> result = Patterns.ask(jobManager,
-						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-						new Timeout(timeout));
-				Integer numTMs = (Integer) Await.result(result, timeout);
-				if (numTMs == numExpected) {
-					break;
-				}
-			}
-			catch (TimeoutException e) {
-				// ignore and retry
-			}
-			catch (ClassCastException e) {
-				fail("Wrong response: " + e.getMessage());
-			}
-		}
+	private Collection<JobID> waitForRunningJobs(ClusterClient<?> clusterClient, Time timeout) throws ExecutionException, InterruptedException {
+		return FutureUtils.retrySuccesfulWithDelay(
+				CheckedSupplier.unchecked(clusterClient::listJobs),
+				Time.milliseconds(50L),
+				Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
+				jobs -> !jobs.isEmpty(),
+				TestingUtils.defaultScheduledExecutor())
+			.get()
+			.stream()
+			.map(JobStatusMessage::getJobId)
+			.collect(Collectors.toList());
 	}
 
 	private void printProcessLog(String processName, String log) {


[flink] 05/08: [hotfix] Let ClusterEntrypoint implement AutoCloseableAsync

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 20447b23848c2f69afef2742e06922fe174ddc47
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 27 17:14:00 2018 +0200

    [hotfix] Let ClusterEntrypoint implement AutoCloseableAsync
---
 .../apache/flink/runtime/entrypoint/ClusterEntrypoint.java  | 13 +++++++++++--
 .../AbstractTaskManagerProcessFailureRecoveryTest.java      |  7 +------
 2 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 5665500..9eaef34 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
@@ -88,7 +89,7 @@ import scala.concurrent.duration.FiniteDuration;
  *
  * <p>Specialization of this class can be used for the session mode and the per-job mode
  */
-public abstract class ClusterEntrypoint implements FatalErrorHandler {
+public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErrorHandler {
 
 	public static final ConfigOption<String> EXECUTION_MODE = ConfigOptions
 		.key("internal.cluster.execution-mode")
@@ -312,6 +313,14 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 	}
 
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		return shutDownAsync(
+			ApplicationStatus.UNKNOWN,
+			"Cluster entrypoint has been closed externally.",
+			true).thenAccept(ignored -> {});
+	}
+
 	protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
 		synchronized (lock) {
 			Throwable exception = null;
@@ -392,7 +401,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return resultConfiguration;
 	}
 
-	public CompletableFuture<ApplicationStatus> shutDownAsync(
+	private CompletableFuture<ApplicationStatus> shutDownAsync(
 			ApplicationStatus applicationStatus,
 			@Nullable String diagnostics,
 			boolean cleanupHaData) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 1374b70..5d7f26b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
@@ -101,9 +100,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 		jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L);
 		jmConfig.setInteger(RestOptions.PORT, restPort);
 
-		final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig);
-
-		try {
+		try (final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig)) {
 			// check that we run this test only if the java command
 			// is available on this machine
 			String javaCommand = getJavaCommandPath();
@@ -228,8 +225,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			if (taskManagerProcess3 != null) {
 				taskManagerProcess3.destroy();
 			}
-
-			clusterEntrypoint.shutDownAsync(ApplicationStatus.SUCCEEDED, null, true).get();
 		}
 	}