You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/19 09:43:17 UTC
[1/3] flink git commit: [hotfix] Fix checkstyle violations in
BootstrapTool
Repository: flink
Updated Branches:
refs/heads/master 749dd2993 -> 44ed5ef0f
[hotfix] Fix checkstyle violations in BootstrapTool
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44ed5ef0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44ed5ef0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44ed5ef0
Branch: refs/heads/master
Commit: 44ed5ef0fc1c221f3916ab5126f1bc8ee5dfb45d
Parents: 6bdec86
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jul 18 15:49:55 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jul 19 11:43:03 2018 +0200
----------------------------------------------------------------------
.../clusterframework/BootstrapTools.java | 23 +++++++++-----------
1 file changed, 10 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/44ed5ef0/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index ebe7140..b43946c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -270,8 +270,7 @@ public class BootstrapTools {
*/
public static void writeConfiguration(Configuration cfg, File file) throws IOException {
try (FileWriter fwrt = new FileWriter(file);
- PrintWriter out = new PrintWriter(fwrt))
- {
+ PrintWriter out = new PrintWriter(fwrt)) {
for (String key : cfg.keySet()) {
String value = cfg.getString(key, null);
out.print(key);
@@ -331,7 +330,7 @@ public class BootstrapTools {
/**
* Get an instance of the dynamic properties option.
*
- * Dynamic properties allow the user to specify additional configuration values with -D, such as
+ * <p>Dynamic properties allow the user to specify additional configuration values with -D, such as
* <tt> -Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624</tt>
*/
public static Option newDynamicPropertiesOption() {
@@ -345,13 +344,13 @@ public class BootstrapTools {
final Configuration config = new Configuration();
String[] values = cmd.getOptionValues(DYNAMIC_PROPERTIES_OPT);
- if(values != null) {
- for(String value : values) {
+ if (values != null) {
+ for (String value : values) {
String[] pair = value.split("=", 2);
- if(pair.length == 1) {
+ if (pair.length == 1) {
config.setString(pair[0], Boolean.TRUE.toString());
}
- else if(pair.length == 2) {
+ else if (pair.length == 2) {
config.setString(pair[0], pair[1]);
}
}
@@ -401,7 +400,7 @@ public class BootstrapTools {
}
//applicable only for YarnMiniCluster secure test run
//krb5.conf file will be available as local resource in JM/TM container
- if(hasKrb5) {
+ if (hasKrb5) {
javaOpts += " -Djava.security.krb5.conf=krb5.conf";
}
startCommandValues.put("jvmopts", javaOpts);
@@ -439,12 +438,11 @@ public class BootstrapTools {
// ------------------------------------------------------------------------
- /** Private constructor to prevent instantiation */
+ /** Private constructor to prevent instantiation. */
private BootstrapTools() {}
/**
- * Replaces placeholders in the template start command with values from
- * <tt>startCommandValues</tt>.
+ * Replaces placeholders in the template start command with values from startCommandValues.
*
* <p>If the default template {@link ConfigConstants#DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE}
* is used, the following keys must be present in the map or the resulting
@@ -458,7 +456,6 @@ public class BootstrapTools {
* <li><tt>args</tt> = arguments for the main class</li>
* <li><tt>redirects</tt> = output redirects</li>
* </ul>
- * </p>
*
* @param template
* a template start command with placeholders
@@ -478,7 +475,7 @@ public class BootstrapTools {
}
/**
- * Set temporary configuration directories if necessary
+ * Set temporary configuration directories if necessary.
*
* @param configuration flink config to patch
* @param defaultDirs in case no tmp directories is set, next directories will be applied
[2/3] flink git commit: [FLINK-9762] Consolidate configuration
cloning in BootstrapTools
Posted by tr...@apache.org.
[FLINK-9762] Consolidate configuration cloning in BootstrapTools
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bdec86e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bdec86e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bdec86e
Branch: refs/heads/master
Commit: 6bdec86e31d82d7c38d2509a039a1a03ab9f246e
Parents: ec28f92
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jul 18 15:45:15 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jul 19 11:43:03 2018 +0200
----------------------------------------------------------------------
.../_includes/generated/core_configuration.html | 5 ---
.../apache/flink/configuration/CoreOptions.java | 7 ----
.../UnmodifiableConfiguration.java | 6 ++++
.../clusterframework/BootstrapTools.java | 34 ++++++++++++++++----
.../apache/flink/yarn/YarnResourceManager.java | 9 ++----
5 files changed, 37 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6bdec86e/docs/_includes/generated/core_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/core_configuration.html b/docs/_includes/generated/core_configuration.html
index 8281adc..98cca91 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -23,11 +23,6 @@
<td>Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).</td>
</tr>
<tr>
- <td><h5>internal.io.tmp.dirs.use-local-default</h5></td>
- <td style="word-wrap: break-word;">true</td>
- <td>key, which says if default value is used for `io.tmp.dirs` config variable.</td>
- </tr>
- <tr>
<td><h5>io.tmp.dirs</h5></td>
<td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.</td>
<td></td>
http://git-wip-us.apache.org/repos/asf/flink/blob/6bdec86e/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index ea27e28..16c9b54 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -187,13 +187,6 @@ public class CoreOptions {
.defaultValue(System.getProperty("java.io.tmpdir"))
.withDeprecatedKeys("taskmanager.tmp.dirs");
- /**
- * String key, which says if default value is used for `io.tmp.dirs` config variable.
- */
- public static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = key("internal.io.tmp.dirs.use-local-default")
- .defaultValue(true)
- .withDescription("key, which says if default value is used for `io.tmp.dirs` config variable.");
-
// ------------------------------------------------------------------------
// program
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6bdec86e/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
index f92de1c..0a1bcc4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
@@ -65,6 +65,12 @@ public class UnmodifiableConfiguration extends Configuration {
error();
}
+ @Override
+ public <T> boolean removeConfig(ConfigOption<T> configOption) {
+ error();
+ return false;
+ }
+
private void error(){
throw new UnsupportedOperationException("The configuration is unmodifiable; its contents cannot be changed.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bdec86e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 3b606f7..ebe7140 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
@@ -60,11 +61,19 @@ import scala.Some;
import scala.Tuple2;
import scala.concurrent.duration.FiniteDuration;
+import static org.apache.flink.configuration.ConfigOptions.key;
+
/**
* Tools for starting JobManager and TaskManager processes, including the
* Actor Systems used to run the JobManager and TaskManager actors.
*/
public class BootstrapTools {
+ /**
+ * Internal option which says if default value is used for {@link CoreOptions#TMP_DIRS}.
+ */
+ private static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = key("internal.io.tmpdirs.use-local-default")
+ .defaultValue(false);
+
private static final Logger LOG = LoggerFactory.getLogger(BootstrapTools.class);
/**
@@ -235,7 +244,7 @@ public class BootstrapTools {
int numSlots,
FiniteDuration registrationTimeout) {
- Configuration cfg = baseConfig.clone();
+ Configuration cfg = cloneConfiguration(baseConfig);
if (jobManagerHostname != null && !jobManagerHostname.isEmpty()) {
cfg.setString(JobManagerOptions.ADDRESS, jobManagerHostname);
@@ -250,10 +259,6 @@ public class BootstrapTools {
cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
}
- if (baseConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
- cfg.removeConfig(CoreOptions.TMP_DIRS);
- }
-
return cfg;
}
@@ -482,11 +487,28 @@ public class BootstrapTools {
if (configuration.contains(CoreOptions.TMP_DIRS)) {
LOG.info("Overriding Fink's temporary file directories with those " +
"specified in the Flink config: {}", configuration.getValue(CoreOptions.TMP_DIRS));
- configuration.setBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS, false);
}
else {
LOG.info("Setting directories for temporary files to: {}", defaultDirs);
configuration.setString(CoreOptions.TMP_DIRS, defaultDirs);
+ configuration.setBoolean(USE_LOCAL_DEFAULT_TMP_DIRS, true);
+ }
+ }
+
+ /**
+ * Clones the given configuration and resets instance specific config options.
+ *
+ * @param configuration to clone
+ * @return Cloned configuration with reset instance specific config options
+ */
+ public static Configuration cloneConfiguration(Configuration configuration) {
+ final Configuration clonedConfiguration = new Configuration(configuration);
+
+ if (clonedConfiguration.getBoolean(USE_LOCAL_DEFAULT_TMP_DIRS)){
+ clonedConfiguration.removeConfig(CoreOptions.TMP_DIRS);
+ clonedConfiguration.removeConfig(USE_LOCAL_DEFAULT_TMP_DIRS);
}
+
+ return clonedConfiguration;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bdec86e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index b08993a..49385e5 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -21,9 +21,9 @@ package org.apache.flink.yarn;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -473,15 +473,12 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
taskManagerParameters.taskManagerHeapSizeMB(),
taskManagerParameters.taskManagerDirectMemoryLimitMB());
- Configuration taskManagerConfig = flinkConfig.clone();
- if (flinkConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
- taskManagerConfig.removeConfig(CoreOptions.TMP_DIRS);
- }
+ Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig);
log.debug("TaskManager configuration: {}", taskManagerConfig);
ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
- taskManagerConfig,
+ flinkConfig,
yarnConfig,
env,
taskManagerParameters,
[3/3] flink git commit: [FLINK-9762][yarn] Use local default tmp
directories on Yarn and Mesos
Posted by tr...@apache.org.
[FLINK-9762][yarn] Use local default tmp directories on Yarn and Mesos
This closes #6284.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec28f92f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec28f92f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec28f92f
Branch: refs/heads/master
Commit: ec28f92ffd042308494d9661a38ab462738611aa
Parents: 749dd29
Author: Oleksandr Nitavskyi <o....@criteo.com>
Authored: Mon Jul 2 09:42:17 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jul 19 11:43:03 2018 +0200
----------------------------------------------------------------------
.../_includes/generated/core_configuration.html | 7 ++++-
.../flink/configuration/Configuration.java | 26 ++++++++++++++++++
.../apache/flink/configuration/CoreOptions.java | 9 +++++-
.../configuration/DelegatingConfiguration.java | 5 ++++
.../flink/configuration/ConfigurationTest.java | 29 ++++++++++++++++++++
.../mesos/entrypoint/MesosEntrypointUtils.java | 12 ++------
.../clusterframework/BootstrapTools.java | 24 +++++++++++++++-
.../clusterframework/BootstrapToolsTest.java | 19 +++++++++++++
.../flink/yarn/YarnApplicationMasterRunner.java | 13 ++-------
.../apache/flink/yarn/YarnResourceManager.java | 12 ++++++--
.../flink/yarn/YarnTaskExecutorRunner.java | 12 ++------
.../yarn/YarnTaskManagerRunnerFactory.java | 12 ++------
.../yarn/entrypoint/YarnEntrypointUtils.java | 13 ++-------
13 files changed, 135 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/docs/_includes/generated/core_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/core_configuration.html b/docs/_includes/generated/core_configuration.html
index 91fa1a5..8281adc 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -23,8 +23,13 @@
<td>Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).</td>
</tr>
<tr>
+ <td><h5>internal.io.tmp.dirs.use-local-default</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>key, which says if default value is used for `io.tmp.dirs` config variable.</td>
+ </tr>
+ <tr>
<td><h5>io.tmp.dirs</h5></td>
- <td style="word-wrap: break-word;">System.getProperty("java.io.tmpdir")</td>
+ <td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.</td>
<td></td>
</tr>
<tr>
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 7d99fbb..00c4c38 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -729,6 +729,32 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
}
}
+ /**
+ * Removes given config option from the configuration.
+ *
+ * @param configOption config option to remove
+ * @param <T> Type of the config option
+ * @return true is config has been removed, false otherwise
+ */
+ public <T> boolean removeConfig(ConfigOption<T> configOption){
+ synchronized (this.confData){
+ // try the current key
+ Object oldValue = this.confData.remove(configOption.key());
+ if (oldValue == null){
+ for (String deprecatedKey : configOption.deprecatedKeys()){
+ oldValue = this.confData.remove(deprecatedKey);
+ if (oldValue != null){
+ LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
+ deprecatedKey, configOption.key());
+ return true;
+ }
+ }
+ return false;
+ }
+ return true;
+ }
+ }
+
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 656943f..ea27e28 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -181,12 +181,19 @@ public class CoreOptions {
* The config parameter defining the directories for temporary files, separated by
* ",", "|", or the system's {@link java.io.File#pathSeparator}.
*/
- @Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")")
+ @Documentation.OverrideDefault("'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty(\"java.io.tmpdir\") in standalone.")
public static final ConfigOption<String> TMP_DIRS =
key("io.tmp.dirs")
.defaultValue(System.getProperty("java.io.tmpdir"))
.withDeprecatedKeys("taskmanager.tmp.dirs");
+ /**
+ * String key, which says if default value is used for `io.tmp.dirs` config variable.
+ */
+ public static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = key("internal.io.tmp.dirs.use-local-default")
+ .defaultValue(true)
+ .withDescription("key, which says if default value is used for `io.tmp.dirs` config variable.");
+
// ------------------------------------------------------------------------
// program
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index 7b75c7a..1a637f6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -311,6 +311,11 @@ public final class DelegatingConfiguration extends Configuration {
}
@Override
+ public <T> boolean removeConfig(ConfigOption<T> configOption){
+ return backingConfig.removeConfig(configOption);
+ }
+
+ @Override
public boolean containsKey(String key) {
return backingConfig.containsKey(prefix + key);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index 232c829..3b98a44 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -303,4 +304,32 @@ public class ConfigurationTest extends TestLogger {
assertEquals(13, cfg.getInteger(matchesThird));
assertEquals(-1, cfg.getInteger(notContained));
}
+
+ @Test
+ public void testRemove(){
+ Configuration cfg = new Configuration();
+ cfg.setInteger("a", 1);
+ cfg.setInteger("b", 2);
+
+ ConfigOption<Integer> validOption = ConfigOptions
+ .key("a")
+ .defaultValue(-1);
+
+ ConfigOption<Integer> deprecatedOption = ConfigOptions
+ .key("c")
+ .defaultValue(-1)
+ .withDeprecatedKeys("d", "b");
+
+ ConfigOption<Integer> unexistedOption = ConfigOptions
+ .key("e")
+ .defaultValue(-1)
+ .withDeprecatedKeys("f", "g", "j");
+
+ assertEquals("Wrong expectation about size", cfg.keySet().size(), 2);
+ assertTrue("Expected 'validOption' is removed", cfg.removeConfig(validOption));
+ assertEquals("Wrong expectation about size", cfg.keySet().size(), 1);
+ assertTrue("Expected 'existedOption' is removed", cfg.removeConfig(deprecatedOption));
+ assertEquals("Wrong expectation about size", cfg.keySet().size(), 0);
+ assertFalse("Expected 'unexistedOption' is not removed", cfg.removeConfig(unexistedOption));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
index 498f435..2059c8e 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
@@ -19,13 +19,13 @@
package org.apache.flink.mesos.entrypoint;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
@@ -173,15 +173,7 @@ public class MesosEntrypointUtils {
final Map<String, String> envs = System.getenv();
final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
- // configure local directory
- if (configuration.contains(CoreOptions.TMP_DIRS)) {
- log.info("Overriding Mesos' temporary file directories with those " +
- "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
- }
- else if (tmpDirs != null) {
- log.info("Setting directories for temporary files to: {}", tmpDirs);
- configuration.setString(CoreOptions.TMP_DIRS, tmpDirs);
- }
+ BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, tmpDirs);
return configuration;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 7a8403a..3b606f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -250,7 +250,11 @@ public class BootstrapTools {
cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
}
- return cfg;
+ if (baseConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
+ cfg.removeConfig(CoreOptions.TMP_DIRS);
+ }
+
+ return cfg;
}
/**
@@ -467,4 +471,22 @@ public class BootstrapTools {
}
return template;
}
+
+ /**
+ * Set temporary configuration directories if necessary
+ *
+ * @param configuration flink config to patch
+ * @param defaultDirs in case no tmp directories is set, next directories will be applied
+ */
+ public static void updateTmpDirectoriesInConfiguration(Configuration configuration, String defaultDirs){
+ if (configuration.contains(CoreOptions.TMP_DIRS)) {
+ LOG.info("Overriding Fink's temporary file directories with those " +
+ "specified in the Flink config: {}", configuration.getValue(CoreOptions.TMP_DIRS));
+ configuration.setBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS, false);
+ }
+ else {
+ LOG.info("Setting directories for temporary files to: {}", defaultDirs);
+ configuration.setString(CoreOptions.TMP_DIRS, defaultDirs);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index cf38fea..7e31c8e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -276,4 +276,23 @@ public class BootstrapToolsTest {
true, true, true, this.getClass()));
}
+
+ @Test
+ public void testUpdateTmpDirectoriesInConfiguration(){
+ Configuration config = new Configuration();
+
+ // test that default value is taken
+ BootstrapTools.updateTmpDirectoriesInConfiguration(config, "default/directory/path");
+ assertEquals(config.getString(CoreOptions.TMP_DIRS), "default/directory/path");
+
+ // test that we ignore default value is value is set before
+ BootstrapTools.updateTmpDirectoriesInConfiguration(config, "not/default/directory/path");
+ assertEquals(config.getString(CoreOptions.TMP_DIRS), "default/directory/path");
+
+ //test that empty value is not a magic string
+ config.setString(CoreOptions.TMP_DIRS, "");
+ BootstrapTools.updateTmpDirectoriesInConfiguration(config, "some/new/path");
+ assertEquals(config.getString(CoreOptions.TMP_DIRS), "");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index eb977bf..497ac87 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
@@ -523,16 +522,8 @@ public class YarnApplicationMasterRunner {
ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
- // configure local directory
- if (configuration.contains(CoreOptions.TMP_DIRS)) {
- log.info("Overriding YARN's temporary file directories with those " +
- "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
- }
- else {
- final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
- log.info("Setting directories for temporary files to: {}", localDirs);
- configuration.setString(CoreOptions.TMP_DIRS, localDirs);
- }
+ final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
+ BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
return configuration;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 0206ffb..b08993a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -472,14 +473,19 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
taskManagerParameters.taskManagerHeapSizeMB(),
taskManagerParameters.taskManagerDirectMemoryLimitMB());
- log.debug("TaskManager configuration: {}", flinkConfig);
+ Configuration taskManagerConfig = flinkConfig.clone();
+ if (flinkConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
+ taskManagerConfig.removeConfig(CoreOptions.TMP_DIRS);
+ }
+
+ log.debug("TaskManager configuration: {}", taskManagerConfig);
ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
- flinkConfig,
+ taskManagerConfig,
yarnConfig,
env,
taskManagerParameters,
- flinkConfig,
+ taskManagerConfig,
currDir,
YarnTaskExecutorRunner.class,
log);
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 94cd5a9..0e70de9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -21,10 +21,10 @@ package org.apache.flink.yarn;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
@@ -100,15 +100,7 @@ public class YarnTaskExecutorRunner {
final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
- // configure local directory
- if (configuration.contains(CoreOptions.TMP_DIRS)) {
- LOG.info("Overriding YARN's temporary file directories with those " +
- "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
- }
- else {
- LOG.info("Setting directories for temporary files to: {}", localDirs);
- configuration.setString(CoreOptions.TMP_DIRS, localDirs);
- }
+ BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
// tell akka to die in case of an error
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
index d6f2364..9a01075 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
@@ -21,8 +21,8 @@ package org.apache.flink.yarn;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
@@ -121,15 +121,7 @@ public class YarnTaskManagerRunnerFactory {
final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
- // configure local directory
- if (configuration.contains(CoreOptions.TMP_DIRS)) {
- LOG.info("Overriding YARN's temporary file directories with those " +
- "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
- }
- else {
- LOG.info("Setting directories for temporary files to: {}", localDirs);
- configuration.setString(CoreOptions.TMP_DIRS, localDirs);
- }
+ BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
// tell akka to die in case of an error
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
http://git-wip-us.apache.org/repos/asf/flink/blob/ec28f92f/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index 49957e1..5566963 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.yarn.entrypoint;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
@@ -130,16 +129,8 @@ public class YarnEntrypointUtils {
configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}
- // configure local directory
- if (configuration.contains(CoreOptions.TMP_DIRS)) {
- log.info("Overriding YARN's temporary file directories with those " +
- "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
- }
- else {
- final String localDirs = env.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
- log.info("Setting directories for temporary files to: {}", localDirs);
- configuration.setString(CoreOptions.TMP_DIRS, localDirs);
- }
+ final String localDirs = env.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
+ BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
return configuration;
}