You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/17 15:53:31 UTC
[flink] 11/16: [FLINK-XXXXX] Update the DeploymentTarger setting
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3b35e990962d1c1284a0b321caae41c20c46ff60
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sat Nov 16 22:24:26 2019 +0100
[FLINK-XXXXX] Update the DeploymentTarger setting
---
.../flink/client/cli/AbstractCustomCommandLine.java | 3 ++-
.../flink/client/deployment/StandaloneClientFactory.java | 3 ++-
.../client/deployment/ClusterClientServiceLoaderTest.java | 3 ++-
.../org/apache/flink/yarn/YarnClusterClientFactory.java | 6 +++++-
.../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java | 6 +++++-
.../apache/flink/yarn/YarnClusterClientFactoryTest.java | 15 +++++++++++++--
6 files changed, 29 insertions(+), 7 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
index f32d4f8..b8431cf 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
@@ -18,6 +18,7 @@
package org.apache.flink.client.cli;
+import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -73,7 +74,7 @@ public abstract class AbstractCustomCommandLine implements CustomCommandLine {
@Override
public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
final Configuration resultingConfiguration = new Configuration(configuration);
- resultingConfiguration.setString(DeploymentOptions.TARGET, getId());
+ resultingConfiguration.setString(DeploymentOptions.TARGET, StandaloneSessionClusterExecutor.NAME);
if (commandLine.hasOption(addressOption.getOpt())) {
String addressWithPort = commandLine.getOptionValue(addressOption.getOpt());
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
index b10204b..9597ff8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
@@ -18,6 +18,7 @@
package org.apache.flink.client.deployment;
+import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
@@ -35,7 +36,7 @@ public class StandaloneClientFactory implements ClusterClientFactory<StandaloneC
@Override
public boolean isCompatibleWith(Configuration configuration) {
checkNotNull(configuration);
- return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
+ return StandaloneSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
}
@Override
diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
index a084021..b7a9953 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.client.deployment;
+import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
@@ -57,7 +58,7 @@ public class ClusterClientServiceLoaderTest {
@Test
public void testStandaloneClusterClientFactoryDiscovery() {
final Configuration config = new Configuration();
- config.setString(DeploymentOptions.TARGET, StandaloneClientFactory.ID);
+ config.setString(DeploymentOptions.TARGET, StandaloneSessionClusterExecutor.NAME);
ClusterClientFactory<StandaloneClusterId> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
assertTrue(factory instanceof StandaloneClientFactory);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index 7605470..ad391d8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -45,7 +47,9 @@ public class YarnClusterClientFactory implements ClusterClientFactory<Applicatio
@Override
public boolean isCompatibleWith(Configuration configuration) {
checkNotNull(configuration);
- return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
+ final String deploymentTarget = configuration.getString(DeploymentOptions.TARGET);
+ return YarnJobClusterExecutor.NAME.equalsIgnoreCase(deploymentTarget) ||
+ YarnSessionClusterExecutor.NAME.equalsIgnoreCase(deploymentTarget);
}
@Override
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 3fb8dfa..8c0a391 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -47,6 +47,8 @@ import org.apache.flink.yarn.YarnClusterClientFactory;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
@@ -346,7 +348,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
// we ignore the addressOption because it can only contain "yarn-cluster"
final Configuration effectiveConfiguration = new Configuration(configuration);
- effectiveConfiguration.setString(DeploymentOptions.TARGET, getId());
applyDescriptorOptionToConfig(commandLine, effectiveConfiguration);
@@ -361,6 +362,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace);
effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(applicationId));
+ effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME);
+ } else {
+ effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);
}
if (commandLine.hasOption(jmMemory.getOpt())) {
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
index 931313a..508c11e 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.Test;
@@ -35,9 +37,18 @@ import static org.junit.Assert.assertTrue;
public class YarnClusterClientFactoryTest {
@Test
- public void testYarnClusterClientFactoryDiscovery() {
+ public void testYarnClusterClientFactoryDiscoveryWithPerJobExecutor() {
+ testYarnClusterClientFactoryDiscoveryHelper(YarnJobClusterExecutor.NAME);
+ }
+
+ @Test
+ public void testYarnClusterClientFactoryDiscoveryWithSessionExecutor() {
+ testYarnClusterClientFactoryDiscoveryHelper(YarnSessionClusterExecutor.NAME);
+ }
+
+ private void testYarnClusterClientFactoryDiscoveryHelper(final String targetName) {
final Configuration configuration = new Configuration();
- configuration.setString(DeploymentOptions.TARGET, YarnClusterClientFactory.ID);
+ configuration.setString(DeploymentOptions.TARGET, targetName);
final ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
final ClusterClientFactory<ApplicationId> factory = serviceLoader.getClusterClientFactory(configuration);