You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by yanghua <gi...@git.apache.org> on 2018/02/10 07:10:23 UTC

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

GitHub user yanghua opened a pull request:

    https://github.com/apache/flink/pull/5448

    [FLINK-6469] Configure Memory Sizes with units

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yanghua/flink FLINK-6469

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5448.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5448
    
----
commit 1e109fd21783eada731df4552e154302d1bd9255
Author: vinoyang <vi...@...>
Date:   2018-02-10T06:52:13Z

    [FLINK-6469] Configure Memory Sizes with units

----


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r197736638
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java ---
    @@ -67,8 +68,8 @@ public String toString() {
     	public static ClusterSpecification fromConfiguration(Configuration configuration) {
     		int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
     
    -		int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
    -		int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
    +		int jobManagerMemoryMb = (int) MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
    --- End diff --
    
    hi @dawidwys this belongs suggestion 2, "change the return value of getMebiBytes() to int or have a getMebiBytesAsInt() method that uses a MathUtils.checkedDownCast() to avoid unnoticed overflow errors, as Stephan commented"
    
    "have a getMebiBytesAsInt()" is another choice, if I picked the first choice and change the `(int) to  MathUtils.checkedDownCast()` . Is there necessary to provide the `getMebiBytesAsInt()` method?


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    @StephanEwen for the open question, this PR's implementation has used some new key and deprecated the old key, such as `jobmanager.heap.mb -> jobmanager.heap.size` , `taskmanager.heap.mb -> taskmanager.heap.size`, the problem is how to deal with `taskmanager.memory.segment-size` and `taskmanager.memory.size` which contains the keyword `size` is not suitable to be replaced by a new key.


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    @StephanEwen 
    for the `taskmanager.memory.segment-size` because of it's default unit is `byte`, so whether there is a unit or not, the behavior is consistent.
    
    So we just need to handle `taskmanager.memory.size` 


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r197708483
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---
    @@ -482,14 +484,15 @@ private static NetworkEnvironment createNetworkEnvironment(
     	public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) {
     		Preconditions.checkArgument(totalJavaMemorySize > 0);
     
    -		int segmentSize = config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
    +		int segmentSize = (int) MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes();
    --- End diff --
    
    Use `checkedDownCast`


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r197707246
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java ---
    @@ -67,8 +68,8 @@ public String toString() {
     	public static ClusterSpecification fromConfiguration(Configuration configuration) {
     		int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
     
    -		int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
    -		int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
    +		int jobManagerMemoryMb = (int) MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
    +		int taskManagerMemoryMb = (int) MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
    --- End diff --
    
    use `getMebiBytesAsInt`


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    Okay, after taking a look, I think we need to add a few changes:
    
      - We need to add an additional `MemoryUnit.parse()` method that takes the "default" unit, so that we parse the old heap sizes such that they are in MB if nothing else is specified.
    
      - We should either change the return value of `getMebiBytes()` to `int` or have a `getMebiBytesAsInt()` method that uses a `MathUtils.checkedDownCast()` to avoid unnoticed overflow errors.
    
    Open question: As we are changing the value type of the heap size config options, should we deprecate the current config keys and introduce new ones (like `jobmanager.heap-size`)?



---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r191198882
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---
    @@ -647,7 +649,16 @@ public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration
     		final long heapSizeMB;
     		if (useOffHeap) {
     
    -			long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
    +			String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
    +			long offHeapSize = 0;
    +			try {
    +				offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
    +				if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
    +					offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
    +				}
    +			} catch (IllegalArgumentException e) {
    +
    --- End diff --
    
    I found the flow with default value somewhat counterintuitive. How about we structure this code like this:
    
    	long offHeapSize;
    	String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
    	if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
    		try {
    			offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE))
    				.getMebiBytes();
    		} catch (IllegalArgumentException e) {
    			throw new IllegalConfigurationException(
    				"Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
    		}
    	} else {
    		offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
    	}


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    Hi @StephanEwen , with the help of your `MemorySize` (sub task of FLINK-6469),I finished the remained work, replaced the old memory config (in code、config file and shell script) with the memory size. 
    
    For the config item in `conf/flink-conf.yaml`, retained but deprecated the config key `jobmanager.heap.mb` , `taskmanager.heap.mb` and introduced `jobmanager.heap.size`, `taskmanager.heap.size` with the unit `m`. 
    
    Would you please review this PR, thanks!


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r191190775
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ---
    @@ -176,19 +188,19 @@
     	/**
     	 * Size of memory buffers used by the network stack and the memory manager (in bytes).
     	 */
    -	public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
    +	public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
     			key("taskmanager.memory.segment-size")
    -			.defaultValue(32768)
    -			.withDescription("Size of memory buffers used by the network stack and the memory manager (in bytes).");
    +			.defaultValue("32768")
    +			.withDescription("Size of memory buffers used by the network stack and the memory manager.");
     
     	/**
     	 * Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not
     	 * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
     	 */
    -	public static final ConfigOption<Long> MANAGED_MEMORY_SIZE =
    +	public static final ConfigOption<String> MANAGED_MEMORY_SIZE =
    --- End diff --
    
    This option requires special handling (maybe similar to JOB_MANAGER_HEAP_MEMORY/TASK_MANAGER_HEAP_MEMORY). Right now if an old config file will be used with new version the megabytes will be treated as bytes. Therefore the value will 1000 smaller.


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    Hi @yanghua,
    
    I am afraid Stephan won't be able to reply any time soon. I would suggest to 
    
    - add the default unit to the parse method of `MemorySize` and use MB for `MANAGED_MEMORY_SIZE`. 
    - change the return value of getMebiBytes() to int or have a getMebiBytesAsInt() method that uses a MathUtils.checkedDownCast() to avoid unnoticed overflow errors, as Stephan commented
    - change the default value of `MANAGED_MEMORY_SIZE` to 0, as suggested by @zentol 
    
    After that I think this PR will be ready to be merged.


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r197711813
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -161,7 +162,7 @@ public YarnResourceManager(
     		numPendingContainerRequests = 0;
     
     		this.webInterfaceUrl = webInterfaceUrl;
    -		this.defaultTaskManagerMemoryMB = flinkConfig.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
    +		this.defaultTaskManagerMemoryMB = (int) MemorySize.parse(flinkConfig.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
    --- End diff --
    
    unnecessary cast


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r191190916
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ---
    @@ -176,19 +188,19 @@
     	/**
     	 * Size of memory buffers used by the network stack and the memory manager (in bytes).
     	 */
    -	public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
    +	public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
     			key("taskmanager.memory.segment-size")
    -			.defaultValue(32768)
    -			.withDescription("Size of memory buffers used by the network stack and the memory manager (in bytes).");
    +			.defaultValue("32768")
    +			.withDescription("Size of memory buffers used by the network stack and the memory manager.");
     
     	/**
     	 * Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not
     	 * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
     	 */
    -	public static final ConfigOption<Long> MANAGED_MEMORY_SIZE =
    +	public static final ConfigOption<String> MANAGED_MEMORY_SIZE =
    --- End diff --
    
    Also I think we should add tests that explicitly test using old style configuration.


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r197711430
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ---
    @@ -66,7 +67,7 @@
      */
     public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
     
    -	private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue();
    +	private static final int BUFFER_SIZE = (int) MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes();
    --- End diff --
    
    use `checkedDownCast`


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r191197602
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---
    @@ -647,7 +649,16 @@ public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration
     		final long heapSizeMB;
     		if (useOffHeap) {
     
    -			long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
    +			String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
    +			long offHeapSize = 0;
    +			try {
    +				offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
    +				if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
    +					offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
    +				}
    +			} catch (IllegalArgumentException e) {
    +
    --- End diff --
    
    Rethrow with IllegalConfigurationException with a pointer that `MANAGED_MEMORY_SIZE` was badly configured.


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r197711881
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -385,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
     		}
     
     		// JobManager Memory
    -		final int jobManagerMemoryMB = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
    +		final int jobManagerMemoryMB = (int) MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
    --- End diff --
    
    unnecessary cast


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    There is no problem reusing old keys, if their default unit was "bytes", because the `MemorySize.parse(...)` interprets a number as bytes, if there is no unit attached to it.
    
    I did not realize that you switched the config keys already - in that case we need to backwards support the old keys as well. Also, we need to update all the shell scripts (`config.sh`, `jobmanager.sh`, `taskmanager.sh` and so on) to be consistent with the new config keys.
    



---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r197707216
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java ---
    @@ -67,8 +68,8 @@ public String toString() {
     	public static ClusterSpecification fromConfiguration(Configuration configuration) {
     		int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
     
    -		int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
    -		int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
    +		int jobManagerMemoryMb = (int) MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
    --- End diff --
    
    use `getMebiBytesAsInt`


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    cc @dawidwys the third suggestion has finished, the others has supported before. can you review this?


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    @StephanEwen any opinion about this PR?


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    cc @dawidwys and @StephanEwen , I have fixed the conflicts and some issues in new files, please review it.


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r191147297
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ---
    @@ -72,10 +72,19 @@
     			" leader-election service (like ZooKeeper) is used to elect and discover the JobManager" +
     			" leader from potentially multiple standby JobManagers.");
     
    +	/**
    +	 * JVM heap size for the JobManager with memory size.
    +	 */
    +	public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =
    +		key("jobmanager.heap.size")
    +		.defaultValue("1024m")
    +		.withDescription("JVM heap size for the JobManager.");
    +
     	/**
     	 * JVM heap size (in megabytes) for the JobManager.
    +	 * @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
     	 */
    -	public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY =
    +	public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY_MB =
    --- End diff --
    
    Missing annotation `@Deprecated`. That way we can remove that option from docs (which I think we should do).


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r197754848
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java ---
    @@ -148,6 +148,27 @@ public static MemorySize parse(String text) throws IllegalArgumentException {
     		return new MemorySize(parseBytes(text));
     	}
     
    +	/**
    +	 * Parses the given string as as MemorySize.
    +	 * The supported expressions are listed under {@link MemorySize}.
    +	 *
    +	 * <p>
    +	 * Note : this method is compatible with the old memory config key, like {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}.
    +	 * </p>
    +	 *
    +	 * @param text The string to parse.
    +	 * @return The parsed MemorySize.
    +	 *
    +	 * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
    +	 */
    +	public static MemorySize parseAsMebiBytesIfNoUnit(String text) throws IllegalArgumentException {
    --- End diff --
    
    @dawidwys there is not a `MemoryUnit` and `parseAsMebiBytesIfNoUnit` just for MebiBytes(`taskmanager.memory.size`) so I think we should not make it more complex.


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    Thanks, I think this is a really nice change!
    
    Given that we are approaching feature freeze for 1.5 and already have a very big backlog, I would try and get to this for the 1.6 release. Hope that works for you.


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    @yanghua Could you rebase this PR on top of the master? I could have a look then. Thanks!


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r197708797
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java ---
    @@ -86,7 +87,7 @@ private static TaskManagerServicesConfiguration getTmConfig(
     			networkBufFraction,
     			networkBufMin,
     			networkBufMax,
    -			TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
    +			(int) MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(),
    --- End diff --
    
    use `checkedDownCast`


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    Hi @yanghua 
    I think those changes look overall good. The biggest concern I had with handling `MANAGED_MEMORY_SIZE` as I think if used with old style configuration file, the resulting value will differ.


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    cc @dawidwys introduced `MemoryUnit` and refactor `MemorySize`, please review~


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r197707880
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java ---
    @@ -148,6 +148,27 @@ public static MemorySize parse(String text) throws IllegalArgumentException {
     		return new MemorySize(parseBytes(text));
     	}
     
    +	/**
    +	 * Parses the given string as as MemorySize.
    +	 * The supported expressions are listed under {@link MemorySize}.
    +	 *
    +	 * <p>
    +	 * Note : this method is compatible with the old memory config key, like {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}.
    +	 * </p>
    +	 *
    +	 * @param text The string to parse.
    +	 * @return The parsed MemorySize.
    +	 *
    +	 * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
    +	 */
    +	public static MemorySize parseAsMebiBytesIfNoUnit(String text) throws IllegalArgumentException {
    --- End diff --
    
    I think a better idea would be to have `parse(String text, MemoryUnit defaultUnit)`


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5448


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r191191358
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ---
    @@ -72,10 +72,19 @@
     			" leader-election service (like ZooKeeper) is used to elect and discover the JobManager" +
     			" leader from potentially multiple standby JobManagers.");
     
    +	/**
    +	 * JVM heap size for the JobManager with memory size.
    +	 */
    +	public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =
    +		key("jobmanager.heap.size")
    +		.defaultValue("1024m")
    +		.withDescription("JVM heap size for the JobManager.");
    +
     	/**
     	 * JVM heap size (in megabytes) for the JobManager.
    +	 * @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
     	 */
    -	public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY =
    +	public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY_MB =
    --- End diff --
    
    Also think we should add some note to release notes, that the migration of parameters is advised.


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r197711900
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -385,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
     		}
     
     		// JobManager Memory
    -		final int jobManagerMemoryMB = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
    +		final int jobManagerMemoryMB = (int) MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
     
     		// Task Managers memory
    -		final int taskManagerMemoryMB = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
    +		final int taskManagerMemoryMB = (int) MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
    --- End diff --
    
    unnecessary cast


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r191147488
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ---
    @@ -32,10 +32,22 @@
     	//  General TaskManager Options
     	// ------------------------------------------------------------------------
     
    +	/**
    +	 * JVM heap size for the TaskManagers with memory size.
    +	 */
    +	public static final ConfigOption<String> TASK_MANAGER_HEAP_MEMORY =
    +			key("taskmanager.heap.size")
    +			.defaultValue("1024m")
    +			.withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" +
    +					" the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" +
    +					" YARN container, minus a certain tolerance value.");
    +
     	/**
     	 * JVM heap size (in megabytes) for the TaskManagers.
    +	 *
    +	 * @deprecated use {@link #TASK_MANAGER_HEAP_MEMORY}
     	 */
    -	public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY =
    +	public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY_MB =
    --- End diff --
    
    Add `@Deprecated`


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r193277880
  
    --- Diff: docs/_includes/generated/task_manager_configuration.html ---
    @@ -84,13 +84,13 @@
             </tr>
             <tr>
                 <td><h5>taskmanager.memory.segment-size</h5></td>
    -            <td style="word-wrap: break-word;">32768</td>
    -            <td>Size of memory buffers used by the network stack and the memory manager (in bytes).</td>
    +            <td style="word-wrap: break-word;">"32768"</td>
    +            <td>Size of memory buffers used by the network stack and the memory manager.</td>
             </tr>
             <tr>
                 <td><h5>taskmanager.memory.size</h5></td>
    -            <td style="word-wrap: break-word;">-1</td>
    -            <td>Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not set, a relative fraction will be allocated.</td>
    +            <td style="word-wrap: break-word;">"-1"</td>
    --- End diff --
    
    currently, every place we use the value of config item `taskmanager.memory.size`, will compare with the default value "-1", if yes, will not use the parser, so there is no problem now. But I think the suggestion from @zentol  is good. What's your opinion? @StephanEwen 


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    Will try and take a look at this soon... Sorry for the delay.
    
    What I would consider very important is that users who don't change their configuration do not get different behavior all of a sudden. Meaning in the absence of a "unit" we do not always interpret the value as a "byte" but as whatever the config value was measured in before (such as MBs, ...).


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    hi @dawidwys thanks for your review suggestion, I have refactored the PR code except the `MANAGED_MEMORY_SIZE `. The problem you concerned is exists, the key is suitable for this PR, and it seems we should introduce a new key and mark this as **@deprecated**, otherwise, We could not avoid user using old config value without unit.
    
    But it is just a possibility, we can highlight a **Note** message and give a guidance. 
    At worst, user config it as **megabytes**, actually it means **bytes"" (`1024 * 1024` difference) , when starting TaskManager, it would cause failed or show disagreement with Flink web UI. Then use would recheck this configuration item.
    
    What's your opinion? @StephanEwen 


---

[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5448
  
    cc @dawidwys 


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r197707411
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java ---
    @@ -95,8 +95,8 @@ public long getKibiBytes() {
     	/**
     	 * Gets the memory size in Mebibytes (= 1024 Kibibytes).
     	 */
    -	public long getMebiBytes() {
    -		return bytes >> 20;
    +	public int getMebiBytes() {
    +		return (int) (bytes >> 20);
    --- End diff --
    
    use `checkedDownCast` here


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r191199370
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---
    @@ -223,9 +224,17 @@ public static TaskManagerServicesConfiguration fromConfiguration(
     				parseQueryableStateConfiguration(configuration);
     
     		// extract memory settings
    -		long configuredMemory = configuration.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
    +		long configuredMemory = Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue());
    +		if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue())) {
    +			try {
    +				configuredMemory = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
    +			} catch (IllegalArgumentException e) {
    +
    +			}
    +		}
    --- End diff --
    
    Same comments as before.  How about code like this:
    
    	long configuredMemory;
    	String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
    	if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
    		try {
    			configuredMemory = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE))
    				.getMebiBytes();
    		} catch (IllegalArgumentException e) {
    			throw new IllegalConfigurationException(
    				"Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
    		}
    	} else {
    		configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
    	}


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r193221496
  
    --- Diff: docs/_includes/generated/task_manager_configuration.html ---
    @@ -84,13 +84,13 @@
             </tr>
             <tr>
                 <td><h5>taskmanager.memory.segment-size</h5></td>
    -            <td style="word-wrap: break-word;">32768</td>
    -            <td>Size of memory buffers used by the network stack and the memory manager (in bytes).</td>
    +            <td style="word-wrap: break-word;">"32768"</td>
    +            <td>Size of memory buffers used by the network stack and the memory manager.</td>
             </tr>
             <tr>
                 <td><h5>taskmanager.memory.size</h5></td>
    -            <td style="word-wrap: break-word;">-1</td>
    -            <td>Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not set, a relative fraction will be allocated.</td>
    +            <td style="word-wrap: break-word;">"-1"</td>
    --- End diff --
    
    looking at the [parser](https://github.com/yanghua/flink/blob/39fd2efb66bd6fcac3f86e953729831a49bc7709/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java#L191) it appears that `MemorySize` does not support negative values (rightfully so). We may have to change the default to 0.


---

[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5448#discussion_r191186380
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ---
    @@ -176,19 +188,19 @@
     	/**
     	 * Size of memory buffers used by the network stack and the memory manager (in bytes).
     	 */
    -	public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
    +	public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
     			key("taskmanager.memory.segment-size")
    -			.defaultValue(32768)
    -			.withDescription("Size of memory buffers used by the network stack and the memory manager (in bytes).");
    +			.defaultValue("32768")
    +			.withDescription("Size of memory buffers used by the network stack and the memory manager.");
     
     	/**
     	 * Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not
    --- End diff --
    
    remove (in megabytes)


---