You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 14:35:07 UTC

[flink] 08/08: [FLINK-10397] Remove CoreOptions#MODE

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8434d686473a088a876967dc2fe9b5dee0e3169
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sat Sep 22 23:16:50 2018 +0200

    [FLINK-10397] Remove CoreOptions#MODE
    
    Removes the MODE option used to switch between the new and legacy mode.
    
    This closes #6752.
---
 docs/_includes/generated/core_configuration.html   |  5 --
 .../org/apache/flink/client/LocalExecutor.java     | 50 ++++++----------
 .../org/apache/flink/client/RemoteExecutor.java    |  8 +--
 .../org/apache/flink/client/cli/CliFrontend.java   | 12 +---
 .../flink/client/cli/CliFrontendTestBase.java      | 26 +-------
 .../apache/flink/configuration/CoreOptions.java    | 22 -------
 .../org/apache/flink/api/scala/FlinkShell.scala    | 34 ++++-------
 .../apache/flink/api/scala/ScalaShellITCase.scala  |  1 -
 .../api/environment/RemoteStreamEnvironment.java   |  8 +--
 .../environment/StreamExecutionEnvironment.java    |  9 +--
 .../test/operators/RemoteEnvironmentITCase.java    | 69 ++++------------------
 .../flink/yarn/CliFrontendRunWithYarnTest.java     |  5 --
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 32 +++-------
 13 files changed, 57 insertions(+), 224 deletions(-)

diff --git a/docs/_includes/generated/core_configuration.html b/docs/_includes/generated/core_configuration.html
index 98cca91..4366e8b 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -28,11 +28,6 @@
             <td></td>
         </tr>
         <tr>
-            <td><h5>mode</h5></td>
-            <td style="word-wrap: break-word;">"new"</td>
-            <td>Switch to select the execution mode. Possible values are 'new' and 'legacy'.</td>
-        </tr>
-        <tr>
             <td><h5>parallelism.default</h5></td>
             <td style="word-wrap: break-word;">1</td>
             <td></td>
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 4e4993a..14d3ee5 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -37,7 +37,6 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
@@ -125,39 +124,28 @@ public class LocalExecutor extends PlanExecutor {
 	}
 
 	private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
-		final JobExecutorService newJobExecutorService;
-		if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
+		if (!configuration.contains(RestOptions.PORT)) {
+			configuration.setInteger(RestOptions.PORT, 0);
+		}
 
-			if (!configuration.contains(RestOptions.PORT)) {
-				configuration.setInteger(RestOptions.PORT, 0);
-			}
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(configuration)
+			.setNumTaskManagers(
+				configuration.getInteger(
+					ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
+					ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
+			.setRpcServiceSharing(RpcServiceSharing.SHARED)
+			.setNumSlotsPerTaskManager(
+				configuration.getInteger(
+					TaskManagerOptions.NUM_TASK_SLOTS, 1))
+			.build();
 
-			final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
-				.setConfiguration(configuration)
-				.setNumTaskManagers(
-					configuration.getInteger(
-						ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
-						ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
-				.setRpcServiceSharing(RpcServiceSharing.SHARED)
-				.setNumSlotsPerTaskManager(
-					configuration.getInteger(
-						TaskManagerOptions.NUM_TASK_SLOTS, 1))
-				.build();
-
-			final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
-			miniCluster.start();
-
-			configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
-
-			newJobExecutorService = miniCluster;
-		} else {
-			final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true);
-			localFlinkMiniCluster.start();
-
-			newJobExecutorService = localFlinkMiniCluster;
-		}
+		final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
+		miniCluster.start();
+
+		configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
 
-		return newJobExecutorService;
+		return miniCluster;
 	}
 
 	@Override
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 0a2f1b4..a4424eb 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -24,10 +24,8 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.optimizer.DataStatistics;
@@ -151,11 +149,7 @@ public class RemoteExecutor extends PlanExecutor {
 	public void start() throws Exception {
 		synchronized (lock) {
 			if (client == null) {
-				if (CoreOptions.LEGACY_MODE.equals(clientConfiguration.getString(CoreOptions.MODE))) {
-					client = new StandaloneClusterClient(clientConfiguration);
-				} else {
-					client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
-				}
+				client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
 				client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
 			}
 			else {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index c7e6344..c7c664d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -123,8 +123,6 @@ public class CliFrontend {
 
 	private final int defaultParallelism;
 
-	private final boolean isNewMode;
-
 	public CliFrontend(
 			Configuration configuration,
 			List<CustomCommandLine<?>> customCommandLines) throws Exception {
@@ -147,8 +145,6 @@ public class CliFrontend {
 
 		this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration);
 		this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
-
-		this.isNewMode = CoreOptions.NEW_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE));
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -233,7 +229,7 @@ public class CliFrontend {
 			final ClusterClient<T> client;
 
 			// directly deploy the job if the cluster is started in job mode and detached
-			if (isNewMode && clusterId == null && runOptions.getDetachedMode()) {
+			if (clusterId == null && runOptions.getDetachedMode()) {
 				int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
 
 				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
@@ -1200,11 +1196,7 @@ public class CliFrontend {
 			LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
 		}
 
-		if (configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE)) {
-			customCommandLines.add(new DefaultCLI(configuration));
-		} else {
-			customCommandLines.add(new LegacyCLI(configuration));
-		}
+		customCommandLines.add(new DefaultCLI(configuration));
 
 		return customCommandLines;
 	}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
index 3c24376..8ff426c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
@@ -19,43 +19,21 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
 /**
- * Base test class for {@link CliFrontend} tests that wraps the new vs. legacy mode.
+ * Base test class for {@link CliFrontend} tests.
  */
-@RunWith(Parameterized.class)
 public abstract class CliFrontendTestBase extends TestLogger {
-	@Parameterized.Parameter
-	public String mode;
-
-	@Parameterized.Parameters(name = "Mode = {0}")
-	public static List<String> parameters() {
-		return Arrays.asList(CoreOptions.LEGACY_MODE, CoreOptions.NEW_MODE);
-	}
 
 	protected Configuration getConfiguration() {
 		final Configuration configuration = GlobalConfiguration
 			.loadConfiguration(CliFrontendTestUtils.getConfigDir());
-		configuration.setString(CoreOptions.MODE, mode);
 		return configuration;
 	}
 
 	static AbstractCustomCommandLine<?> getCli(Configuration configuration) {
-		switch (configuration.getString(CoreOptions.MODE)) {
-			case CoreOptions.LEGACY_MODE:
-				return new LegacyCLI(configuration);
-			case CoreOptions.NEW_MODE:
-				return new DefaultCLI(configuration);
-		}
-		throw new IllegalStateException();
+		return new DefaultCLI(configuration);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 9ae807e..4c928fe 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -304,26 +304,4 @@ public class CoreOptions {
 	public static ConfigOption<Long> fileSystemConnectionLimitStreamInactivityTimeout(String scheme) {
 		return ConfigOptions.key("fs." + scheme + ".limit.stream-timeout").defaultValue(0L);
 	}
-
-	// ------------------------------------------------------------------------
-	//  Distributed architecture
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Constant value for the new execution mode.
-	 */
-	public static final String NEW_MODE = "new";
-
-	/**
-	 * Constant value for the old execution mode.
-	 */
-	public static final String LEGACY_MODE = "legacy";
-
-	/**
-	 * Switch to select the execution mode. Possible values are {@link CoreOptions#NEW_MODE}
-	 * and {@link CoreOptions#LEGACY_MODE}.
-	 */
-	public static final ConfigOption<String> MODE = key("mode")
-		.defaultValue(NEW_MODE)
-		.withDescription("Switch to select the execution mode. Possible values are 'new' and 'legacy'.");
 }
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index c04e845..d493495 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -139,36 +139,24 @@ object FlinkShell {
     }
   }
 
-  private type LocalCluster = Either[StandaloneMiniCluster, MiniCluster]
-
   def fetchConnectionInfo(
     configuration: Configuration,
     config: Config
-  ): (String, Int, Option[Either[LocalCluster , ClusterClient[_]]]) = {
+  ): (String, Int, Option[Either[MiniCluster , ClusterClient[_]]]) = {
     config.executionMode match {
       case ExecutionMode.LOCAL => // Local mode
         val config = configuration
         config.setInteger(JobManagerOptions.PORT, 0)
 
-        val (miniCluster, port) = config.getString(CoreOptions.MODE) match {
-          case CoreOptions.LEGACY_MODE => {
-            val cluster = new StandaloneMiniCluster(config)
-
-            (Left(cluster), cluster.getPort)
-          }
-          case CoreOptions.NEW_MODE => {
-            val miniClusterConfig = new MiniClusterConfiguration.Builder()
-              .setConfiguration(config)
-              .build()
-            val cluster = new MiniCluster(miniClusterConfig)
-            cluster.start()
-
-            (Right(cluster), cluster.getRestAddress.getPort)
-          }
-        }
+        val miniClusterConfig = new MiniClusterConfiguration.Builder()
+          .setConfiguration(config)
+          .build()
+        val cluster = new MiniCluster(miniClusterConfig)
+        cluster.start()
+        val port = cluster.getRestAddress.getPort
 
         println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n")
-        ("localhost", port, Some(Left(miniCluster)))
+        ("localhost", port, Some(Left(cluster)))
 
       case ExecutionMode.REMOTE => // Remote mode
         if (config.host.isEmpty || config.port.isEmpty) {
@@ -211,8 +199,7 @@ object FlinkShell {
     val (repl, cluster) = try {
       val (host, port, cluster) = fetchConnectionInfo(configuration, config)
       val conf = cluster match {
-        case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration
-        case Some(Left(Right(_))) => configuration
+        case Some(Left(_)) => configuration
         case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
         case None => configuration
       }
@@ -242,8 +229,7 @@ object FlinkShell {
     } finally {
       repl.closeInterpreter()
       cluster match {
-        case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close()
-        case Some(Left(Right(newMiniCluster))) => newMiniCluster.close()
+        case Some(Left(miniCluster)) => miniCluster.close()
         case Some(Right(yarnCluster)) => yarnCluster.shutdown()
         case _ =>
       }
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 54bb16f..731bbf6 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -319,7 +319,6 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-    configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
     // set to different than default so not to interfere with ScalaShellLocalStartupITCase
     configuration.setInteger(RestOptions.PORT, 8082)
     val miniConfig = new MiniClusterConfiguration.Builder()
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 9c36dab..0af6d93 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -24,10 +24,8 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -206,11 +204,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 
 		final ClusterClient<?> client;
 		try {
-			if (CoreOptions.LEGACY_MODE.equals(configuration.getString(CoreOptions.MODE))) {
-				client = new StandaloneClusterClient(configuration);
-			} else {
-				client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment");
-			}
+			client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment");
 		}
 		catch (Exception e) {
 			throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b7259de..d4e14f0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -47,7 +47,6 @@ import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -1653,13 +1652,9 @@ public abstract class StreamExecutionEnvironment {
 	public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
 		final LocalStreamEnvironment currentEnvironment;
 
-		if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
-			currentEnvironment = new LocalStreamEnvironment(configuration);
-		} else {
-			currentEnvironment = new LegacyLocalStreamEnvironment(configuration);
-		}
-
+		currentEnvironment = new LocalStreamEnvironment(configuration);
 		currentEnvironment.setParallelism(parallelism);
+
 		return currentEnvironment;
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index c2d6341..451108b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -20,37 +20,27 @@ package org.apache.flink.test.operators;
 
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.runtime.minicluster.StandaloneMiniCluster;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assume.assumeTrue;
 
 /**
  * Integration tests for {@link org.apache.flink.api.java.RemoteEnvironment}.
@@ -78,31 +68,22 @@ public class RemoteEnvironmentITCase extends TestLogger {
 	public static void setupCluster() throws Exception {
 		configuration = new Configuration();
 
-		if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
-			configuration.setInteger(WebOptions.PORT, 0);
-			final MiniCluster miniCluster = new MiniCluster(
-				new MiniClusterConfiguration.Builder()
-					.setConfiguration(configuration)
-					.setNumSlotsPerTaskManager(TM_SLOTS)
-					.build());
+		configuration.setInteger(WebOptions.PORT, 0);
+		final MiniCluster miniCluster = new MiniCluster(
+			new MiniClusterConfiguration.Builder()
+				.setConfiguration(configuration)
+				.setNumSlotsPerTaskManager(TM_SLOTS)
+				.build());
 
-			miniCluster.start();
+		miniCluster.start();
 
-			final URI uri = miniCluster.getRestAddress();
-			hostname = uri.getHost();
-			port = uri.getPort();
+		final URI uri = miniCluster.getRestAddress();
+		hostname = uri.getHost();
+		port = uri.getPort();
 
-			configuration.setInteger(WebOptions.PORT, port);
+		configuration.setInteger(WebOptions.PORT, port);
 
-			resource = miniCluster;
-		} else {
-			configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, TM_SLOTS);
-			final StandaloneMiniCluster standaloneMiniCluster = new StandaloneMiniCluster(configuration);
-			hostname = standaloneMiniCluster.getHostname();
-			port = standaloneMiniCluster.getPort();
-
-			resource = standaloneMiniCluster;
-		}
+		resource = miniCluster;
 	}
 
 	@AfterClass
@@ -111,32 +92,6 @@ public class RemoteEnvironmentITCase extends TestLogger {
 	}
 
 	/**
-	 * Ensure that that Akka configuration parameters can be set.
-	 */
-	@Test(expected = FlinkException.class)
-	public void testInvalidAkkaConfiguration() throws Throwable {
-		assumeTrue(CoreOptions.LEGACY_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE)));
-		Configuration config = new Configuration();
-		config.setString(AkkaOptions.STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
-
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				hostname,
-				port,
-				config
-		);
-		env.getConfig().disableSysoutLogging();
-
-		DataSet<String> result = env.createInput(new TestNonRichInputFormat());
-		result.output(new LocalCollectionOutputFormat<>(new ArrayList<String>()));
-		try {
-			env.execute();
-			Assert.fail("Program should not run successfully, cause of invalid akka settings.");
-		} catch (ProgramInvocationException ex) {
-			throw ex.getCause();
-		}
-	}
-
-	/**
 	 * Ensure that the program parallelism can be set even if the configuration is supplied.
 	 */
 	@Test
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
index d6a029f..75204d9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.client.cli.CliFrontendTestUtils;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.FlinkException;
@@ -40,8 +39,6 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
@@ -53,7 +50,6 @@ import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
  *
  * @see org.apache.flink.client.cli.CliFrontendRunTest
  */
-@RunWith(Parameterized.class)
 public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
 
 	@Rule
@@ -74,7 +70,6 @@ public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
 		String testJarPath = getTestJarPath("BatchWordCount.jar").getAbsolutePath();
 
 		Configuration configuration = new Configuration();
-		configuration.setString(CoreOptions.MODE, mode);
 		configuration.setString(JobManagerOptions.ADDRESS, "localhost");
 		configuration.setInteger(JobManagerOptions.PORT, 8081);
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 65f813e..7ba2150 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -41,7 +41,6 @@ import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
-import org.apache.flink.yarn.LegacyYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -163,8 +162,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 	private final String yarnPropertiesFileLocation;
 
-	private final boolean isNewMode;
-
 	private final YarnConfiguration yarnConfiguration;
 
 	public FlinkYarnSessionCli(
@@ -185,8 +182,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
 		this.acceptInteractiveInput = acceptInteractiveInput;
 
-		this.isNewMode = configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE);
-
 		// Create the command line options
 
 		query = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
@@ -375,10 +370,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 	}
 
 	private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) {
-		if (!isNewMode && !cmd.hasOption(container.getOpt())) { // number of containers is required option!
-			LOG.error("Missing required argument {}", container.getOpt());
-			printUsage();
-			throw new IllegalArgumentException("Missing required argument " + container.getOpt());
+		if (cmd.hasOption(container.getOpt())) { // number of containers is required option!
+			LOG.info("The argument {} is deprecated in will be ignored.", container.getOpt());
 		}
 
 		// TODO: The number of task manager should be deprecated soon
@@ -989,20 +982,11 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		yarnClient.init(yarnConfiguration);
 		yarnClient.start();
 
-		if (isNewMode) {
-			return new YarnClusterDescriptor(
-				configuration,
-				yarnConfiguration,
-				configurationDirectory,
-				yarnClient,
-				false);
-		} else {
-			return new LegacyYarnClusterDescriptor(
-				configuration,
-				yarnConfiguration,
-				configurationDirectory,
-				yarnClient,
-				false);
-		}
+		return new YarnClusterDescriptor(
+			configuration,
+			yarnConfiguration,
+			configurationDirectory,
+			yarnClient,
+			false);
 	}
 }