You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/28 14:36:56 UTC

[GitHub] asfgit closed pull request #6752: [FLINK-10397] Remove CoreOptions#MODE

asfgit closed pull request #6752: [FLINK-10397] Remove CoreOptions#MODE
URL: https://github.com/apache/flink/pull/6752
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/core_configuration.html b/docs/_includes/generated/core_configuration.html
index 98cca9125a0..4366e8b246f 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -27,11 +27,6 @@
             <td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.</td>
             <td></td>
         </tr>
-        <tr>
-            <td><h5>mode</h5></td>
-            <td style="word-wrap: break-word;">"new"</td>
-            <td>Switch to select the execution mode. Possible values are 'new' and 'legacy'.</td>
-        </tr>
         <tr>
             <td><h5>parallelism.default</h5></td>
             <td style="word-wrap: break-word;">1</td>
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 4e4993a88db..14d3ee50a14 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -37,7 +37,6 @@
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
@@ -125,39 +124,28 @@ public void start() throws Exception {
 	}
 
 	private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
-		final JobExecutorService newJobExecutorService;
-		if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
+		if (!configuration.contains(RestOptions.PORT)) {
+			configuration.setInteger(RestOptions.PORT, 0);
+		}
 
-			if (!configuration.contains(RestOptions.PORT)) {
-				configuration.setInteger(RestOptions.PORT, 0);
-			}
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(configuration)
+			.setNumTaskManagers(
+				configuration.getInteger(
+					ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
+					ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
+			.setRpcServiceSharing(RpcServiceSharing.SHARED)
+			.setNumSlotsPerTaskManager(
+				configuration.getInteger(
+					TaskManagerOptions.NUM_TASK_SLOTS, 1))
+			.build();
 
-			final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
-				.setConfiguration(configuration)
-				.setNumTaskManagers(
-					configuration.getInteger(
-						ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
-						ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
-				.setRpcServiceSharing(RpcServiceSharing.SHARED)
-				.setNumSlotsPerTaskManager(
-					configuration.getInteger(
-						TaskManagerOptions.NUM_TASK_SLOTS, 1))
-				.build();
-
-			final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
-			miniCluster.start();
-
-			configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
-
-			newJobExecutorService = miniCluster;
-		} else {
-			final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true);
-			localFlinkMiniCluster.start();
-
-			newJobExecutorService = localFlinkMiniCluster;
-		}
+		final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
+		miniCluster.start();
+
+		configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
 
-		return newJobExecutorService;
+		return miniCluster;
 	}
 
 	@Override
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 0a2f1b49411..a4424eb0b96 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -24,10 +24,8 @@
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.optimizer.DataStatistics;
@@ -151,11 +149,7 @@ public int getDefaultParallelism() {
 	public void start() throws Exception {
 		synchronized (lock) {
 			if (client == null) {
-				if (CoreOptions.LEGACY_MODE.equals(clientConfiguration.getString(CoreOptions.MODE))) {
-					client = new StandaloneClusterClient(clientConfiguration);
-				} else {
-					client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
-				}
+				client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
 				client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
 			}
 			else {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index c7e6344447e..c7c664d3f86 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -123,8 +123,6 @@
 
 	private final int defaultParallelism;
 
-	private final boolean isNewMode;
-
 	public CliFrontend(
 			Configuration configuration,
 			List<CustomCommandLine<?>> customCommandLines) throws Exception {
@@ -147,8 +145,6 @@ public CliFrontend(
 
 		this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration);
 		this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
-
-		this.isNewMode = CoreOptions.NEW_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE));
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -233,7 +229,7 @@ protected void run(String[] args) throws Exception {
 			final ClusterClient<T> client;
 
 			// directly deploy the job if the cluster is started in job mode and detached
-			if (isNewMode && clusterId == null && runOptions.getDetachedMode()) {
+			if (clusterId == null && runOptions.getDetachedMode()) {
 				int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
 
 				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
@@ -1200,11 +1196,7 @@ static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress
 			LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
 		}
 
-		if (configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE)) {
-			customCommandLines.add(new DefaultCLI(configuration));
-		} else {
-			customCommandLines.add(new LegacyCLI(configuration));
-		}
+		customCommandLines.add(new DefaultCLI(configuration));
 
 		return customCommandLines;
 	}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
index 3c243763a0b..8ff426c8057 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
@@ -19,43 +19,21 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
 /**
- * Base test class for {@link CliFrontend} tests that wraps the new vs. legacy mode.
+ * Base test class for {@link CliFrontend} tests.
  */
-@RunWith(Parameterized.class)
 public abstract class CliFrontendTestBase extends TestLogger {
-	@Parameterized.Parameter
-	public String mode;
-
-	@Parameterized.Parameters(name = "Mode = {0}")
-	public static List<String> parameters() {
-		return Arrays.asList(CoreOptions.LEGACY_MODE, CoreOptions.NEW_MODE);
-	}
 
 	protected Configuration getConfiguration() {
 		final Configuration configuration = GlobalConfiguration
 			.loadConfiguration(CliFrontendTestUtils.getConfigDir());
-		configuration.setString(CoreOptions.MODE, mode);
 		return configuration;
 	}
 
 	static AbstractCustomCommandLine<?> getCli(Configuration configuration) {
-		switch (configuration.getString(CoreOptions.MODE)) {
-			case CoreOptions.LEGACY_MODE:
-				return new LegacyCLI(configuration);
-			case CoreOptions.NEW_MODE:
-				return new DefaultCLI(configuration);
-		}
-		throw new IllegalStateException();
+		return new DefaultCLI(configuration);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 9ae807ef145..4c928fef686 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -304,26 +304,4 @@
 	public static ConfigOption<Long> fileSystemConnectionLimitStreamInactivityTimeout(String scheme) {
 		return ConfigOptions.key("fs." + scheme + ".limit.stream-timeout").defaultValue(0L);
 	}
-
-	// ------------------------------------------------------------------------
-	//  Distributed architecture
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Constant value for the new execution mode.
-	 */
-	public static final String NEW_MODE = "new";
-
-	/**
-	 * Constant value for the old execution mode.
-	 */
-	public static final String LEGACY_MODE = "legacy";
-
-	/**
-	 * Switch to select the execution mode. Possible values are {@link CoreOptions#NEW_MODE}
-	 * and {@link CoreOptions#LEGACY_MODE}.
-	 */
-	public static final ConfigOption<String> MODE = key("mode")
-		.defaultValue(NEW_MODE)
-		.withDescription("Switch to select the execution mode. Possible values are 'new' and 'legacy'.");
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index c9a17227f79..9eaef34a33a 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -54,6 +54,7 @@
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
@@ -88,7 +89,7 @@
  *
  * <p>Specialization of this class can be used for the session mode and the per-job mode
  */
-public abstract class ClusterEntrypoint implements FatalErrorHandler {
+public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErrorHandler {
 
 	public static final ConfigOption<String> EXECUTION_MODE = ConfigOptions
 		.key("internal.cluster.execution-mode")
@@ -147,7 +148,7 @@ protected ClusterEntrypoint(Configuration configuration) {
 		return terminationFuture;
 	}
 
-	protected void startCluster() throws ClusterEntrypointException {
+	public void startCluster() throws ClusterEntrypointException {
 		LOG.info("Starting {}.", getClass().getSimpleName());
 
 		try {
@@ -312,6 +313,14 @@ protected MetricRegistryImpl createMetricRegistry(Configuration configuration) {
 		return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 	}
 
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		return shutDownAsync(
+			ApplicationStatus.UNKNOWN,
+			"Cluster entrypoint has been closed externally.",
+			true).thenAccept(ignored -> {});
+	}
+
 	protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
 		synchronized (lock) {
 			Throwable exception = null;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
index 94925b2aba0..b07095c2b6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
@@ -116,6 +116,16 @@ private void registerShutDownFuture() {
 		return shutDownFuture;
 	}
 
+	@Nonnull
+	public T getDispatcher() {
+		return dispatcher;
+	}
+
+	@Nonnull
+	public WebMonitorEndpoint<?> getWebMonitorEndpoint() {
+		return webMonitorEndpoint;
+	}
+
 	@Override
 	public CompletableFuture<Void> closeAsync() {
 		if (isRunning.compareAndSet(true, false)) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
new file mode 100644
index 00000000000..85d3caa26fa
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
+import org.apache.flink.runtime.jobmanager.JobManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link Dispatcher} instance running in a separate JVM.
+ */
+public class DispatcherProcess extends TestJvmProcess {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcess.class);
+
+	/** ID for this JobManager. */
+	private final int id;
+
+	/** The configuration for the JobManager. */
+	private final Configuration config;
+
+	/** Configuration parsed as args for {@link JobManagerProcess.JobManagerProcessEntryPoint}. */
+	private final String[] jvmArgs;
+
+	/**
+	 * Creates a {@link JobManager} running in a separate JVM.
+	 *
+	 * @param id     ID for the JobManager
+	 * @param config Configuration for the job manager process
+	 *
+	 * @throws Exception
+	 */
+	public DispatcherProcess(int id, Configuration config) throws Exception {
+		checkArgument(id >= 0, "Negative ID");
+		this.id = id;
+		this.config = checkNotNull(config, "Configuration");
+
+		ArrayList<String> args = new ArrayList<>();
+
+		for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+			args.add("--" + entry.getKey());
+			args.add(entry.getValue());
+		}
+
+		this.jvmArgs = new String[args.size()];
+		args.toArray(jvmArgs);
+	}
+
+	@Override
+	public String getName() {
+		return "JobManager " + id;
+	}
+
+	@Override
+	public String[] getJvmArgs() {
+		return jvmArgs;
+	}
+
+	@Override
+	public String getEntryPointClassName() {
+		return DispatcherProcessEntryPoint.class.getName();
+	}
+
+	public Configuration getConfig() {
+		return config;
+	}
+
+	@Override
+	public String toString() {
+		return String.format("JobManagerProcess(id=%d)", id);
+	}
+
+	/**
+	 * Entry point for the JobManager process.
+	 */
+	public static class DispatcherProcessEntryPoint {
+
+		private static final Logger LOG = LoggerFactory.getLogger(DispatcherProcessEntryPoint.class);
+
+		/**
+		 * Entrypoint of the DispatcherProcessEntryPoint.
+		 *
+		 * <p>Other arguments are parsed to a {@link Configuration} and passed to the
+		 * JobManager, for instance: <code>--high-availability ZOOKEEPER --high-availability.zookeeper.quorum
+		 * "xyz:123:456"</code>.
+		 */
+		public static void main(String[] args) {
+			try {
+				ParameterTool params = ParameterTool.fromArgs(args);
+				Configuration config = params.getConfiguration();
+				LOG.info("Configuration: {}.", config);
+
+				config.setInteger(JobManagerOptions.PORT, 0);
+				config.setInteger(RestOptions.PORT, 0);
+
+				final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(config);
+
+				ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint);
+			}
+			catch (Throwable t) {
+				LOG.error("Failed to start JobManager process", t);
+				System.exit(1);
+			}
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
deleted file mode 100644
index b381f62b97e..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testutils;
-
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link TaskManager} instance running in a separate JVM.
- */
-public class TaskManagerProcess extends TestJvmProcess {
-
-	/** ID for this TaskManager */
-	private final int id;
-
-	/** The configuration for the TaskManager */
-	private final Configuration config;
-
-	/** Configuration parsed as args for {@link TaskManagerProcess.TaskManagerProcessEntryPoint} */
-	private final String[] jvmArgs;
-
-	public TaskManagerProcess(int id, Configuration config) throws Exception {
-		checkArgument(id >= 0, "Negative ID");
-		this.id = id;
-		this.config = checkNotNull(config, "Configuration");
-
-		ArrayList<String> args = new ArrayList<>();
-
-		for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
-			args.add("--" + entry.getKey());
-			args.add(entry.getValue());
-		}
-
-		this.jvmArgs = new String[args.size()];
-		args.toArray(jvmArgs);
-	}
-
-	@Override
-	public String getName() {
-		return "TaskManager " + id;
-	}
-
-	@Override
-	public String[] getJvmArgs() {
-		return jvmArgs;
-	}
-
-	@Override
-	public String getEntryPointClassName() {
-		return TaskManagerProcessEntryPoint.class.getName();
-	}
-
-	public int getId() {
-		return id;
-	}
-
-	@Override
-	public String toString() {
-		return String.format("TaskManagerProcess(id=%d)", id);
-	}
-
-	/**
-	 * Entry point for the TaskManager process.
-	 */
-	public static class TaskManagerProcessEntryPoint {
-
-		private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
-
-		/**
-		 * All arguments are parsed to a {@link Configuration} and passed to the Taskmanager,
-		 * for instance: <code>--high-availability ZOOKEEPER --high-availability.zookeeper.quorum "xyz:123:456"</code>.
-		 */
-		public static void main(String[] args) throws Exception {
-			try {
-				Configuration config = ParameterTool.fromArgs(args).getConfiguration();
-
-				if (!config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
-					config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
-				}
-
-				if (!config.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
-					config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-				}
-
-
-				LOG.info("Configuration: {}.", config);
-
-				// Run the TaskManager
-				TaskManager.selectNetworkInterfaceAndRunTaskManager(
-					config,
-					ResourceID.generate(),
-					TaskManager.class);
-
-				// Run forever
-				new CountDownLatch(1).await();
-			}
-			catch (Throwable t) {
-				LOG.error("Failed to start TaskManager process", t);
-				System.exit(1);
-			}
-		}
-	}
-
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
index 080ecf81e66..654b2bd66d7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
@@ -62,4 +62,8 @@ protected void after() {
 	public int getBlobServerPort() {
 		return blobServer.getPort();
 	}
+
+	public BlobServer getBlobServer() {
+		return blobServer;
+	}
 }
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index c04e845c2fc..d493495d484 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -139,36 +139,24 @@ object FlinkShell {
     }
   }
 
-  private type LocalCluster = Either[StandaloneMiniCluster, MiniCluster]
-
   def fetchConnectionInfo(
     configuration: Configuration,
     config: Config
-  ): (String, Int, Option[Either[LocalCluster , ClusterClient[_]]]) = {
+  ): (String, Int, Option[Either[MiniCluster , ClusterClient[_]]]) = {
     config.executionMode match {
       case ExecutionMode.LOCAL => // Local mode
         val config = configuration
         config.setInteger(JobManagerOptions.PORT, 0)
 
-        val (miniCluster, port) = config.getString(CoreOptions.MODE) match {
-          case CoreOptions.LEGACY_MODE => {
-            val cluster = new StandaloneMiniCluster(config)
-
-            (Left(cluster), cluster.getPort)
-          }
-          case CoreOptions.NEW_MODE => {
-            val miniClusterConfig = new MiniClusterConfiguration.Builder()
-              .setConfiguration(config)
-              .build()
-            val cluster = new MiniCluster(miniClusterConfig)
-            cluster.start()
-
-            (Right(cluster), cluster.getRestAddress.getPort)
-          }
-        }
+        val miniClusterConfig = new MiniClusterConfiguration.Builder()
+          .setConfiguration(config)
+          .build()
+        val cluster = new MiniCluster(miniClusterConfig)
+        cluster.start()
+        val port = cluster.getRestAddress.getPort
 
         println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n")
-        ("localhost", port, Some(Left(miniCluster)))
+        ("localhost", port, Some(Left(cluster)))
 
       case ExecutionMode.REMOTE => // Remote mode
         if (config.host.isEmpty || config.port.isEmpty) {
@@ -211,8 +199,7 @@ object FlinkShell {
     val (repl, cluster) = try {
       val (host, port, cluster) = fetchConnectionInfo(configuration, config)
       val conf = cluster match {
-        case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration
-        case Some(Left(Right(_))) => configuration
+        case Some(Left(_)) => configuration
         case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
         case None => configuration
       }
@@ -242,8 +229,7 @@ object FlinkShell {
     } finally {
       repl.closeInterpreter()
       cluster match {
-        case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close()
-        case Some(Left(Right(newMiniCluster))) => newMiniCluster.close()
+        case Some(Left(miniCluster)) => miniCluster.close()
         case Some(Right(yarnCluster)) => yarnCluster.shutdown()
         case _ =>
       }
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 54bb16f038c..731bbf6b288 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -319,7 +319,6 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-    configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
     // set to different than default so not to interfere with ScalaShellLocalStartupITCase
     configuration.setInteger(RestOptions.PORT, 8082)
     val miniConfig = new MiniClusterConfiguration.Builder()
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 9c36dab75fd..0af6d937294 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -24,10 +24,8 @@
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -206,11 +204,7 @@ protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL>
 
 		final ClusterClient<?> client;
 		try {
-			if (CoreOptions.LEGACY_MODE.equals(configuration.getString(CoreOptions.MODE))) {
-				client = new StandaloneClusterClient(configuration);
-			} else {
-				client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment");
-			}
+			client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment");
 		}
 		catch (Exception e) {
 			throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b7259de196f..d4e14f07667 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -47,7 +47,6 @@
 import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -1653,13 +1652,9 @@ public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
 	public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
 		final LocalStreamEnvironment currentEnvironment;
 
-		if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
-			currentEnvironment = new LocalStreamEnvironment(configuration);
-		} else {
-			currentEnvironment = new LegacyLocalStreamEnvironment(configuration);
-		}
-
+		currentEnvironment = new LocalStreamEnvironment(configuration);
 		currentEnvironment.setParallelism(parallelism);
+
 		return currentEnvironment;
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index c2d63415b2a..451108b8627 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -20,37 +20,27 @@
 
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.runtime.minicluster.StandaloneMiniCluster;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assume.assumeTrue;
 
 /**
  * Integration tests for {@link org.apache.flink.api.java.RemoteEnvironment}.
@@ -78,31 +68,22 @@
 	public static void setupCluster() throws Exception {
 		configuration = new Configuration();
 
-		if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
-			configuration.setInteger(WebOptions.PORT, 0);
-			final MiniCluster miniCluster = new MiniCluster(
-				new MiniClusterConfiguration.Builder()
-					.setConfiguration(configuration)
-					.setNumSlotsPerTaskManager(TM_SLOTS)
-					.build());
+		configuration.setInteger(WebOptions.PORT, 0);
+		final MiniCluster miniCluster = new MiniCluster(
+			new MiniClusterConfiguration.Builder()
+				.setConfiguration(configuration)
+				.setNumSlotsPerTaskManager(TM_SLOTS)
+				.build());
 
-			miniCluster.start();
+		miniCluster.start();
 
-			final URI uri = miniCluster.getRestAddress();
-			hostname = uri.getHost();
-			port = uri.getPort();
+		final URI uri = miniCluster.getRestAddress();
+		hostname = uri.getHost();
+		port = uri.getPort();
 
-			configuration.setInteger(WebOptions.PORT, port);
+		configuration.setInteger(WebOptions.PORT, port);
 
-			resource = miniCluster;
-		} else {
-			configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, TM_SLOTS);
-			final StandaloneMiniCluster standaloneMiniCluster = new StandaloneMiniCluster(configuration);
-			hostname = standaloneMiniCluster.getHostname();
-			port = standaloneMiniCluster.getPort();
-
-			resource = standaloneMiniCluster;
-		}
+		resource = miniCluster;
 	}
 
 	@AfterClass
@@ -110,32 +91,6 @@ public static void tearDownCluster() throws Exception {
 		resource.close();
 	}
 
-	/**
-	 * Ensure that that Akka configuration parameters can be set.
-	 */
-	@Test(expected = FlinkException.class)
-	public void testInvalidAkkaConfiguration() throws Throwable {
-		assumeTrue(CoreOptions.LEGACY_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE)));
-		Configuration config = new Configuration();
-		config.setString(AkkaOptions.STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
-
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				hostname,
-				port,
-				config
-		);
-		env.getConfig().disableSysoutLogging();
-
-		DataSet<String> result = env.createInput(new TestNonRichInputFormat());
-		result.output(new LocalCollectionOutputFormat<>(new ArrayList<String>()));
-		try {
-			env.execute();
-			Assert.fail("Program should not run successfully, cause of invalid akka settings.");
-		} catch (ProgramInvocationException ex) {
-			throw ex.getCause();
-		}
-	}
-
 	/**
 	 * Ensure that the program parallelism can be set even if the configuration is supplied.
 	 */
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 56327adaae0..5d7f26bb886 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -19,28 +19,19 @@
 package org.apache.flink.test.recovery;
 
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -51,17 +42,8 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.StringWriter;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
 import static org.junit.Assert.assertFalse;
@@ -91,6 +73,9 @@
 	@Rule
 	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+	@Rule
+	public final BlobServerResource blobServerResource = new BlobServerResource();
+
 	@Test
 	public void testTaskManagerProcessFailure() throws Exception {
 
@@ -98,15 +83,24 @@ public void testTaskManagerProcessFailure() throws Exception {
 		final StringWriter processOutput2 = new StringWriter();
 		final StringWriter processOutput3 = new StringWriter();
 
-		ActorSystem jmActorSystem = null;
-		HighAvailabilityServices highAvailabilityServices = null;
 		Process taskManagerProcess1 = null;
 		Process taskManagerProcess2 = null;
 		Process taskManagerProcess3 = null;
 
 		File coordinateTempDir = null;
 
-		try {
+		final int jobManagerPort = NetUtils.getAvailablePort();
+		final int restPort = NetUtils.getAvailablePort();
+
+		Configuration jmConfig = new Configuration();
+		jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+		jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+		jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+		jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 500L);
+		jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L);
+		jmConfig.setInteger(RestOptions.PORT, restPort);
+
+		try (final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig)) {
 			// check that we run this test only if the java command
 			// is available on this machine
 			String javaCommand = getJavaCommandPath();
@@ -123,37 +117,7 @@ public void testTaskManagerProcessFailure() throws Exception {
 			// coordination between the processes goes through a directory
 			coordinateTempDir = temporaryFolder.newFolder();
 
-			// find a free port to start the JobManager
-			final int jobManagerPort = NetUtils.getAvailablePort();
-
-			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
-
-			Configuration jmConfig = new Configuration();
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms");
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s");
-			jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
-			jmConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "10 s");
-			jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
-			jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1());
-			jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
-
-			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-				jmConfig,
-				TestingUtils.defaultExecutor(),
-				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(
-				jmConfig,
-				jmActorSystem,
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				highAvailabilityServices,
-				NoOpMetricRegistry.INSTANCE,
-				Option.empty(),
-				JobManager.class,
-				MemoryArchivist.class)._1();
+			clusterEntrypoint.startCluster();
 
 			// the TaskManager java command
 			String[] command = new String[] {
@@ -162,7 +126,7 @@ public void testTaskManagerProcessFailure() throws Exception {
 					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
 					"-Xms80m", "-Xmx80m",
 					"-classpath", getCurrentClasspath(),
-					TaskManagerProcessEntryPoint.class.getName(),
+					TaskExecutorProcessEntryPoint.class.getName(),
 					String.valueOf(jobManagerPort)
 			};
 
@@ -172,10 +136,6 @@ public void testTaskManagerProcessFailure() throws Exception {
 			taskManagerProcess2 = new ProcessBuilder(command).start();
 			new CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
 
-			// we wait for the JobManager to have the two TaskManagers available
-			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 2, 120000);
-
 			// the program will set a marker file in each of its parallel tasks once they are ready, so that
 			// this coordinating code is aware of this.
 			// the program will very slowly consume elements until the marker file (later created by the
@@ -188,7 +148,7 @@ public void testTaskManagerProcessFailure() throws Exception {
 				@Override
 				public void run() {
 					try {
-						testTaskManagerFailure(jobManagerPort, coordinateDirClosure);
+						testTaskManagerFailure(restPort, coordinateDirClosure);
 					}
 					catch (Throwable t) {
 						t.printStackTrace();
@@ -219,10 +179,6 @@ public void run() {
 			taskManagerProcess3 = new ProcessBuilder(command).start();
 			new CommonTestUtils.PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
 
-			// we wait for the third TaskManager to register
-			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 3, 120000);
-
 			// kill one of the previous TaskManagers, triggering a failure and recovery
 			taskManagerProcess1.destroy();
 			taskManagerProcess1 = null;
@@ -269,13 +225,6 @@ public void run() {
 			if (taskManagerProcess3 != null) {
 				taskManagerProcess3.destroy();
 			}
-			if (jmActorSystem != null) {
-				jmActorSystem.shutdown();
-			}
-
-			if (highAvailabilityServices != null) {
-				highAvailabilityServices.closeAndCleanupAllData();
-			}
 		}
 	}
 
@@ -289,44 +238,6 @@ public void run() {
 	 */
 	public abstract void testTaskManagerFailure(int jobManagerPort, File coordinateDir) throws Exception;
 
-	protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelayMillis)
-			throws Exception {
-		final long pollInterval = 10_000_000; // 10 ms = 10,000,000 nanos
-		final long deadline = System.nanoTime() + maxDelayMillis * 1_000_000;
-
-		long time;
-
-		while ((time = System.nanoTime()) < deadline) {
-			FiniteDuration timeout = new FiniteDuration(pollInterval, TimeUnit.NANOSECONDS);
-
-			try {
-				Future<?> result = Patterns.ask(jobManager,
-						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-						new Timeout(timeout));
-
-				int numTMs = (Integer) Await.result(result, timeout);
-
-				if (numTMs == numExpected) {
-					return;
-				}
-			}
-			catch (TimeoutException e) {
-				// ignore and retry
-			}
-			catch (ClassCastException e) {
-				fail("Wrong response: " + e.getMessage());
-			}
-
-			long timePassed = System.nanoTime() - time;
-			long remainingMillis = (pollInterval - timePassed) / 1_000_000;
-			if (remainingMillis > 0) {
-				Thread.sleep(remainingMillis);
-			}
-		}
-
-		fail("The TaskManagers did not register within the expected time (" + maxDelayMillis + "msecs)");
-	}
-
 	protected static void printProcessLog(String processName, String log) {
 		if (log == null || log.length() == 0) {
 			return;
@@ -387,11 +298,11 @@ protected static boolean waitForMarkerFiles(File basedir, String prefix, int num
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager.
+	 * The entry point for the TaskExecutor JVM. Simply configures and runs a TaskExecutor.
 	 */
-	public static class TaskManagerProcessEntryPoint {
+	public static class TaskExecutorProcessEntryPoint {
 
-		private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
+		private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorProcessEntryPoint.class);
 
 		public static void main(String[] args) {
 			try {
@@ -405,14 +316,7 @@ public static void main(String[] args) {
 				cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 				cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
 
-				TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg,
-					ResourceID.generate(), TaskManager.class);
-
-				// wait forever
-				Object lock = new Object();
-				synchronized (lock) {
-					lock.wait();
-				}
+				TaskManagerRunner.runTaskManager(cfg, ResourceID.generate());
 			}
 			catch (Throwable t) {
 				LOG.error("Failed to start TaskManager process", t);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index d3accffcbf8..9e9ce076197 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -22,36 +22,38 @@
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.DispatcherProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import org.apache.commons.io.FileUtils;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -59,12 +61,15 @@
 import org.junit.runners.Parameterized;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -90,23 +95,28 @@
 @RunWith(Parameterized.class)
 public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 
-	private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+	private static ZooKeeperTestEnvironment zooKeeper;
 
 	private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
 
 	@Rule
 	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (ZooKeeper != null) {
-			ZooKeeper.shutdown();
-		}
+	@BeforeClass
+	public static void setup() {
+		zooKeeper = new ZooKeeperTestEnvironment(1);
 	}
 
 	@Before
 	public void cleanUp() throws Exception {
-		ZooKeeper.deleteAll();
+		zooKeeper.deleteAll();
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (zooKeeper != null) {
+			zooKeeper.shutdown();
+		}
 	}
 
 	protected static final String READY_MARKER_FILE_PREFIX = "ready_";
@@ -141,7 +151,6 @@ public JobManagerHAProcessFailureBatchRecoveryITCase(ExecutionMode executionMode
 	 */
 	private void testJobManagerFailure(String zkQuorum, final File coordinateDir, final File zookeeperStoragePath) throws Exception {
 		Configuration config = new Configuration();
-		config.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum);
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, zookeeperStoragePath.getAbsolutePath());
@@ -149,7 +158,7 @@ private void testJobManagerFailure(String zkQuorum, final File coordinateDir, fi
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"leader", 1, config);
 		env.setParallelism(PARALLELISM);
-		env.setNumberOfExecutionRetries(1);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
 		env.getConfig().setExecutionMode(executionMode);
 		env.getConfig().disableSysoutLogging();
 
@@ -212,7 +221,8 @@ public void flatMap(Long value, Collector<Long> out) throws Exception {
 	}
 
 	@Test
-	public void testJobManagerProcessFailure() throws Exception {
+	public void testDispatcherProcessFailure() throws Exception {
+		final Time timeout = Time.seconds(30L);
 		final File zookeeperStoragePath = temporaryFolder.newFolder();
 
 		// Config
@@ -222,15 +232,11 @@ public void testJobManagerProcessFailure() throws Exception {
 
 		assertEquals(PARALLELISM, numberOfTaskManagers * numberOfSlotsPerTaskManager);
 
-		// Setup
-		// Test actor system
-		ActorSystem testActorSystem;
-
 		// Job managers
-		final JobManagerProcess[] jmProcess = new JobManagerProcess[numberOfJobManagers];
+		final DispatcherProcess[] dispatcherProcesses = new DispatcherProcess[numberOfJobManagers];
 
 		// Task managers
-		final ActorSystem[] tmActorSystem = new ActorSystem[numberOfTaskManagers];
+		TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numberOfTaskManagers];
 
 		HighAvailabilityServices highAvailabilityServices = null;
 
@@ -239,24 +245,25 @@ public void testJobManagerProcessFailure() throws Exception {
 		// Coordination between the processes goes through a directory
 		File coordinateTempDir = null;
 
+		// Cluster config
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
+			zooKeeper.getConnectString(), zookeeperStoragePath.getPath());
+		// Task manager configuration
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+
+		final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
+
 		try {
 			final Deadline deadline = TestTimeOut.fromNow();
 
 			// Coordination directory
 			coordinateTempDir = temporaryFolder.newFolder();
 
-			// Job Managers
-			Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
-					ZooKeeper.getConnectString(), zookeeperStoragePath.getPath());
-
 			// Start first process
-			jmProcess[0] = new JobManagerProcess(0, config);
-			jmProcess[0].startProcess();
-
-			// Task manager configuration
-			config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
-			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+			dispatcherProcesses[0] = new DispatcherProcess(0, config);
+			dispatcherProcesses[0].startProcess();
 
 			highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
 				config,
@@ -264,27 +271,13 @@ public void testJobManagerProcessFailure() throws Exception {
 
 			// Start the task manager process
 			for (int i = 0; i < numberOfTaskManagers; i++) {
-				tmActorSystem[i] = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-				TaskManager.startTaskManagerComponentsAndActor(
-					config,
-					ResourceID.generate(),
-					tmActorSystem[i],
-					highAvailabilityServices,
-					NoOpMetricRegistry.INSTANCE,
-					"localhost",
-					Option.<String>empty(),
-					false,
-					TaskManager.class);
+				taskManagerRunners[i] = new TaskManagerRunner(config, ResourceID.generate());
+				taskManagerRunners[i].start();
 			}
 
-			// Test actor system
-			testActorSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-
-			jmProcess[0].getActorRef(testActorSystem, deadline.timeLeft());
-
 			// Leader listener
 			TestingListener leaderListener = new TestingListener();
-			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
+			leaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
 			leaderRetrievalService.start(leaderListener);
 
 			// Initial submission
@@ -293,13 +286,14 @@ public void testJobManagerProcessFailure() throws Exception {
 			String leaderAddress = leaderListener.getAddress();
 			UUID leaderId = leaderListener.getLeaderSessionID();
 
-			// Get the leader ref
-			ActorRef leaderRef = AkkaUtils.getActorRef(leaderAddress, testActorSystem, deadline.timeLeft());
-			ActorGateway leaderGateway = new AkkaActorGateway(leaderRef, leaderId);
+			final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = rpcService.connect(
+				leaderAddress,
+				DispatcherId.fromUuid(leaderId),
+				DispatcherGateway.class);
+			final DispatcherGateway dispatcherGateway = dispatcherGatewayFuture.get();
 
 			// Wait for all task managers to connect to the leading job manager
-			JobManagerActorTestUtils.waitForTaskManagers(numberOfTaskManagers, leaderGateway,
-					deadline.timeLeft());
+			waitForTaskManagers(numberOfTaskManagers, dispatcherGateway, deadline.timeLeft());
 
 			final File coordinateDirClosure = coordinateTempDir;
 			final Throwable[] errorRef = new Throwable[1];
@@ -309,7 +303,7 @@ public void testJobManagerProcessFailure() throws Exception {
 				@Override
 				public void run() {
 					try {
-						testJobManagerFailure(ZooKeeper.getConnectString(), coordinateDirClosure, zookeeperStoragePath);
+						testJobManagerFailure(zooKeeper.getConnectString(), coordinateDirClosure, zookeeperStoragePath);
 					}
 					catch (Throwable t) {
 						t.printStackTrace();
@@ -326,12 +320,10 @@ public void run() {
 					READY_MARKER_FILE_PREFIX, PARALLELISM, deadline.timeLeft().toMillis());
 
 			// Kill one of the job managers and trigger recovery
-			jmProcess[0].destroy();
+			dispatcherProcesses[0].destroy();
 
-			jmProcess[1] = new JobManagerProcess(1, config);
-			jmProcess[1].startProcess();
-
-			jmProcess[1].getActorRef(testActorSystem, deadline.timeLeft());
+			dispatcherProcesses[1] = new DispatcherProcess(1, config);
+			dispatcherProcesses[1].startProcess();
 
 			// we create the marker file which signals the program functions tasks that they can complete
 			AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
@@ -358,7 +350,7 @@ public void run() {
 			// for Travis and the root problem is not shown)
 			t.printStackTrace();
 
-			for (JobManagerProcess p : jmProcess) {
+			for (DispatcherProcess p : dispatcherProcesses) {
 				if (p != null) {
 					p.printProcessLog();
 				}
@@ -368,8 +360,8 @@ public void run() {
 		}
 		finally {
 			for (int i = 0; i < numberOfTaskManagers; i++) {
-				if (tmActorSystem[i] != null) {
-					tmActorSystem[i].shutdown();
+				if (taskManagerRunners[i] != null) {
+					taskManagerRunners[i].close();
 				}
 			}
 
@@ -377,7 +369,7 @@ public void run() {
 				leaderRetrievalService.stop();
 			}
 
-			for (JobManagerProcess jmProces : jmProcess) {
+			for (DispatcherProcess jmProces : dispatcherProcesses) {
 				if (jmProces != null) {
 					jmProces.destroy();
 				}
@@ -387,6 +379,8 @@ public void run() {
 				highAvailabilityServices.closeAndCleanupAllData();
 			}
 
+			RpcUtils.terminateRpcService(rpcService, timeout);
+
 			// Delete coordination directory
 			if (coordinateTempDir != null) {
 				try {
@@ -398,4 +392,14 @@ public void run() {
 		}
 	}
 
+	private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws ExecutionException, InterruptedException {
+		FutureUtils.retrySuccesfulWithDelay(
+			() -> dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())),
+			Time.milliseconds(50L),
+			org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())),
+			clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= numberOfTaskManagers,
+			new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor()))
+			.get();
+	}
+
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index b85a410e3a6..afca8f12100 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -21,51 +21,58 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
+import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.NetUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.CheckedSupplier;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.StringWriter;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * This test makes sure that jobs are canceled properly in cases where
@@ -74,15 +81,36 @@
 @SuppressWarnings("serial")
 public class ProcessFailureCancelingITCase extends TestLogger {
 
+	@Rule
+	public final BlobServerResource blobServerResource = new BlobServerResource();
+
 	@Test
 	public void testCancelingOnProcessFailure() throws Exception {
 		final StringWriter processOutput = new StringWriter();
+		final Time timeout = Time.minutes(2L);
 
-		ActorSystem jmActorSystem = null;
+		RestClusterClient<String> clusterClient = null;
 		Process taskManagerProcess = null;
-		HighAvailabilityServices highAvailabilityServices = null;
+		final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+
+		Configuration jmConfig = new Configuration();
+		jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+		jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+		jmConfig.setInteger(RestOptions.PORT, 0);
+
+		final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, jmConfig);
+		final int jobManagerPort = rpcService.getPort();
+		jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+
+		final SessionDispatcherResourceManagerComponentFactory resourceManagerComponentFactory = new SessionDispatcherResourceManagerComponentFactory(
+			StandaloneResourceManagerFactory.INSTANCE);
+		DispatcherResourceManagerComponent<?> dispatcherResourceManagerComponent = null;
+
+		try (final HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+				jmConfig,
+				TestingUtils.defaultExecutor(),
+				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION)) {
 
-		try {
 			// check that we run this test only if the java command
 			// is available on this machine
 			String javaCommand = getJavaCommandPath();
@@ -96,36 +124,22 @@ public void testCancelingOnProcessFailure() throws Exception {
 			tempLogFile.deleteOnExit();
 			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
 
-			// find a free port to start the JobManager
-			final int jobManagerPort = NetUtils.getAvailablePort();
-
-			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
-
-			Configuration jmConfig = new Configuration();
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "5 s");
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2000 s");
-			jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 10);
-			jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
-			jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1());
-			jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
-
-			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-				jmConfig,
-				TestingUtils.defaultExecutor(),
-				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(
+			dispatcherResourceManagerComponent = resourceManagerComponentFactory.create(
 				jmConfig,
-				jmActorSystem,
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				highAvailabilityServices,
+				rpcService,
+				haServices,
+				blobServerResource.getBlobServer(),
+				new HeartbeatServices(100L, 1000L),
 				NoOpMetricRegistry.INSTANCE,
-				Option.empty(),
-				JobManager.class,
-				MemoryArchivist.class)._1();
+				new MemoryArchivedExecutionGraphStore(),
+				fatalErrorHandler);
+
+			// update the rest ports
+			final int restPort = dispatcherResourceManagerComponent
+				.getWebMonitorEndpoint()
+				.getServerAddress()
+				.getPort();
+			jmConfig.setInteger(RestOptions.PORT, restPort);
 
 			// the TaskManager java command
 			String[] command = new String[] {
@@ -134,7 +148,7 @@ public void testCancelingOnProcessFailure() throws Exception {
 					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
 					"-Xms80m", "-Xmx80m",
 					"-classpath", getCurrentClasspath(),
-					AbstractTaskManagerProcessFailureRecoveryTest.TaskManagerProcessEntryPoint.class.getName(),
+					AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName(),
 					String.valueOf(jobManagerPort)
 			};
 
@@ -142,21 +156,14 @@ public void testCancelingOnProcessFailure() throws Exception {
 			taskManagerProcess = new ProcessBuilder(command).start();
 			new CommonTestUtils.PipeForwarder(taskManagerProcess.getErrorStream(), processOutput);
 
-			// we wait for the JobManager to have the two TaskManagers available
-			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 1, 120000);
-
 			final Throwable[] errorRef = new Throwable[1];
 
-			final Configuration configuration = new Configuration();
-			configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
-
 			// start the test program, which infinitely blocks
 			Runnable programRunner = new Runnable() {
 				@Override
 				public void run() {
 					try {
-						ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration);
+						ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", restPort, new Configuration());
 						env.setParallelism(2);
 						env.setRestartStrategy(RestartStrategies.noRestart());
 						env.getConfig().disableSysoutLogging();
@@ -187,15 +194,30 @@ public Long map(Long value) throws Exception {
 			Thread programThread = new Thread(programRunner);
 
 			// kill the TaskManager
+			programThread.start();
+
+			final LeaderConnectionInfo leaderConnectionInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(), Time.seconds(10L));
+
+			final DispatcherGateway dispatcherGateway = rpcService.connect(
+				leaderConnectionInfo.getAddress(),
+				DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()),
+				DispatcherGateway.class).get();
+
+			waitUntilAllSlotsAreUsed(dispatcherGateway, timeout);
+
+			clusterClient = new RestClusterClient<>(jmConfig, "standalone");
+
+			final Collection<JobID> jobIds = waitForRunningJobs(clusterClient, timeout);
+
+			assertThat(jobIds, hasSize(1));
+			final JobID jobId = jobIds.iterator().next();
+
+			// kill the TaskManager after the job started to run
 			taskManagerProcess.destroy();
 			taskManagerProcess = null;
 
-			// immediately submit the job. this should hit the case
-			// where the JobManager still thinks it has the TaskManager and tries to send it tasks
-			programThread.start();
-
 			// try to cancel the job
-			cancelRunningJob(jmActor);
+			clusterClient.cancel(jobId);
 
 			// we should see a failure within reasonable time (10s is the ask timeout).
 			// since the CI environment is often slow, we conservatively give it up to 2 minutes,
@@ -223,88 +245,42 @@ public Long map(Long value) throws Exception {
 			if (taskManagerProcess != null) {
 				taskManagerProcess.destroy();
 			}
-			if (jmActorSystem != null) {
-				jmActorSystem.shutdown();
-			}
-
-			if (highAvailabilityServices != null) {
-				highAvailabilityServices.closeAndCleanupAllData();
-			}
-		}
-	}
-
-	private void cancelRunningJob(ActorRef jobManager) throws Exception {
-		final FiniteDuration askTimeout = new FiniteDuration(10, TimeUnit.SECONDS);
-
-		// try at most for 30 seconds
-		final long deadline = System.currentTimeMillis() + 30000;
-
-		JobID jobId = null;
-
-		do {
-			Future<Object> response = Patterns.ask(jobManager,
-					JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(askTimeout));
-
-			Object result;
-			try {
-				result = Await.result(response, askTimeout);
+			if (clusterClient != null) {
+				clusterClient.shutdown();
 			}
-			catch (Exception e) {
-				throw new Exception("Could not retrieve running jobs from the JobManager.", e);
+			if (dispatcherResourceManagerComponent != null) {
+				dispatcherResourceManagerComponent.close();
 			}
 
-			if (result instanceof JobManagerMessages.RunningJobsStatus) {
-
-				List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+			fatalErrorHandler.rethrowError();
 
-				if (jobs.size() == 1) {
-					jobId = jobs.get(0).getJobId();
-					break;
-				}
-			}
-		}
-		while (System.currentTimeMillis() < deadline);
-
-		if (jobId == null) {
-			// we never found it running, must have failed already
-			return;
+			RpcUtils.terminateRpcService(rpcService, Time.seconds(10L));
 		}
-
-		// tell the JobManager to cancel the job
-		jobManager.tell(
-			new JobManagerMessages.LeaderSessionMessage(
-				HighAvailabilityServices.DEFAULT_LEADER_ID,
-				new JobManagerMessages.CancelJob(jobId)),
-			ActorRef.noSender());
 	}
 
-	private void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
-			throws Exception {
-		final long deadline = System.currentTimeMillis() + maxDelay;
-		while (true) {
-			long remaining = deadline - System.currentTimeMillis();
-			if (remaining <= 0) {
-				fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
-			}
-
-			FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
+	private void waitUntilAllSlotsAreUsed(DispatcherGateway dispatcherGateway, Time timeout) throws ExecutionException, InterruptedException {
+		FutureUtils.retrySuccesfulWithDelay(
+			() -> dispatcherGateway.requestClusterOverview(timeout),
+			Time.milliseconds(50L),
+			Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
+			clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= 1 &&
+				clusterOverview.getNumSlotsAvailable() == 0 &&
+				clusterOverview.getNumSlotsTotal() == 2,
+			TestingUtils.defaultScheduledExecutor())
+			.get();
+	}
 
-			try {
-				Future<?> result = Patterns.ask(jobManager,
-						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-						new Timeout(timeout));
-				Integer numTMs = (Integer) Await.result(result, timeout);
-				if (numTMs == numExpected) {
-					break;
-				}
-			}
-			catch (TimeoutException e) {
-				// ignore and retry
-			}
-			catch (ClassCastException e) {
-				fail("Wrong response: " + e.getMessage());
-			}
-		}
+	private Collection<JobID> waitForRunningJobs(ClusterClient<?> clusterClient, Time timeout) throws ExecutionException, InterruptedException {
+		return FutureUtils.retrySuccesfulWithDelay(
+				CheckedSupplier.unchecked(clusterClient::listJobs),
+				Time.milliseconds(50L),
+				Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
+				jobs -> !jobs.isEmpty(),
+				TestingUtils.defaultScheduledExecutor())
+			.get()
+			.stream()
+			.map(JobStatusMessage::getJobId)
+			.collect(Collectors.toList());
 	}
 
 	private void printProcessLog(String processName, String log) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
index 7dc6f0cf1d6..4815c4938f7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
@@ -25,7 +25,6 @@
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -68,10 +67,9 @@ public TaskManagerProcessFailureBatchRecoveryITCase(ExecutionMode executionMode)
 	public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir) throws Exception {
 
 		final Configuration configuration = new Configuration();
-		configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration);
 		env.setParallelism(PARALLELISM);
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000));
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
 		env.getConfig().setExecutionMode(executionMode);
 		env.getConfig().disableSysoutLogging();
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
index 766a7993c45..fbf6b5b71e3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
@@ -23,7 +23,6 @@
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -67,7 +66,6 @@ public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir)
 		final File tempCheckpointDir = tempFolder.newFolder();
 
 		final Configuration configuration = new Configuration();
-		configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
 			"localhost",
 			jobManagerPort,
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
index d6a029f21b9..75204d9fa21 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
@@ -23,7 +23,6 @@
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.FlinkException;
@@ -40,8 +39,6 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
@@ -53,7 +50,6 @@
  *
  * @see org.apache.flink.client.cli.CliFrontendRunTest
  */
-@RunWith(Parameterized.class)
 public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
 
 	@Rule
@@ -74,7 +70,6 @@ public void testRun() throws Exception {
 		String testJarPath = getTestJarPath("BatchWordCount.jar").getAbsolutePath();
 
 		Configuration configuration = new Configuration();
-		configuration.setString(CoreOptions.MODE, mode);
 		configuration.setString(JobManagerOptions.ADDRESS, "localhost");
 		configuration.setInteger(JobManagerOptions.PORT, 8081);
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 65f813e52f7..7ba21502e67 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -41,7 +41,6 @@
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
-import org.apache.flink.yarn.LegacyYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -163,8 +162,6 @@
 
 	private final String yarnPropertiesFileLocation;
 
-	private final boolean isNewMode;
-
 	private final YarnConfiguration yarnConfiguration;
 
 	public FlinkYarnSessionCli(
@@ -185,8 +182,6 @@ public FlinkYarnSessionCli(
 		this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
 		this.acceptInteractiveInput = acceptInteractiveInput;
 
-		this.isNewMode = configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE);
-
 		// Create the command line options
 
 		query = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
@@ -375,10 +370,8 @@ private AbstractYarnClusterDescriptor createDescriptor(
 	}
 
 	private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) {
-		if (!isNewMode && !cmd.hasOption(container.getOpt())) { // number of containers is required option!
-			LOG.error("Missing required argument {}", container.getOpt());
-			printUsage();
-			throw new IllegalArgumentException("Missing required argument " + container.getOpt());
+		if (cmd.hasOption(container.getOpt())) { // number of containers is required option!
+			LOG.info("The argument {} is deprecated in will be ignored.", container.getOpt());
 		}
 
 		// TODO: The number of task manager should be deprecated soon
@@ -989,20 +982,11 @@ private AbstractYarnClusterDescriptor getClusterDescriptor(
 		yarnClient.init(yarnConfiguration);
 		yarnClient.start();
 
-		if (isNewMode) {
-			return new YarnClusterDescriptor(
-				configuration,
-				yarnConfiguration,
-				configurationDirectory,
-				yarnClient,
-				false);
-		} else {
-			return new LegacyYarnClusterDescriptor(
-				configuration,
-				yarnConfiguration,
-				configurationDirectory,
-				yarnClient,
-				false);
-		}
+		return new YarnClusterDescriptor(
+			configuration,
+			yarnConfiguration,
+			configurationDirectory,
+			yarnClient,
+			false);
 	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services