You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by yanghua <gi...@git.apache.org> on 2018/02/10 07:10:23 UTC
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
GitHub user yanghua opened a pull request:
https://github.com/apache/flink/pull/5448
[FLINK-6469] Configure Memory Sizes with units
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/yanghua/flink FLINK-6469
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5448.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5448
----
commit 1e109fd21783eada731df4552e154302d1bd9255
Author: vinoyang <vi...@...>
Date: 2018-02-10T06:52:13Z
[FLINK-6469] Configure Memory Sizes with units
----
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r197736638
--- Diff: flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java ---
@@ -67,8 +68,8 @@ public String toString() {
public static ClusterSpecification fromConfiguration(Configuration configuration) {
int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
- int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
- int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+ int jobManagerMemoryMb = (int) MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
--- End diff --
hi @dawidwys this belongs suggestion 2, "change the return value of getMebiBytes() to int or have a getMebiBytesAsInt() method that uses a MathUtils.checkedDownCast() to avoid unnoticed overflow errors, as Stephan commented"
"have a getMebiBytesAsInt()" is another choice, if I picked the first choice and change the `(int) to MathUtils.checkedDownCast()` . Is there necessary to provide the `getMebiBytesAsInt()` method?
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5448
@StephanEwen for the open question, this PR's implementation has used some new key and deprecated the old key, such as `jobmanager.heap.mb -> jobmanager.heap.size` , `taskmanager.heap.mb -> taskmanager.heap.size`, the problem is how to deal with `taskmanager.memory.segment-size` and `taskmanager.memory.size` which contains the keyword `size` is not suitable to be replaced by a new key.
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5448
@StephanEwen
for the `taskmanager.memory.segment-size` because of it's default unit is `byte`, so whether there is a unit or not, the behavior is consistent.
So we just need to handle `taskmanager.memory.size`
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r197708483
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---
@@ -482,14 +484,15 @@ private static NetworkEnvironment createNetworkEnvironment(
public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) {
Preconditions.checkArgument(totalJavaMemorySize > 0);
- int segmentSize = config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
+ int segmentSize = (int) MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes();
--- End diff --
Use `checkedDownCast`
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r197707246
--- Diff: flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java ---
@@ -67,8 +68,8 @@ public String toString() {
public static ClusterSpecification fromConfiguration(Configuration configuration) {
int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
- int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
- int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+ int jobManagerMemoryMb = (int) MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+ int taskManagerMemoryMb = (int) MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
--- End diff --
use `getMebiBytesAsInt`
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5448
Okay, after taking a look, I think we need to add a few changes:
- We need to add an additional `MemoryUnit.parse()` method that takes the "default" unit, so that we parse the old heap sizes such that they are in MB if nothing else is specified.
- We should either change the return value of `getMebiBytes()` to `int` or have a `getMebiBytesAsInt()` method that uses a `MathUtils.checkedDownCast()` to avoid unnoticed overflow errors.
Open question: As we are changing the value type of the heap size config options, should we deprecate the current config keys and introduce new ones (like `jobmanager.heap-size`)?
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r191198882
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---
@@ -647,7 +649,16 @@ public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration
final long heapSizeMB;
if (useOffHeap) {
- long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+ String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
+ long offHeapSize = 0;
+ try {
+ offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
+ if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
+ offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+ }
+ } catch (IllegalArgumentException e) {
+
--- End diff --
I found the flow with default value somewhat counterintuitive. How about we structure this code like this:
long offHeapSize;
String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
try {
offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE))
.getMebiBytes();
} catch (IllegalArgumentException e) {
throw new IllegalConfigurationException(
"Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
}
} else {
offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
}
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5448
Hi @StephanEwen , with the help of your `MemorySize` (sub task of FLINK-6469),I finished the remained work, replaced the old memory config (in code、config file and shell script) with the memory size.
For the config item in `conf/flink-conf.yaml`, retained but deprecated the config key `jobmanager.heap.mb` , `taskmanager.heap.mb` and introduced `jobmanager.heap.size`, `taskmanager.heap.size` with the unit `m`.
Would you please review this PR, thanks!
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r191190775
--- Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ---
@@ -176,19 +188,19 @@
/**
* Size of memory buffers used by the network stack and the memory manager (in bytes).
*/
- public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
+ public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
- .defaultValue(32768)
- .withDescription("Size of memory buffers used by the network stack and the memory manager (in bytes).");
+ .defaultValue("32768")
+ .withDescription("Size of memory buffers used by the network stack and the memory manager.");
/**
* Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not
* set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
*/
- public static final ConfigOption<Long> MANAGED_MEMORY_SIZE =
+ public static final ConfigOption<String> MANAGED_MEMORY_SIZE =
--- End diff --
This option requires special handling (maybe similar to JOB_MANAGER_HEAP_MEMORY/TASK_MANAGER_HEAP_MEMORY). Right now if an old config file will be used with new version the megabytes will be treated as bytes. Therefore the value will 1000 smaller.
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:
https://github.com/apache/flink/pull/5448
Hi @yanghua,
I am afraid Stephan won't be able to reply any time soon. I would suggest to
- add the default unit to the parse method of `MemorySize` and use MB for `MANAGED_MEMORY_SIZE`.
- change the return value of getMebiBytes() to int or have a getMebiBytesAsInt() method that uses a MathUtils.checkedDownCast() to avoid unnoticed overflow errors, as Stephan commented
- change the default value of `MANAGED_MEMORY_SIZE` to 0, as suggested by @zentol
After that I think this PR will be ready to be merged.
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r197711813
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -161,7 +162,7 @@ public YarnResourceManager(
numPendingContainerRequests = 0;
this.webInterfaceUrl = webInterfaceUrl;
- this.defaultTaskManagerMemoryMB = flinkConfig.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+ this.defaultTaskManagerMemoryMB = (int) MemorySize.parse(flinkConfig.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
--- End diff --
unnecessary cast
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r191190916
--- Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ---
@@ -176,19 +188,19 @@
/**
* Size of memory buffers used by the network stack and the memory manager (in bytes).
*/
- public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
+ public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
- .defaultValue(32768)
- .withDescription("Size of memory buffers used by the network stack and the memory manager (in bytes).");
+ .defaultValue("32768")
+ .withDescription("Size of memory buffers used by the network stack and the memory manager.");
/**
* Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not
* set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
*/
- public static final ConfigOption<Long> MANAGED_MEMORY_SIZE =
+ public static final ConfigOption<String> MANAGED_MEMORY_SIZE =
--- End diff --
Also I think we should add tests that explicitly test using old style configuration.
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r197711430
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ---
@@ -66,7 +67,7 @@
*/
public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
- private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue();
+ private static final int BUFFER_SIZE = (int) MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes();
--- End diff --
use `checkedDownCast`
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r191197602
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---
@@ -647,7 +649,16 @@ public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration
final long heapSizeMB;
if (useOffHeap) {
- long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+ String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
+ long offHeapSize = 0;
+ try {
+ offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
+ if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
+ offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+ }
+ } catch (IllegalArgumentException e) {
+
--- End diff --
Rethrow with IllegalConfigurationException with a pointer that `MANAGED_MEMORY_SIZE` was badly configured.
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r197711881
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -385,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
}
// JobManager Memory
- final int jobManagerMemoryMB = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
+ final int jobManagerMemoryMB = (int) MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
--- End diff --
unnecessary cast
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5448
There is no problem reusing old keys, if their default unit was "bytes", because the `MemorySize.parse(...)` interprets a number as bytes, if there is no unit attached to it.
I did not realize that you switched the config keys already - in that case we need to backwards support the old keys as well. Also, we need to update all the shell scripts (`config.sh`, `jobmanager.sh`, `taskmanager.sh` and so on) to be consistent with the new config keys.
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r197707216
--- Diff: flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java ---
@@ -67,8 +68,8 @@ public String toString() {
public static ClusterSpecification fromConfiguration(Configuration configuration) {
int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
- int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
- int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+ int jobManagerMemoryMb = (int) MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
--- End diff --
use `getMebiBytesAsInt`
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5448
cc @dawidwys the third suggestion has finished, the others has supported before. can you review this?
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5448
@StephanEwen any opinion about this PR?
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5448
cc @dawidwys and @StephanEwen , I have fixed the conflicts and some issues in new files, please review it.
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r191147297
--- Diff: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ---
@@ -72,10 +72,19 @@
" leader-election service (like ZooKeeper) is used to elect and discover the JobManager" +
" leader from potentially multiple standby JobManagers.");
+ /**
+ * JVM heap size for the JobManager with memory size.
+ */
+ public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =
+ key("jobmanager.heap.size")
+ .defaultValue("1024m")
+ .withDescription("JVM heap size for the JobManager.");
+
/**
* JVM heap size (in megabytes) for the JobManager.
+ * @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
*/
- public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY =
+ public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY_MB =
--- End diff --
Missing annotation `@Deprecated`. That way we can remove that option from docs (which I think we should do).
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r197754848
--- Diff: flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java ---
@@ -148,6 +148,27 @@ public static MemorySize parse(String text) throws IllegalArgumentException {
return new MemorySize(parseBytes(text));
}
+ /**
+ * Parses the given string as as MemorySize.
+ * The supported expressions are listed under {@link MemorySize}.
+ *
+ * <p>
+ * Note : this method is compatible with the old memory config key, like {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}.
+ * </p>
+ *
+ * @param text The string to parse.
+ * @return The parsed MemorySize.
+ *
+ * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
+ */
+ public static MemorySize parseAsMebiBytesIfNoUnit(String text) throws IllegalArgumentException {
--- End diff --
@dawidwys there is not a `MemoryUnit` and `parseAsMebiBytesIfNoUnit` just for MebiBytes(`taskmanager.memory.size`) so I think we should not make it more complex.
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5448
Thanks, I think this is a really nice change!
Given that we are approaching feature freeze for 1.5 and already have a very big backlog, I would try and get to this for the 1.6 release. Hope that works for you.
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:
https://github.com/apache/flink/pull/5448
@yanghua Could you rebase this PR on top of the master? I could have a look then. Thanks!
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r197708797
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java ---
@@ -86,7 +87,7 @@ private static TaskManagerServicesConfiguration getTmConfig(
networkBufFraction,
networkBufMin,
networkBufMax,
- TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
+ (int) MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(),
--- End diff --
use `checkedDownCast`
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:
https://github.com/apache/flink/pull/5448
Hi @yanghua
I think those changes look overall good. The biggest concern I had with handling `MANAGED_MEMORY_SIZE` as I think if used with old style configuration file, the resulting value will differ.
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5448
cc @dawidwys introduced `MemoryUnit` and refactor `MemorySize`, please review~
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r197707880
--- Diff: flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java ---
@@ -148,6 +148,27 @@ public static MemorySize parse(String text) throws IllegalArgumentException {
return new MemorySize(parseBytes(text));
}
+ /**
+ * Parses the given string as as MemorySize.
+ * The supported expressions are listed under {@link MemorySize}.
+ *
+ * <p>
+ * Note : this method is compatible with the old memory config key, like {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}.
+ * </p>
+ *
+ * @param text The string to parse.
+ * @return The parsed MemorySize.
+ *
+ * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
+ */
+ public static MemorySize parseAsMebiBytesIfNoUnit(String text) throws IllegalArgumentException {
--- End diff --
I think a better idea would be to have `parse(String text, MemoryUnit defaultUnit)`
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5448
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r191191358
--- Diff: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ---
@@ -72,10 +72,19 @@
" leader-election service (like ZooKeeper) is used to elect and discover the JobManager" +
" leader from potentially multiple standby JobManagers.");
+ /**
+ * JVM heap size for the JobManager with memory size.
+ */
+ public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =
+ key("jobmanager.heap.size")
+ .defaultValue("1024m")
+ .withDescription("JVM heap size for the JobManager.");
+
/**
* JVM heap size (in megabytes) for the JobManager.
+ * @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
*/
- public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY =
+ public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY_MB =
--- End diff --
Also think we should add some note to release notes, that the migration of parameters is advised.
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r197711900
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -385,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
}
// JobManager Memory
- final int jobManagerMemoryMB = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
+ final int jobManagerMemoryMB = (int) MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
// Task Managers memory
- final int taskManagerMemoryMB = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+ final int taskManagerMemoryMB = (int) MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
--- End diff --
unnecessary cast
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r191147488
--- Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ---
@@ -32,10 +32,22 @@
// General TaskManager Options
// ------------------------------------------------------------------------
+ /**
+ * JVM heap size for the TaskManagers with memory size.
+ */
+ public static final ConfigOption<String> TASK_MANAGER_HEAP_MEMORY =
+ key("taskmanager.heap.size")
+ .defaultValue("1024m")
+ .withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" +
+ " the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" +
+ " YARN container, minus a certain tolerance value.");
+
/**
* JVM heap size (in megabytes) for the TaskManagers.
+ *
+ * @deprecated use {@link #TASK_MANAGER_HEAP_MEMORY}
*/
- public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY =
+ public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY_MB =
--- End diff --
Add `@Deprecated`
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r193277880
--- Diff: docs/_includes/generated/task_manager_configuration.html ---
@@ -84,13 +84,13 @@
</tr>
<tr>
<td><h5>taskmanager.memory.segment-size</h5></td>
- <td style="word-wrap: break-word;">32768</td>
- <td>Size of memory buffers used by the network stack and the memory manager (in bytes).</td>
+ <td style="word-wrap: break-word;">"32768"</td>
+ <td>Size of memory buffers used by the network stack and the memory manager.</td>
</tr>
<tr>
<td><h5>taskmanager.memory.size</h5></td>
- <td style="word-wrap: break-word;">-1</td>
- <td>Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not set, a relative fraction will be allocated.</td>
+ <td style="word-wrap: break-word;">"-1"</td>
--- End diff --
currently, every place we use the value of config item `taskmanager.memory.size`, will compare with the default value "-1", if yes, will not use the parser, so there is no problem now. But I think the suggestion from @zentol is good. What's your opinion? @StephanEwen
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5448
Will try and take a look at this soon... Sorry for the delay.
What I would consider very important is that users who don't change their configuration do not get different behavior all of a sudden. Meaning in the absence of a "unit" we do not always interpret the value as a "byte" but as whatever the config value was measured in before (such as MBs, ...).
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5448
hi @dawidwys thanks for your review suggestion, I have refactored the PR code except the `MANAGED_MEMORY_SIZE `. The problem you concerned is exists, the key is suitable for this PR, and it seems we should introduce a new key and mark this as **@deprecated**, otherwise, We could not avoid user using old config value without unit.
But it is just a possibility, we can highlight a **Note** message and give a guidance.
At worst, user config it as **megabytes**, actually it means **bytes"" (`1024 * 1024` difference) , when starting TaskManager, it would cause failed or show disagreement with Flink web UI. Then use would recheck this configuration item.
What's your opinion? @StephanEwen
---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5448
cc @dawidwys
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r197707411
--- Diff: flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java ---
@@ -95,8 +95,8 @@ public long getKibiBytes() {
/**
* Gets the memory size in Mebibytes (= 1024 Kibibytes).
*/
- public long getMebiBytes() {
- return bytes >> 20;
+ public int getMebiBytes() {
+ return (int) (bytes >> 20);
--- End diff --
use `checkedDownCast` here
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r191199370
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---
@@ -223,9 +224,17 @@ public static TaskManagerServicesConfiguration fromConfiguration(
parseQueryableStateConfiguration(configuration);
// extract memory settings
- long configuredMemory = configuration.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+ long configuredMemory = Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue());
+ if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue())) {
+ try {
+ configuredMemory = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+ } catch (IllegalArgumentException e) {
+
+ }
+ }
--- End diff --
Same comments as before. How about code like this:
long configuredMemory;
String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
try {
configuredMemory = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE))
.getMebiBytes();
} catch (IllegalArgumentException e) {
throw new IllegalConfigurationException(
"Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
}
} else {
configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
}
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r193221496
--- Diff: docs/_includes/generated/task_manager_configuration.html ---
@@ -84,13 +84,13 @@
</tr>
<tr>
<td><h5>taskmanager.memory.segment-size</h5></td>
- <td style="word-wrap: break-word;">32768</td>
- <td>Size of memory buffers used by the network stack and the memory manager (in bytes).</td>
+ <td style="word-wrap: break-word;">"32768"</td>
+ <td>Size of memory buffers used by the network stack and the memory manager.</td>
</tr>
<tr>
<td><h5>taskmanager.memory.size</h5></td>
- <td style="word-wrap: break-word;">-1</td>
- <td>Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not set, a relative fraction will be allocated.</td>
+ <td style="word-wrap: break-word;">"-1"</td>
--- End diff --
looking at the [parser](https://github.com/yanghua/flink/blob/39fd2efb66bd6fcac3f86e953729831a49bc7709/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java#L191) it appears that `MemorySize` does not support negative values (rightfully so). We may have to change the default to 0.
---
[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r191186380
--- Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ---
@@ -176,19 +188,19 @@
/**
* Size of memory buffers used by the network stack and the memory manager (in bytes).
*/
- public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
+ public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
- .defaultValue(32768)
- .withDescription("Size of memory buffers used by the network stack and the memory manager (in bytes).");
+ .defaultValue("32768")
+ .withDescription("Size of memory buffers used by the network stack and the memory manager.");
/**
* Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not
--- End diff --
remove (in megabytes)
---