You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/19 09:43:17 UTC

[1/3] flink git commit: [hotfix] Fix checkstyle violations in BootstrapTool

Repository: flink
Updated Branches:
  refs/heads/master 749dd2993 -> 44ed5ef0f


[hotfix] Fix checkstyle violations in BootstrapTool


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44ed5ef0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44ed5ef0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44ed5ef0

Branch: refs/heads/master
Commit: 44ed5ef0fc1c221f3916ab5126f1bc8ee5dfb45d
Parents: 6bdec86
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jul 18 15:49:55 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jul 19 11:43:03 2018 +0200

----------------------------------------------------------------------
 .../clusterframework/BootstrapTools.java        | 23 +++++++++-----------
 1 file changed, 10 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44ed5ef0/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index ebe7140..b43946c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -270,8 +270,7 @@ public class BootstrapTools {
 	 */
 	public static void writeConfiguration(Configuration cfg, File file) throws IOException {
 		try (FileWriter fwrt = new FileWriter(file);
-			PrintWriter out = new PrintWriter(fwrt))
-		{
+			PrintWriter out = new PrintWriter(fwrt)) {
 			for (String key : cfg.keySet()) {
 				String value = cfg.getString(key, null);
 				out.print(key);
@@ -331,7 +330,7 @@ public class BootstrapTools {
 	/**
 	 * Get an instance of the dynamic properties option.
 	 *
-	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
+	 * <p>Dynamic properties allow the user to specify additional configuration values with -D, such as
 	 * <tt> -Dfs.overwrite-files=true  -Dtaskmanager.network.memory.min=536346624</tt>
      */
 	public static Option newDynamicPropertiesOption() {
@@ -345,13 +344,13 @@ public class BootstrapTools {
 		final Configuration config = new Configuration();
 
 		String[] values = cmd.getOptionValues(DYNAMIC_PROPERTIES_OPT);
-		if(values != null) {
-			for(String value : values) {
+		if (values != null) {
+			for (String value : values) {
 				String[] pair = value.split("=", 2);
-				if(pair.length == 1) {
+				if (pair.length == 1) {
 					config.setString(pair[0], Boolean.TRUE.toString());
 				}
-				else if(pair.length == 2) {
+				else if (pair.length == 2) {
 					config.setString(pair[0], pair[1]);
 				}
 			}
@@ -401,7 +400,7 @@ public class BootstrapTools {
 		}
 		//applicable only for YarnMiniCluster secure test run
 		//krb5.conf file will be available as local resource in JM/TM container
-		if(hasKrb5) {
+		if (hasKrb5) {
 			javaOpts += " -Djava.security.krb5.conf=krb5.conf";
 		}
 		startCommandValues.put("jvmopts", javaOpts);
@@ -439,12 +438,11 @@ public class BootstrapTools {
 
 	// ------------------------------------------------------------------------
 
-	/** Private constructor to prevent instantiation */
+	/** Private constructor to prevent instantiation. */
 	private BootstrapTools() {}
 
 	/**
-	 * Replaces placeholders in the template start command with values from
-	 * <tt>startCommandValues</tt>.
+	 * Replaces placeholders in the template start command with values from startCommandValues.
 	 *
 	 * <p>If the default template {@link ConfigConstants#DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE}
 	 * is used, the following keys must be present in the map or the resulting
@@ -458,7 +456,6 @@ public class BootstrapTools {
 	 * <li><tt>args</tt> = arguments for the main class</li>
 	 * <li><tt>redirects</tt> = output redirects</li>
 	 * </ul>
-	 * </p>
 	 *
 	 * @param template
 	 * 		a template start command with placeholders
@@ -478,7 +475,7 @@ public class BootstrapTools {
 	}
 
 	/**
-	 * Set temporary configuration directories if necessary
+	 * Set temporary configuration directories if necessary.
 	 *
 	 * @param configuration flink config to patch
 	 * @param defaultDirs in case no tmp directories is set, next directories will be applied


[2/3] flink git commit: [FLINK-9762] Consolidate configuration cloning in BootstrapTools

Posted by tr...@apache.org.
[FLINK-9762] Consolidate configuration cloning in BootstrapTools


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bdec86e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bdec86e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bdec86e

Branch: refs/heads/master
Commit: 6bdec86e31d82d7c38d2509a039a1a03ab9f246e
Parents: ec28f92
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jul 18 15:45:15 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jul 19 11:43:03 2018 +0200

----------------------------------------------------------------------
 .../_includes/generated/core_configuration.html |  5 ---
 .../apache/flink/configuration/CoreOptions.java |  7 ----
 .../UnmodifiableConfiguration.java              |  6 ++++
 .../clusterframework/BootstrapTools.java        | 34 ++++++++++++++++----
 .../apache/flink/yarn/YarnResourceManager.java  |  9 ++----
 5 files changed, 37 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6bdec86e/docs/_includes/generated/core_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/core_configuration.html b/docs/_includes/generated/core_configuration.html
index 8281adc..98cca91 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -23,11 +23,6 @@
             <td>Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).</td>
         </tr>
         <tr>
-            <td><h5>internal.io.tmp.dirs.use-local-default</h5></td>
-            <td style="word-wrap: break-word;">true</td>
-            <td>key, which says if default value is used for `io.tmp.dirs` config variable.</td>
-        </tr>
-        <tr>
             <td><h5>io.tmp.dirs</h5></td>
             <td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.</td>
             <td></td>

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdec86e/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index ea27e28..16c9b54 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -187,13 +187,6 @@ public class CoreOptions {
 			.defaultValue(System.getProperty("java.io.tmpdir"))
 			.withDeprecatedKeys("taskmanager.tmp.dirs");
 
-	/**
-	 * String key, which says if default value is used for `io.tmp.dirs` config variable.
-	 */
-	public static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = key("internal.io.tmp.dirs.use-local-default")
-		.defaultValue(true)
-		.withDescription("key, which says if default value is used for `io.tmp.dirs` config variable.");
-
 	// ------------------------------------------------------------------------
 	//  program
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdec86e/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
index f92de1c..0a1bcc4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
@@ -65,6 +65,12 @@ public class UnmodifiableConfiguration extends Configuration {
 		error();
 	}
 
+	@Override
+	public <T> boolean removeConfig(ConfigOption<T> configOption) {
+		error();
+		return false;
+	}
+
 	private void error(){
 		throw new UnsupportedOperationException("The configuration is unmodifiable; its contents cannot be changed.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdec86e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 3b606f7..ebe7140 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -60,11 +61,19 @@ import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
+
 /**
  * Tools for starting JobManager and TaskManager processes, including the
  * Actor Systems used to run the JobManager and TaskManager actors.
  */
 public class BootstrapTools {
+	/**
+	 * Internal option which says if default value is used for {@link CoreOptions#TMP_DIRS}.
+	 */
+	private static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = key("internal.io.tmpdirs.use-local-default")
+		.defaultValue(false);
+
 	private static final Logger LOG = LoggerFactory.getLogger(BootstrapTools.class);
 
 	/**
@@ -235,7 +244,7 @@ public class BootstrapTools {
 				int numSlots,
 				FiniteDuration registrationTimeout) {
 
-		Configuration cfg = baseConfig.clone();
+		Configuration cfg = cloneConfiguration(baseConfig);
 
 		if (jobManagerHostname != null && !jobManagerHostname.isEmpty()) {
 			cfg.setString(JobManagerOptions.ADDRESS, jobManagerHostname);
@@ -250,10 +259,6 @@ public class BootstrapTools {
 			cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
 		}
 
-		if (baseConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
-			cfg.removeConfig(CoreOptions.TMP_DIRS);
-		}
-
 		return cfg;
 	}
 
@@ -482,11 +487,28 @@ public class BootstrapTools {
 		if (configuration.contains(CoreOptions.TMP_DIRS)) {
 			LOG.info("Overriding Fink's temporary file directories with those " +
 				"specified in the Flink config: {}", configuration.getValue(CoreOptions.TMP_DIRS));
-			configuration.setBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS, false);
 		}
 		else {
 			LOG.info("Setting directories for temporary files to: {}", defaultDirs);
 			configuration.setString(CoreOptions.TMP_DIRS, defaultDirs);
+			configuration.setBoolean(USE_LOCAL_DEFAULT_TMP_DIRS, true);
+		}
+	}
+
+	/**
+	 * Clones the given configuration and resets instance specific config options.
+	 *
+	 * @param configuration to clone
+	 * @return Cloned configuration with reset instance specific config options
+	 */
+	public static Configuration cloneConfiguration(Configuration configuration) {
+		final Configuration clonedConfiguration = new Configuration(configuration);
+
+		if (clonedConfiguration.getBoolean(USE_LOCAL_DEFAULT_TMP_DIRS)){
+			clonedConfiguration.removeConfig(CoreOptions.TMP_DIRS);
+			clonedConfiguration.removeConfig(USE_LOCAL_DEFAULT_TMP_DIRS);
 		}
+
+		return clonedConfiguration;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdec86e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index b08993a..49385e5 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -21,9 +21,9 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -473,15 +473,12 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 				taskManagerParameters.taskManagerHeapSizeMB(),
 				taskManagerParameters.taskManagerDirectMemoryLimitMB());
 
-		Configuration taskManagerConfig = flinkConfig.clone();
-		if (flinkConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
-			taskManagerConfig.removeConfig(CoreOptions.TMP_DIRS);
-		}
+		Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig);
 
 		log.debug("TaskManager configuration: {}", taskManagerConfig);
 
 		ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
-			taskManagerConfig,
+			flinkConfig,
 			yarnConfig,
 			env,
 			taskManagerParameters,


[3/3] flink git commit: [FLINK-9762][yarn] Use local default tmp directories on Yarn and Mesos

Posted by tr...@apache.org.
[FLINK-9762][yarn] Use local default tmp directories on Yarn and Mesos

This closes #6284.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec28f92f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec28f92f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec28f92f

Branch: refs/heads/master
Commit: ec28f92ffd042308494d9661a38ab462738611aa
Parents: 749dd29
Author: Oleksandr Nitavskyi <o....@criteo.com>
Authored: Mon Jul 2 09:42:17 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jul 19 11:43:03 2018 +0200

----------------------------------------------------------------------
 .../_includes/generated/core_configuration.html |  7 ++++-
 .../flink/configuration/Configuration.java      | 26 ++++++++++++++++++
 .../apache/flink/configuration/CoreOptions.java |  9 +++++-
 .../configuration/DelegatingConfiguration.java  |  5 ++++
 .../flink/configuration/ConfigurationTest.java  | 29 ++++++++++++++++++++
 .../mesos/entrypoint/MesosEntrypointUtils.java  | 12 ++------
 .../clusterframework/BootstrapTools.java        | 24 +++++++++++++++-
 .../clusterframework/BootstrapToolsTest.java    | 19 +++++++++++++
 .../flink/yarn/YarnApplicationMasterRunner.java | 13 ++-------
 .../apache/flink/yarn/YarnResourceManager.java  | 12 ++++++--
 .../flink/yarn/YarnTaskExecutorRunner.java      | 12 ++------
 .../yarn/YarnTaskManagerRunnerFactory.java      | 12 ++------
 .../yarn/entrypoint/YarnEntrypointUtils.java    | 13 ++-------
 13 files changed, 135 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/docs/_includes/generated/core_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/core_configuration.html b/docs/_includes/generated/core_configuration.html
index 91fa1a5..8281adc 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -23,8 +23,13 @@
             <td>Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).</td>
         </tr>
         <tr>
+            <td><h5>internal.io.tmp.dirs.use-local-default</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>key, which says if default value is used for `io.tmp.dirs` config variable.</td>
+        </tr>
+        <tr>
             <td><h5>io.tmp.dirs</h5></td>
-            <td style="word-wrap: break-word;">System.getProperty("java.io.tmpdir")</td>
+            <td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.</td>
             <td></td>
         </tr>
         <tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 7d99fbb..00c4c38 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -729,6 +729,32 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		}
 	}
 
+	/**
+	 * Removes given config option from the configuration.
+	 *
+	 * @param configOption config option to remove
+	 * @param <T> Type of the config option
+	 * @return true is config has been removed, false otherwise
+	 */
+	public <T> boolean removeConfig(ConfigOption<T> configOption){
+		synchronized (this.confData){
+			// try the current key
+			Object oldValue = this.confData.remove(configOption.key());
+			if (oldValue == null){
+				for (String deprecatedKey : configOption.deprecatedKeys()){
+					oldValue = this.confData.remove(deprecatedKey);
+					if (oldValue != null){
+						LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
+							deprecatedKey, configOption.key());
+						return true;
+					}
+				}
+				return false;
+			}
+			return true;
+		}
+	}
+
 
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 656943f..ea27e28 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -181,12 +181,19 @@ public class CoreOptions {
 	 * The config parameter defining the directories for temporary files, separated by
 	 * ",", "|", or the system's {@link java.io.File#pathSeparator}.
 	 */
-	@Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")")
+	@Documentation.OverrideDefault("'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty(\"java.io.tmpdir\") in standalone.")
 	public static final ConfigOption<String> TMP_DIRS =
 		key("io.tmp.dirs")
 			.defaultValue(System.getProperty("java.io.tmpdir"))
 			.withDeprecatedKeys("taskmanager.tmp.dirs");
 
+	/**
+	 * String key, which says if default value is used for `io.tmp.dirs` config variable.
+	 */
+	public static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = key("internal.io.tmp.dirs.use-local-default")
+		.defaultValue(true)
+		.withDescription("key, which says if default value is used for `io.tmp.dirs` config variable.");
+
 	// ------------------------------------------------------------------------
 	//  program
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index 7b75c7a..1a637f6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -311,6 +311,11 @@ public final class DelegatingConfiguration extends Configuration {
 	}
 
 	@Override
+	public <T> boolean removeConfig(ConfigOption<T> configOption){
+		return backingConfig.removeConfig(configOption);
+	}
+
+	@Override
 	public boolean containsKey(String key) {
 		return backingConfig.containsKey(prefix + key);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index 232c829..3b98a44 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -303,4 +304,32 @@ public class ConfigurationTest extends TestLogger {
 		assertEquals(13, cfg.getInteger(matchesThird));
 		assertEquals(-1, cfg.getInteger(notContained));
 	}
+
+	@Test
+	public void testRemove(){
+		Configuration cfg = new Configuration();
+		cfg.setInteger("a", 1);
+		cfg.setInteger("b", 2);
+
+		ConfigOption<Integer> validOption = ConfigOptions
+			.key("a")
+			.defaultValue(-1);
+
+		ConfigOption<Integer> deprecatedOption = ConfigOptions
+			.key("c")
+			.defaultValue(-1)
+			.withDeprecatedKeys("d", "b");
+
+		ConfigOption<Integer> unexistedOption = ConfigOptions
+			.key("e")
+			.defaultValue(-1)
+			.withDeprecatedKeys("f", "g", "j");
+
+		assertEquals("Wrong expectation about size", cfg.keySet().size(), 2);
+		assertTrue("Expected 'validOption' is removed", cfg.removeConfig(validOption));
+		assertEquals("Wrong expectation about size", cfg.keySet().size(), 1);
+		assertTrue("Expected 'existedOption' is removed", cfg.removeConfig(deprecatedOption));
+		assertEquals("Wrong expectation about size", cfg.keySet().size(), 0);
+		assertFalse("Expected 'unexistedOption' is not removed", cfg.removeConfig(unexistedOption));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
index 498f435..2059c8e 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
@@ -19,13 +19,13 @@
 package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
 import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
@@ -173,15 +173,7 @@ public class MesosEntrypointUtils {
 		final Map<String, String> envs = System.getenv();
 		final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
 
-		// configure local directory
-		if (configuration.contains(CoreOptions.TMP_DIRS)) {
-			log.info("Overriding Mesos' temporary file directories with those " +
-				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
-		}
-		else if (tmpDirs != null) {
-			log.info("Setting directories for temporary files to: {}", tmpDirs);
-			configuration.setString(CoreOptions.TMP_DIRS, tmpDirs);
-		}
+		BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, tmpDirs);
 
 		return configuration;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 7a8403a..3b606f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -250,7 +250,11 @@ public class BootstrapTools {
 			cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
 		}
 
-		return cfg; 
+		if (baseConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
+			cfg.removeConfig(CoreOptions.TMP_DIRS);
+		}
+
+		return cfg;
 	}
 
 	/**
@@ -467,4 +471,22 @@ public class BootstrapTools {
 		}
 		return template;
 	}
+
+	/**
+	 * Set temporary configuration directories if necessary
+	 *
+	 * @param configuration flink config to patch
+	 * @param defaultDirs in case no tmp directories is set, next directories will be applied
+	 */
+	public static void updateTmpDirectoriesInConfiguration(Configuration configuration, String defaultDirs){
+		if (configuration.contains(CoreOptions.TMP_DIRS)) {
+			LOG.info("Overriding Fink's temporary file directories with those " +
+				"specified in the Flink config: {}", configuration.getValue(CoreOptions.TMP_DIRS));
+			configuration.setBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS, false);
+		}
+		else {
+			LOG.info("Setting directories for temporary files to: {}", defaultDirs);
+			configuration.setString(CoreOptions.TMP_DIRS, defaultDirs);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index cf38fea..7e31c8e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -276,4 +276,23 @@ public class BootstrapToolsTest {
 					true, true, true, this.getClass()));
 
 	}
+
+	@Test
+	public void testUpdateTmpDirectoriesInConfiguration(){
+		Configuration config = new Configuration();
+
+		// test that default value is taken
+		BootstrapTools.updateTmpDirectoriesInConfiguration(config, "default/directory/path");
+		assertEquals(config.getString(CoreOptions.TMP_DIRS), "default/directory/path");
+
+		// test that we ignore default value is value is set before
+		BootstrapTools.updateTmpDirectoriesInConfiguration(config, "not/default/directory/path");
+		assertEquals(config.getString(CoreOptions.TMP_DIRS), "default/directory/path");
+
+		//test that empty value is not a magic string
+		config.setString(CoreOptions.TMP_DIRS, "");
+		BootstrapTools.updateTmpDirectoriesInConfiguration(config, "some/new/path");
+		assertEquals(config.getString(CoreOptions.TMP_DIRS), "");
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index eb977bf..497ac87 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -523,16 +522,8 @@ public class YarnApplicationMasterRunner {
 			ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
 			ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
 
-		// configure local directory
-		if (configuration.contains(CoreOptions.TMP_DIRS)) {
-			log.info("Overriding YARN's temporary file directories with those " +
-				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
-		}
-		else {
-			final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
-			log.info("Setting directories for temporary files to: {}", localDirs);
-			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
-		}
+		final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
+		BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
 
 		return configuration;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 0206ffb..b08993a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -472,14 +473,19 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 				taskManagerParameters.taskManagerHeapSizeMB(),
 				taskManagerParameters.taskManagerDirectMemoryLimitMB());
 
-		log.debug("TaskManager configuration: {}", flinkConfig);
+		Configuration taskManagerConfig = flinkConfig.clone();
+		if (flinkConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
+			taskManagerConfig.removeConfig(CoreOptions.TMP_DIRS);
+		}
+
+		log.debug("TaskManager configuration: {}", taskManagerConfig);
 
 		ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
-			flinkConfig,
+			taskManagerConfig,
 			yarnConfig,
 			env,
 			taskManagerParameters,
-			flinkConfig,
+			taskManagerConfig,
 			currDir,
 			YarnTaskExecutorRunner.class,
 			log);

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 94cd5a9..0e70de9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -21,10 +21,10 @@ package org.apache.flink.yarn;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -100,15 +100,7 @@ public class YarnTaskExecutorRunner {
 			final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
 			FileSystem.initialize(configuration);
 
-			// configure local directory
-			if (configuration.contains(CoreOptions.TMP_DIRS)) {
-				LOG.info("Overriding YARN's temporary file directories with those " +
-					"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
-			}
-			else {
-				LOG.info("Setting directories for temporary files to: {}", localDirs);
-				configuration.setString(CoreOptions.TMP_DIRS, localDirs);
-			}
+			BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
 
 			// tell akka to die in case of an error
 			configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
index d6f2364..9a01075 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
@@ -21,8 +21,8 @@ package org.apache.flink.yarn;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -121,15 +121,7 @@ public class YarnTaskManagerRunnerFactory {
 		final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
 		LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
 
-		// configure local directory
-		if (configuration.contains(CoreOptions.TMP_DIRS)) {
-			LOG.info("Overriding YARN's temporary file directories with those " +
-				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
-		}
-		else {
-			LOG.info("Setting directories for temporary files to: {}", localDirs);
-			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
-		}
+		BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
 
 		// tell akka to die in case of an error
 		configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index 49957e1..5566963 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -130,16 +129,8 @@ public class YarnEntrypointUtils {
 			configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
 		}
 
-		// configure local directory
-		if (configuration.contains(CoreOptions.TMP_DIRS)) {
-			log.info("Overriding YARN's temporary file directories with those " +
-				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
-		}
-		else {
-			final String localDirs = env.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
-			log.info("Setting directories for temporary files to: {}", localDirs);
-			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
-		}
+		final String localDirs = env.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
+		BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
 
 		return configuration;
 	}