You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/06/04 08:36:23 UTC
[flink] branch release-1.11 updated: [FLINK-17935] Move set
yarn.log-config-file to YarnClusterClientFactory.createClusterDescriptor()
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 32c6590 [FLINK-17935] Move set yarn.log-config-file to YarnClusterClientFactory.createClusterDescriptor()
32c6590 is described below
commit 32c65903b736f204bfe8b0f1d23c8ccddbd1b2f1
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri May 29 15:08:51 2020 +0200
[FLINK-17935] Move set yarn.log-config-file to YarnClusterClientFactory.createClusterDescriptor()
This closes #12455.
---
.../org/apache/flink/client/cli/CliFrontend.java | 2 +-
.../org/apache/flink/client/cli/ExecutorCLI.java | 7 +++-
.../apache/flink/client/cli/ExecutorCLITest.java | 21 +++++++++---
.../configuration/DeploymentOptionsInternal.java | 37 ++++++++++++++++++++++
.../flink/kubernetes/cli/KubernetesSessionCli.java | 13 +++++---
.../decorators/FlinkConfMountDecorator.java | 3 +-
.../parameters/AbstractKubernetesParameters.java | 13 ++++++--
.../parameters/KubernetesParameters.java | 2 ++
.../kubernetes/cli/KubernetesSessionCliTest.java | 33 +++++++++++++++----
.../org/apache/flink/api/scala/FlinkShell.scala | 11 ++-----
.../flink/yarn/YarnClusterClientFactory.java | 7 ++++
.../apache/flink/yarn/YarnClusterDescriptor.java | 4 ---
.../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 4 +--
.../yarn/configuration/YarnLogConfigUtil.java | 4 +--
14 files changed, 122 insertions(+), 39 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 1e2bae0..a038511 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -1045,7 +1045,7 @@ public class CliFrontend {
public static List<CustomCommandLine> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
List<CustomCommandLine> customCommandLines = new ArrayList<>();
- customCommandLines.add(new ExecutorCLI(configuration));
+ customCommandLines.add(new ExecutorCLI(configuration, configurationDirectory));
// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
index c1a3d0b..e88de9e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.cli;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.PipelineExecutor;
@@ -72,8 +73,11 @@ public class ExecutorCLI implements CustomCommandLine {
private final Configuration baseConfiguration;
- public ExecutorCLI(final Configuration configuration) {
+ private final String configurationDir;
+
+ public ExecutorCLI(final Configuration configuration, final String configDir) {
this.baseConfiguration = new UnmodifiableConfiguration(checkNotNull(configuration));
+ this.configurationDir = checkNotNull(configDir);
}
@Override
@@ -115,6 +119,7 @@ public class ExecutorCLI implements CustomCommandLine {
}
encodeDynamicProperties(commandLine, effectiveConfiguration);
+ effectiveConfiguration.set(DeploymentOptionsInternal.CONF_DIR, configurationDir);
if (LOG.isDebugEnabled()) {
LOG.debug("Effective Configuration: {}", effectiveConfiguration);
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
index 6eb9a02..daa6cd3 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
@@ -26,7 +26,9 @@ import org.apache.flink.configuration.DeploymentOptions;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.util.Arrays;
import java.util.List;
@@ -41,13 +43,18 @@ import static org.junit.Assert.assertFalse;
*/
public class ExecutorCLITest {
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
private Options testOptions;
@Before
public void initOptions() {
testOptions = new Options();
- final ExecutorCLI cliUnderTest = new ExecutorCLI(new Configuration());
+ final ExecutorCLI cliUnderTest = new ExecutorCLI(
+ new Configuration(),
+ tmp.getRoot().getAbsolutePath());
cliUnderTest.addGeneralOptions(testOptions);
}
@@ -57,7 +64,9 @@ public class ExecutorCLITest {
final Configuration loadedConfig = new Configuration();
loadedConfig.set(DeploymentOptions.TARGET, expectedExecutorName);
- final ExecutorCLI cliUnderTest = new ExecutorCLI(loadedConfig);
+ final ExecutorCLI cliUnderTest = new ExecutorCLI(
+ loadedConfig,
+ tmp.getRoot().getAbsolutePath());
final CommandLine emptyCommandLine = CliFrontendParser.parse(testOptions, new String[0], true);
final Configuration configuration = cliUnderTest.applyCommandLineOptionsToConfiguration(emptyCommandLine);
@@ -87,7 +96,9 @@ public class ExecutorCLITest {
"-D" + CoreOptions.DEFAULT_PARALLELISM.key() + "=5"
};
- final ExecutorCLI cliUnderTest = new ExecutorCLI(loadedConfig);
+ final ExecutorCLI cliUnderTest = new ExecutorCLI(
+ loadedConfig,
+ tmp.getRoot().getAbsolutePath());
final CommandLine commandLine = CliFrontendParser.parse(testOptions, args, true);
final Configuration configuration = cliUnderTest.applyCommandLineOptionsToConfiguration(commandLine);
@@ -113,7 +124,9 @@ public class ExecutorCLITest {
final ConfigOption<Integer> configOption = key("test.int").intType().noDefaultValue();
final int expectedValue = 42;
- final ExecutorCLI cliUnderTest = new ExecutorCLI(new Configuration());
+ final ExecutorCLI cliUnderTest = new ExecutorCLI(
+ new Configuration(),
+ tmp.getRoot().getAbsolutePath());
final String[] args = {executorOption, expectedExecutorName, "-D" + configOption.key() + "=" + expectedValue};
final CommandLine commandLine = CliFrontendParser.parse(testOptions, args, true);
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptionsInternal.java b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptionsInternal.java
new file mode 100644
index 0000000..22c6387
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptionsInternal.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.Internal;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Internal options used during deployment.
+ */
+@Internal
+public class DeploymentOptionsInternal {
+
+ public static final ConfigOption<String> CONF_DIR =
+ key("$internal.deployment.config-dir")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("**DO NOT USE** The path to the configuration directory.");
+
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
index db1bc8c..36fc950 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.AbstractCustomCommandLine;
import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.ExecutorCLI;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
@@ -68,14 +69,14 @@ public class KubernetesSessionCli {
private final ExecutorCLI cli;
private final ClusterClientServiceLoader clusterClientServiceLoader;
- public KubernetesSessionCli(Configuration configuration) {
- this(configuration, new DefaultClusterClientServiceLoader());
+ public KubernetesSessionCli(Configuration configuration, String configDir) {
+ this(configuration, new DefaultClusterClientServiceLoader(), configDir);
}
- public KubernetesSessionCli(Configuration configuration, ClusterClientServiceLoader clusterClientServiceLoader) {
+ public KubernetesSessionCli(Configuration configuration, ClusterClientServiceLoader clusterClientServiceLoader, String configDir) {
this.baseConfiguration = new UnmodifiableConfiguration(checkNotNull(configuration));
this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
- this.cli = new ExecutorCLI(baseConfiguration);
+ this.cli = new ExecutorCLI(baseConfiguration, configDir);
}
public Configuration getEffectiveConfiguration(String[] args) throws CliArgsException {
@@ -178,10 +179,12 @@ public class KubernetesSessionCli {
public static void main(String[] args) {
final Configuration configuration = GlobalConfiguration.loadConfiguration();
+ final String configDir = CliFrontend.getConfigurationDirectoryFromEnv();
+
int retCode;
try {
- final KubernetesSessionCli cli = new KubernetesSessionCli(configuration);
+ final KubernetesSessionCli cli = new KubernetesSessionCli(configuration, configDir);
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
} catch (CliArgsException e) {
retCode = AbstractCustomCommandLine.handleCliArgsException(e, LOG);
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
index 07cf85e..79eb7aa 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -19,7 +19,6 @@
package org.apache.flink.kubernetes.kubeclient.decorators;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
@@ -167,7 +166,7 @@ public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator {
}
private List<File> getLocalLogConfFiles() {
- final String confDir = CliFrontend.getConfigurationDirectoryFromEnv();
+ final String confDir = kubernetesComponentConf.getConfigDirectory();
final File logbackFile = new File(confDir, CONFIG_FILE_LOGBACK_NAME);
final File log4jFile = new File(confDir, CONFIG_FILE_LOG4J_NAME);
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index b3bfd6b..c655b63 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -18,8 +18,8 @@
package org.apache.flink.kubernetes.kubeclient.parameters;
-import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.utils.Constants;
@@ -55,6 +55,13 @@ public abstract class AbstractKubernetesParameters implements KubernetesParamete
}
@Override
+ public String getConfigDirectory() {
+ final String configDir = flinkConfig.get(DeploymentOptionsInternal.CONF_DIR);
+ checkNotNull(configDir);
+ return configDir;
+ }
+
+ @Override
public String getClusterId() {
final String clusterId = flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID);
@@ -130,14 +137,14 @@ public abstract class AbstractKubernetesParameters implements KubernetesParamete
@Override
public boolean hasLogback() {
- final String confDir = CliFrontend.getConfigurationDirectoryFromEnv();
+ final String confDir = getConfigDirectory();
final File logbackFile = new File(confDir, CONFIG_FILE_LOGBACK_NAME);
return logbackFile.exists();
}
@Override
public boolean hasLog4j() {
- final String confDir = CliFrontend.getConfigurationDirectoryFromEnv();
+ final String confDir = getConfigDirectory();
final File log4jFile = new File(confDir, CONFIG_FILE_LOG4J_NAME);
return log4jFile.exists();
}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
index 44443c4..2e5ee0a 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
@@ -32,6 +32,8 @@ import java.util.Optional;
*/
public interface KubernetesParameters {
+ String getConfigDirectory();
+
String getClusterId();
String getNamespace();
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
index df7fe6a..79953d7 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -32,7 +33,9 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutor;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.util.Map;
@@ -46,9 +49,14 @@ import static org.junit.Assert.assertTrue;
*/
public class KubernetesSessionCliTest {
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
@Test
public void testKubernetesSessionCliSetsDeploymentTargetCorrectly() throws CliArgsException {
- final KubernetesSessionCli cli = new KubernetesSessionCli(new Configuration());
+ final KubernetesSessionCli cli = new KubernetesSessionCli(
+ new Configuration(),
+ tmp.getRoot().getAbsolutePath());
final String[] args = {};
final Configuration configuration = cli.getEffectiveConfiguration(args);
@@ -59,7 +67,9 @@ public class KubernetesSessionCliTest {
@Test
public void testDynamicProperties() throws Exception {
- final KubernetesSessionCli cli = new KubernetesSessionCli(new Configuration());
+ final KubernetesSessionCli cli = new KubernetesSessionCli(
+ new Configuration(),
+ tmp.getRoot().getAbsolutePath());
final String[] args = new String[] {
"-e", KubernetesSessionClusterExecutor.NAME,
"-Dakka.ask.timeout=5 min",
@@ -72,9 +82,10 @@ public class KubernetesSessionCliTest {
Assert.assertNotNull(clientFactory);
final Map<String, String> executorConfigMap = executorConfig.toMap();
- assertEquals(3, executorConfigMap.size());
+ assertEquals(4, executorConfigMap.size());
assertEquals("5 min", executorConfigMap.get("akka.ask.timeout"));
assertEquals("-DappName=foobar", executorConfigMap.get("env.java.opts"));
+ assertEquals(tmp.getRoot().getAbsolutePath(), executorConfig.get(DeploymentOptionsInternal.CONF_DIR));
assertTrue(executorConfigMap.containsKey(DeploymentOptions.TARGET.key()));
}
@@ -131,7 +142,9 @@ public class KubernetesSessionCliTest {
"-D" + TaskManagerOptions.NUM_TASK_SLOTS.key() + "=" + slotsPerTaskManager
};
- final KubernetesSessionCli cli = new KubernetesSessionCli(configuration);
+ final KubernetesSessionCli cli = new KubernetesSessionCli(
+ configuration,
+ tmp.getRoot().getAbsolutePath());
Configuration executorConfig = cli.getEffectiveConfiguration(args);
ClusterClientFactory<String> clientFactory = getClusterClientFactory(executorConfig);
@@ -157,7 +170,9 @@ public class KubernetesSessionCliTest {
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);
final String[] args = {"-e", KubernetesSessionClusterExecutor.NAME};
- final KubernetesSessionCli cli = new KubernetesSessionCli(configuration);
+ final KubernetesSessionCli cli = new KubernetesSessionCli(
+ configuration,
+ tmp.getRoot().getAbsolutePath());
Configuration executorConfig = cli.getEffectiveConfiguration(args);
ClusterClientFactory<String> clientFactory = getClusterClientFactory(executorConfig);
@@ -220,7 +235,9 @@ public class KubernetesSessionCliTest {
configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
- final KubernetesSessionCli cli = new KubernetesSessionCli(configuration);
+ final KubernetesSessionCli cli = new KubernetesSessionCli(
+ configuration,
+ tmp.getRoot().getAbsolutePath());
final Configuration executorConfig = cli.getEffectiveConfiguration(new String[]{});
final ClusterClientFactory<String> clientFactory = getClusterClientFactory(executorConfig);
@@ -258,6 +275,8 @@ public class KubernetesSessionCliTest {
Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(totalMemory));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(totalMemory));
- return new KubernetesSessionCli(configuration);
+ return new KubernetesSessionCli(
+ configuration,
+ tmp.getRoot().getAbsolutePath());
}
}
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index d3f65c2..cfd91a5 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -140,10 +140,7 @@ object FlinkShell {
}
private def getConfigDir(config: Config) = {
- config.configDir match {
- case Some(confDir) => confDir
- case None => CliFrontend.getConfigurationDirectoryFromEnv
- }
+ config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
}
private def getGlobalConfig(config: Config) = {
@@ -232,8 +229,7 @@ object FlinkShell {
val effectiveConfig = new Configuration(flinkConfig)
val args = parseYarnArgList(config, "yarn-cluster")
- val configurationDirectory =
- config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
+ val configurationDirectory = getConfigDir(config)
val frontend = new CliFrontend(
effectiveConfig,
@@ -271,8 +267,7 @@ object FlinkShell {
val effectiveConfig = new Configuration(flinkConfig)
val args = parseYarnArgList(config, mode)
- val configurationDirectory =
- config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
+ val configurationDirectory = getConfigDir(config)
val frontend = new CliFrontend(
effectiveConfig,
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index b7b8645..776172f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -23,8 +23,10 @@ import org.apache.flink.client.deployment.AbstractContainerizedClusterClientFact
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
+import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -51,6 +53,11 @@ public class YarnClusterClientFactory extends AbstractContainerizedClusterClient
@Override
public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
checkNotNull(configuration);
+
+ final String configurationDirectory =
+ configuration.get(DeploymentOptionsInternal.CONF_DIR);
+ YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);
+
return getClusterDescriptor(configuration);
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 2d2103c..25b1009 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterRetrieveException;
@@ -405,9 +404,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
final List<String> pipelineJars = flinkConfiguration.getOptional(PipelineOptions.JARS).orElse(Collections.emptyList());
Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
- final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
- YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);
-
try {
return deployInternal(
clusterSpecification,
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 19f1373..9507ad8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -30,6 +30,7 @@ import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
@@ -44,7 +45,6 @@ import org.apache.flink.util.FlinkException;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
-import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
@@ -433,7 +433,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
configuration.setString(YarnConfigOptions.NODE_LABEL, nodeLabelValue);
}
- YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);
+ configuration.set(DeploymentOptionsInternal.CONF_DIR, configurationDirectory);
}
private boolean isYarnPropertiesFileMode(CommandLine commandLine) {
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
index 981cd4d..0a93dce 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
@@ -47,12 +47,12 @@ public class YarnLogConfigUtil {
final Configuration configuration,
final String configurationDirectory) {
- if (configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE) != null) {
+ if (configuration.get(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE) != null) {
return configuration;
}
discoverLogConfigFile(configurationDirectory).ifPresent(file ->
- configuration.setString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, file.getPath()));
+ configuration.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, file.getPath()));
return configuration;
}