You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 14:35:07 UTC
[flink] 08/08: [FLINK-10397] Remove CoreOptions#MODE
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a8434d686473a088a876967dc2fe9b5dee0e3169
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sat Sep 22 23:16:50 2018 +0200
[FLINK-10397] Remove CoreOptions#MODE
Removes the MODE option used to switch between the new and legacy mode.
This closes #6752.
---
docs/_includes/generated/core_configuration.html | 5 --
.../org/apache/flink/client/LocalExecutor.java | 50 ++++++----------
.../org/apache/flink/client/RemoteExecutor.java | 8 +--
.../org/apache/flink/client/cli/CliFrontend.java | 12 +---
.../flink/client/cli/CliFrontendTestBase.java | 26 +-------
.../apache/flink/configuration/CoreOptions.java | 22 -------
.../org/apache/flink/api/scala/FlinkShell.scala | 34 ++++-------
.../apache/flink/api/scala/ScalaShellITCase.scala | 1 -
.../api/environment/RemoteStreamEnvironment.java | 8 +--
.../environment/StreamExecutionEnvironment.java | 9 +--
.../test/operators/RemoteEnvironmentITCase.java | 69 ++++------------------
.../flink/yarn/CliFrontendRunWithYarnTest.java | 5 --
.../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 32 +++-------
13 files changed, 57 insertions(+), 224 deletions(-)
diff --git a/docs/_includes/generated/core_configuration.html b/docs/_includes/generated/core_configuration.html
index 98cca91..4366e8b 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -28,11 +28,6 @@
<td></td>
</tr>
<tr>
- <td><h5>mode</h5></td>
- <td style="word-wrap: break-word;">"new"</td>
- <td>Switch to select the execution mode. Possible values are 'new' and 'legacy'.</td>
- </tr>
- <tr>
<td><h5>parallelism.default</h5></td>
<td style="word-wrap: break-word;">1</td>
<td></td>
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 4e4993a..14d3ee5 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -37,7 +37,6 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.JobExecutorService;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
@@ -125,39 +124,28 @@ public class LocalExecutor extends PlanExecutor {
}
private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
- final JobExecutorService newJobExecutorService;
- if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
+ if (!configuration.contains(RestOptions.PORT)) {
+ configuration.setInteger(RestOptions.PORT, 0);
+ }
- if (!configuration.contains(RestOptions.PORT)) {
- configuration.setInteger(RestOptions.PORT, 0);
- }
+ final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+ .setConfiguration(configuration)
+ .setNumTaskManagers(
+ configuration.getInteger(
+ ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
+ ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
+ .setRpcServiceSharing(RpcServiceSharing.SHARED)
+ .setNumSlotsPerTaskManager(
+ configuration.getInteger(
+ TaskManagerOptions.NUM_TASK_SLOTS, 1))
+ .build();
- final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
- .setConfiguration(configuration)
- .setNumTaskManagers(
- configuration.getInteger(
- ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
- ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
- .setRpcServiceSharing(RpcServiceSharing.SHARED)
- .setNumSlotsPerTaskManager(
- configuration.getInteger(
- TaskManagerOptions.NUM_TASK_SLOTS, 1))
- .build();
-
- final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
- miniCluster.start();
-
- configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
-
- newJobExecutorService = miniCluster;
- } else {
- final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true);
- localFlinkMiniCluster.start();
-
- newJobExecutorService = localFlinkMiniCluster;
- }
+ final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
+ miniCluster.start();
+
+ configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
- return newJobExecutorService;
+ return miniCluster;
}
@Override
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 0a2f1b4..a4424eb 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -24,10 +24,8 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.optimizer.DataStatistics;
@@ -151,11 +149,7 @@ public class RemoteExecutor extends PlanExecutor {
public void start() throws Exception {
synchronized (lock) {
if (client == null) {
- if (CoreOptions.LEGACY_MODE.equals(clientConfiguration.getString(CoreOptions.MODE))) {
- client = new StandaloneClusterClient(clientConfiguration);
- } else {
- client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
- }
+ client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
}
else {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index c7e6344..c7c664d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -123,8 +123,6 @@ public class CliFrontend {
private final int defaultParallelism;
- private final boolean isNewMode;
-
public CliFrontend(
Configuration configuration,
List<CustomCommandLine<?>> customCommandLines) throws Exception {
@@ -147,8 +145,6 @@ public class CliFrontend {
this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration);
this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
-
- this.isNewMode = CoreOptions.NEW_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE));
}
// --------------------------------------------------------------------------------------------
@@ -233,7 +229,7 @@ public class CliFrontend {
final ClusterClient<T> client;
// directly deploy the job if the cluster is started in job mode and detached
- if (isNewMode && clusterId == null && runOptions.getDetachedMode()) {
+ if (clusterId == null && runOptions.getDetachedMode()) {
int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
@@ -1200,11 +1196,7 @@ public class CliFrontend {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
- if (configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE)) {
- customCommandLines.add(new DefaultCLI(configuration));
- } else {
- customCommandLines.add(new LegacyCLI(configuration));
- }
+ customCommandLines.add(new DefaultCLI(configuration));
return customCommandLines;
}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
index 3c24376..8ff426c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
@@ -19,43 +19,21 @@
package org.apache.flink.client.cli;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.util.TestLogger;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
/**
- * Base test class for {@link CliFrontend} tests that wraps the new vs. legacy mode.
+ * Base test class for {@link CliFrontend} tests.
*/
-@RunWith(Parameterized.class)
public abstract class CliFrontendTestBase extends TestLogger {
- @Parameterized.Parameter
- public String mode;
-
- @Parameterized.Parameters(name = "Mode = {0}")
- public static List<String> parameters() {
- return Arrays.asList(CoreOptions.LEGACY_MODE, CoreOptions.NEW_MODE);
- }
protected Configuration getConfiguration() {
final Configuration configuration = GlobalConfiguration
.loadConfiguration(CliFrontendTestUtils.getConfigDir());
- configuration.setString(CoreOptions.MODE, mode);
return configuration;
}
static AbstractCustomCommandLine<?> getCli(Configuration configuration) {
- switch (configuration.getString(CoreOptions.MODE)) {
- case CoreOptions.LEGACY_MODE:
- return new LegacyCLI(configuration);
- case CoreOptions.NEW_MODE:
- return new DefaultCLI(configuration);
- }
- throw new IllegalStateException();
+ return new DefaultCLI(configuration);
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 9ae807e..4c928fe 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -304,26 +304,4 @@ public class CoreOptions {
public static ConfigOption<Long> fileSystemConnectionLimitStreamInactivityTimeout(String scheme) {
return ConfigOptions.key("fs." + scheme + ".limit.stream-timeout").defaultValue(0L);
}
-
- // ------------------------------------------------------------------------
- // Distributed architecture
- // ------------------------------------------------------------------------
-
- /**
- * Constant value for the new execution mode.
- */
- public static final String NEW_MODE = "new";
-
- /**
- * Constant value for the old execution mode.
- */
- public static final String LEGACY_MODE = "legacy";
-
- /**
- * Switch to select the execution mode. Possible values are {@link CoreOptions#NEW_MODE}
- * and {@link CoreOptions#LEGACY_MODE}.
- */
- public static final ConfigOption<String> MODE = key("mode")
- .defaultValue(NEW_MODE)
- .withDescription("Switch to select the execution mode. Possible values are 'new' and 'legacy'.");
}
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index c04e845..d493495 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -139,36 +139,24 @@ object FlinkShell {
}
}
- private type LocalCluster = Either[StandaloneMiniCluster, MiniCluster]
-
def fetchConnectionInfo(
configuration: Configuration,
config: Config
- ): (String, Int, Option[Either[LocalCluster , ClusterClient[_]]]) = {
+ ): (String, Int, Option[Either[MiniCluster , ClusterClient[_]]]) = {
config.executionMode match {
case ExecutionMode.LOCAL => // Local mode
val config = configuration
config.setInteger(JobManagerOptions.PORT, 0)
- val (miniCluster, port) = config.getString(CoreOptions.MODE) match {
- case CoreOptions.LEGACY_MODE => {
- val cluster = new StandaloneMiniCluster(config)
-
- (Left(cluster), cluster.getPort)
- }
- case CoreOptions.NEW_MODE => {
- val miniClusterConfig = new MiniClusterConfiguration.Builder()
- .setConfiguration(config)
- .build()
- val cluster = new MiniCluster(miniClusterConfig)
- cluster.start()
-
- (Right(cluster), cluster.getRestAddress.getPort)
- }
- }
+ val miniClusterConfig = new MiniClusterConfiguration.Builder()
+ .setConfiguration(config)
+ .build()
+ val cluster = new MiniCluster(miniClusterConfig)
+ cluster.start()
+ val port = cluster.getRestAddress.getPort
println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n")
- ("localhost", port, Some(Left(miniCluster)))
+ ("localhost", port, Some(Left(cluster)))
case ExecutionMode.REMOTE => // Remote mode
if (config.host.isEmpty || config.port.isEmpty) {
@@ -211,8 +199,7 @@ object FlinkShell {
val (repl, cluster) = try {
val (host, port, cluster) = fetchConnectionInfo(configuration, config)
val conf = cluster match {
- case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration
- case Some(Left(Right(_))) => configuration
+ case Some(Left(_)) => configuration
case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
case None => configuration
}
@@ -242,8 +229,7 @@ object FlinkShell {
} finally {
repl.closeInterpreter()
cluster match {
- case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close()
- case Some(Left(Right(newMiniCluster))) => newMiniCluster.close()
+ case Some(Left(miniCluster)) => miniCluster.close()
case Some(Right(yarnCluster)) => yarnCluster.shutdown()
case _ =>
}
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 54bb16f..731bbf6 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -319,7 +319,6 @@ object ScalaShellITCase {
@BeforeClass
def beforeAll(): Unit = {
- configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
// set to different than default so not to interfere with ScalaShellLocalStartupITCase
configuration.setInteger(RestOptions.PORT, 8082)
val miniConfig = new MiniClusterConfiguration.Builder()
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 9c36dab..0af6d93 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -24,10 +24,8 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -206,11 +204,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
final ClusterClient<?> client;
try {
- if (CoreOptions.LEGACY_MODE.equals(configuration.getString(CoreOptions.MODE))) {
- client = new StandaloneClusterClient(configuration);
- } else {
- client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment");
- }
+ client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment");
}
catch (Exception e) {
throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b7259de..d4e14f0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -47,7 +47,6 @@ import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PreviewPlanEnvironment;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -1653,13 +1652,9 @@ public abstract class StreamExecutionEnvironment {
public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
final LocalStreamEnvironment currentEnvironment;
- if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
- currentEnvironment = new LocalStreamEnvironment(configuration);
- } else {
- currentEnvironment = new LegacyLocalStreamEnvironment(configuration);
- }
-
+ currentEnvironment = new LocalStreamEnvironment(configuration);
currentEnvironment.setParallelism(parallelism);
+
return currentEnvironment;
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index c2d6341..451108b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -20,37 +20,27 @@ package org.apache.flink.test.operators;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.runtime.minicluster.StandaloneMiniCluster;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
-import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assume.assumeTrue;
/**
* Integration tests for {@link org.apache.flink.api.java.RemoteEnvironment}.
@@ -78,31 +68,22 @@ public class RemoteEnvironmentITCase extends TestLogger {
public static void setupCluster() throws Exception {
configuration = new Configuration();
- if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
- configuration.setInteger(WebOptions.PORT, 0);
- final MiniCluster miniCluster = new MiniCluster(
- new MiniClusterConfiguration.Builder()
- .setConfiguration(configuration)
- .setNumSlotsPerTaskManager(TM_SLOTS)
- .build());
+ configuration.setInteger(WebOptions.PORT, 0);
+ final MiniCluster miniCluster = new MiniCluster(
+ new MiniClusterConfiguration.Builder()
+ .setConfiguration(configuration)
+ .setNumSlotsPerTaskManager(TM_SLOTS)
+ .build());
- miniCluster.start();
+ miniCluster.start();
- final URI uri = miniCluster.getRestAddress();
- hostname = uri.getHost();
- port = uri.getPort();
+ final URI uri = miniCluster.getRestAddress();
+ hostname = uri.getHost();
+ port = uri.getPort();
- configuration.setInteger(WebOptions.PORT, port);
+ configuration.setInteger(WebOptions.PORT, port);
- resource = miniCluster;
- } else {
- configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, TM_SLOTS);
- final StandaloneMiniCluster standaloneMiniCluster = new StandaloneMiniCluster(configuration);
- hostname = standaloneMiniCluster.getHostname();
- port = standaloneMiniCluster.getPort();
-
- resource = standaloneMiniCluster;
- }
+ resource = miniCluster;
}
@AfterClass
@@ -111,32 +92,6 @@ public class RemoteEnvironmentITCase extends TestLogger {
}
/**
- * Ensure that that Akka configuration parameters can be set.
- */
- @Test(expected = FlinkException.class)
- public void testInvalidAkkaConfiguration() throws Throwable {
- assumeTrue(CoreOptions.LEGACY_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE)));
- Configuration config = new Configuration();
- config.setString(AkkaOptions.STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
-
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
- hostname,
- port,
- config
- );
- env.getConfig().disableSysoutLogging();
-
- DataSet<String> result = env.createInput(new TestNonRichInputFormat());
- result.output(new LocalCollectionOutputFormat<>(new ArrayList<String>()));
- try {
- env.execute();
- Assert.fail("Program should not run successfully, cause of invalid akka settings.");
- } catch (ProgramInvocationException ex) {
- throw ex.getCause();
- }
- }
-
- /**
* Ensure that the program parallelism can be set even if the configuration is supplied.
*/
@Test
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
index d6a029f..75204d9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.client.cli.CliFrontendTestUtils;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.FlinkException;
@@ -40,8 +39,6 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import static junit.framework.TestCase.assertTrue;
import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
@@ -53,7 +50,6 @@ import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
*
* @see org.apache.flink.client.cli.CliFrontendRunTest
*/
-@RunWith(Parameterized.class)
public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
@Rule
@@ -74,7 +70,6 @@ public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
String testJarPath = getTestJarPath("BatchWordCount.jar").getAbsolutePath();
Configuration configuration = new Configuration();
- configuration.setString(CoreOptions.MODE, mode);
configuration.setString(JobManagerOptions.ADDRESS, "localhost");
configuration.setInteger(JobManagerOptions.PORT, 8081);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 65f813e..7ba2150 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -41,7 +41,6 @@ import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
-import org.apache.flink.yarn.LegacyYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -163,8 +162,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
private final String yarnPropertiesFileLocation;
- private final boolean isNewMode;
-
private final YarnConfiguration yarnConfiguration;
public FlinkYarnSessionCli(
@@ -185,8 +182,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
this.acceptInteractiveInput = acceptInteractiveInput;
- this.isNewMode = configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE);
-
// Create the command line options
query = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
@@ -375,10 +370,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
}
private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) {
- if (!isNewMode && !cmd.hasOption(container.getOpt())) { // number of containers is required option!
- LOG.error("Missing required argument {}", container.getOpt());
- printUsage();
- throw new IllegalArgumentException("Missing required argument " + container.getOpt());
+ if (cmd.hasOption(container.getOpt())) { // number of containers is required option!
+ LOG.info("The argument {} is deprecated in will be ignored.", container.getOpt());
}
// TODO: The number of task manager should be deprecated soon
@@ -989,20 +982,11 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
yarnClient.init(yarnConfiguration);
yarnClient.start();
- if (isNewMode) {
- return new YarnClusterDescriptor(
- configuration,
- yarnConfiguration,
- configurationDirectory,
- yarnClient,
- false);
- } else {
- return new LegacyYarnClusterDescriptor(
- configuration,
- yarnConfiguration,
- configurationDirectory,
- yarnClient,
- false);
- }
+ return new YarnClusterDescriptor(
+ configuration,
+ yarnConfiguration,
+ configurationDirectory,
+ yarnClient,
+ false);
}
}