You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by JTaky <gi...@git.apache.org> on 2018/07/09 09:55:16 UTC

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

GitHub user JTaky opened a pull request:

    https://github.com/apache/flink/pull/6284

    [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') wrongly managed on Yarn

    ## What is the purpose of the change
    
    Currently TMP_DIRS (io.tmp.dirs) option has a bug in case of default behaviour on Yarn platform:
    If not value has been set via parameter on Yarn platform the env value 'LOCAL_DIRS' is used on Application Master.
    When task manager is started this 'LOCAL_DIRS' is passed to the TaskManager, so TaskManager do not understand that he needs to have a default behaviour.
    The proposed ~~hack~~ fix is to remove TMP_DIRS value in case if default behaviour is required before starting TaskManager.
    
    ## Brief change log
    
    * Extract method which set _io.tmp.dirs_ configuration value
    * Fix configuration pass to TaskManager (**I am not sure what is the best way to implement this**):
    ** Add a new flag, saying _io.tmp.dirs_ is value has been overridden
    ** If _io.tmp.dirs_ has *not* been overridden we remove _io.tmp.dirs_ remove config value, by setting the empty value (should we remove? clone without a given key?)
    * Correction of the documentation
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as MiniClusterResource for Mesos part.
    1. I do not see a way to test launching of the TaskManager with a correct configuration.
    2. A unit test documenting specific logic of TMP_DIRS managing is added: BootstrapToolsTest#testSetTmpDirectoriesConfig
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JTaky/flink tmpDirs

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6284.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6284
    
----
commit d0c0bf7682843947bd1cfdf27775eb75c2864232
Author: Oleksandr Nitavskyi <o....@...>
Date:   2018-07-02T07:42:17Z

    [FLINK-9762][yarn] Extract method: tmp dirs setting

commit 99bdeb924a2bf91eab39462ac8253ee79dcb1137
Author: Oleksandr Nitavskyi <o....@...>
Date:   2018-07-02T08:06:56Z

    [FLINK-9762][yarn] Fix TMP_DIRS configuration passing
    
    * HACK: Instead of removing set value as empty

commit 2fd5754209508e082e4678e8605a25e3d25f8209
Author: Oleksandr Nitavskyi <o....@...>
Date:   2018-07-09T09:15:14Z

    [FLINK-9762] correct documenatation for 'io.tmp.dirs' config

----


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202084291
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---
    @@ -467,4 +471,22 @@ public static String getStartCommand(String template,
     		}
     		return template;
     	}
    +
    +	/**
    +	 * Set temporary configuration directories if necesary
    +	 *
    +	 * @param configuration flink config to patch
    +	 * @param defaultDirs in case no tmp directories is set, next directories will be applied
    +	 */
    +	public static void setTmpDirectoriesConfig(Configuration configuration, String defaultDirs){
    --- End diff --
    
    Should we maybe call it `updateTmpDirectoriesInConfiguration`?


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by JTaky <gi...@git.apache.org>.
Github user JTaky commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202756519
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---
    @@ -181,12 +181,17 @@
     	 * The config parameter defining the directories for temporary files, separated by
     	 * ",", "|", or the system's {@link java.io.File#pathSeparator}.
     	 */
    -	@Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")")
    +	@Documentation.OverrideDefault("'LOCAL_DIRS' on Yarn and '_FLINK_TMP_DIR' on Mesos.")
     	public static final ConfigOption<String> TMP_DIRS =
     		key("io.tmp.dirs")
     			.defaultValue(System.getProperty("java.io.tmpdir"))
     			.withDeprecatedKeys("taskmanager.tmp.dirs");
     
    +	/**
    +	 * String key, which says if variable `java.io.tmpdir` has been overridden for the cluster.
    +	 */
    +	public static final String TMP_DIRS_OVERRIDDEN = "io.tmp.dirs.overridden";
    --- End diff --
    
    done


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by JTaky <gi...@git.apache.org>.
Github user JTaky commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202756696
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -473,8 +474,13 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource
     
     		log.debug("TaskManager configuration: {}", flinkConfig);
     
    +		Configuration taskManagerConfig = flinkConfig.clone();
    +		if (!flinkConfig.containsKey(CoreOptions.TMP_DIRS_OVERRIDDEN)){
    +			taskManagerConfig.setString(CoreOptions.TMP_DIRS, "");	// HACK: emulate removal for the given key
    +		}
    --- End diff --
    
    same question as before :( didn't get the idea


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202087987
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -473,8 +474,13 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource
     
     		log.debug("TaskManager configuration: {}", flinkConfig);
    --- End diff --
    
    log the `flinkConfig` after cloning/resetting partial values.


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202087890
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -473,8 +474,13 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource
     
     		log.debug("TaskManager configuration: {}", flinkConfig);
     
    +		Configuration taskManagerConfig = flinkConfig.clone();
    +		if (!flinkConfig.containsKey(CoreOptions.TMP_DIRS_OVERRIDDEN)){
    +			taskManagerConfig.setString(CoreOptions.TMP_DIRS, "");	// HACK: emulate removal for the given key
    +		}
    --- End diff --
    
    Replace with `BootstratpTools#cloneConfiguration(Configuration)` in order to reduce code duplication.


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202086059
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---
    @@ -181,12 +181,17 @@
     	 * The config parameter defining the directories for temporary files, separated by
     	 * ",", "|", or the system's {@link java.io.File#pathSeparator}.
     	 */
    -	@Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")")
    +	@Documentation.OverrideDefault("'LOCAL_DIRS' on Yarn and '_FLINK_TMP_DIR' on Mesos.")
     	public static final ConfigOption<String> TMP_DIRS =
     		key("io.tmp.dirs")
     			.defaultValue(System.getProperty("java.io.tmpdir"))
     			.withDeprecatedKeys("taskmanager.tmp.dirs");
     
    +	/**
    +	 * String key, which says if variable `java.io.tmpdir` has been overridden for the cluster.
    +	 */
    +	public static final String TMP_DIRS_OVERRIDDEN = "io.tmp.dirs.overridden";
    --- End diff --
    
    Let's make this a proper `ConfigOption<Boolean>` with name `USE_LOCAL_DEFAULT_TMP_DIRS` key name `internal.io.tmp.dirs.use-local-default`


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202087570
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---
    @@ -250,6 +250,10 @@ public static Configuration generateTaskManagerConfiguration(
     			cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
     		}
     
    +		if (!baseConfig.containsKey(CoreOptions.TMP_DIRS_OVERRIDDEN)){
    +			cfg.setString(CoreOptions.TMP_DIRS, "");	// HACK: emulate removal for the given key
    +		}
    --- End diff --
    
    I would suggest to add a `BootstratpTools#cloneConfiguration(Configuration)` method which clones the given `Configuration` and resets all node specific fields. One of these fields is `CoreOptions#TMP_DIRS`. This means if `USE_LOCAL_DEFAULT_TMP_DIRS` is true, then we clear the `TMP_DIRS` option. We actually need for that a `Configuration#clear(ConfigOption)` method which removes the option entry from `Configuration#confData`. Otherwise we might risk that we don't use the TMP_DIRS default value but instead `""` if there are no local default tmp dirs.


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by JTaky <gi...@git.apache.org>.
Github user JTaky commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202755913
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -473,8 +474,13 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource
     
     		log.debug("TaskManager configuration: {}", flinkConfig);
    --- End diff --
    
    done


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6284


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by JTaky <gi...@git.apache.org>.
Github user JTaky commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202321585
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---
    @@ -467,4 +471,22 @@ public static String getStartCommand(String template,
     		}
     		return template;
     	}
    +
    +	/**
    +	 * Set temporary configuration directories if necesary
    +	 *
    +	 * @param configuration flink config to patch
    +	 * @param defaultDirs in case no tmp directories is set, next directories will be applied
    +	 */
    +	public static void setTmpDirectoriesConfig(Configuration configuration, String defaultDirs){
    +		if (configuration.contains(CoreOptions.TMP_DIRS) && !configuration.getString(CoreOptions.TMP_DIRS).isEmpty()) {
    +			LOG.info("Overriding Fink's temporary file directories with those " +
    +				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
    --- End diff --
    
    done


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by JTaky <gi...@git.apache.org>.
Github user JTaky commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202756536
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---
    @@ -250,6 +250,10 @@ public static Configuration generateTaskManagerConfiguration(
     			cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
     		}
     
    +		if (!baseConfig.containsKey(CoreOptions.TMP_DIRS_OVERRIDDEN)){
    +			cfg.setString(CoreOptions.TMP_DIRS, "");	// HACK: emulate removal for the given key
    +		}
    --- End diff --
    
    Agree, magic values are the dirtiest way.
    Will go with a 'clear' (probably remove, in order to mimic java collection API) method.
    
    Didn't get the point with clone method.  Is it in order to factorize this 4 lines or do you want to make it generic and use for all custom settings? In such case we should extract list of predicates per each configuration which looks quite complex as an API


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by JTaky <gi...@git.apache.org>.
Github user JTaky commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202321582
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---
    @@ -467,4 +471,22 @@ public static String getStartCommand(String template,
     		}
     		return template;
     	}
    +
    +	/**
    +	 * Set temporary configuration directories if necesary
    +	 *
    +	 * @param configuration flink config to patch
    +	 * @param defaultDirs in case no tmp directories is set, next directories will be applied
    +	 */
    +	public static void setTmpDirectoriesConfig(Configuration configuration, String defaultDirs){
    --- End diff --
    
    done


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202075077
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---
    @@ -181,12 +181,17 @@
     	 * The config parameter defining the directories for temporary files, separated by
     	 * ",", "|", or the system's {@link java.io.File#pathSeparator}.
     	 */
    -	@Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")")
    +	@Documentation.OverrideDefault("'LOCAL_DIRS' on Yarn and '_FLINK_TMP_DIR' on Mesos.")
    --- End diff --
    
    We should also keep saying that in the standalone mode we use `System.getProperty("java.io.tmpdir")`


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202082557
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---
    @@ -467,4 +471,22 @@ public static String getStartCommand(String template,
     		}
     		return template;
     	}
    +
    +	/**
    +	 * Set temporary configuration directories if necesary
    +	 *
    +	 * @param configuration flink config to patch
    +	 * @param defaultDirs in case no tmp directories is set, next directories will be applied
    +	 */
    +	public static void setTmpDirectoriesConfig(Configuration configuration, String defaultDirs){
    +		if (configuration.contains(CoreOptions.TMP_DIRS) && !configuration.getString(CoreOptions.TMP_DIRS).isEmpty()) {
    +			LOG.info("Overriding Fink's temporary file directories with those " +
    +				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
    --- End diff --
    
    Should use logging placeholder `{}`


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by JTaky <gi...@git.apache.org>.
Github user JTaky commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202321593
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---
    @@ -181,12 +181,17 @@
     	 * The config parameter defining the directories for temporary files, separated by
     	 * ",", "|", or the system's {@link java.io.File#pathSeparator}.
     	 */
    -	@Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")")
    +	@Documentation.OverrideDefault("'LOCAL_DIRS' on Yarn and '_FLINK_TMP_DIR' on Mesos.")
    --- End diff --
    
    done


---

[GitHub] flink issue #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') wrongly...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/6284
  
    find related test error, please recheck~


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202090987
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -473,8 +474,13 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource
     
     		log.debug("TaskManager configuration: {}", flinkConfig);
     
    +		Configuration taskManagerConfig = flinkConfig.clone();
    +		if (!flinkConfig.containsKey(CoreOptions.TMP_DIRS_OVERRIDDEN)){
    +			taskManagerConfig.setString(CoreOptions.TMP_DIRS, "");	// HACK: emulate removal for the given key
    +		}
    +
     		ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
    -			flinkConfig,
    --- End diff --
    
    here we should keep the `flinkConfig` and instead replace the 5th argument with `taskManagerConfig` because it is this `Configuration` which is given to the new `TaskManager` container.


---

[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

Posted by JTaky <gi...@git.apache.org>.
Github user JTaky commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6284#discussion_r202755897
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -473,8 +474,13 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource
     
     		log.debug("TaskManager configuration: {}", flinkConfig);
     
    +		Configuration taskManagerConfig = flinkConfig.clone();
    +		if (!flinkConfig.containsKey(CoreOptions.TMP_DIRS_OVERRIDDEN)){
    +			taskManagerConfig.setString(CoreOptions.TMP_DIRS, "");	// HACK: emulate removal for the given key
    +		}
    +
     		ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
    -			flinkConfig,
    --- End diff --
    
    done. Thank you very much!
    Since you seem fine with this hacky approach I have tested and made stable the last PR on our Yarn cluster.


---