You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/19 06:15:03 UTC
[flink] 14/16: [FLINK-XXXXX] Fix ClusterClientFactryDiscovery
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e156e3942e033403ad4b64d97c7289a98e55a081
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Nov 19 06:25:28 2019 +0100
[FLINK-XXXXX] Fix ClusterClientFactryDiscovery
---
.../main/java/org/apache/flink/client/cli/DefaultCLI.java | 5 +++--
.../flink/client/deployment/StandaloneClientFactory.java | 5 ++---
.../client/deployment/ClusterClientServiceLoaderTest.java | 3 ++-
.../org/apache/flink/yarn/YarnClusterClientFactory.java | 8 +++++---
.../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java | 8 +++++---
.../apache/flink/yarn/YarnClusterClientFactoryTest.java | 15 +++++++++++++--
6 files changed, 30 insertions(+), 14 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 397d5dd..1245688 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -18,7 +18,6 @@
package org.apache.flink.client.cli;
-import org.apache.flink.client.deployment.StandaloneClientFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.commons.cli.CommandLine;
@@ -29,6 +28,8 @@ import org.apache.commons.cli.Options;
*/
public class DefaultCLI extends AbstractCustomCommandLine {
+ public static final String ID = "default";
+
public DefaultCLI(Configuration configuration) {
super(configuration);
}
@@ -41,7 +42,7 @@ public class DefaultCLI extends AbstractCustomCommandLine {
@Override
public String getId() {
- return StandaloneClientFactory.ID;
+ return ID;
}
@Override
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
index 647f14f..e5ec6f4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.client.deployment;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
@@ -32,12 +33,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {
- public static final String ID = "default";
-
@Override
public boolean isCompatibleWith(Configuration configuration) {
checkNotNull(configuration);
- return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
+ return StandaloneSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
}
@Override
diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
index a8e34ab..b7a9953 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.client.deployment;
+import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
@@ -57,7 +58,7 @@ public class ClusterClientServiceLoaderTest {
@Test
public void testStandaloneClusterClientFactoryDiscovery() {
final Configuration config = new Configuration();
- config.setString(DeploymentOptions.TARGET, StandaloneClientFactory.ID.toUpperCase());
+ config.setString(DeploymentOptions.TARGET, StandaloneSessionClusterExecutor.NAME);
ClusterClientFactory<StandaloneClusterId> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
assertTrue(factory instanceof StandaloneClientFactory);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index fd23699..1b1d9de 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -42,12 +44,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class YarnClusterClientFactory implements ClusterClientFactory<ApplicationId> {
- public static final String ID = "yarn-cluster";
-
@Override
public boolean isCompatibleWith(Configuration configuration) {
checkNotNull(configuration);
- return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
+ final String deploymentTarget = configuration.getString(DeploymentOptions.TARGET);
+ return YarnJobClusterExecutor.NAME.equalsIgnoreCase(deploymentTarget) ||
+ YarnSessionClusterExecutor.NAME.equalsIgnoreCase(deploymentTarget);
}
@Override
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 388dea0..7ef70dc 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -43,7 +43,6 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.ShutdownHookUtil;
-import org.apache.flink.yarn.YarnClusterClientFactory;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
@@ -105,6 +104,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
private static final long CLIENT_POLLING_INTERVAL_MS = 3000L;
+ /** The id for the CommandLine interface. */
+ private static final String ID = "yarn-cluster";
+
// YARN-session related constants
private static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
private static final String YARN_APPLICATION_ID_KEY = "applicationID";
@@ -319,14 +321,14 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
@Override
public boolean isActive(CommandLine commandLine) {
String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
- boolean yarnJobManager = YarnClusterClientFactory.ID.equals(jobManagerOption);
+ boolean yarnJobManager = ID.equals(jobManagerOption);
boolean yarnAppId = commandLine.hasOption(applicationId.getOpt());
return yarnJobManager || yarnAppId || (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null);
}
@Override
public String getId() {
- return YarnClusterClientFactory.ID;
+ return ID;
}
@Override
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
index 6bdd920..508c11e 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.Test;
@@ -35,9 +37,18 @@ import static org.junit.Assert.assertTrue;
public class YarnClusterClientFactoryTest {
@Test
- public void testYarnClusterClientFactoryDiscovery() {
+ public void testYarnClusterClientFactoryDiscoveryWithPerJobExecutor() {
+ testYarnClusterClientFactoryDiscoveryHelper(YarnJobClusterExecutor.NAME);
+ }
+
+ @Test
+ public void testYarnClusterClientFactoryDiscoveryWithSessionExecutor() {
+ testYarnClusterClientFactoryDiscoveryHelper(YarnSessionClusterExecutor.NAME);
+ }
+
+ private void testYarnClusterClientFactoryDiscoveryHelper(final String targetName) {
final Configuration configuration = new Configuration();
- configuration.setString(DeploymentOptions.TARGET, YarnClusterClientFactory.ID.toUpperCase());
+ configuration.setString(DeploymentOptions.TARGET, targetName);
final ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
final ClusterClientFactory<ApplicationId> factory = serviceLoader.getClusterClientFactory(configuration);