You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/06/04 08:36:23 UTC

[flink] branch release-1.11 updated: [FLINK-17935] Move set yarn.log-config-file to YarnClusterClientFactory.createClusterDescriptor()

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 32c6590  [FLINK-17935] Move set yarn.log-config-file to YarnClusterClientFactory.createClusterDescriptor()
32c6590 is described below

commit 32c65903b736f204bfe8b0f1d23c8ccddbd1b2f1
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri May 29 15:08:51 2020 +0200

    [FLINK-17935] Move set yarn.log-config-file to YarnClusterClientFactory.createClusterDescriptor()
    
    This closes #12455.
---
 .../org/apache/flink/client/cli/CliFrontend.java   |  2 +-
 .../org/apache/flink/client/cli/ExecutorCLI.java   |  7 +++-
 .../apache/flink/client/cli/ExecutorCLITest.java   | 21 +++++++++---
 .../configuration/DeploymentOptionsInternal.java   | 37 ++++++++++++++++++++++
 .../flink/kubernetes/cli/KubernetesSessionCli.java | 13 +++++---
 .../decorators/FlinkConfMountDecorator.java        |  3 +-
 .../parameters/AbstractKubernetesParameters.java   | 13 ++++++--
 .../parameters/KubernetesParameters.java           |  2 ++
 .../kubernetes/cli/KubernetesSessionCliTest.java   | 33 +++++++++++++++----
 .../org/apache/flink/api/scala/FlinkShell.scala    | 11 ++-----
 .../flink/yarn/YarnClusterClientFactory.java       |  7 ++++
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  4 ---
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |  4 +--
 .../yarn/configuration/YarnLogConfigUtil.java      |  4 +--
 14 files changed, 122 insertions(+), 39 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 1e2bae0..a038511 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -1045,7 +1045,7 @@ public class CliFrontend {
 
 	public static List<CustomCommandLine> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
 		List<CustomCommandLine> customCommandLines = new ArrayList<>();
-		customCommandLines.add(new ExecutorCLI(configuration));
+		customCommandLines.add(new ExecutorCLI(configuration, configurationDirectory));
 
 		//	Command line interface of the YARN session, with a special initialization here
 		//	to prefix all options with y/yarn.
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
index c1a3d0b..e88de9e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.cli;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.PipelineExecutor;
@@ -72,8 +73,11 @@ public class ExecutorCLI implements CustomCommandLine {
 
 	private final Configuration baseConfiguration;
 
-	public ExecutorCLI(final Configuration configuration) {
+	private final String configurationDir;
+
+	public ExecutorCLI(final Configuration configuration, final String configDir) {
 		this.baseConfiguration = new UnmodifiableConfiguration(checkNotNull(configuration));
+		this.configurationDir =  checkNotNull(configDir);
 	}
 
 	@Override
@@ -115,6 +119,7 @@ public class ExecutorCLI implements CustomCommandLine {
 		}
 
 		encodeDynamicProperties(commandLine, effectiveConfiguration);
+		effectiveConfiguration.set(DeploymentOptionsInternal.CONF_DIR, configurationDir);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Effective Configuration: {}", effectiveConfiguration);
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
index 6eb9a02..daa6cd3 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
@@ -26,7 +26,9 @@ import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.util.Arrays;
 import java.util.List;
@@ -41,13 +43,18 @@ import static org.junit.Assert.assertFalse;
  */
 public class ExecutorCLITest {
 
+	@Rule
+	public TemporaryFolder tmp = new TemporaryFolder();
+
 	private Options testOptions;
 
 	@Before
 	public void initOptions() {
 		testOptions = new Options();
 
-		final ExecutorCLI cliUnderTest = new ExecutorCLI(new Configuration());
+		final ExecutorCLI cliUnderTest = new ExecutorCLI(
+				new Configuration(),
+				tmp.getRoot().getAbsolutePath());
 		cliUnderTest.addGeneralOptions(testOptions);
 	}
 
@@ -57,7 +64,9 @@ public class ExecutorCLITest {
 		final Configuration loadedConfig = new Configuration();
 		loadedConfig.set(DeploymentOptions.TARGET, expectedExecutorName);
 
-		final ExecutorCLI cliUnderTest = new ExecutorCLI(loadedConfig);
+		final ExecutorCLI cliUnderTest = new ExecutorCLI(
+				loadedConfig,
+				tmp.getRoot().getAbsolutePath());
 		final CommandLine emptyCommandLine = CliFrontendParser.parse(testOptions, new String[0], true);
 
 		final Configuration configuration = cliUnderTest.applyCommandLineOptionsToConfiguration(emptyCommandLine);
@@ -87,7 +96,9 @@ public class ExecutorCLITest {
 				"-D" + CoreOptions.DEFAULT_PARALLELISM.key() + "=5"
 		};
 
-		final ExecutorCLI cliUnderTest = new ExecutorCLI(loadedConfig);
+		final ExecutorCLI cliUnderTest = new ExecutorCLI(
+				loadedConfig,
+				tmp.getRoot().getAbsolutePath());
 		final CommandLine commandLine = CliFrontendParser.parse(testOptions, args, true);
 
 		final Configuration configuration = cliUnderTest.applyCommandLineOptionsToConfiguration(commandLine);
@@ -113,7 +124,9 @@ public class ExecutorCLITest {
 		final ConfigOption<Integer> configOption = key("test.int").intType().noDefaultValue();
 		final int expectedValue = 42;
 
-		final ExecutorCLI cliUnderTest = new ExecutorCLI(new Configuration());
+		final ExecutorCLI cliUnderTest = new ExecutorCLI(
+				new Configuration(),
+				tmp.getRoot().getAbsolutePath());
 
 		final String[] args = {executorOption, expectedExecutorName, "-D" + configOption.key() + "=" + expectedValue};
 		final CommandLine commandLine = CliFrontendParser.parse(testOptions, args, true);
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptionsInternal.java b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptionsInternal.java
new file mode 100644
index 0000000..22c6387
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptionsInternal.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.Internal;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Internal options used during deployment.
+ */
+@Internal
+public class DeploymentOptionsInternal {
+
+	public static final ConfigOption<String> CONF_DIR =
+			key("$internal.deployment.config-dir")
+					.stringType()
+					.noDefaultValue()
+					.withDescription("**DO NOT USE** The path to the configuration directory.");
+
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
index db1bc8c..36fc950 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.cli.AbstractCustomCommandLine;
 import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.cli.ExecutorCLI;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
@@ -68,14 +69,14 @@ public class KubernetesSessionCli {
 	private final ExecutorCLI cli;
 	private final ClusterClientServiceLoader clusterClientServiceLoader;
 
-	public KubernetesSessionCli(Configuration configuration) {
-		this(configuration, new DefaultClusterClientServiceLoader());
+	public KubernetesSessionCli(Configuration configuration, String configDir) {
+		this(configuration, new DefaultClusterClientServiceLoader(), configDir);
 	}
 
-	public KubernetesSessionCli(Configuration configuration, ClusterClientServiceLoader clusterClientServiceLoader) {
+	public KubernetesSessionCli(Configuration configuration, ClusterClientServiceLoader clusterClientServiceLoader, String configDir) {
 		this.baseConfiguration = new UnmodifiableConfiguration(checkNotNull(configuration));
 		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
-		this.cli = new ExecutorCLI(baseConfiguration);
+		this.cli = new ExecutorCLI(baseConfiguration, configDir);
 	}
 
 	public Configuration getEffectiveConfiguration(String[] args) throws CliArgsException {
@@ -178,10 +179,12 @@ public class KubernetesSessionCli {
 	public static void main(String[] args) {
 		final Configuration configuration = GlobalConfiguration.loadConfiguration();
 
+		final String configDir = CliFrontend.getConfigurationDirectoryFromEnv();
+
 		int retCode;
 
 		try {
-			final KubernetesSessionCli cli = new KubernetesSessionCli(configuration);
+			final KubernetesSessionCli cli = new KubernetesSessionCli(configuration, configDir);
 			retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
 		} catch (CliArgsException e) {
 			retCode = AbstractCustomCommandLine.handleCliArgsException(e, LOG);
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
index 07cf85e..79eb7aa 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -19,7 +19,6 @@
 package org.apache.flink.kubernetes.kubeclient.decorators;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
@@ -167,7 +166,7 @@ public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator {
 	}
 
 	private List<File> getLocalLogConfFiles() {
-		final String confDir = CliFrontend.getConfigurationDirectoryFromEnv();
+		final String confDir = kubernetesComponentConf.getConfigDirectory();
 		final File logbackFile = new File(confDir, CONFIG_FILE_LOGBACK_NAME);
 		final File log4jFile = new File(confDir, CONFIG_FILE_LOG4J_NAME);
 
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index b3bfd6b..c655b63 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.kubernetes.kubeclient.parameters;
 
-import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.utils.Constants;
 
@@ -55,6 +55,13 @@ public abstract class AbstractKubernetesParameters implements KubernetesParamete
 	}
 
 	@Override
+	public String getConfigDirectory() {
+		final String configDir = flinkConfig.get(DeploymentOptionsInternal.CONF_DIR);
+		checkNotNull(configDir);
+		return configDir;
+	}
+
+	@Override
 	public String getClusterId() {
 		final String clusterId = flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID);
 
@@ -130,14 +137,14 @@ public abstract class AbstractKubernetesParameters implements KubernetesParamete
 
 	@Override
 	public boolean hasLogback() {
-		final String confDir = CliFrontend.getConfigurationDirectoryFromEnv();
+		final String confDir = getConfigDirectory();
 		final File logbackFile = new File(confDir, CONFIG_FILE_LOGBACK_NAME);
 		return logbackFile.exists();
 	}
 
 	@Override
 	public boolean hasLog4j() {
-		final String confDir = CliFrontend.getConfigurationDirectoryFromEnv();
+		final String confDir = getConfigDirectory();
 		final File log4jFile = new File(confDir, CONFIG_FILE_LOG4J_NAME);
 		return log4jFile.exists();
 	}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
index 44443c4..2e5ee0a 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
@@ -32,6 +32,8 @@ import java.util.Optional;
  */
 public interface KubernetesParameters {
 
+	String getConfigDirectory();
+
 	String getClusterId();
 
 	String getNamespace();
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
index df7fe6a..79953d7 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -32,7 +33,9 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutor;
 
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.util.Map;
 
@@ -46,9 +49,14 @@ import static org.junit.Assert.assertTrue;
  */
 public class KubernetesSessionCliTest {
 
+	@Rule
+	public TemporaryFolder tmp = new TemporaryFolder();
+
 	@Test
 	public void testKubernetesSessionCliSetsDeploymentTargetCorrectly() throws CliArgsException {
-		final KubernetesSessionCli cli = new KubernetesSessionCli(new Configuration());
+		final KubernetesSessionCli cli = new KubernetesSessionCli(
+				new Configuration(),
+				tmp.getRoot().getAbsolutePath());
 
 		final String[] args = {};
 		final Configuration configuration = cli.getEffectiveConfiguration(args);
@@ -59,7 +67,9 @@ public class KubernetesSessionCliTest {
 	@Test
 	public void testDynamicProperties() throws Exception {
 
-		final KubernetesSessionCli cli = new KubernetesSessionCli(new Configuration());
+		final KubernetesSessionCli cli = new KubernetesSessionCli(
+				new Configuration(),
+				tmp.getRoot().getAbsolutePath());
 		final String[] args = new String[] {
 			"-e", KubernetesSessionClusterExecutor.NAME,
 			"-Dakka.ask.timeout=5 min",
@@ -72,9 +82,10 @@ public class KubernetesSessionCliTest {
 		Assert.assertNotNull(clientFactory);
 
 		final Map<String, String> executorConfigMap = executorConfig.toMap();
-		assertEquals(3, executorConfigMap.size());
+		assertEquals(4, executorConfigMap.size());
 		assertEquals("5 min", executorConfigMap.get("akka.ask.timeout"));
 		assertEquals("-DappName=foobar", executorConfigMap.get("env.java.opts"));
+		assertEquals(tmp.getRoot().getAbsolutePath(), executorConfig.get(DeploymentOptionsInternal.CONF_DIR));
 		assertTrue(executorConfigMap.containsKey(DeploymentOptions.TARGET.key()));
 	}
 
@@ -131,7 +142,9 @@ public class KubernetesSessionCliTest {
 				"-D" + TaskManagerOptions.NUM_TASK_SLOTS.key() + "=" + slotsPerTaskManager
 		};
 
-		final KubernetesSessionCli cli = new KubernetesSessionCli(configuration);
+		final KubernetesSessionCli cli = new KubernetesSessionCli(
+				configuration,
+				tmp.getRoot().getAbsolutePath());
 
 		Configuration executorConfig = cli.getEffectiveConfiguration(args);
 		ClusterClientFactory<String> clientFactory = getClusterClientFactory(executorConfig);
@@ -157,7 +170,9 @@ public class KubernetesSessionCliTest {
 		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);
 
 		final String[] args = {"-e", KubernetesSessionClusterExecutor.NAME};
-		final KubernetesSessionCli cli = new KubernetesSessionCli(configuration);
+		final KubernetesSessionCli cli = new KubernetesSessionCli(
+				configuration,
+				tmp.getRoot().getAbsolutePath());
 
 		Configuration executorConfig = cli.getEffectiveConfiguration(args);
 		ClusterClientFactory<String> clientFactory = getClusterClientFactory(executorConfig);
@@ -220,7 +235,9 @@ public class KubernetesSessionCliTest {
 		configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
 		configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
 
-		final KubernetesSessionCli cli = new KubernetesSessionCli(configuration);
+		final KubernetesSessionCli cli = new KubernetesSessionCli(
+				configuration,
+				tmp.getRoot().getAbsolutePath());
 
 		final Configuration executorConfig = cli.getEffectiveConfiguration(new String[]{});
 		final ClusterClientFactory<String> clientFactory = getClusterClientFactory(executorConfig);
@@ -258,6 +275,8 @@ public class KubernetesSessionCliTest {
 		Configuration configuration = new Configuration();
 		configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(totalMemory));
 		configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(totalMemory));
-		return new KubernetesSessionCli(configuration);
+		return new KubernetesSessionCli(
+				configuration,
+				tmp.getRoot().getAbsolutePath());
 	}
 }
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index d3f65c2..cfd91a5 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -140,10 +140,7 @@ object FlinkShell {
   }
 
   private def getConfigDir(config: Config) = {
-    config.configDir match {
-      case Some(confDir) => confDir
-      case None => CliFrontend.getConfigurationDirectoryFromEnv
-    }
+    config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
   }
 
   private def getGlobalConfig(config: Config) = {
@@ -232,8 +229,7 @@ object FlinkShell {
     val effectiveConfig = new Configuration(flinkConfig)
     val args = parseYarnArgList(config, "yarn-cluster")
 
-    val configurationDirectory =
-      config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
+    val configurationDirectory = getConfigDir(config)
 
     val frontend = new CliFrontend(
       effectiveConfig,
@@ -271,8 +267,7 @@ object FlinkShell {
     val effectiveConfig = new Configuration(flinkConfig)
     val args = parseYarnArgList(config, mode)
 
-    val configurationDirectory =
-      config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
+    val configurationDirectory = getConfigDir(config)
 
     val frontend = new CliFrontend(
       effectiveConfig,
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index b7b8645..776172f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -23,8 +23,10 @@ import org.apache.flink.client.deployment.AbstractContainerizedClusterClientFact
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
+import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -51,6 +53,11 @@ public class YarnClusterClientFactory extends AbstractContainerizedClusterClient
 	@Override
 	public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
 		checkNotNull(configuration);
+
+		final String configurationDirectory =
+				configuration.get(DeploymentOptionsInternal.CONF_DIR);
+		YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);
+
 		return getClusterDescriptor(configuration);
 	}
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 2d2103c..25b1009 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterRetrieveException;
@@ -405,9 +404,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 		final List<String> pipelineJars = flinkConfiguration.getOptional(PipelineOptions.JARS).orElse(Collections.emptyList());
 		Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
 
-		final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
-		YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);
-
 		try {
 			return deployInternal(
 					clusterSpecification,
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 19f1373..9507ad8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -30,6 +30,7 @@ import org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
@@ -44,7 +45,6 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.ShutdownHookUtil;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
-import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
 import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
 import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
 
@@ -433,7 +433,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 			configuration.setString(YarnConfigOptions.NODE_LABEL, nodeLabelValue);
 		}
 
-		YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);
+		configuration.set(DeploymentOptionsInternal.CONF_DIR, configurationDirectory);
 	}
 
 	private boolean isYarnPropertiesFileMode(CommandLine commandLine) {
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
index 981cd4d..0a93dce 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
@@ -47,12 +47,12 @@ public class YarnLogConfigUtil {
 			final Configuration configuration,
 			final String configurationDirectory) {
 
-		if (configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE) != null) {
+		if (configuration.get(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE) != null) {
 			return configuration;
 		}
 
 		discoverLogConfigFile(configurationDirectory).ifPresent(file ->
-				configuration.setString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, file.getPath()));
+				configuration.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, file.getPath()));
 		return configuration;
 	}