You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/19 09:43:19 UTC
[3/3] flink git commit: [FLINK-9762][yarn] Use local default tmp
directories on Yarn and Mesos
[FLINK-9762][yarn] Use local default tmp directories on Yarn and Mesos
This closes #6284.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec28f92f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec28f92f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec28f92f
Branch: refs/heads/master
Commit: ec28f92ffd042308494d9661a38ab462738611aa
Parents: 749dd29
Author: Oleksandr Nitavskyi <o....@criteo.com>
Authored: Mon Jul 2 09:42:17 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jul 19 11:43:03 2018 +0200
----------------------------------------------------------------------
.../_includes/generated/core_configuration.html | 7 ++++-
.../flink/configuration/Configuration.java | 26 ++++++++++++++++++
.../apache/flink/configuration/CoreOptions.java | 9 +++++-
.../configuration/DelegatingConfiguration.java | 5 ++++
.../flink/configuration/ConfigurationTest.java | 29 ++++++++++++++++++++
.../mesos/entrypoint/MesosEntrypointUtils.java | 12 ++------
.../clusterframework/BootstrapTools.java | 24 +++++++++++++++-
.../clusterframework/BootstrapToolsTest.java | 19 +++++++++++++
.../flink/yarn/YarnApplicationMasterRunner.java | 13 ++-------
.../apache/flink/yarn/YarnResourceManager.java | 12 ++++++--
.../flink/yarn/YarnTaskExecutorRunner.java | 12 ++------
.../yarn/YarnTaskManagerRunnerFactory.java | 12 ++------
.../yarn/entrypoint/YarnEntrypointUtils.java | 13 ++-------
13 files changed, 135 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/docs/_includes/generated/core_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/core_configuration.html b/docs/_includes/generated/core_configuration.html
index 91fa1a5..8281adc 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -23,8 +23,13 @@
<td>Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).</td>
</tr>
<tr>
+ <td><h5>internal.io.tmp.dirs.use-local-default</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>key, which says if default value is used for `io.tmp.dirs` config variable.</td>
+ </tr>
+ <tr>
<td><h5>io.tmp.dirs</h5></td>
- <td style="word-wrap: break-word;">System.getProperty("java.io.tmpdir")</td>
+ <td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.</td>
<td></td>
</tr>
<tr>
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 7d99fbb..00c4c38 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -729,6 +729,32 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
}
}
+ /**
+ * Removes given config option from the configuration.
+ *
+ * @param configOption config option to remove
+ * @param <T> Type of the config option
+ * @return true is config has been removed, false otherwise
+ */
+ public <T> boolean removeConfig(ConfigOption<T> configOption){
+ synchronized (this.confData){
+ // try the current key
+ Object oldValue = this.confData.remove(configOption.key());
+ if (oldValue == null){
+ for (String deprecatedKey : configOption.deprecatedKeys()){
+ oldValue = this.confData.remove(deprecatedKey);
+ if (oldValue != null){
+ LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
+ deprecatedKey, configOption.key());
+ return true;
+ }
+ }
+ return false;
+ }
+ return true;
+ }
+ }
+
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 656943f..ea27e28 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -181,12 +181,19 @@ public class CoreOptions {
* The config parameter defining the directories for temporary files, separated by
* ",", "|", or the system's {@link java.io.File#pathSeparator}.
*/
- @Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")")
+ @Documentation.OverrideDefault("'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty(\"java.io.tmpdir\") in standalone.")
public static final ConfigOption<String> TMP_DIRS =
key("io.tmp.dirs")
.defaultValue(System.getProperty("java.io.tmpdir"))
.withDeprecatedKeys("taskmanager.tmp.dirs");
+ /**
+ * String key, which says if default value is used for `io.tmp.dirs` config variable.
+ */
+ public static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = key("internal.io.tmp.dirs.use-local-default")
+ .defaultValue(true)
+ .withDescription("key, which says if default value is used for `io.tmp.dirs` config variable.");
+
// ------------------------------------------------------------------------
// program
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index 7b75c7a..1a637f6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -311,6 +311,11 @@ public final class DelegatingConfiguration extends Configuration {
}
@Override
+ public <T> boolean removeConfig(ConfigOption<T> configOption){
+ return backingConfig.removeConfig(configOption);
+ }
+
+ @Override
public boolean containsKey(String key) {
return backingConfig.containsKey(prefix + key);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index 232c829..3b98a44 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -303,4 +304,32 @@ public class ConfigurationTest extends TestLogger {
assertEquals(13, cfg.getInteger(matchesThird));
assertEquals(-1, cfg.getInteger(notContained));
}
+
+ @Test
+ public void testRemove(){
+ Configuration cfg = new Configuration();
+ cfg.setInteger("a", 1);
+ cfg.setInteger("b", 2);
+
+ ConfigOption<Integer> validOption = ConfigOptions
+ .key("a")
+ .defaultValue(-1);
+
+ ConfigOption<Integer> deprecatedOption = ConfigOptions
+ .key("c")
+ .defaultValue(-1)
+ .withDeprecatedKeys("d", "b");
+
+ ConfigOption<Integer> unexistedOption = ConfigOptions
+ .key("e")
+ .defaultValue(-1)
+ .withDeprecatedKeys("f", "g", "j");
+
+ assertEquals("Wrong expectation about size", cfg.keySet().size(), 2);
+ assertTrue("Expected 'validOption' is removed", cfg.removeConfig(validOption));
+ assertEquals("Wrong expectation about size", cfg.keySet().size(), 1);
+ assertTrue("Expected 'existedOption' is removed", cfg.removeConfig(deprecatedOption));
+ assertEquals("Wrong expectation about size", cfg.keySet().size(), 0);
+ assertFalse("Expected 'unexistedOption' is not removed", cfg.removeConfig(unexistedOption));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
index 498f435..2059c8e 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
@@ -19,13 +19,13 @@
package org.apache.flink.mesos.entrypoint;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
@@ -173,15 +173,7 @@ public class MesosEntrypointUtils {
final Map<String, String> envs = System.getenv();
final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
- // configure local directory
- if (configuration.contains(CoreOptions.TMP_DIRS)) {
- log.info("Overriding Mesos' temporary file directories with those " +
- "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
- }
- else if (tmpDirs != null) {
- log.info("Setting directories for temporary files to: {}", tmpDirs);
- configuration.setString(CoreOptions.TMP_DIRS, tmpDirs);
- }
+ BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, tmpDirs);
return configuration;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 7a8403a..3b606f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -250,7 +250,11 @@ public class BootstrapTools {
cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
}
- return cfg;
+ if (baseConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
+ cfg.removeConfig(CoreOptions.TMP_DIRS);
+ }
+
+ return cfg;
}
/**
@@ -467,4 +471,22 @@ public class BootstrapTools {
}
return template;
}
+
+ /**
+ * Set temporary configuration directories if necessary
+ *
+ * @param configuration flink config to patch
+ * @param defaultDirs in case no tmp directories is set, next directories will be applied
+ */
+ public static void updateTmpDirectoriesInConfiguration(Configuration configuration, String defaultDirs){
+ if (configuration.contains(CoreOptions.TMP_DIRS)) {
+ LOG.info("Overriding Fink's temporary file directories with those " +
+ "specified in the Flink config: {}", configuration.getValue(CoreOptions.TMP_DIRS));
+ configuration.setBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS, false);
+ }
+ else {
+ LOG.info("Setting directories for temporary files to: {}", defaultDirs);
+ configuration.setString(CoreOptions.TMP_DIRS, defaultDirs);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index cf38fea..7e31c8e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -276,4 +276,23 @@ public class BootstrapToolsTest {
true, true, true, this.getClass()));
}
+
+ @Test
+ public void testUpdateTmpDirectoriesInConfiguration(){
+ Configuration config = new Configuration();
+
+ // test that default value is taken
+ BootstrapTools.updateTmpDirectoriesInConfiguration(config, "default/directory/path");
+ assertEquals(config.getString(CoreOptions.TMP_DIRS), "default/directory/path");
+
+ // test that we ignore default value is value is set before
+ BootstrapTools.updateTmpDirectoriesInConfiguration(config, "not/default/directory/path");
+ assertEquals(config.getString(CoreOptions.TMP_DIRS), "default/directory/path");
+
+ //test that empty value is not a magic string
+ config.setString(CoreOptions.TMP_DIRS, "");
+ BootstrapTools.updateTmpDirectoriesInConfiguration(config, "some/new/path");
+ assertEquals(config.getString(CoreOptions.TMP_DIRS), "");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index eb977bf..497ac87 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
@@ -523,16 +522,8 @@ public class YarnApplicationMasterRunner {
ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
- // configure local directory
- if (configuration.contains(CoreOptions.TMP_DIRS)) {
- log.info("Overriding YARN's temporary file directories with those " +
- "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
- }
- else {
- final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
- log.info("Setting directories for temporary files to: {}", localDirs);
- configuration.setString(CoreOptions.TMP_DIRS, localDirs);
- }
+ final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
+ BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
return configuration;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 0206ffb..b08993a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -472,14 +473,19 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
taskManagerParameters.taskManagerHeapSizeMB(),
taskManagerParameters.taskManagerDirectMemoryLimitMB());
- log.debug("TaskManager configuration: {}", flinkConfig);
+ Configuration taskManagerConfig = flinkConfig.clone();
+ if (flinkConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
+ taskManagerConfig.removeConfig(CoreOptions.TMP_DIRS);
+ }
+
+ log.debug("TaskManager configuration: {}", taskManagerConfig);
ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
- flinkConfig,
+ taskManagerConfig,
yarnConfig,
env,
taskManagerParameters,
- flinkConfig,
+ taskManagerConfig,
currDir,
YarnTaskExecutorRunner.class,
log);
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 94cd5a9..0e70de9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -21,10 +21,10 @@ package org.apache.flink.yarn;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
@@ -100,15 +100,7 @@ public class YarnTaskExecutorRunner {
final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
- // configure local directory
- if (configuration.contains(CoreOptions.TMP_DIRS)) {
- LOG.info("Overriding YARN's temporary file directories with those " +
- "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
- }
- else {
- LOG.info("Setting directories for temporary files to: {}", localDirs);
- configuration.setString(CoreOptions.TMP_DIRS, localDirs);
- }
+ BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
// tell akka to die in case of an error
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
index d6f2364..9a01075 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
@@ -21,8 +21,8 @@ package org.apache.flink.yarn;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
@@ -121,15 +121,7 @@ public class YarnTaskManagerRunnerFactory {
final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
- // configure local directory
- if (configuration.contains(CoreOptions.TMP_DIRS)) {
- LOG.info("Overriding YARN's temporary file directories with those " +
- "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
- }
- else {
- LOG.info("Setting directories for temporary files to: {}", localDirs);
- configuration.setString(CoreOptions.TMP_DIRS, localDirs);
- }
+ BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
// tell akka to die in case of an error
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index 49957e1..5566963 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.yarn.entrypoint;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
@@ -130,16 +129,8 @@ public class YarnEntrypointUtils {
configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}
- // configure local directory
- if (configuration.contains(CoreOptions.TMP_DIRS)) {
- log.info("Overriding YARN's temporary file directories with those " +
- "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
- }
- else {
- final String localDirs = env.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
- log.info("Setting directories for temporary files to: {}", localDirs);
- configuration.setString(CoreOptions.TMP_DIRS, localDirs);
- }
+ final String localDirs = env.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
+ BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
return configuration;
}