You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by yanghua <gi...@git.apache.org> on 2018/07/10 15:20:10 UTC

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

GitHub user yanghua opened a pull request:

    https://github.com/apache/flink/pull/6297

    [FLINK-9777] YARN: JM and TM Memory must be specified with Units

    ## What is the purpose of the change
    
    *This pull request specify unit for JM and TM memory on YARN mode*
    
    
    ## Brief change log
    
      - *parse the jm and tm with default MB unit*
      - *change related document*
    
    ## Verifying this change
    
    This change is already covered by existing tests*.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yanghua/flink FLINK-9777

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6297.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6297
    
----
commit 0ba28e996e5dc01040b9dd4cc9d3d86f6cb9dacd
Author: yanghua <ya...@...>
Date:   2018-07-10T15:16:12Z

    [FLINK-9777] YARN: JM and TM Memory must be specified with Units

----


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r203000441
  
    --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---
    @@ -352,6 +352,76 @@ public void testConfigurationClusterSpecification() throws Exception {
     		assertThat(clusterSpecification.getSlotsPerTaskManager(), is(slotsPerTaskManager));
     	}
     
    +	/**
    +	 * Tests the specifying heap memory for job manager and task manager.
    +	 */
    +	@Test
    +	public void testHeapMemoryProperty() throws Exception {
    +		//without unit
    +		String[] args = new String[] { "-yn", "2", "-yjm", "1024", "-ytm", "2048" };
    +
    +		FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
    +			new Configuration(),
    +			tmp.getRoot().getAbsolutePath(),
    +			"y",
    +			"yarn");
    +
    +		CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
    +
    +		ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
    +
    +		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
    +		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
    +
    +		//with unit "m"
    --- End diff --
    
    This should be a separate test


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6297


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202338957
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -500,11 +501,11 @@ protected Configuration applyCommandLineOptionsToConfiguration(CommandLine comma
     		}
     
     		if (commandLine.hasOption(jmMemory.getOpt())) {
    -			effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(jmMemory.getOpt()));
    +			effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(jmMemory.getOpt()) + "m");
    --- End diff --
    
    I think we have to be a bit smarter here. The unit might be already provided from cli, right?


---

[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/6297
  
    LGTM for me, Thanks for your contribution @yanghua.
    
    @GJL would you like to have last look, or can I merge it?


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202075216
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
     		}
     
     		// JobManager Memory
    -		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
    +		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
    --- End diff --
    
    introduce a new config key is used to make the `jobmanager.heap.mb` backwards compatibility in flink config file (config.sh can calculate it accurately). And user can specify the unit for the value of the key `jobmanager.heap.size` .
    
    So if we remove anything about `JOB_MANAGER_HEAP_MEMORY_MB ` in Java and Scala code, is there any problem?


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r201980407
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
     		}
     
     		// JobManager Memory
    -		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
    +		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
    --- End diff --
    
    Was there a reason against making `JOB_MANAGER_HEAP_MEMORY_MB` a deprecated key for `JOB_MANAGER_HEAP_MEMORY` when you worked on FLINK-6469? That is:
    ```
    public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =
    		key("jobmanager.heap.size")
    			.defaultValue("1024m")
    			.withDeprecatedKeys("jobmanager.heap.mb")
    		    ...
    ```
    I am not sure what happens if someone only configures `jobmanager.heap.mb` in `flink-conf.yaml`. Will the value be respected for all deployment modes?
    
    Same holds for the TaskManager config key.


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r203000672
  
    --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---
    @@ -352,6 +352,76 @@ public void testConfigurationClusterSpecification() throws Exception {
     		assertThat(clusterSpecification.getSlotsPerTaskManager(), is(slotsPerTaskManager));
     	}
     
    +	/**
    +	 * Tests the specifying heap memory for job manager and task manager.
    +	 */
    +	@Test
    +	public void testHeapMemoryProperty() throws Exception {
    +		//without unit
    +		String[] args = new String[] { "-yn", "2", "-yjm", "1024", "-ytm", "2048" };
    +
    +		FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
    +			new Configuration(),
    +			tmp.getRoot().getAbsolutePath(),
    +			"y",
    +			"yarn");
    +
    +		CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
    +
    +		ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
    +
    +		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
    +		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
    +
    +		//with unit "m"
    +		args = new String[] { "-yn", "2", "-yjm", "1024m", "-ytm", "2048m" };
    +		commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
    +		clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
    +
    +		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
    +		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
    +
    +		//with unit non "m"
    +		args = new String[] { "-yn", "2", "-yjm", "1g", "-ytm", "2g" };
    +		commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
    +		clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
    +
    +		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
    +		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
    +
    +		//specify old config key
    --- End diff --
    
    This should be a separate test


---

[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on the issue:

    https://github.com/apache/flink/pull/6297
  
    I will take a look later.


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202018873
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
     		}
     
     		// JobManager Memory
    -		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
    +		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
    --- End diff --
    
    I think the problem here would be that we would need to get back to MB as a default unit for this option. Or am I wrong here? The whole idea for introducing the new option was to switch to bytes as a default one, if none provided. 


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r203000456
  
    --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---
    @@ -352,6 +352,76 @@ public void testConfigurationClusterSpecification() throws Exception {
     		assertThat(clusterSpecification.getSlotsPerTaskManager(), is(slotsPerTaskManager));
     	}
     
    +	/**
    +	 * Tests the specifying heap memory for job manager and task manager.
    +	 */
    +	@Test
    +	public void testHeapMemoryProperty() throws Exception {
    +		//without unit
    +		String[] args = new String[] { "-yn", "2", "-yjm", "1024", "-ytm", "2048" };
    +
    +		FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
    +			new Configuration(),
    +			tmp.getRoot().getAbsolutePath(),
    +			"y",
    +			"yarn");
    +
    +		CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
    +
    +		ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
    +
    +		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
    +		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
    +
    +		//with unit "m"
    +		args = new String[] { "-yn", "2", "-yjm", "1024m", "-ytm", "2048m" };
    +		commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
    +		clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
    +
    +		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
    +		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
    +
    +		//with unit non "m"
    --- End diff --
    
    This should be a separate test


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202020619
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
     		}
     
     		// JobManager Memory
    -		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
    +		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
    --- End diff --
    
    @dawidwys As I understand, backwards compatibility should be prioritized. See https://github.com/apache/flink/pull/5448#issuecomment-394296226
    
    > We need to add an additional MemoryUnit.parse() method that takes the "default" unit, so that we parse the old heap sizes such that they are in MB if nothing else is specified.


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202339002
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -500,11 +501,11 @@ protected Configuration applyCommandLineOptionsToConfiguration(CommandLine comma
     		}
     
     		if (commandLine.hasOption(jmMemory.getOpt())) {
    -			effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(jmMemory.getOpt()));
    +			effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(jmMemory.getOpt()) + "m");
     		}
     
     		if (commandLine.hasOption(tmMemory.getOpt())) {
    -			effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(tmMemory.getOpt()));
    +			effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(tmMemory.getOpt()) + "m");
    --- End diff --
    
    Same as above.


---

[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/6297
  
    @zentol can you review this?


---

[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/6297
  
    @GJL please review~


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202071052
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
     		}
     
     		// JobManager Memory
    -		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
    +		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
    --- End diff --
    
    to @GJL we can not use `.withDeprecatedKeys("jobmanager.heap.mb")` because `jobmanager.heap.size` and `jobmanager.heap.mb` has different meaning. The former can use different unit such **1g** but the latter can just measure with **MB**.
    
    to @dawidwys and @GJL , now the `jobmanager.heap.mb` just used in config file, and can be calculated accurately, this is used for backwards compatibility, but in the project, it is useless, all the place can be replaced with `jobmanager.heap.size`, and the key in the code could not been exposed to the user?


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202059575
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
     		}
     
     		// JobManager Memory
    -		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
    +		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
    --- End diff --
    
    Imagine a case when previously we had in configuration:
    
        jobmanager.heap.mb: 1024
    
    which meant 1024 MB, in the new code we will pass 1024 as value for `jobmanager.heap.size`, which was introduced solely to treat numbers without units as bytes. Therefore it will be parsed as 1024 bytes rather than 1024 MB.
    
    If we switch the default unit to MB, there is no point for having a new `ConfigOption`.


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202338296
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java ---
    @@ -29,6 +29,46 @@
     
     	private static final String[] EMPTY = new String[0];
     
    +	/**
    +	 * Get job manager's heap memory.
    +	 *
    +	 * This method will check the new key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and
    +	 * the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.
    +	 *
    +	 * @param configuration the configuration object
    +	 * @return the memory size of job manager's heap memory.
    +	 */
    +	public static MemorySize getJobManagerHeapMemory(Configuration configuration) {
    +		if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
    +			return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
    +		} else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) {
    +			return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m");
    +		} else {
    +			throw new RuntimeException("Can not find config key : " + JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key()
    --- End diff --
    
    Use `FlinkRuntimeException`.


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202623960
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -500,11 +500,19 @@ protected Configuration applyCommandLineOptionsToConfiguration(CommandLine comma
     		}
     
     		if (commandLine.hasOption(jmMemory.getOpt())) {
    -			effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(jmMemory.getOpt()));
    +			String jmMemoryVal = commandLine.getOptionValue(jmMemory.getOpt());
    +			if (!jmMemoryVal.toLowerCase().contains("m")) {
    --- End diff --
    
    This won't cover other units than "m". How about checking it with `MemoryUnit.hasUnit`?


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202017985
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
     		}
     
     		// JobManager Memory
    -		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
    +		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
    --- End diff --
    
    Would it hurt to add `.withDeprecatedKeys("jobmanager.heap.mb")`? In case someone does not use the scripts to start the cluster.


---

[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on the issue:

    https://github.com/apache/flink/pull/6297
  
    cc: @dawidwys @StephanEwen 


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202023148
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
     		}
     
     		// JobManager Memory
    -		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
    +		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
    --- End diff --
    
    Agreed, missed it previously. Unfortunately `withDeprecatedKeys` won't solve it though. Either we should revert the option (and use the mb versions), or anywhere we use the new option, we should also check for the old one. 


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r201981340
  
    --- Diff: docs/ops/deployment/yarn_setup.md ---
    @@ -101,12 +101,12 @@ Usage:
        Optional
          -D <arg>                        Dynamic properties
          -d,--detached                   Start detached
    -     -jm,--jobManagerMemory <arg>    Memory for JobManager Container [in MB]
    +     -jm,--jobManagerMemory <arg>    Memory for JobManager Container [with unit, if not, use MB]
    --- End diff --
    
    nit: I would reword this to: 
    > *Memory for JobManager container with optional unit (default: MB).*


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r203000806
  
    --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---
    @@ -352,6 +352,76 @@ public void testConfigurationClusterSpecification() throws Exception {
     		assertThat(clusterSpecification.getSlotsPerTaskManager(), is(slotsPerTaskManager));
     	}
     
    +	/**
    +	 * Tests the specifying heap memory for job manager and task manager.
    +	 */
    +	@Test
    +	public void testHeapMemoryProperty() throws Exception {
    +		//without unit
    +		String[] args = new String[] { "-yn", "2", "-yjm", "1024", "-ytm", "2048" };
    +
    +		FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
    +			new Configuration(),
    +			tmp.getRoot().getAbsolutePath(),
    +			"y",
    +			"yarn");
    +
    +		CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
    +
    +		ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
    +
    +		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
    +		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
    +
    +		//with unit "m"
    +		args = new String[] { "-yn", "2", "-yjm", "1024m", "-ytm", "2048m" };
    +		commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
    +		clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
    +
    +		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
    +		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
    +
    +		//with unit non "m"
    +		args = new String[] { "-yn", "2", "-yjm", "1g", "-ytm", "2g" };
    +		commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
    +		clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
    +
    +		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
    +		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
    +
    +		//specify old config key
    +		Configuration configuration = new Configuration();
    +		configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
    +		configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
    +
    +		flinkYarnSessionCli = new FlinkYarnSessionCli(
    +			configuration,
    +			tmp.getRoot().getAbsolutePath(),
    +			"y",
    +			"yarn");
    +
    +		commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
    +
    +		clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
    +
    +		assertThat(clusterSpecification.getMasterMemoryMB(), is(2048));
    +		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(4096));
    +
    +		//set nothing use default value
    --- End diff --
    
    This should be a separate test


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202057663
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
     		}
     
     		// JobManager Memory
    -		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
    +		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
    --- End diff --
    
    @dawidwys Why is it not enough?
    
    > When obtaining a value from the configuration via Configuration.getValue(ConfigOption), the deprecated keys will be checked in the order provided to this method. The first key for which a value is found will be used - that value will be returned.


---

[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/6297
  
    Changes look good now. I think we are still missing tests for the added behaviour. We should check:
    
    - if yarn cli parameters are properly parsed without units
    - if yarn cli parameters are properly parsed with units other than m
    - if the logic for `getJobManagerHeapMemory/getTaskManagerHeapMemory` works properly (if the old parameter is parsed if the new one is not provided. If the default value is provided if no option was set.


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202085772
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
     		}
     
     		// JobManager Memory
    -		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
    +		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
    --- End diff --
    
    I think we cannot do this as users might not use the `config.sh`. At the same time we think we really should keep backwards compatibility, as otherwise there might be complaints.  We've discussed this with @GJL offline and we think the way to go right now would be to check everywhere we use the new option to check for the old one as well if the new one was not set.
    
    That is:
    * `org.apache.flink.client.deployment.ClusterSpecification#fromConfiguration`
    * `org.apache.flink.yarn.YarnResourceManager#YarnResourceManager`
    * `org.apache.flink.yarn.cli.FlinkYarnSessionCli#createClusterSpecification`
    
    Probably we should wrap it in some utility method.
    
    Also for the yarn command line we should keep the old behaviour. This means we should add "m" suffix in `org.apache.flink.yarn.cli.FlinkYarnSessionCli#applyCommandLineOptionsToConfiguration`


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r201997464
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
     		}
     
     		// JobManager Memory
    -		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
    +		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
    --- End diff --
    
    in FLINK-6469, in order to config the jm's memory with unit I introduced a new key and deprecated `jobmanager.heap.mb`.
    
    * in flink codebase(except shell script) I have removed all the place used `JOB_MANAGER_HEAP_MEMORY_MB` and `jobmanager.heap.mb`, so it will not be used.
    * in shell (`config.sh`) the old key `jobmanager.heap.mb` also be supported if the new key `jobmanager.heap.size` can not be read, so it still be supported.
    
     


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202338312
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java ---
    @@ -29,6 +29,46 @@
     
     	private static final String[] EMPTY = new String[0];
     
    +	/**
    +	 * Get job manager's heap memory.
    +	 *
    +	 * This method will check the new key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and
    +	 * the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.
    +	 *
    +	 * @param configuration the configuration object
    +	 * @return the memory size of job manager's heap memory.
    +	 */
    +	public static MemorySize getJobManagerHeapMemory(Configuration configuration) {
    +		if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
    +			return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
    +		} else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) {
    +			return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m");
    +		} else {
    +			throw new RuntimeException("Can not find config key : " + JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key()
    +			+ " or " + JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB);
    +		}
    +	}
    +
    +	/**
    +	 * Get task manager's heap memory.
    +	 *
    +	 * This method will check the new key {@link TaskManagerOptions#TASK_MANAGER_HEAP_MEMORY} and
    +	 * the old key {@link TaskManagerOptions#TASK_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.
    +	 *
    +	 * @param configuration the configuration object
    +	 * @return the memory size of task manager's heap memory.
    +	 */
    +	public static MemorySize getTaskManagerHeapMemory(Configuration configuration) {
    +		if (configuration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key())) {
    +			return MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY));
    +		} else if (configuration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB.key())) {
    +			return MemorySize.parse(configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) + "m");
    +		} else {
    +			throw new RuntimeException("Can not find config key : " + TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key()
    --- End diff --
    
    Use `FlinkRuntimeException`.


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202624004
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -500,11 +500,19 @@ protected Configuration applyCommandLineOptionsToConfiguration(CommandLine comma
     		}
     
     		if (commandLine.hasOption(jmMemory.getOpt())) {
    -			effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(jmMemory.getOpt()));
    +			String jmMemoryVal = commandLine.getOptionValue(jmMemory.getOpt());
    +			if (!jmMemoryVal.toLowerCase().contains("m")) {
    +				jmMemoryVal += "m";
    +			}
    +			effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jmMemoryVal);
     		}
     
     		if (commandLine.hasOption(tmMemory.getOpt())) {
    -			effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(tmMemory.getOpt()));
    +			String tmMemoryVal = commandLine.getOptionValue(tmMemory.getOpt());
    +			if (!tmMemoryVal.toLowerCase().contains("m")) {
    --- End diff --
    
    Same as above


---

[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/6297
  
    @dawidwys added test case, please review~


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r202381490
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -500,11 +501,11 @@ protected Configuration applyCommandLineOptionsToConfiguration(CommandLine comma
     		}
     
     		if (commandLine.hasOption(jmMemory.getOpt())) {
    -			effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(jmMemory.getOpt()));
    +			effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(jmMemory.getOpt()) + "m");
    --- End diff --
    
    you are right, I have update the PR, please review, thanks.


---

[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on the issue:

    https://github.com/apache/flink/pull/6297
  
    @dawidwys Please merge.


---

[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6297#discussion_r201997596
  
    --- Diff: docs/ops/deployment/yarn_setup.md ---
    @@ -101,12 +101,12 @@ Usage:
        Optional
          -D <arg>                        Dynamic properties
          -d,--detached                   Start detached
    -     -jm,--jobManagerMemory <arg>    Memory for JobManager Container [in MB]
    +     -jm,--jobManagerMemory <arg>    Memory for JobManager Container [with unit, if not, use MB]
    --- End diff --
    
    change this soon


---