You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/18 08:46:23 UTC

[3/3] flink git commit: [FLINK-8350][config] replace "taskmanager.tmp.dirs" with "io.tmp.dirs"

[FLINK-8350][config] replace "taskmanager.tmp.dirs" with "io.tmp.dirs"

This replaces "taskmanager.tmp.dirs" with the new "io.tmp.dirs"
configuration parameter to define temporary directories in (cluster)
environments for all components, i.e. JobManager, JobMaster, Dispatcher,...

Please note that this (kind of internal and thus undocumented) configuration
parameter is set by our YARN and Mesos integrations.

[FLINK-8350][cluster] initialise "io.tmp.dirs" for JobManager as well

In a YARN and Mesos environment, this initialises Flink's temporary directory
configuration with YARN/Mesos application-specific paths for JobManager,
JobMaster, Dispatcher, etc. components as well (Mesos integration actually still
lacks a proper integration of this, but once done, the new hooks fall in place
just fine).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76abcaa5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76abcaa5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76abcaa5

Branch: refs/heads/master
Commit: 76abcaa55d0d6ab704b7ab8164718e8e2dcae2c4
Parents: 46ed5e3
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jan 2 16:52:16 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 18 09:45:59 2018 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  6 +++
 .../flink/configuration/ConfigurationUtils.java | 41 ++++++++++++++++++++
 .../apache/flink/configuration/CoreOptions.java | 15 +++++++
 .../mesos/entrypoint/MesosEntrypointUtils.java  | 32 +++++++++++++++
 .../entrypoint/MesosJobClusterEntrypoint.java   |  3 +-
 .../MesosSessionClusterEntrypoint.java          |  4 +-
 .../entrypoint/MesosTaskExecutorRunner.java     | 17 +-------
 .../MesosApplicationMasterRunner.java           |  3 +-
 .../MesosTaskManagerRunner.java                 | 18 +--------
 .../apache/flink/runtime/blob/BlobUtils.java    | 12 +++---
 .../taskexecutor/TaskManagerConfiguration.java  |  3 +-
 .../TaskManagerServicesConfiguration.java       | 17 +-------
 .../flink/runtime/blob/BlobUtilsTest.java       | 23 ++++++-----
 ...askManagerComponentsStartupShutdownTest.java |  5 ++-
 .../taskmanager/TaskManagerStartupTest.java     |  3 +-
 .../flink/yarn/YarnApplicationMasterRunner.java | 17 +++++++-
 .../flink/yarn/YarnTaskExecutorRunner.java      | 12 +++---
 .../flink/yarn/YarnTaskManagerRunner.java       | 13 +++----
 .../yarn/entrypoint/YarnEntrypointUtils.java    | 14 ++++++-
 .../entrypoint/YarnJobClusterEntrypoint.java    |  2 +-
 .../YarnSessionClusterEntrypoint.java           |  2 +-
 21 files changed, 168 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 510f3b1..d0dd499 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -209,7 +209,10 @@ public final class ConfigConstants {
 	/**
 	 * The config parameter defining the directories for temporary files, separated by
 	 * ",", "|", or the system's {@link java.io.File#pathSeparator}.
+	 *
+	 * @deprecated Use {@link CoreOptions#TMP_DIRS} instead
 	 */
+	@Deprecated
 	public static final String TASK_MANAGER_TMP_DIR_KEY = "taskmanager.tmp.dirs";
 
 	/**
@@ -1338,7 +1341,10 @@ public final class ConfigConstants {
 
 	/**
 	 * The default directory for temporary files of the task manager.
+	 *
+	 * @deprecated {@link CoreOptions#TMP_DIRS} provides the default value now
 	 */
+	@Deprecated
 	public static final String DEFAULT_TASK_MANAGER_TMP_PATH = System.getProperty("java.io.tmpdir");
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
new file mode 100644
index 0000000..e697e6f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import java.io.File;
+
+/**
+ * Utility class for {@link Configuration} related helper functions.
+ */
+public class ConfigurationUtils {
+
+	/**
+	 * Extracts the task manager directories for temporary files as defined by
+	 * {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}.
+	 *
+	 * @param configuration configuration object
+	 * @return array of configured directories (in order)
+	 */
+	public static String[] parseTempDirectories(Configuration configuration) {
+		return configuration.getString(CoreOptions.TMP_DIRS).split(",|" + File.pathSeparator);
+	}
+
+	// Make sure that we cannot instantiate this class
+	private ConfigurationUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index f93c4f1..4592608 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -20,6 +20,8 @@ package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
+
 /**
  * The set of configuration options for core parameters.
  */
@@ -97,6 +99,19 @@ public class CoreOptions {
 		.defaultValue("");
 
 	// ------------------------------------------------------------------------
+	//  generic io
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The config parameter defining the directories for temporary files, separated by
+	 * ",", "|", or the system's {@link java.io.File#pathSeparator}.
+	 */
+	public static final ConfigOption<String> TMP_DIRS =
+		key("io.tmp.dirs")
+			.defaultValue(System.getProperty("java.io.tmpdir"))
+			.withDeprecatedKeys("taskmanager.tmp.dirs");
+
+	// ------------------------------------------------------------------------
 	//  program
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
index af72c96..6b8d820 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
@@ -19,8 +19,11 @@
 package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.mesos.configuration.MesosOptions;
+import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
 import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
@@ -36,6 +39,7 @@ import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.Duration;
@@ -152,4 +156,32 @@ public class MesosEntrypointUtils {
 		overlay.configure(containerSpec);
 	}
 
+	/**
+	 * Loads the global configuration, adds the given dynamic properties configuration, and sets
+	 * the temp directory paths.
+	 *
+	 * @param dynamicProperties dynamic properties to integrate
+	 * @param log logger instance
+	 * @return the loaded and adapted global configuration
+	 */
+	public static Configuration loadConfiguration(Configuration dynamicProperties, Logger log) {
+		Configuration configuration =
+			GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
+
+		// read the environment variables
+		final Map<String, String> envs = System.getenv();
+		final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
+
+		// configure local directory
+		if (configuration.contains(CoreOptions.TMP_DIRS)) {
+			log.info("Overriding Mesos' temporary file directories with those " +
+				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
+		}
+		else if (tmpDirs != null) {
+			log.info("Setting directories for temporary files to: {}", tmpDirs);
+			configuration.setString(CoreOptions.TMP_DIRS, tmpDirs);
+		}
+
+		return configuration;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 874ee14..d38e734 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -19,7 +19,6 @@
 package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
 import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
@@ -218,7 +217,7 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 		}
 
 		Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
-		Configuration configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
+		Configuration configuration = MesosEntrypointUtils.loadConfiguration(dynamicProperties, LOG);
 
 		MesosJobClusterEntrypoint clusterEntrypoint = new MesosJobClusterEntrypoint(configuration, dynamicProperties);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index 039e51d..f52532a 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -19,7 +19,6 @@
 package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
 import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
@@ -192,10 +191,11 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 		}
 
 		Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
-		Configuration configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
+		Configuration configuration = MesosEntrypointUtils.loadConfiguration(dynamicProperties, LOG);
 
 		MesosSessionClusterEntrypoint clusterEntrypoint = new MesosSessionClusterEntrypoint(configuration, dynamicProperties);
 
 		clusterEntrypoint.startCluster();
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
index 41449be..11a4130 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
@@ -19,9 +19,7 @@
 package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -76,7 +74,7 @@ public class MesosTaskExecutorRunner {
 			Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
 			LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
 
-			configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
+			configuration = MesosEntrypointUtils.loadConfiguration(dynamicProperties, LOG);
 		}
 		catch (Throwable t) {
 			LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t);
@@ -84,20 +82,7 @@ public class MesosTaskExecutorRunner {
 			return;
 		}
 
-		// read the environment variables
 		final Map<String, String> envs = System.getenv();
-		final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
-
-		// configure local directory
-		String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
-		if (flinkTempDirs != null) {
-			LOG.info("Overriding Mesos temporary file directories with those " +
-				"specified in the Flink config: {}", flinkTempDirs);
-		}
-		else if (tmpDirs != null) {
-			LOG.info("Setting directories for temporary files to: {}", tmpDirs);
-			configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
-		}
 
 		// configure the filesystems
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 544150b..94804ac 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -20,7 +20,6 @@ package org.apache.flink.mesos.runtime.clusterframework;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.FileSystem;
@@ -157,7 +156,7 @@ public class MesosApplicationMasterRunner {
 			CommandLine cmd = parser.parse(ALL_OPTIONS, args);
 
 			final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
-			final Configuration config = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
+			final Configuration config = MesosEntrypointUtils.loadConfiguration(dynamicProperties, LOG);
 
 			// configure the filesystems
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 4549970..74201d9 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -19,10 +19,9 @@
 package org.apache.flink.mesos.runtime.clusterframework;
 
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.mesos.entrypoint.MesosEntrypointUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -76,7 +75,7 @@ public class MesosTaskManagerRunner {
 			final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
 			LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
 
-			configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
+			configuration = MesosEntrypointUtils.loadConfiguration(dynamicProperties, LOG);
 		}
 		catch (Throwable t) {
 			LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t);
@@ -84,20 +83,7 @@ public class MesosTaskManagerRunner {
 			return;
 		}
 
-		// read the environment variables
 		final Map<String, String> envs = System.getenv();
-		final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
-
-		// configure local directory
-		String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
-		if (flinkTempDirs != null) {
-			LOG.info("Overriding Mesos temporary file directories with those " +
-				"specified in the Flink config: {}", flinkTempDirs);
-		}
-		else if (tmpDirs != null) {
-			LOG.info("Setting directories for temporary files to: {}", tmpDirs);
-			configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
-		}
 
 		// configure the filesystems
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 29f3c67..04d3366 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -20,14 +20,13 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
 import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
@@ -135,10 +134,9 @@ public class BlobUtils {
 	/**
 	 * Creates a local storage directory for a blob service under the configuration parameter given
 	 * by {@link BlobServerOptions#STORAGE_DIRECTORY}. If this is <tt>null</tt> or empty, we will
-	 * fall back to the TaskManager temp directories (given by
-	 * {@link ConfigConstants#TASK_MANAGER_TMP_DIR_KEY}; which in turn falls back to
-	 * {@link ConfigConstants#DEFAULT_TASK_MANAGER_TMP_PATH} currently set to
-	 * <tt>java.io.tmpdir</tt>) and choose one among them at random.
+	 * fall back to Flink's temp directories (given by
+	 * {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}) and choose one among them at
+	 * random.
 	 *
 	 * @param config
 	 * 		Flink configuration
@@ -154,7 +152,7 @@ public class BlobUtils {
 
 		File baseDir;
 		if (StringUtils.isNullOrWhitespaceOnly(basePath)) {
-			final String[] tmpDirPaths = TaskManagerServicesConfiguration.parseTempDirectories(config);
+			final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(config);
 			baseDir = new File(tmpDirPaths[RANDOM.nextInt(tmpDirPaths.length)]);
 		}
 		else {

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index bbd605f..449f665 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
@@ -147,7 +148,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 			numberSlots = 1;
 		}
 
-		final String[] tmpDirPaths = TaskManagerServicesConfiguration.parseTempDirectories(configuration);
+		final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(configuration);
 
 		final Time timeout;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index ab636e9..61e83d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.QueryableStateOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -35,7 +36,6 @@ import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
@@ -184,7 +184,7 @@ public class TaskManagerServicesConfiguration {
 			slots = 1;
 		}
 
-		final String[] tmpDirs = parseTempDirectories(configuration);
+		final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);
 
 		final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(
 			configuration,
@@ -435,19 +435,6 @@ public class TaskManagerServicesConfiguration {
 	}
 
 	/**
-	 * Extracts the task manager directories for temporary files as defined by
-	 * {@link ConfigConstants#TASK_MANAGER_TMP_DIR_KEY}.
-	 *
-	 * @param configuration configuration object
-	 * @return array of configured directories (in order)
-	 */
-	public static String[] parseTempDirectories(Configuration configuration) {
-		return configuration.getString(
-			ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
-	}
-
-	/**
 	 * Validates a condition for a config parameter and displays a standard exception, if the
 	 * the condition does not hold.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
index 757823d..c6d8e9a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -49,8 +49,7 @@ public class BlobUtilsTest {
 		Configuration config = new Configuration();
 		String blobStorageDir = temporaryFolder.newFolder().getAbsolutePath();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, blobStorageDir);
-		config.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-			temporaryFolder.newFolder().getAbsolutePath());
+		config.setString(CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath());
 
 		File dir = BlobUtils.initLocalStorageDirectory(config);
 		assertThat(dir.getAbsolutePath(), startsWith(blobStorageDir));
@@ -58,13 +57,13 @@ public class BlobUtilsTest {
 
 	/**
 	 * Tests {@link BlobUtils#initLocalStorageDirectory}'s fallback to
-	 * {@link ConfigConstants#TASK_MANAGER_TMP_DIR_KEY} with a single temp directory.
+	 * {@link CoreOptions#TMP_DIRS} with a single temp directory.
 	 */
 	@Test
 	public void testTaskManagerFallbackBlobStorageDirectory1() throws IOException {
 		Configuration config = new Configuration();
 		String blobStorageDir = temporaryFolder.getRoot().getAbsolutePath();
-		config.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, blobStorageDir);
+		config.setString(CoreOptions.TMP_DIRS, blobStorageDir);
 
 		File dir = BlobUtils.initLocalStorageDirectory(config);
 		assertThat(dir.getAbsolutePath(), startsWith(blobStorageDir));
@@ -72,14 +71,14 @@ public class BlobUtilsTest {
 
 	/**
 	 * Tests {@link BlobUtils#initLocalStorageDirectory}'s fallback to
-	 * {@link ConfigConstants#TASK_MANAGER_TMP_DIR_KEY} having multiple temp directories.
+	 * {@link CoreOptions#TMP_DIRS} having multiple temp directories.
 	 */
 	@Test
 	public void testTaskManagerFallbackBlobStorageDirectory2a() throws IOException {
 		Configuration config = new Configuration();
 		String blobStorageDirs = temporaryFolder.newFolder().getAbsolutePath() + "," +
 			temporaryFolder.newFolder().getAbsolutePath();
-		config.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, blobStorageDirs);
+		config.setString(CoreOptions.TMP_DIRS, blobStorageDirs);
 
 		File dir = BlobUtils.initLocalStorageDirectory(config);
 		assertThat(dir.getAbsolutePath(), startsWith(temporaryFolder.getRoot().getAbsolutePath()));
@@ -87,22 +86,22 @@ public class BlobUtilsTest {
 
 	/**
 	 * Tests {@link BlobUtils#initLocalStorageDirectory}'s fallback to
-	 * {@link ConfigConstants#TASK_MANAGER_TMP_DIR_KEY} having multiple temp directories.
+	 * {@link CoreOptions#TMP_DIRS} having multiple temp directories.
 	 */
 	@Test
 	public void testTaskManagerFallbackBlobStorageDirectory2b() throws IOException {
 		Configuration config = new Configuration();
 		String blobStorageDirs = temporaryFolder.newFolder().getAbsolutePath() + File.pathSeparator +
 			temporaryFolder.newFolder().getAbsolutePath();
-		config.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, blobStorageDirs);
+		config.setString(CoreOptions.TMP_DIRS, blobStorageDirs);
 
 		File dir = BlobUtils.initLocalStorageDirectory(config);
 		assertThat(dir.getAbsolutePath(), startsWith(temporaryFolder.getRoot().getAbsolutePath()));
 	}
 
 	/**
-	 * Tests {@link BlobUtils#initLocalStorageDirectory}'s fallback to
-	 * {@link ConfigConstants#DEFAULT_TASK_MANAGER_TMP_PATH}.
+	 * Tests {@link BlobUtils#initLocalStorageDirectory}'s fallback to the default value of
+	 * {@link CoreOptions#TMP_DIRS}.
 	 */
 	@Test
 	public void testTaskManagerFallbackFallbackBlobStorageDirectory1() throws IOException {
@@ -110,6 +109,6 @@ public class BlobUtilsTest {
 
 		File dir = BlobUtils.initLocalStorageDirectory(config);
 		assertThat(dir.getAbsolutePath(),
-			startsWith(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH));
+			startsWith(CoreOptions.TMP_DIRS.defaultValue()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index a3c41c5..5edd796 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
@@ -73,7 +73,6 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 	@Test
 	public void testComponentsStartupShutdown() throws Exception {
 
-		final String[] TMP_DIR = new String[] { ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH };
 		final Time timeout = Time.seconds(100);
 		final int BUFFER_SIZE = 32 * 1024;
 
@@ -82,6 +81,8 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 		config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "1 s");
 		config.setInteger(AkkaOptions.WATCH_THRESHOLD, 1);
 
+		final String[] TMP_DIR = ConfigurationUtils.parseTempDirectories(config);
+
 		ActorSystem actorSystem = null;
 
 		HighAvailabilityServices highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index 27df4c1..ade2f3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -142,7 +143,7 @@ public class TaskManagerStartupTest extends TestLogger {
 
 		try {
 			Configuration cfg = new Configuration();
-			cfg.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, nonWritable.getAbsolutePath());
+			cfg.setString(CoreOptions.TMP_DIRS, nonWritable.getAbsolutePath());
 			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 			cfg.setString(JobManagerOptions.ADDRESS, "localhost");
 			cfg.setInteger(JobManagerOptions.PORT, 21656);

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 279981a..e97fac3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -168,7 +169,7 @@ public class YarnApplicationMasterRunner {
 				FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
 			LOG.debug("YARN dynamic properties: {}", dynamicProperties);
 
-			final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
+			final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties, LOG);
 
 			// set keytab principal and replace path with the local path of the shipped keytab file in NodeManager
 			if (keytabPath != null && remoteKeytabPrincipal != null) {
@@ -512,11 +513,12 @@ public class YarnApplicationMasterRunner {
 	 *
 	 * @param baseDirectory directory to load the configuration from
 	 * @param additional additional parameters to be included in the configuration
+	 * @param log logger instance
 	 *
 	 * @return The configuration to be used by the TaskManagers.
 	 */
 	@SuppressWarnings("deprecation")
-	private static Configuration createConfiguration(String baseDirectory, Map<String, String> additional) {
+	private static Configuration createConfiguration(String baseDirectory, Map<String, String> additional, Logger log) {
 		LOG.info("Loading config from directory " + baseDirectory);
 
 		Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory);
@@ -549,6 +551,17 @@ public class YarnApplicationMasterRunner {
 			ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
 			ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
 
+		// configure local directory
+		if (configuration.contains(CoreOptions.TMP_DIRS)) {
+			log.info("Overriding YARN's temporary file directories with those " +
+				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
+		}
+		else {
+			final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
+			log.info("Setting directories for temporary files to: {}", localDirs);
+			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
+		}
+
 		return configuration;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index cd0053b..8b54d87 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.FileSystem;
@@ -103,14 +104,13 @@ public class YarnTaskExecutorRunner {
 			FileSystem.initialize(configuration);
 
 			// configure local directory
-			String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
-			if (flinkTempDirs == null) {
-				LOG.info("Setting directories for temporary file " + localDirs);
-				configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, localDirs);
+			if (configuration.contains(CoreOptions.TMP_DIRS)) {
+				LOG.info("Overriding YARN's temporary file directories with those " +
+					"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
 			}
 			else {
-				LOG.info("Overriding YARN's temporary file directories with those " +
-						"specified in the Flink config: " + flinkTempDirs);
+				LOG.info("Setting directories for temporary files to: {}", localDirs);
+				configuration.setString(CoreOptions.TMP_DIRS, localDirs);
 			}
 
 			// tell akka to die in case of an error

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index a3b964f..a6e34c5 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -19,8 +19,8 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -83,14 +83,13 @@ public class YarnTaskManagerRunner {
 		LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
 
 		// configure local directory
-		String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
-		if (flinkTempDirs == null) {
-			LOG.info("Setting directories for temporary file " + localDirs);
-			configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, localDirs);
+		if (configuration.contains(CoreOptions.TMP_DIRS)) {
+			LOG.info("Overriding YARN's temporary file directories with those " +
+				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
 		}
 		else {
-			LOG.info("Overriding YARN's temporary file directories with those " +
-				"specified in the Flink config: " + flinkTempDirs);
+			LOG.info("Setting directories for temporary files to: {}", localDirs);
+			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
 		}
 
 		// tell akka to die in case of an error

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index a03ed08..c50c043 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -77,7 +78,7 @@ public class YarnEntrypointUtils {
 		return SecurityUtils.getInstalledContext();
 	}
 
-	public static Configuration loadConfiguration(String workingDirectory, Map<String, String> env) {
+	public static Configuration loadConfiguration(String workingDirectory, Map<String, String> env, Logger log) {
 		Configuration configuration = GlobalConfiguration.loadConfiguration(workingDirectory);
 
 		final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
@@ -145,6 +146,17 @@ public class YarnEntrypointUtils {
 			configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
 		}
 
+		// configure local directory
+		if (configuration.contains(CoreOptions.TMP_DIRS)) {
+			log.info("Overriding YARN's temporary file directories with those " +
+				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
+		}
+		else {
+			final String localDirs = env.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
+			log.info("Setting directories for temporary files to: {}", localDirs);
+			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
+		}
+
 		return configuration;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 78013f7..d27e207 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -142,7 +142,7 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 			LOG.warn("Could not log YARN environment information.", e);
 		}
 
-		Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env);
+		Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env, LOG);
 
 		YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(
 			configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/76abcaa5/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index 96fba34..81a9b6b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -116,7 +116,7 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
 			LOG.warn("Could not log YARN environment information.", e);
 		}
 
-		Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env);
+		Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env, LOG);
 
 		YarnSessionClusterEntrypoint yarnSessionClusterEntrypoint = new YarnSessionClusterEntrypoint(
 			configuration,