You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/19 09:43:19 UTC

[3/3] flink git commit: [FLINK-9762][yarn] Use local default tmp directories on Yarn and Mesos

[FLINK-9762][yarn] Use local default tmp directories on Yarn and Mesos

This closes #6284.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec28f92f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec28f92f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec28f92f

Branch: refs/heads/master
Commit: ec28f92ffd042308494d9661a38ab462738611aa
Parents: 749dd29
Author: Oleksandr Nitavskyi <o....@criteo.com>
Authored: Mon Jul 2 09:42:17 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jul 19 11:43:03 2018 +0200

----------------------------------------------------------------------
 .../_includes/generated/core_configuration.html |  7 ++++-
 .../flink/configuration/Configuration.java      | 26 ++++++++++++++++++
 .../apache/flink/configuration/CoreOptions.java |  9 +++++-
 .../configuration/DelegatingConfiguration.java  |  5 ++++
 .../flink/configuration/ConfigurationTest.java  | 29 ++++++++++++++++++++
 .../mesos/entrypoint/MesosEntrypointUtils.java  | 12 ++------
 .../clusterframework/BootstrapTools.java        | 24 +++++++++++++++-
 .../clusterframework/BootstrapToolsTest.java    | 19 +++++++++++++
 .../flink/yarn/YarnApplicationMasterRunner.java | 13 ++-------
 .../apache/flink/yarn/YarnResourceManager.java  | 12 ++++++--
 .../flink/yarn/YarnTaskExecutorRunner.java      | 12 ++------
 .../yarn/YarnTaskManagerRunnerFactory.java      | 12 ++------
 .../yarn/entrypoint/YarnEntrypointUtils.java    | 13 ++-------
 13 files changed, 135 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/docs/_includes/generated/core_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/core_configuration.html b/docs/_includes/generated/core_configuration.html
index 91fa1a5..8281adc 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -23,8 +23,13 @@
             <td>Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).</td>
         </tr>
         <tr>
+            <td><h5>internal.io.tmp.dirs.use-local-default</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>key, which says if default value is used for `io.tmp.dirs` config variable.</td>
+        </tr>
+        <tr>
             <td><h5>io.tmp.dirs</h5></td>
-            <td style="word-wrap: break-word;">System.getProperty("java.io.tmpdir")</td>
+            <td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.</td>
             <td></td>
         </tr>
         <tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 7d99fbb..00c4c38 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -729,6 +729,32 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		}
 	}
 
+	/**
+	 * Removes given config option from the configuration.
+	 *
+	 * @param configOption config option to remove
+	 * @param <T> Type of the config option
+	 * @return true is config has been removed, false otherwise
+	 */
+	public <T> boolean removeConfig(ConfigOption<T> configOption){
+		synchronized (this.confData){
+			// try the current key
+			Object oldValue = this.confData.remove(configOption.key());
+			if (oldValue == null){
+				for (String deprecatedKey : configOption.deprecatedKeys()){
+					oldValue = this.confData.remove(deprecatedKey);
+					if (oldValue != null){
+						LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
+							deprecatedKey, configOption.key());
+						return true;
+					}
+				}
+				return false;
+			}
+			return true;
+		}
+	}
+
 
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 656943f..ea27e28 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -181,12 +181,19 @@ public class CoreOptions {
 	 * The config parameter defining the directories for temporary files, separated by
 	 * ",", "|", or the system's {@link java.io.File#pathSeparator}.
 	 */
-	@Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")")
+	@Documentation.OverrideDefault("'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty(\"java.io.tmpdir\") in standalone.")
 	public static final ConfigOption<String> TMP_DIRS =
 		key("io.tmp.dirs")
 			.defaultValue(System.getProperty("java.io.tmpdir"))
 			.withDeprecatedKeys("taskmanager.tmp.dirs");
 
+	/**
+	 * String key, which says if default value is used for `io.tmp.dirs` config variable.
+	 */
+	public static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = key("internal.io.tmp.dirs.use-local-default")
+		.defaultValue(true)
+		.withDescription("key, which says if default value is used for `io.tmp.dirs` config variable.");
+
 	// ------------------------------------------------------------------------
 	//  program
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index 7b75c7a..1a637f6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -311,6 +311,11 @@ public final class DelegatingConfiguration extends Configuration {
 	}
 
 	@Override
+	public <T> boolean removeConfig(ConfigOption<T> configOption){
+		return backingConfig.removeConfig(configOption);
+	}
+
+	@Override
 	public boolean containsKey(String key) {
 		return backingConfig.containsKey(prefix + key);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index 232c829..3b98a44 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -303,4 +304,32 @@ public class ConfigurationTest extends TestLogger {
 		assertEquals(13, cfg.getInteger(matchesThird));
 		assertEquals(-1, cfg.getInteger(notContained));
 	}
+
+	@Test
+	public void testRemove(){
+		Configuration cfg = new Configuration();
+		cfg.setInteger("a", 1);
+		cfg.setInteger("b", 2);
+
+		ConfigOption<Integer> validOption = ConfigOptions
+			.key("a")
+			.defaultValue(-1);
+
+		ConfigOption<Integer> deprecatedOption = ConfigOptions
+			.key("c")
+			.defaultValue(-1)
+			.withDeprecatedKeys("d", "b");
+
+		ConfigOption<Integer> unexistedOption = ConfigOptions
+			.key("e")
+			.defaultValue(-1)
+			.withDeprecatedKeys("f", "g", "j");
+
+		assertEquals("Wrong expectation about size", cfg.keySet().size(), 2);
+		assertTrue("Expected 'validOption' is removed", cfg.removeConfig(validOption));
+		assertEquals("Wrong expectation about size", cfg.keySet().size(), 1);
+		assertTrue("Expected 'existedOption' is removed", cfg.removeConfig(deprecatedOption));
+		assertEquals("Wrong expectation about size", cfg.keySet().size(), 0);
+		assertFalse("Expected 'unexistedOption' is not removed", cfg.removeConfig(unexistedOption));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
index 498f435..2059c8e 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
@@ -19,13 +19,13 @@
 package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
 import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
@@ -173,15 +173,7 @@ public class MesosEntrypointUtils {
 		final Map<String, String> envs = System.getenv();
 		final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
 
-		// configure local directory
-		if (configuration.contains(CoreOptions.TMP_DIRS)) {
-			log.info("Overriding Mesos' temporary file directories with those " +
-				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
-		}
-		else if (tmpDirs != null) {
-			log.info("Setting directories for temporary files to: {}", tmpDirs);
-			configuration.setString(CoreOptions.TMP_DIRS, tmpDirs);
-		}
+		BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, tmpDirs);
 
 		return configuration;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 7a8403a..3b606f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -250,7 +250,11 @@ public class BootstrapTools {
 			cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
 		}
 
-		return cfg; 
+		if (baseConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
+			cfg.removeConfig(CoreOptions.TMP_DIRS);
+		}
+
+		return cfg;
 	}
 
 	/**
@@ -467,4 +471,22 @@ public class BootstrapTools {
 		}
 		return template;
 	}
+
+	/**
+	 * Set temporary configuration directories if necessary
+	 *
+	 * @param configuration flink config to patch
+	 * @param defaultDirs in case no tmp directories is set, next directories will be applied
+	 */
+	public static void updateTmpDirectoriesInConfiguration(Configuration configuration, String defaultDirs){
+		if (configuration.contains(CoreOptions.TMP_DIRS)) {
+			LOG.info("Overriding Fink's temporary file directories with those " +
+				"specified in the Flink config: {}", configuration.getValue(CoreOptions.TMP_DIRS));
+			configuration.setBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS, false);
+		}
+		else {
+			LOG.info("Setting directories for temporary files to: {}", defaultDirs);
+			configuration.setString(CoreOptions.TMP_DIRS, defaultDirs);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index cf38fea..7e31c8e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -276,4 +276,23 @@ public class BootstrapToolsTest {
 					true, true, true, this.getClass()));
 
 	}
+
+	@Test
+	public void testUpdateTmpDirectoriesInConfiguration(){
+		Configuration config = new Configuration();
+
+		// test that default value is taken
+		BootstrapTools.updateTmpDirectoriesInConfiguration(config, "default/directory/path");
+		assertEquals(config.getString(CoreOptions.TMP_DIRS), "default/directory/path");
+
+		// test that we ignore default value is value is set before
+		BootstrapTools.updateTmpDirectoriesInConfiguration(config, "not/default/directory/path");
+		assertEquals(config.getString(CoreOptions.TMP_DIRS), "default/directory/path");
+
+		//test that empty value is not a magic string
+		config.setString(CoreOptions.TMP_DIRS, "");
+		BootstrapTools.updateTmpDirectoriesInConfiguration(config, "some/new/path");
+		assertEquals(config.getString(CoreOptions.TMP_DIRS), "");
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index eb977bf..497ac87 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -523,16 +522,8 @@ public class YarnApplicationMasterRunner {
 			ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
 			ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
 
-		// configure local directory
-		if (configuration.contains(CoreOptions.TMP_DIRS)) {
-			log.info("Overriding YARN's temporary file directories with those " +
-				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
-		}
-		else {
-			final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
-			log.info("Setting directories for temporary files to: {}", localDirs);
-			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
-		}
+		final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
+		BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
 
 		return configuration;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 0206ffb..b08993a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -472,14 +473,19 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 				taskManagerParameters.taskManagerHeapSizeMB(),
 				taskManagerParameters.taskManagerDirectMemoryLimitMB());
 
-		log.debug("TaskManager configuration: {}", flinkConfig);
+		Configuration taskManagerConfig = flinkConfig.clone();
+		if (flinkConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
+			taskManagerConfig.removeConfig(CoreOptions.TMP_DIRS);
+		}
+
+		log.debug("TaskManager configuration: {}", taskManagerConfig);
 
 		ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
-			flinkConfig,
+			taskManagerConfig,
 			yarnConfig,
 			env,
 			taskManagerParameters,
-			flinkConfig,
+			taskManagerConfig,
 			currDir,
 			YarnTaskExecutorRunner.class,
 			log);

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 94cd5a9..0e70de9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -21,10 +21,10 @@ package org.apache.flink.yarn;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -100,15 +100,7 @@ public class YarnTaskExecutorRunner {
 			final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
 			FileSystem.initialize(configuration);
 
-			// configure local directory
-			if (configuration.contains(CoreOptions.TMP_DIRS)) {
-				LOG.info("Overriding YARN's temporary file directories with those " +
-					"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
-			}
-			else {
-				LOG.info("Setting directories for temporary files to: {}", localDirs);
-				configuration.setString(CoreOptions.TMP_DIRS, localDirs);
-			}
+			BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
 
 			// tell akka to die in case of an error
 			configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
index d6f2364..9a01075 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
@@ -21,8 +21,8 @@ package org.apache.flink.yarn;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -121,15 +121,7 @@ public class YarnTaskManagerRunnerFactory {
 		final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
 		LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
 
-		// configure local directory
-		if (configuration.contains(CoreOptions.TMP_DIRS)) {
-			LOG.info("Overriding YARN's temporary file directories with those " +
-				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
-		}
-		else {
-			LOG.info("Setting directories for temporary files to: {}", localDirs);
-			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
-		}
+		BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
 
 		// tell akka to die in case of an error
 		configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index 49957e1..5566963 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -130,16 +129,8 @@ public class YarnEntrypointUtils {
 			configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
 		}
 
-		// configure local directory
-		if (configuration.contains(CoreOptions.TMP_DIRS)) {
-			log.info("Overriding YARN's temporary file directories with those " +
-				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
-		}
-		else {
-			final String localDirs = env.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
-			log.info("Setting directories for temporary files to: {}", localDirs);
-			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
-		}
+		final String localDirs = env.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
+		BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
 
 		return configuration;
 	}