You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2019/11/07 21:55:51 UTC

[flink] branch FLINK-13986-flip49-cleanup-e2e created (now 38b436c)

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git.


      at 38b436c  run e2e

This branch includes the following new commits:

     new 2b9e5d9  [hotfix] Refactor 'TaskManagerHeapSizeCalculationJavaBashTest', abstract 'JavaBashTestBase' for executing testing bash scripts from java classes.
     new dd72879  [hotfix] Remove heading/trailing/duplicated whitespaces from shell command generated by BootstrapTools.
     new 30ce8c0  [FLINK-13983][runtime] Introduce 'BashJavaUtils' allowing bash scripts to call java codes for generating TM resource dynamic configurations and JVM parameters.
     new 4088bea  [FLINK-13983][dist] TM startup scripts calls java codes to set flip49 TM resource configs and JVM parameters, if feature option is enabled.
     new 5b3ebcf  [FLINK-13983][runtime][yarn/mesos] Launches TaskExecutors on Yarn/Mesos with JVM parameters and dynamic configs generated from TaskExecutorResourceSpec.
     new a28bdc8  [FLINK-13983][runtime] Use flip49 config options to decide memory size of ShuffleEnvironment.
     new b4b482a  [FLINK-13983][runtime] Use flip49 config options to decide memory size of MemoryManager.
     new e1daac3  [FLINK-13986][core][config] Change default value of flip49 feature flag to true
     new 1d80b12  [FLINK-13986][test] Fix test cases missing explicit task executor resource configurations.
     new 436f467  [FLINK-13986][test] Fix failure cases that fail due to change of expected exception type.
     new 4224fd9  Adjust memory configuration for local execution
     new fd73130  Treat legacy TM heap size as total process memory, not flink memory
     new 5b5be51  Add backwards compatibility
     new 1b7df76  Change task off heap default value from 0b to 1m to accomodate for framework small allocations (temporary: later add framework off heap)
     new c499e10  Increase memory in YARNHighAvailabilityITCase
     new 71b1c38  fix ClassLoaderITCase
     new dc0ba44  fix LaunchableMesosWorkerTest
     new d123baf  fix YarnConfigurationITCase
     new 1f24495  Fix yarn cut off
     new b8c6a4c  [FLINK-14631] Account for netty direct allocations in direct memory limit (Netty Shuffle)
     new 38b436c  run e2e

The 21 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[flink] 13/21: Add backwards compatibility

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b5be514ad75d0343792b95ea5be8f83b0fa25d8
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Thu Oct 31 17:06:58 2019 +0100

    Add backwards compatibility
---
 flink-dist/src/main/resources/flink-conf.yaml                      | 4 ++--
 .../flink/runtime/clusterframework/TaskExecutorResourceUtils.java  | 7 +++++--
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index a3bc57d..15d7aa9 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -42,9 +42,9 @@ jobmanager.rpc.port: 6123
 jobmanager.heap.size: 1024m
 
 
-# The heap size for the TaskManager JVM
+# Total Flink process size for the TaskExecutor
 
-taskmanager.heap.size: 1024m
+taskmanager.memory.total-process.size: 1024m
 
 
 # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index f2a275f..4b649e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -417,7 +417,9 @@ public class TaskExecutorResourceUtils {
 			return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
 		} else {
 			@SuppressWarnings("deprecation")
-			final long legacyHeapMemoryMB = config.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB);
+			final long legacyHeapMemoryMB = config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) ?
+				config.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) :
+				MemorySize.parse(config.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getBytes();
 			return new MemorySize(legacyHeapMemoryMB << 20); // megabytes to bytes
 		}
 	}
@@ -427,7 +429,8 @@ public class TaskExecutorResourceUtils {
 	}
 
 	private static boolean isManagedMemorySizeExplicitlyConfigured(final Configuration config) {
-		return config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+		return config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE) ||
+			config.contains(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE);
 	}
 
 	private static boolean isManagedMemoryOffHeapFractionExplicitlyConfigured(final Configuration config) {


[flink] 21/21: run e2e

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 38b436cbd24d54b65bf7479b05f13920255271f9
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Wed Nov 6 18:15:49 2019 +0100

    run e2e
---
 .travis.yml | 157 ++++++++++++++++++++----------------------------------------
 1 file changed, 52 insertions(+), 105 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 109533f..d6fc195 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -71,7 +71,6 @@ stages:
   - name: compile
   - name: test
   - name: E2E
-    if: type = cron
   - name: cleanup
 
 jdk: "openjdk8"
@@ -130,242 +129,190 @@ jobs:
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11"
       name: cleanup
     # hadoop 2.4.1 profile
-    - if: type = cron
-      stage: compile
+    - stage: compile
       script: ./tools/travis_controller.sh compile
       env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
       name: compile - hadoop 2.4.1
-    - if: type = cron
-      stage: test
+    - stage: test
       script: ./tools/travis_controller.sh core
       env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
       name: core - hadoop 2.4.1
-    - if: type = cron
-      script: ./tools/travis_controller.sh libraries
+    - script: ./tools/travis_controller.sh libraries
       env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
       name: libraries - hadoop 2.4.1
-    - if: type = cron
-      script: ./tools/travis_controller.sh blink_planner
+    - script: ./tools/travis_controller.sh blink_planner
       env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
       name: blink_planner - hadoop 2.4.1
-    - if: type = cron
-      script: ./tools/travis_controller.sh connectors
+    - script: ./tools/travis_controller.sh connectors
       env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
       name: connectors - hadoop 2.4.1
-    - if: type = cron
-      script: ./tools/travis_controller.sh kafka/gelly
+    - script: ./tools/travis_controller.sh kafka/gelly
       env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
       name: kafka/gelly - hadoop 2.4.1
-    - if: type = cron
-      script: ./tools/travis_controller.sh tests
+    - script: ./tools/travis_controller.sh tests
       env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
       name: tests - hadoop 2.4.1
-    - if: type = cron
-      script: ./tools/travis_controller.sh misc
+    - script: ./tools/travis_controller.sh misc
       env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
       name: misc - hadoop 2.4.1
-    - if: type = cron
-      stage: cleanup
+    - stage: cleanup
       script: ./tools/travis_controller.sh cleanup
       env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
       name: cleanup - hadoop 2.4.1
     # scala 2.12 profile
-    - if: type = cron
-      stage: compile
+    - stage: compile
       script: ./tools/travis_controller.sh compile
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1"
       name: compile - scala 2.12
-    - if: type = cron
-      stage: test
+    - stage: test
       script: ./tools/travis_controller.sh core
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1"
       name: core - scala 2.12
-    - if: type = cron
-      script: ./tools/travis_controller.sh libraries
+    - script: ./tools/travis_controller.sh libraries
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1"
       name: libraries - scala 2.12
-    - if: type = cron
-      script: ./tools/travis_controller.sh blink_planner
+    - script: ./tools/travis_controller.sh blink_planner
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1"
       name: blink_planner - scala 2.12
-    - if: type = cron
-      script: ./tools/travis_controller.sh connectors
+    - script: ./tools/travis_controller.sh connectors
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1"
       name: connectors - scala 2.12
-    - if: type = cron
-      script: ./tools/travis_controller.sh kafka/gelly
+    - script: ./tools/travis_controller.sh kafka/gelly
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1"
       name: kafka/gelly - scala 2.12
-    - if: type = cron
-      script: ./tools/travis_controller.sh tests
+    - script: ./tools/travis_controller.sh tests
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1"
       name: tests - scala 2.12
-    - if: type = cron
-      script: ./tools/travis_controller.sh misc
+    - script: ./tools/travis_controller.sh misc
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1"
       name: misc - scala 2.12
-    - if: type = cron
-      stage: cleanup
+    - stage: cleanup
       script: ./tools/travis_controller.sh cleanup
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1"
       name: cleanup - scala 2.12
     # JDK11 profile
-    - if: type = cron
-      jdk: "openjdk11"
+    - jdk: "openjdk11"
       stage: compile
       script: ./tools/travis_controller.sh compile
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11 -Djdk11"
       name: compile - jdk 11
-    - if: type = cron
-      jdk: "openjdk11"
+    - jdk: "openjdk11"
       stage: test
       script: ./tools/travis_controller.sh core
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11 -Djdk11"
       name: core - jdk 11
-    - if: type = cron
-      jdk: "openjdk11"
+    - jdk: "openjdk11"
       script: ./tools/travis_controller.sh python
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11 -Djdk11"
       name: python - jdk 11
-    - if: type = cron
-      jdk: "openjdk11"
+    - jdk: "openjdk11"
       script: ./tools/travis_controller.sh libraries
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11 -Djdk11"
       name: libraries - jdk 11
-    - if: type = cron
-      jdk: "openjdk11"
+    - jdk: "openjdk11"
       script: ./tools/travis_controller.sh blink_planner
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11 -Djdk11"
       name: blink_planner - jdk 11
-    - if: type = cron
-      jdk: "openjdk11"
+    - jdk: "openjdk11"
       script: ./tools/travis_controller.sh connectors
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11 -Djdk11"
       name: connectors - jdk 11
-    - if: type = cron
-      jdk: "openjdk11"
+    - jdk: "openjdk11"
       script: ./tools/travis_controller.sh kafka/gelly
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11 -Djdk11"
       name: kafka/gelly - jdk 11
-    - if: type = cron
-      jdk: "openjdk11"
+    - jdk: "openjdk11"
       script: ./tools/travis_controller.sh tests
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11 -Djdk11"
       name: tests - jdk 11
-    - if: type = cron
-      jdk: "openjdk11"
+    - jdk: "openjdk11"
       script: ./tools/travis_controller.sh misc
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11 -Djdk11"
       name: misc
-    - if: type = cron
-      jdk: "openjdk11"
+    - jdk: "openjdk11"
       stage: cleanup
       script: ./tools/travis_controller.sh cleanup
       env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11 -Djdk11"
       name: cleanup - jdk 11
     # Documentation 404 check
-    - if: type = cron
-      stage: test
+    - stage: test
       script: ./tools/travis/docs.sh
       language: ruby
       rvm: 2.4.0
       name: Documentation links check
     # E2E profiles - Hadoop 2.8
-    - if: type = cron
-      stage: test
+    - stage: test
       env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3 -De2e-metrics"
       script: ./tools/travis/nightly.sh split_misc.sh
       name: e2e - misc - hadoop 2.8
-    - if: type = cron
-      env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3"
+    - env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3"
       script: ./tools/travis/nightly.sh split_ha.sh
       name: e2e - ha - hadoop 2.8
-    - if: type = cron
-      env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3"
+    - env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3"
       script: ./tools/travis/nightly.sh split_sticky.sh
       name: e2e - sticky - hadoop 2.8
-    - if: type = cron
-      env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3"
+    - env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3"
       script: ./tools/travis/nightly.sh split_checkpoints.sh
       name: e2e - checkpoints - hadoop 2.8
-    - if: type = cron
-      env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3"
+    - env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3"
       script: ./tools/travis/nightly.sh split_container.sh
       name: e2e - container - hadoop 2.8
-    - if: type = cron
-      env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3"
+    - env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3"
       script: ./tools/travis/nightly.sh split_heavy.sh
       name: e2e - heavy - hadoop 2.8
       # E2E profiles - Scala 2.12
-    - if: type = cron
-      env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 -De2e-metrics"
+    - env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 -De2e-metrics"
       script: ./tools/travis/nightly.sh split_misc.sh
       name: e2e - misc - scala 2.12
-    - if: type = cron
-      env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
+    - env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
       script: ./tools/travis/nightly.sh split_ha.sh
       name: e2e - ha - scala 2.12
-    - if: type = cron
-      env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
+    - env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
       script: ./tools/travis/nightly.sh split_sticky.sh
       name: e2e - sticky - scala 2.12
-    - if: type = cron
-      env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
+    - env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
       script: ./tools/travis/nightly.sh split_checkpoints.sh
       name: e2e - checkpoints - scala 2.12
-    - if: type = cron
-      env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
+    - env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
       script: ./tools/travis/nightly.sh split_container.sh
       name: e2e - container - scala 2.12
-    - if: type = cron
-      env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
+    - env: PROFILE="-Dinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
       script: ./tools/travis/nightly.sh split_heavy.sh
       name: e2e - heavy - scala 2.12
       # E2E profiles - Hadoop-free
-    - if: type = cron
-      env: PROFILE="-De2e-metrics"
+    - env: PROFILE="-De2e-metrics"
       script: ./tools/travis/nightly.sh split_misc_hadoopfree.sh
       name: e2e - misc
-    - if: type = cron
-      env: PROFILE=""
+    - env: PROFILE=""
       script: ./tools/travis/nightly.sh split_ha.sh
       name: e2e - ha
-    - if: type = cron
-      env: PROFILE=""
+    - env: PROFILE=""
       script: ./tools/travis/nightly.sh split_sticky.sh
       name: e2e - sticky
-    - if: type = cron
-      env: PROFILE=""
+    - env: PROFILE=""
       script: ./tools/travis/nightly.sh split_checkpoints.sh
       name: e2e - checkpoints
-    - if: type = cron
-      env: PROFILE=""
+    - env: PROFILE=""
       script: ./tools/travis/nightly.sh split_container.sh
       name: e2e - container
-    - if: type = cron
-      env: PROFILE=""
+    - env: PROFILE=""
       script: ./tools/travis/nightly.sh split_heavy.sh
       name: e2e - heavy
     # E2E profiles - Java 11
-    - if: type = cron
-      stage: test
+    - stage: test
       jdk: "openjdk11"
       env: PROFILE="-Djdk11 -Dinclude-hadoop -Dhadoop.version=2.8.3 -De2e-metrics"
       script: ./tools/travis/nightly.sh split_misc.sh
       name: e2e - misc - jdk11
-    - if: type = cron
-      env: PROFILE="-Djdk11 -Dinclude-hadoop -Dhadoop.version=2.8.3"
+    - env: PROFILE="-Djdk11 -Dinclude-hadoop -Dhadoop.version=2.8.3"
       script: ./tools/travis/nightly.sh split_ha.sh
       name: e2e - ha - jdk11
-    - if: type = cron
-      env: PROFILE="-Djdk11 -Dinclude-hadoop -Dhadoop.version=2.8.3"
+    - env: PROFILE="-Djdk11 -Dinclude-hadoop -Dhadoop.version=2.8.3"
       script: ./tools/travis/nightly.sh split_sticky.sh
       name: e2e - sticky - jdk 11
-    - if: type = cron
-      env: PROFILE="-Djdk11 -Dinclude-hadoop -Dhadoop.version=2.8.3"
+    - env: PROFILE="-Djdk11 -Dinclude-hadoop -Dhadoop.version=2.8.3"
       script: ./tools/travis/nightly.sh split_checkpoints.sh
       name: e2e - checkpoints - jdk 11
-    - if: type = cron
-      env: PROFILE="-Djdk11 -Dinclude-hadoop -Dhadoop.version=2.8.3"
+    - env: PROFILE="-Djdk11 -Dinclude-hadoop -Dhadoop.version=2.8.3"
       script: ./tools/travis/nightly.sh split_heavy.sh
       name: e2e - heavy - jdk 11


[flink] 07/21: [FLINK-13983][runtime] Use flip49 config options to decide memory size of MemoryManager.

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b4b482af4b93ee9fc98de08b7b427514df41d61b
Author: Xintong Song <to...@gmail.com>
AuthorDate: Mon Oct 14 16:09:23 2019 +0800

    [FLINK-13983][runtime] Use flip49 config options to decide memory size of MemoryManager.
---
 .../flink/runtime/taskexecutor/TaskManagerServices.java   | 15 +++++++++++++++
 .../taskexecutor/TaskManagerServicesConfiguration.java    | 10 ++++++++++
 2 files changed, 25 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index e570799..91f611a 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -51,7 +51,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
@@ -330,6 +332,19 @@ public class TaskManagerServices {
 	 */
 	private static MemoryManager createMemoryManager(
 			TaskManagerServicesConfiguration taskManagerServicesConfiguration) {
+		if (taskManagerServicesConfiguration.getOnHeapManagedMemorySize() != null &&
+			taskManagerServicesConfiguration.getOffHeapManagedMemorySize() != null) {
+			// flip49 enabled
+
+			final Map<MemoryType, Long> memorySizeByType = new HashMap<>();
+			memorySizeByType.put(MemoryType.HEAP, taskManagerServicesConfiguration.getOnHeapManagedMemorySize().getBytes());
+			memorySizeByType.put(MemoryType.OFF_HEAP, taskManagerServicesConfiguration.getOffHeapManagedMemorySize().getBytes());
+
+			return new MemoryManager(memorySizeByType,
+				taskManagerServicesConfiguration.getNumberOfSlots(),
+				taskManagerServicesConfiguration.getPageSize());
+		}
+
 		// computing the amount of memory to use depends on how much memory is available
 		// it strictly needs to happen AFTER the network stack has been initialized
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index c2133c9..bd84107 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -222,6 +222,16 @@ public class TaskManagerServicesConfiguration {
 		return taskExecutorResourceSpec == null ? null : taskExecutorResourceSpec.getShuffleMemSize();
 	}
 
+	@Nullable // should only be null when flip49 is disabled
+	public MemorySize getOnHeapManagedMemorySize() {
+		return taskExecutorResourceSpec == null ? null : taskExecutorResourceSpec.getOnHeapManagedMemorySize();
+	}
+
+	@Nullable // should only be null when flip49 is disabled
+	public MemorySize getOffHeapManagedMemorySize() {
+		return taskExecutorResourceSpec == null ? null : taskExecutorResourceSpec.getOffHeapManagedMemorySize();
+	}
+
 	long getTimerServiceShutdownTimeout() {
 		return timerServiceShutdownTimeout;
 	}


[flink] 02/21: [hotfix] Remove heading/trailing/duplicated whitespaces from shell command generated by BootstrapTools.

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dd728794937d69b710accea513e4662c9e669949
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Sep 27 18:09:58 2019 +0800

    [hotfix] Remove heading/trailing/duplicated whitespaces from shell command generated by BootstrapTools.
    
    This is to eliminate the dependencies on white space counts from 'BootstrapToolsTest#testGetTaskManagerShellCommand'.
---
 .../runtime/clusterframework/BootstrapTools.java   |  4 +-
 .../clusterframework/BootstrapToolsTest.java       | 20 ++++-----
 .../flink/yarn/YarnClusterDescriptorTest.java      | 49 +++++++++++-----------
 3 files changed, 37 insertions(+), 36 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 75de581..0b156ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -472,7 +472,9 @@ public class BootstrapTools {
 		for (Map.Entry<String, String> variable : startCommandValues
 			.entrySet()) {
 			template = template
-				.replace("%" + variable.getKey() + "%", variable.getValue());
+				.replace("%" + variable.getKey() + "%", variable.getValue())
+				.replace("  ", " ")
+				.trim();
 		}
 		return template;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index 54aadf5..04ad29c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -165,8 +165,8 @@ public class BootstrapToolsTest extends TestLogger {
 
 		assertEquals(
 			java + " " + jvmmem +
-				" " + // jvmOpts
-				" " + // logging
+				"" + // jvmOpts
+				"" + // logging
 				" " + mainClass + " " + args + " " + redirects,
 			BootstrapTools
 				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
@@ -175,8 +175,8 @@ public class BootstrapToolsTest extends TestLogger {
 		final String krb5 = "-Djava.security.krb5.conf=krb5.conf";
 		assertEquals(
 			java + " " + jvmmem +
-				" " + " " + krb5 + // jvmOpts
-				" " + // logging
+				" " + krb5 + // jvmOpts
+				"" + // logging
 				" " + mainClass + " " + args + " " + redirects,
 			BootstrapTools
 				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
@@ -185,7 +185,7 @@ public class BootstrapToolsTest extends TestLogger {
 		// logback only, with/out krb5
 		assertEquals(
 			java + " " + jvmmem +
-				" " + // jvmOpts
+				"" + // jvmOpts
 				" " + logfile + " " + logback +
 				" " + mainClass + " " + args + " " + redirects,
 			BootstrapTools
@@ -194,7 +194,7 @@ public class BootstrapToolsTest extends TestLogger {
 
 		assertEquals(
 			java + " " + jvmmem +
-				" " + " " + krb5 + // jvmOpts
+				" " + krb5 + // jvmOpts
 				" " + logfile + " " + logback +
 				" " + mainClass + " " + args + " " + redirects,
 			BootstrapTools
@@ -204,7 +204,7 @@ public class BootstrapToolsTest extends TestLogger {
 		// log4j, with/out krb5
 		assertEquals(
 			java + " " + jvmmem +
-				" " + // jvmOpts
+				"" + // jvmOpts
 				" " + logfile + " " + log4j +
 				" " + mainClass + " " + args + " " + redirects,
 			BootstrapTools
@@ -213,7 +213,7 @@ public class BootstrapToolsTest extends TestLogger {
 
 		assertEquals(
 			java + " " + jvmmem +
-				" " + " " + krb5 + // jvmOpts
+				" " + krb5 + // jvmOpts
 				" " + logfile + " " + log4j +
 				" " + mainClass + " " + args + " " + redirects,
 			BootstrapTools
@@ -223,7 +223,7 @@ public class BootstrapToolsTest extends TestLogger {
 		// logback + log4j, with/out krb5
 		assertEquals(
 			java + " " + jvmmem +
-				" " + // jvmOpts
+				"" + // jvmOpts
 				" " + logfile + " " + logback + " " + log4j +
 				" " + mainClass + " " + args + " " + redirects,
 			BootstrapTools
@@ -232,7 +232,7 @@ public class BootstrapToolsTest extends TestLogger {
 
 		assertEquals(
 			java + " " + jvmmem +
-				" " + " " + krb5 + // jvmOpts
+				" " + krb5 + // jvmOpts
 				" " + logfile + " " + logback + " " + log4j +
 				" " + mainClass + " " + args + " " + redirects,
 			BootstrapTools
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 69af7c3..ade7eec 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -189,7 +189,6 @@ public class YarnClusterDescriptorTest extends TestLogger {
 		final String log4j =
 			"-Dlog4j.configuration=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME; // if set
 		final String mainClass = clusterDescriptor.getYarnSessionClusterEntrypoint();
-		final String args = "";
 		final String redirects =
 			"1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +
 			"2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err";
@@ -199,9 +198,9 @@ public class YarnClusterDescriptorTest extends TestLogger {
 			// no logging, with/out krb5
 			assertEquals(
 				java + " " + jvmmem +
-					" " + // jvmOpts
-					" " + // logging
-					" " + mainClass + " " + args + " " + redirects,
+					"" + // jvmOpts
+					"" + // logging
+					" " + mainClass + " " + redirects,
 				clusterDescriptor
 					.setupApplicationMasterContainer(
 						mainClass,
@@ -213,9 +212,9 @@ public class YarnClusterDescriptorTest extends TestLogger {
 
 			assertEquals(
 				java + " " + jvmmem +
-					" " + " " + krb5 + // jvmOpts
-					" " + // logging
-					" " + mainClass + " " + args + " " + redirects,
+					" " + krb5 + // jvmOpts
+					"" + // logging
+					" " + mainClass + " " + redirects,
 				clusterDescriptor
 					.setupApplicationMasterContainer(
 						mainClass,
@@ -228,9 +227,9 @@ public class YarnClusterDescriptorTest extends TestLogger {
 			// logback only, with/out krb5
 			assertEquals(
 				java + " " + jvmmem +
-					" " + // jvmOpts
+					"" + // jvmOpts
 					" " + logfile + " " + logback +
-					" " + mainClass + " " + args + " " + redirects,
+					" " + mainClass + " " + redirects,
 				clusterDescriptor
 					.setupApplicationMasterContainer(
 						mainClass,
@@ -242,9 +241,9 @@ public class YarnClusterDescriptorTest extends TestLogger {
 
 			assertEquals(
 				java + " " + jvmmem +
-					" " + " " + krb5 + // jvmOpts
+					" " + krb5 + // jvmOpts
 					" " + logfile + " " + logback +
-					" " + mainClass + " " + args + " " + redirects,
+					" " + mainClass + " " + redirects,
 				clusterDescriptor
 					.setupApplicationMasterContainer(
 						mainClass,
@@ -257,9 +256,9 @@ public class YarnClusterDescriptorTest extends TestLogger {
 			// log4j, with/out krb5
 			assertEquals(
 				java + " " + jvmmem +
-					" " + // jvmOpts
+					"" + // jvmOpts
 					" " + logfile + " " + log4j +
-					" " + mainClass + " " + args + " " + redirects,
+					" " + mainClass + " " + redirects,
 				clusterDescriptor
 					.setupApplicationMasterContainer(
 						mainClass,
@@ -271,9 +270,9 @@ public class YarnClusterDescriptorTest extends TestLogger {
 
 			assertEquals(
 				java + " " + jvmmem +
-					" " + " " + krb5 + // jvmOpts
+					" " + krb5 + // jvmOpts
 					" " + logfile + " " + log4j +
-					" " + mainClass + " " + args + " " + redirects,
+					" " + mainClass + " " + redirects,
 				clusterDescriptor
 					.setupApplicationMasterContainer(
 						mainClass,
@@ -286,9 +285,9 @@ public class YarnClusterDescriptorTest extends TestLogger {
 			// logback + log4j, with/out krb5
 			assertEquals(
 				java + " " + jvmmem +
-					" " + // jvmOpts
+					"" + // jvmOpts
 					" " + logfile + " " + logback + " " + log4j +
-					" " + mainClass + " " + args + " " + redirects,
+					" " + mainClass + " " + redirects,
 				clusterDescriptor
 					.setupApplicationMasterContainer(
 						mainClass,
@@ -300,9 +299,9 @@ public class YarnClusterDescriptorTest extends TestLogger {
 
 			assertEquals(
 				java + " " + jvmmem +
-					" " + " " + krb5 + // jvmOpts
+					" " + krb5 + // jvmOpts
 					" " + logfile + " " + logback + " " + log4j +
-					" " + mainClass + " " + args + " " + redirects,
+					" " + mainClass + " " + redirects,
 				clusterDescriptor
 					.setupApplicationMasterContainer(
 						mainClass,
@@ -320,7 +319,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				java + " " + jvmmem +
 					" " + jvmOpts +
 					" " + logfile + " " + logback + " " + log4j +
-					" " + mainClass + " " + args + " " + redirects,
+					" " + mainClass + " " + redirects,
 				clusterDescriptor
 					.setupApplicationMasterContainer(
 						mainClass,
@@ -334,7 +333,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				java + " " + jvmmem +
 					" " + jvmOpts + " " + krb5 + // jvmOpts
 					" " + logfile + " " + logback + " " + log4j +
-					" " + mainClass + " " + args + " " + redirects,
+					" " + mainClass + " " + redirects,
 				clusterDescriptor
 					.setupApplicationMasterContainer(
 						mainClass,
@@ -351,7 +350,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				java + " " + jvmmem +
 					" " + jvmOpts + " " + jmJvmOpts +
 					" " + logfile + " " + logback + " " + log4j +
-					" " + mainClass + " " + args + " " + redirects,
+					" " + mainClass + " " + redirects,
 				clusterDescriptor
 					.setupApplicationMasterContainer(
 						mainClass,
@@ -365,7 +364,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				java + " " + jvmmem +
 					" " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
 					" " + logfile + " " + logback + " " + log4j +
-					" " + mainClass + " " + args + " " + redirects,
+					" " + mainClass + " " + redirects,
 				clusterDescriptor
 					.setupApplicationMasterContainer(
 						mainClass,
@@ -383,7 +382,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				java + " 1 " + jvmmem +
 					" 2 " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
 					" 3 " + logfile + " " + logback + " " + log4j +
-					" 4 " + mainClass + " 5 " + args + " 6 " + redirects,
+					" 4 " + mainClass + " 5 6 " + redirects,
 				clusterDescriptor
 					.setupApplicationMasterContainer(
 						mainClass,
@@ -401,7 +400,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 					" " + logfile + " " + logback + " " + log4j +
 					" " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
 					" " + jvmmem +
-					" " + mainClass + " " + args + " " + redirects,
+					" " + mainClass + " " + redirects,
 				clusterDescriptor
 					.setupApplicationMasterContainer(
 						mainClass,


[flink] 16/21: fix ClassLoaderITCase

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 71b1c3827b65560f004e067ec66ecf2bd1ae5e1f
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Mon Nov 4 18:27:57 2019 +0100

    fix ClassLoaderITCase
---
 flink-dist/src/main/flink-bin/bin/config.sh        | 26 ++++++++++++----------
 flink-dist/src/main/flink-bin/bin/taskmanager.sh   |  3 ++-
 .../flink/test/classloading/ClassLoaderITCase.java |  2 +-
 3 files changed, 17 insertions(+), 14 deletions(-)

diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 52ce960..8843877 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -798,6 +798,20 @@ runBashJavaUtilsCmd() {
     echo ${output}
 }
 
+verifyTmResourceConfig() {
+  if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
+	    echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace with key \`${KEY_TASKM_MEM_SIZE}\`"
+    else
+	    flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
+	    FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
+    fi
+
+    if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then
+        echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
+        exit 1
+    fi
+}
+
 getTmResourceDynamicConfigsAndJvmParams() {
     if [[ "`echo ${FLINK_TM_ENABLE_FLIP49} | tr '[:upper:]' '[:lower:]'`" == "true" ]]; then
         echo "$(getTmResourceDynamicConfigsAndJvmParamsFlip49)"
@@ -817,18 +831,6 @@ getTmResourceDynamicConfigsAndJvmParamsFlip49() {
 }
 
 getTmResourceDynamicConfigsAndJvmParamsLegacy() {
-    if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
-	    echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace with key \`${KEY_TASKM_MEM_SIZE}\`"
-    else
-	    flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
-	    FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
-    fi
-
-    if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then
-        echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
-        exit 1
-    fi
-
     if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then
 
         TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index e78a1f1..a3d62e2 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -48,6 +48,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
     export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
 
     # Startup parameters
+    verifyTmResourceConfig
     dynamic_configs_and_jvm_params=$(getTmResourceDynamicConfigsAndJvmParams)
     IFS=$'\n' lines=(${dynamic_configs_and_jvm_params})
 
@@ -55,7 +56,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
     export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
 
     dynamic_configs=${lines[1]}
-    ARGS=(${ARGS[@]} ${dynamic_configs})
+    ARGS+=(${dynamic_configs})
     ARGS+=("--configDir" "${FLINK_CONF_DIR}")
 fi
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index a09142e..f90a5f3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -116,7 +116,7 @@ public class ClassLoaderITCase extends TestLogger {
 				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 
 		// required as we otherwise run out of memory
-		config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "80m");
+		config.setString(TaskManagerOptions.TOTAL_FLINK_MEMORY, "512m");
 
 		miniClusterResource = new MiniClusterResource(
 			new MiniClusterResourceConfiguration.Builder()


[flink] 15/21: Increase memory in YARNHighAvailabilityITCase

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c499e102929b5793c9470e4527e6206d866f874a
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Mon Nov 4 15:24:02 2019 +0100

    Increase memory in YARNHighAvailabilityITCase
---
 .../src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index a30046e..f082c84 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -288,7 +288,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 	}
 
 	private RestClusterClient<ApplicationId> deploySessionCluster(YarnClusterDescriptor yarnClusterDescriptor) throws ClusterDeploymentException {
-		final int containerMemory = 256;
+		final int containerMemory = 1024;
 		final ClusterClient<ApplicationId> yarnClusterClient = yarnClusterDescriptor.deploySessionCluster(
 			new ClusterSpecification.ClusterSpecificationBuilder()
 				.setMasterMemoryMB(containerMemory)


[flink] 09/21: [FLINK-13986][test] Fix test cases missing explicit task executor resource configurations.

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d80b12834b2f11a64d61ef489ea0c9e38357524
Author: Xintong Song <to...@gmail.com>
AuthorDate: Mon Oct 21 16:28:57 2019 +0800

    [FLINK-13986][test] Fix test cases missing explicit task executor resource configurations.
    
    FLIP-49 requires either one of the following three size(s) to be explicitly configured.
    - Task Heap Memory and Managed Memory
    - Total Flink Memory
    - Total Process Memory
    
    This commit fix test cases that fail due to all the above three are missing, by setting a reasonable Total Flink Memory size.
---
 .../src/main/java/org/apache/flink/client/LocalExecutor.java     | 2 ++
 .../apache/flink/streaming/connectors/kafka/KafkaTestBase.java   | 3 ++-
 .../runtime/state/TaskExecutorLocalStateStoresManagerTest.java   | 4 +++-
 .../flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java | 9 ++++++---
 .../apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java | 7 +++++--
 .../test/recovery/JobManagerHAProcessFailureRecoveryITCase.java  | 3 ++-
 6 files changed, 20 insertions(+), 8 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index b1d3330..7ff020c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -64,6 +65,7 @@ public class LocalExecutor extends PlanExecutor {
 		if (!configuration.contains(RestOptions.BIND_PORT)) {
 			configuration.setString(RestOptions.BIND_PORT, "0");
 		}
+		TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(configuration);
 
 		int numTaskManagers = configuration.getInteger(
 				ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 94d9133..85f8f2f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -117,7 +117,8 @@ public abstract class KafkaTestBase extends TestLogger {
 
 	protected static Configuration getFlinkConfiguration() {
 		Configuration flinkConfig = new Configuration();
-		flinkConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "16m");
+		flinkConfig.setString(TaskManagerOptions.TASK_HEAP_MEMORY, "16m");
+		flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		return flinkConfig;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index d9eac20..6191d49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -46,6 +47,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	private static final long MEM_SIZE_PARAM = 128L * 1024 * 1024;
+	private static final int TOTAL_FLINK_MEMORY_MB = 512;
 
 	/**
 	 * This tests that the creation of {@link TaskManagerServices} correctly creates the local state root directory
@@ -205,7 +207,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 	private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration(
 			Configuration config) throws IOException {
 		return TaskManagerServicesConfiguration.fromConfiguration(
-			config,
+			TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(config),
 			ResourceID.generate(),
 			InetAddress.getLocalHost(),
 			MEM_SIZE_PARAM,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
index 4f77721..8bc60a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
@@ -46,6 +46,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 
+import static org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -57,6 +58,8 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
 
 	private static final String LOCAL_HOST = "localhost";
 
+	private static final int TOTAL_FLINK_MEMORY_MB = 512;
+
 	@Rule
 	public final TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -87,7 +90,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
 			nonWritable.setWritable(false, false));
 
 		try {
-			Configuration cfg = new Configuration();
+			Configuration cfg = adjustMemoryConfigurationForLocalExecution(new Configuration());
 			cfg.setString(CoreOptions.TMP_DIRS, nonWritable.getAbsolutePath());
 
 			try {
@@ -117,7 +120,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
 	 */
 	@Test
 	public void testMemoryConfigWrong() throws Exception {
-		Configuration cfg = new Configuration();
+		Configuration cfg = adjustMemoryConfigurationForLocalExecution(new Configuration());
 
 		// something invalid
 		cfg.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "-42m");
@@ -142,7 +145,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
 		final ServerSocket blocker = new ServerSocket(0, 50, InetAddress.getByName(LOCAL_HOST));
 
 		try {
-			final Configuration cfg = new Configuration();
+			final Configuration cfg = adjustMemoryConfigurationForLocalExecution(new Configuration());
 			cfg.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, blocker.getLocalPort());
 
 			startTaskManager(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
index 4110e5f..9777186 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager;
 import org.apache.flink.util.TestLogger;
@@ -91,8 +92,10 @@ public class TaskManagerRunnerTest extends TestLogger {
 	}
 
 	private static TaskManagerRunner createTaskManagerRunner(final Configuration configuration) throws Exception {
-		TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, ResourceID.generate());
+		TaskManagerRunner taskManagerRunner = new TaskManagerRunner(
+			TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(configuration),
+			ResourceID.generate());
 		taskManagerRunner.start();
 		return taskManagerRunner;
 	}
-}
\ No newline at end of file
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
index 130d38e..9ef2d82 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
@@ -249,7 +249,8 @@ public class JobManagerHAProcessFailureRecoveryITCase extends TestLogger {
 		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
 			zooKeeper.getConnectString(), zookeeperStoragePath.getPath());
 		// Task manager configuration
-		config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "4m");
+		config.setString(TaskManagerOptions.TASK_HEAP_MEMORY, "1m");
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 		config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 100);
 		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 


[flink] 05/21: [FLINK-13983][runtime][yarn/mesos] Launches TaskExecutors on Yarn/Mesos with JVM parameters and dynamic configs generated from TaskExecutorResourceSpec.

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b3ebcf00ac8d925fca99967a772154956abfae5
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Sep 27 17:12:10 2019 +0800

    [FLINK-13983][runtime][yarn/mesos] Launches TaskExecutors on Yarn/Mesos with JVM parameters and dynamic configs generated from TaskExecutorResourceSpec.
---
 .../clusterframework/LaunchableMesosWorker.java    | 23 +++++++++++---
 .../MesosTaskManagerParameters.java                |  9 ++++++
 .../clusterframework/MesosResourceManagerTest.java |  2 +-
 .../runtime/clusterframework/BootstrapTools.java   | 12 ++++++--
 .../ContaineredTaskManagerParameters.java          | 19 ++++++++++--
 .../clusterframework/TaskExecutorResourceSpec.java | 16 +++++++++-
 .../runtime/resourcemanager/ResourceManager.java   | 12 ++++++++
 .../clusterframework/BootstrapToolsTest.java       | 35 ++++++++++++++++++++-
 .../ContaineredTaskManagerParametersTest.java      |  6 ++--
 .../test/java/org/apache/flink/yarn/UtilsTest.java |  2 +-
 .../apache/flink/yarn/YarnConfigurationITCase.java |  1 +
 .../org/apache/flink/yarn/YarnResourceManager.java | 36 ++++++++++++++++++----
 12 files changed, 151 insertions(+), 22 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 895c66a..8a9f93a 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -28,6 +28,7 @@ import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.mesos.util.MesosResourceAllocation;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.util.Preconditions;
 
 import com.netflix.fenzo.ConstraintEvaluator;
@@ -135,7 +136,11 @@ public class LaunchableMesosWorker implements LaunchableTask {
 
 		@Override
 		public double getMemory() {
-			return params.containeredParameters().taskManagerTotalMemoryMB();
+			if (params.containeredParameters().getTaskExecutorResourceSpec() == null) { // flip49 disabled
+				return params.containeredParameters().taskManagerTotalMemoryMB();
+			} else {
+				return params.containeredParameters().getTaskExecutorResourceSpec().getTotalProcessMemorySize().getMebiBytes();
+			}
 		}
 
 		@Override
@@ -275,10 +280,15 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		env.addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, taskInfo.getTaskId().getValue()));
 
 		// finalize the memory parameters
-		jvmArgs.append(" -Xms").append(tmParams.taskManagerHeapSizeMB()).append("m");
-		jvmArgs.append(" -Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m");
-		if (tmParams.taskManagerDirectMemoryLimitMB() >= 0) {
-			jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m");
+		if (tmParams.getTaskExecutorResourceSpec() == null) { // flip49 disabled
+			jvmArgs.append(" -Xms").append(tmParams.taskManagerHeapSizeMB()).append("m");
+			jvmArgs.append(" -Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m");
+			if (tmParams.taskManagerDirectMemoryLimitMB() >= 0) {
+				jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append(
+					"m");
+			}
+		} else { // flip49 enabled
+			jvmArgs.append(" ").append(TaskExecutorResourceUtils.generateJvmParametersStr(tmParams.getTaskExecutorResourceSpec()));
 		}
 
 		// pass dynamic system properties
@@ -301,6 +311,9 @@ public class LaunchableMesosWorker implements LaunchableTask {
 			.append(params.command())
 			.append(" ")
 			.append(ContainerSpecification.formatSystemProperties(dynamicProperties));
+		if (tmParams.getTaskExecutorResourceSpec() != null) { // flip49 enabled
+			launchCommand.append(" ").append(TaskExecutorResourceUtils.generateDynamicConfigsStr(tmParams.getTaskExecutorResourceSpec()));
+		}
 		cmd.setValue(launchCommand.toString());
 
 		// build the container info
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 1d49000..fcad191 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -24,6 +24,8 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.util.Preconditions;
 
 import com.netflix.fenzo.ConstraintEvaluator;
@@ -333,9 +335,16 @@ public class MesosTaskManagerParameters {
 	public static MesosTaskManagerParameters create(Configuration flinkConfig) {
 
 		List<ConstraintEvaluator> constraints = parseConstraints(flinkConfig.getString(MESOS_CONSTRAINTS_HARD_HOSTATTR));
+		TaskExecutorResourceSpec taskExecutorResourceSpec;
+		if (flinkConfig.getBoolean(TaskManagerOptions.ENABLE_FLIP_49_CONFIG)) {
+			taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(flinkConfig);
+		} else {
+			taskExecutorResourceSpec = null;
+		}
 		// parse the common parameters
 		ContaineredTaskManagerParameters containeredParameters = ContaineredTaskManagerParameters.create(
 			flinkConfig,
+			taskExecutorResourceSpec,
 			flinkConfig.getInteger(MESOS_RM_TASKS_MEMORY_MB),
 			flinkConfig.getInteger(MESOS_RM_TASKS_SLOTS));
 
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index b2eb177..177bf2f 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -271,7 +271,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			// TaskExecutor templating
 			ContainerSpecification containerSpecification = new ContainerSpecification();
 			ContaineredTaskManagerParameters containeredParams =
-				new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
+				new ContaineredTaskManagerParameters(null, 1024, 768, 256, 4, new HashMap<String, String>());
 			MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
 				1.0, 1, 0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams,
 				Collections.<Protos.Volume>emptyList(), Collections.<Protos.Parameter>emptyList(), false,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 0b156ca..e18edf3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -395,7 +395,12 @@ public class BootstrapTools {
 				tmParams.taskManagerDirectMemoryLimitMB()));
 		}
 
-		startCommandValues.put("jvmmem", StringUtils.join(params, ' '));
+		final TaskExecutorResourceSpec taskExecutorResourceSpec = tmParams.getTaskExecutorResourceSpec();
+		if (taskExecutorResourceSpec == null) { // FLIP-49 disabled
+			startCommandValues.put("jvmmem", StringUtils.join(params, ' '));
+		} else { // FLIP-49 enabled
+			startCommandValues.put("jvmmem", TaskExecutorResourceUtils.generateJvmParametersStr(taskExecutorResourceSpec));
+		}
 
 		String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
 		if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() > 0) {
@@ -427,7 +432,10 @@ public class BootstrapTools {
 		startCommandValues.put("redirects",
 			"1> " + logDirectory + "/taskmanager.out " +
 			"2> " + logDirectory + "/taskmanager.err");
-		startCommandValues.put("args", "--configDir " + configDirectory);
+
+		String argsStr = taskExecutorResourceSpec == null ? "" :
+			TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec) + " ";
+		startCommandValues.put("args", argsStr + "--configDir " + configDirectory);
 
 		final String commandTemplate = flinkConfig
 			.getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index a4e7d25..af8b7fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -48,13 +49,19 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 	/** Environment variables to add to the Java process. */
 	private final HashMap<String, String> taskManagerEnv;
 
+	@Nullable // should only be null when flip49 is disabled
+	private final TaskExecutorResourceSpec taskExecutorResourceSpec;
+
 	public ContaineredTaskManagerParameters(
+			@Nullable // should only be null when flip49 is disabled
+			TaskExecutorResourceSpec taskExecutorResourceSpec,
 			long totalContainerMemoryMB,
 			long taskManagerHeapSizeMB,
 			long taskManagerDirectMemoryLimitMB,
 			int numSlots,
 			HashMap<String, String> taskManagerEnv) {
 
+		this.taskExecutorResourceSpec = taskExecutorResourceSpec;
 		this.totalContainerMemoryMB = totalContainerMemoryMB;
 		this.taskManagerHeapSizeMB = taskManagerHeapSizeMB;
 		this.taskManagerDirectMemoryLimitMB = taskManagerDirectMemoryLimitMB;
@@ -64,6 +71,11 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 
 	// ------------------------------------------------------------------------
 
+	@Nullable // should only be null when flip49 is disabled
+	public TaskExecutorResourceSpec getTaskExecutorResourceSpec() {
+		return taskExecutorResourceSpec;
+	}
+
 	public long taskManagerTotalMemoryMB() {
 		return totalContainerMemoryMB;
 	}
@@ -90,7 +102,8 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 	@Override
 	public String toString() {
 		return "TaskManagerParameters {" +
-			"totalContainerMemory=" + totalContainerMemoryMB +
+			"taskExecutorResourceSpec=" + (taskExecutorResourceSpec == null ? "null" : taskExecutorResourceSpec) +
+			", totalContainerMemory=" + totalContainerMemoryMB +
 			", taskManagerHeapSize=" + taskManagerHeapSizeMB +
 			", taskManagerDirectMemoryLimit=" + taskManagerDirectMemoryLimitMB +
 			", numSlots=" + numSlots +
@@ -151,6 +164,8 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 	 */
 	public static ContaineredTaskManagerParameters create(
 			Configuration config,
+			@Nullable // should only be null when flip49 is disabled
+			TaskExecutorResourceSpec taskExecutorResourceSpec,
 			long containerMemoryMB,
 			int numSlots) {
 		// (1) try to compute how much memory used by container
@@ -175,6 +190,6 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 
 		// done
 		return new ContaineredTaskManagerParameters(
-			containerMemoryMB, heapSizeMB, offHeapSizeMB, numSlots, envVars);
+			taskExecutorResourceSpec, containerMemoryMB, heapSizeMB, offHeapSizeMB, numSlots, envVars);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
index 0813aea..d6cbe5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
@@ -74,7 +74,7 @@ import org.apache.flink.configuration.MemorySize;
  *               └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
  * </pre>
  */
-public class TaskExecutorResourceSpec {
+public class TaskExecutorResourceSpec implements java.io.Serializable {
 
 	private final MemorySize frameworkHeapSize;
 
@@ -155,4 +155,18 @@ public class TaskExecutorResourceSpec {
 	public MemorySize getTotalProcessMemorySize() {
 		return getTotalFlinkMemorySize().add(jvmMetaspaceSize).add(jvmOverheadSize);
 	}
+
+	@Override
+	public String toString() {
+		return "TaskExecutorResourceSpec {"
+			+ "frameworkHeapSize=" + frameworkHeapSize.toString()
+			+ ", taskHeapSize=" + taskHeapSize.toString()
+			+ ", taskOffHeapSize=" + taskOffHeapSize.toString()
+			+ ", shuffleMemSize=" + shuffleMemSize.toString()
+			+ ", onHeapManagedMemorySize=" + onHeapManagedMemorySize.toString()
+			+ ", offHeapManagedMemorySize=" + offHeapManagedMemorySize.toString()
+			+ ", jvmMetaspaceSize=" + jvmMetaspaceSize.toString()
+			+ ", jvmOverheadSize=" + jvmOverheadSize.toString()
+			+ "}";
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index c89a88d..a2aa9ac 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -27,6 +27,8 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
@@ -1210,5 +1212,15 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
 		return Collections.nCopies(numSlots, resourceProfile);
 	}
+
+	@Nullable // should only be null when flip49 is disabled
+	public static TaskExecutorResourceSpec createTaskExecutorResourceSpec(Configuration config) {
+		final boolean enableFlip49 = config.getBoolean(TaskManagerOptions.ENABLE_FLIP_49_CONFIG);
+		if (enableFlip49) {
+			return TaskExecutorResourceUtils.resourceSpecFromConfig(config);
+		} else {
+			return null;
+		}
+	}
 }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index 04ad29c..3cc3168 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.clusterframework;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.util.ExceptionUtils;
@@ -143,8 +144,40 @@ public class BootstrapToolsTest extends TestLogger {
 	@Test
 	public void testGetTaskManagerShellCommand() {
 		final Configuration cfg = new Configuration();
+		final TaskExecutorResourceSpec taskExecutorResourceSpec = new TaskExecutorResourceSpec(
+			new MemorySize(0), // frameworkHeapSize
+			new MemorySize(111), // taskHeapSize
+			new MemorySize(0), // taskOffHeapSize
+			new MemorySize(222), // shuffleMemSize
+			new MemorySize(0), // onHeapManagedMemorySize
+			new MemorySize(0), // offHeapManagedMemorySize
+			new MemorySize(333), // jvmMetaspaceSize
+			new MemorySize(0)); // jvmOverheadSize
+		final ContaineredTaskManagerParameters containeredParams = new ContaineredTaskManagerParameters(
+			taskExecutorResourceSpec, 1024, 768, 256, 4, new HashMap<String, String>());
+
+		// no logging, with/out krb5
+		final String java = "$JAVA_HOME/bin/java";
+		final String jvmmem = "-Xmx111 -Xms111 -XX:MaxDirectMemorySize=222 -XX:MaxMetaspaceSize=333";
+		final String mainClass =
+			"org.apache.flink.runtime.clusterframework.BootstrapToolsTest";
+		final String dynamicConfigs = TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec).trim();
+		final String args = dynamicConfigs + " --configDir ./conf";
+		final String redirects =
+			"1> ./logs/taskmanager.out 2> ./logs/taskmanager.err";
+
+		assertEquals(
+			java + " " + jvmmem + " " + mainClass + " " + args + " " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					false, false, false, this.getClass()));
+	}
+
+	@Test
+	public void testGetTaskManagerShellCommandLegacy() {
+		final Configuration cfg = new Configuration();
 		final ContaineredTaskManagerParameters containeredParams =
-			new ContaineredTaskManagerParameters(1024, 768, 256, 4,
+			new ContaineredTaskManagerParameters(null, 1024, 768, 256, 4,
 				new HashMap<String, String>());
 
 		// no logging, with/out krb5
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
index a1f4cad..7fe09fd 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
@@ -45,7 +45,7 @@ public class ContaineredTaskManagerParametersTest extends TestLogger {
 		Configuration conf = new Configuration();
 
 		ContaineredTaskManagerParameters params =
-			ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
+			ContaineredTaskManagerParameters.create(conf, null, CONTAINER_MEMORY, 1);
 
 		final float memoryCutoffRatio = conf.getFloat(
 			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
@@ -80,7 +80,7 @@ public class ContaineredTaskManagerParametersTest extends TestLogger {
 		conf.setBoolean(MEMORY_OFF_HEAP, false);
 
 		ContaineredTaskManagerParameters params =
-			ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
+			ContaineredTaskManagerParameters.create(conf, null, CONTAINER_MEMORY, 1);
 
 		assertTrue(params.taskManagerDirectMemoryLimitMB() > 0L);
 
@@ -98,7 +98,7 @@ public class ContaineredTaskManagerParametersTest extends TestLogger {
 		conf.setBoolean(MEMORY_OFF_HEAP, true);
 
 		ContaineredTaskManagerParameters params =
-			ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
+			ContaineredTaskManagerParameters.create(conf, null, CONTAINER_MEMORY, 1);
 
 		assertTrue(params.taskManagerDirectMemoryLimitMB() > 0L);
 
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 3a3144e..8f74aa1 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -187,7 +187,7 @@ public class UtilsTest extends TestLogger {
 			hdfsDelegationTokenKind, service));
 		amCredentials.writeTokenStorageFile(new org.apache.hadoop.fs.Path(credentialFile.getAbsolutePath()), yarnConf);
 
-		ContaineredTaskManagerParameters tmParams = new ContaineredTaskManagerParameters(64,
+		ContaineredTaskManagerParameters tmParams = new ContaineredTaskManagerParameters(null, 64,
 			64, 16, 1, new HashMap<>(1));
 		Configuration taskManagerConf = new Configuration();
 
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 2aa476a..8350293 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -177,6 +177,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
 
 					final ContaineredTaskManagerParameters containeredTaskManagerParameters = ContaineredTaskManagerParameters.create(
 						configuration,
+						null,
 						taskManagerMemory,
 						slotsPerTaskManager);
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 3a70086..366a029 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -129,6 +131,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 	private final Resource resource;
 
+	@Nullable // should only be null when flip49 is disabled
+	private final TaskExecutorResourceSpec taskExecutorResourceSpec;
+
 	public YarnResourceManager(
 			RpcService rpcService,
 			String resourceManagerEndpointId,
@@ -176,7 +181,14 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 		this.webInterfaceUrl = webInterfaceUrl;
 		this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
-		this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
+
+		this.taskExecutorResourceSpec = createTaskExecutorResourceSpec(flinkConfig);
+		if (taskExecutorResourceSpec != null) { // FLIP-49 enabled
+			final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(flinkConfig);
+			this.defaultTaskManagerMemoryMB = taskExecutorResourceSpec.getTotalProcessMemorySize().getMebiBytes();
+		} else { // FLIP-49 disabled
+			this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
+		}
 		this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
 		this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
 
@@ -390,6 +402,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 					try {
 						// Context information used to start a TaskExecutor Java process
 						ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
+							taskExecutorResourceSpec,
 							container.getResource(),
 							containerIdStr,
 							container.getNodeId().getHost());
@@ -534,20 +547,31 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 			RM_REQUEST_PRIORITY);
 	}
 
-	private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
-			throws Exception {
+	private ContainerLaunchContext createTaskExecutorLaunchContext(
+		@Nullable // should only be null when flip49 is disabled
+		TaskExecutorResourceSpec tmResourceSpec,
+		Resource resource,
+		String containerId,
+		String host) throws Exception {
+
 		// init the ContainerLaunchContext
 		final String currDir = env.get(ApplicationConstants.Environment.PWD.key());
 
 		final ContaineredTaskManagerParameters taskManagerParameters =
-				ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), numberOfTaskSlots);
+				ContaineredTaskManagerParameters.create(flinkConfig, tmResourceSpec, resource.getMemory(), numberOfTaskSlots);
 
-		log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " +
-				"JVM direct memory limit {} MB",
+		if (tmResourceSpec == null) { // FLIP-49 disabled
+			log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " +
+					"JVM direct memory limit {} MB",
 				containerId,
 				taskManagerParameters.taskManagerTotalMemoryMB(),
 				taskManagerParameters.taskManagerHeapSizeMB(),
 				taskManagerParameters.taskManagerDirectMemoryLimitMB());
+		} else { // Flip-49 enabled
+			log.debug("TaskExecutor {} will be started with {}.",
+				containerId,
+				tmResourceSpec);
+		}
 
 		Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig);
 


[flink] 18/21: fix YarnConfigurationITCase

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d123baf24e68ba7058bbb739954a59879bf5265d
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Tue Nov 5 09:14:50 2019 +0100

    fix YarnConfigurationITCase
---
 .../src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java    | 2 +-
 .../src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java      | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 8350293..dbad597 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -85,7 +85,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
 			final Configuration configuration = new Configuration(flinkConfiguration);
 
 			final int masterMemory = 64;
-			final int taskManagerMemory = 128;
+			final int taskManagerMemory = 512;
 			final int slotsPerTaskManager = 3;
 
 			// disable heap cutoff min
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index f958b70..74207ff 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -835,7 +835,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 				clusterSpecification.getSlotsPerTaskManager());
 
 		configuration.setString(
-				TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY,
+				TaskManagerOptions.TOTAL_PROCESS_MEMORY,
 				clusterSpecification.getTaskManagerMemoryMB() + "m");
 
 		// Upload the flink configuration


[flink] 10/21: [FLINK-13986][test] Fix failure cases that fail due to change of expected exception type.

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 436f46753baa44f2892910e50709bd5cd49d09d6
Author: Xintong Song <to...@gmail.com>
AuthorDate: Mon Oct 21 18:46:32 2019 +0800

    [FLINK-13986][test] Fix failure cases that fail due to change of expected exception type.
---
 .../flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java       | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
index 8bc60a0..bd094c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.blob.BlobCacheService;
@@ -132,7 +131,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
 				highAvailabilityServices);
 
 			fail("Should fail synchronously with an exception");
-		} catch (IllegalConfigurationException e) {
+		} catch (Throwable t) {
 			// splendid!
 		}
 	}


[flink] 14/21: Change task off heap default value from 0b to 1m to accomodate for framework small allocations (temporary: later add framework off heap)

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1b7df767ce95db9d40f71c2936aa298ae51cc16f
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Mon Nov 4 14:27:03 2019 +0100

    Change task off heap default value from 0b to 1m to accomodate for framework small allocations (temporary: later add framework off heap)
---
 docs/_includes/generated/task_manager_memory_configuration.html         | 2 +-
 .../main/java/org/apache/flink/configuration/TaskManagerOptions.java    | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/_includes/generated/task_manager_memory_configuration.html b/docs/_includes/generated/task_manager_memory_configuration.html
index 09b9dc6..e95160e 100644
--- a/docs/_includes/generated/task_manager_memory_configuration.html
+++ b/docs/_includes/generated/task_manager_memory_configuration.html
@@ -79,7 +79,7 @@
         </tr>
         <tr>
             <td><h5>taskmanager.memory.task.off-heap.size</h5></td>
-            <td style="word-wrap: break-word;">"0b"</td>
+            <td style="word-wrap: break-word;">"1m"</td>
             <td>Task Heap Memory size for TaskExecutors. This is the size of off heap memory (JVM direct memory or native memory) reserved for user code.</td>
         </tr>
         <tr>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index d717ad5..234d16a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -293,7 +293,7 @@ public class TaskManagerOptions {
 	 */
 	public static final ConfigOption<String> TASK_OFF_HEAP_MEMORY =
 		key("taskmanager.memory.task.off-heap.size")
-			.defaultValue("0b")
+			.defaultValue("1m")
 			.withDescription("Task Heap Memory size for TaskExecutors. This is the size of off heap memory (JVM direct"
 				+ " memory or native memory) reserved for user code.");
 


[flink] 20/21: [FLINK-14631] Account for netty direct allocations in direct memory limit (Netty Shuffle)

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b8c6a4cdfab29447dc1cfc74d37c1a4c34217717
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Wed Nov 6 17:24:28 2019 +0100

    [FLINK-14631] Account for netty direct allocations in direct memory limit (Netty Shuffle)
    
    At the moment after FLINK-13982, when we calculate JVM direct memory limit, we only account for memory segment network buffers but not for direct allocations from netty arenas in org.apache.flink.runtime.io.network.netty.NettyBufferPool. We should include netty arenas into shuffle memory calculations.
---
 .../test-scripts/test_batch_allround.sh            |  4 +--
 .../TaskExecutorResourceUtils.java                 |  6 +++-
 .../runtime/io/network/netty/NettyBufferPool.java  |  2 ++
 .../runtime/io/network/netty/NettyConfig.java      | 10 ++++--
 .../NettyShuffleEnvironmentConfiguration.java      | 39 ++++++++++++++++++----
 .../TaskExecutorResourceUtilsTest.java             |  6 +++-
 .../NettyShuffleEnvironmentConfigurationTest.java  | 16 +++++++++
 .../example/failing/JobSubmissionFailsITCase.java  |  6 +++-
 .../test/streaming/runtime/BackPressureITCase.java |  4 ++-
 .../apache/flink/yarn/YarnConfigurationITCase.java |  4 +--
 10 files changed, 79 insertions(+), 18 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
index 5143ef4..dc6f753 100755
--- a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
+++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
@@ -26,8 +26,8 @@ TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAll
 echo "Run DataSet-Allround-Test Program"
 
 # modify configuration to include spilling to disk
-set_config_key "taskmanager.network.memory.min" "10485760"
-set_config_key "taskmanager.network.memory.max" "10485760"
+set_config_key "taskmanager.network.memory.min" "27262976"
+set_config_key "taskmanager.network.memory.max" "27262976"
 
 set_conf_ssl "server"
 start_cluster
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index 9c69b62..f98670b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
@@ -375,7 +377,9 @@ public class TaskExecutorResourceUtils {
 		@SuppressWarnings("deprecation")
 		final long numOfBuffers = config.getInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS);
 		final long pageSize =  ConfigurationParserUtils.getPageSize(config);
-		return new MemorySize(numOfBuffers * pageSize);
+		final int numberOfSlots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
+		final long numberOfArenas = NettyConfig.getNumberOfArenas(config, numberOfSlots);
+		return new MemorySize(numOfBuffers * pageSize + numberOfArenas * NettyBufferPool.ARENA_SIZE);
 	}
 
 	private static RangeFraction getShuffleMemoryRangeFraction(final Configuration config) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
index 6d2a6c8..fc49712 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
@@ -67,6 +67,8 @@ public class NettyBufferPool extends PooledByteBufAllocator {
 	 */
 	private static final int MAX_ORDER = 11;
 
+	public static final long ARENA_SIZE = PAGE_SIZE << MAX_ORDER;
+
 	/**
 	 * Creates Netty's buffer pool with the specified number of direct arenas.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 9730907..43708f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -102,9 +102,7 @@ public class NettyConfig {
 	}
 
 	public int getNumberOfArenas() {
-		// default: number of slots
-		final int configValue = config.getInteger(NettyShuffleEnvironmentOptions.NUM_ARENAS);
-		return configValue == -1 ? numberOfSlots : configValue;
+		return getNumberOfArenas(config, numberOfSlots);
 	}
 
 	public int getServerNumThreads() {
@@ -188,4 +186,10 @@ public class NettyConfig {
 				getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(),
 				getSendAndReceiveBufferSize() == 0 ? def : man);
 	}
+
+	public static int getNumberOfArenas(Configuration config, int numberOfSlots) {
+		// default: number of slots
+		final int configValue = config.getInteger(NettyShuffleEnvironmentOptions.NUM_ARENAS);
+		return configValue == -1 ? numberOfSlots : configValue;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index 7bb8a8e..a8d3926 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
@@ -184,10 +185,16 @@ public class NettyShuffleEnvironmentConfiguration {
 
 		final int pageSize = ConfigurationParserUtils.getPageSize(configuration);
 
-		final int numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory, shuffleMemorySize, pageSize);
-
 		final NettyConfig nettyConfig = createNettyConfig(configuration, localTaskManagerCommunication, taskManagerAddress, dataport);
 
+		final int numberOfNettyArenas = nettyConfig != null ? nettyConfig.getNumberOfArenas() : 0;
+		final int numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(
+			configuration,
+			maxJvmHeapMemory,
+			shuffleMemorySize,
+			pageSize,
+			numberOfNettyArenas);
+
 		int initialRequestBackoff = configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
 		int maxRequestBackoff = configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX);
 
@@ -463,11 +470,12 @@ public class NettyShuffleEnvironmentConfiguration {
 		long maxJvmHeapMemory,
 		@Nullable // should only be null when flip49 is disabled
 		MemorySize shuffleMemorySize,
-		int pageSize) {
+		int pageSize,
+		int numberOfNettyArenas) {
 
 		final int numberOfNetworkBuffers;
 		if (shuffleMemorySize != null) { // flip49 enbaled
-			numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(shuffleMemorySize.getBytes(), pageSize);
+			numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(shuffleMemorySize.getBytes(), pageSize, numberOfNettyArenas);
 			logIfIgnoringOldConfigs(configuration);
 		} else if (!hasNewNetworkConfig(configuration)) {
 			// fallback: number of network buffers
@@ -478,7 +486,7 @@ public class NettyShuffleEnvironmentConfiguration {
 			logIfIgnoringOldConfigs(configuration);
 
 			final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
-			numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(networkMemorySize, pageSize);
+			numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(networkMemorySize, pageSize, 0);
 		}
 
 		return numberOfNetworkBuffers;
@@ -491,9 +499,26 @@ public class NettyShuffleEnvironmentConfiguration {
 		}
 	}
 
-	private static int calculateNumberOfNetworkBuffers(long networkMemorySizeByte, int pageSizeByte) {
+	private static int calculateNumberOfNetworkBuffers(
+		long networkMemorySizeByte,
+		int pageSizeByte,
+		int numberOfNettyArenas) {
+
+		long nettyArenasSizeBytes = numberOfNettyArenas * NettyBufferPool.ARENA_SIZE;
+		Preconditions.checkArgument(
+			networkMemorySizeByte >= nettyArenasSizeBytes,
+			String.format(
+				"Provided shuffle memory %d bytes is not enough for direct allocation of %d netty arenas " +
+					"('taskmanager.network.netty.num-arenas', by default 'taskmanager.numberOfTaskSlots'). " +
+					"Each arena size is %d bytes. Total configured arenas size is %d bytes. " +
+					"Try to increase shuffle memory size by adjusting 'taskmanager.memory.shuffle.*' Flink configuration options.",
+				networkMemorySizeByte,
+				numberOfNettyArenas,
+				NettyBufferPool.ARENA_SIZE,
+				nettyArenasSizeBytes));
+
 		// tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
-		long numberOfNetworkBuffersLong = networkMemorySizeByte / pageSizeByte;
+		long numberOfNetworkBuffersLong = (networkMemorySizeByte - nettyArenasSizeBytes) / pageSizeByte;
 		if (numberOfNetworkBuffersLong > Integer.MAX_VALUE) {
 			throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySizeByte
 				+ ") corresponds to more than MAX_INT pages.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
index 79ea059..26528a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -201,13 +202,16 @@ public class TaskExecutorResourceUtilsTest extends TestLogger {
 	public void testConfigShuffleMemoryLegacyNumOfBuffers() {
 		final MemorySize pageSize = MemorySize.parse("32k");
 		final int numOfBuffers = 1024;
-		final MemorySize shuffleSize = pageSize.multiply(numOfBuffers);
+		final MemorySize arenaSize = MemorySize.parse(NettyBufferPool.ARENA_SIZE + "b");
+		final int numberOfNettyArenas = 10;
+		final MemorySize shuffleSize = pageSize.multiply(numOfBuffers).add(arenaSize.multiply(numberOfNettyArenas));
 
 		@SuppressWarnings("deprecation")
 		final ConfigOption<Integer> legacyOption = NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS;
 
 		Configuration conf = new Configuration();
 		conf.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, pageSize.getKibiBytes() + "k");
+		conf.setInteger(NettyShuffleEnvironmentOptions.NUM_ARENAS, numberOfNettyArenas);
 		conf.setInteger(legacyOption, numOfBuffers);
 
 		// validate in configurations without explicit total flink/process memory, otherwise explicit configured
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
index c4c4c0b..fc0035c 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
@@ -30,8 +30,10 @@ import org.junit.Test;
 import java.net.InetAddress;
 import java.util.Random;
 
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -42,6 +44,20 @@ public class NettyShuffleEnvironmentConfigurationTest extends TestLogger {
 
 	private static final long MEM_SIZE_PARAM = 128L * 1024 * 1024;
 
+	@Test
+	public void testNetworkBufferNumberCalculation() {
+		final Configuration config = new Configuration();
+		config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "1m");
+		config.setInteger(NettyShuffleEnvironmentOptions.NUM_ARENAS, 4); // 4 x 16Mb = 64Mb
+		final int numNetworkBuffers = NettyShuffleEnvironmentConfiguration.fromConfiguration(
+			config,
+			MEM_SIZE_PARAM,
+			null,
+			true,
+			InetAddress.getLoopbackAddress()).numNetworkBuffers();
+		assertThat(numNetworkBuffers, is(64)); // 128Mb (total) - 64Mb (arenas) / 1Mb (page) = 64
+	}
+
 	/**
 	 * Verifies that {@link  NettyShuffleEnvironmentConfiguration#fromConfiguration(Configuration, long, MemorySize, boolean, InetAddress)}
 	 * returns the correct result for new configurations via
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index 22d291d..36050e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -68,7 +68,11 @@ public class JobSubmissionFailsITCase extends TestLogger {
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();
-		config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "4m");
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+
+		// to accommodate for 10 netty arenas (NUM_SLOTS / NUM_TM) x 16Mb (NettyBufferPool.ARENA_SIZE)
+		config.setString(TaskManagerOptions.SHUFFLE_MEMORY_MIN, "256m");
+
 		return config;
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
index e7c2072..970a74a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.minicluster.TestingMiniCluster;
@@ -94,7 +95,8 @@ public class BackPressureITCase extends TestLogger {
 		final Configuration configuration = new Configuration();
 
 		final int memorySegmentSizeKb = 32;
-		final String networkBuffersMemory = (memorySegmentSizeKb * NUM_TASKS) + "kb";
+		final long nettyArenaSizeKb = NettyBufferPool.ARENA_SIZE >> 10;
+		final String networkBuffersMemory = ((memorySegmentSizeKb + nettyArenaSizeKb) * NUM_TASKS) + "kb";
 
 		configuration.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, memorySegmentSizeKb + "kb");
 		configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, networkBuffersMemory);
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 36f90e4..35ab654 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -91,8 +91,8 @@ public class YarnConfigurationITCase extends YarnTestBase {
 
 			// disable heap cutoff min
 			configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
-			configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20));
-			configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(4L << 20));
+			configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(49L << 20));
+			configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(52L << 20));
 
 			final YarnConfiguration yarnConfiguration = getYarnConfiguration();
 			final YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(


[flink] 03/21: [FLINK-13983][runtime] Introduce 'BashJavaUtils' allowing bash scripts to call java codes for generating TM resource dynamic configurations and JVM parameters.

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 30ce8c0059a03b494ad7cae93487f9b8c8117749
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu Sep 26 19:36:20 2019 +0800

    [FLINK-13983][runtime] Introduce 'BashJavaUtils' allowing bash scripts to call java codes for generating TM resource dynamic configurations and JVM parameters.
---
 .../flink/configuration/ConfigurationUtils.java    | 58 ++++++++++++++++
 flink-dist/src/main/flink-bin/bin/config.sh        | 14 ++++
 flink-dist/src/test/bin/runBashJavaUtilsCmd.sh     | 38 +++++++++++
 .../org/apache/flink/dist/BashJavaUtilsTest.java   | 54 +++++++++++++++
 .../runtime/taskexecutor/TaskManagerRunner.java    |  4 +-
 .../apache/flink/runtime/util/BashJavaUtils.java   | 77 ++++++++++++++++++++++
 .../TaskExecutorResourceUtilsTest.java             | 44 +++----------
 7 files changed, 250 insertions(+), 39 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index b94865c..b6d817c 100755
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.configuration;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 
 import javax.annotation.Nonnull;
@@ -31,6 +32,7 @@ import java.util.Set;
 
 import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
 import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Utility class for {@link Configuration} related helper functions.
@@ -174,6 +176,62 @@ public class ConfigurationUtils {
 		return separatedPaths.length() > 0 ? separatedPaths.split(",|" + File.pathSeparator) : EMPTY;
 	}
 
+	@VisibleForTesting
+	public static Map<String, String> parseTmResourceDynamicConfigs(String dynamicConfigsStr) {
+		Map<String, String> configs = new HashMap<>();
+		String[] configStrs = dynamicConfigsStr.split(" ");
+
+		checkArgument(configStrs.length % 2 == 0);
+		for (int i = 0; i < configStrs.length; ++i) {
+			String configStr = configStrs[i];
+			if (i % 2 == 0) {
+				checkArgument(configStr.equals("-D"));
+			} else {
+				String[] configKV = configStr.split("=");
+				checkArgument(configKV.length == 2);
+				configs.put(configKV[0], configKV[1]);
+			}
+		}
+
+		checkArgument(configs.containsKey(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key()));
+		checkArgument(configs.containsKey(TaskManagerOptions.TASK_HEAP_MEMORY.key()));
+		checkArgument(configs.containsKey(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key()));
+		checkArgument(configs.containsKey(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key()));
+		checkArgument(configs.containsKey(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key()));
+		checkArgument(configs.containsKey(TaskManagerOptions.MANAGED_MEMORY_SIZE.key()));
+		checkArgument(configs.containsKey(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key()));
+
+		return configs;
+	}
+
+	@VisibleForTesting
+	public static Map<String, String> parseTmResourceJvmParams(String jvmParamsStr) {
+		final String xmx = "-Xmx";
+		final String xms = "-Xms";
+		final String maxDirect = "-XX:MaxDirectMemorySize=";
+		final String maxMetadata = "-XX:MaxMetaspaceSize=";
+
+		Map<String, String> configs = new HashMap<>();
+		for (String paramStr : jvmParamsStr.split(" ")) {
+			if (paramStr.startsWith(xmx)) {
+				configs.put(xmx, paramStr.substring(xmx.length()));
+			} else if (paramStr.startsWith(xms)) {
+				configs.put(xms, paramStr.substring(xms.length()));
+			} else if (paramStr.startsWith(maxDirect)) {
+				configs.put(maxDirect, paramStr.substring(maxDirect.length()));
+			} else if (paramStr.startsWith(maxMetadata)) {
+				configs.put(maxMetadata, paramStr.substring(maxMetadata.length()));
+			}
+		}
+
+		checkArgument(configs.containsKey(xmx));
+		checkArgument(configs.containsKey(xms));
+		checkArgument(configs.containsKey(maxDirect));
+		checkArgument(configs.containsKey(maxMetadata));
+
+		return configs;
+	}
+
 	// Make sure that we cannot instantiate this class
 	private ConfigurationUtils() {
 	}
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index b799214..090c1fb 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -776,3 +776,17 @@ calculateTaskManagerHeapSizeMB() {
 
     echo ${tm_heap_size_mb}
 }
+
+runBashJavaUtilsCmd() {
+    local cmd=$1
+    local class_path=$2
+    local conf_dir=$3
+
+    local output="`${JAVA_RUN} -classpath ${class_path} org.apache.flink.runtime.util.BashJavaUtils ${cmd} --configDir ${conf_dir} 2> /dev/null`"
+    if [[ $? -ne 0 ]]; then
+        echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}."
+        exit 1
+    fi
+
+    echo ${output}
+}
diff --git a/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh b/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh
new file mode 100755
index 0000000..e623760
--- /dev/null
+++ b/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh
@@ -0,0 +1,38 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Wrapper script to compare the TM heap size calculation of config.sh with Java
+USAGE="Usage: runBashJavaUtilsCmd.sh <command>"
+
+COMMAND=$1
+
+if [[ -z "${COMMAND}" ]]; then
+  echo "$USAGE"
+  exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+FLINK_CLASSPATH=`find . -name 'flink-dist*.jar' | grep lib`
+FLINK_CONF_DIR=${bin}/../../main/resources
+
+. ${bin}/../../main/flink-bin/bin/config.sh > /dev/null
+
+runBashJavaUtilsCmd ${COMMAND} ${FLINK_CLASSPATH} ${FLINK_CONF_DIR}
diff --git a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsTest.java b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsTest.java
new file mode 100644
index 0000000..6b6a216
--- /dev/null
+++ b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.runtime.util.BashJavaUtils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for BashJavaUtils.
+ */
+public class BashJavaUtilsTest extends JavaBashTestBase {
+
+	private static final String RUN_BASH_JAVA_UTILS_CMD_SCRIPT = "src/test/bin/runBashJavaUtilsCmd.sh";
+
+	@Test
+	public void testGetTmResourceDynamicConfigs() throws Exception {
+		String[] command = {RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
+			BashJavaUtils.Command.GET_TM_RESOURCE_DYNAMIC_CONFIGS.toString()};
+		String result = executeScript(command);
+
+		assertNotNull(result);
+		ConfigurationUtils.parseTmResourceDynamicConfigs(result);
+	}
+
+	@Test
+	public void testGetTmResourceJvmParams() throws Exception {
+		String[] command = {RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
+			BashJavaUtils.Command.GET_TM_RESOURCE_JVM_PARAMS.toString()};
+		String result = executeScript(command);
+
+		assertNotNull(result);
+		ConfigurationUtils.parseTmResourceJvmParams(result);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 6493dec..924ed0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -305,8 +304,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 		}
 	}
 
-	@VisibleForTesting
-	static Configuration loadConfiguration(String[] args) throws FlinkParseException {
+	public static Configuration loadConfiguration(String[] args) throws FlinkParseException {
 		final CommandLineParser<ClusterConfiguration> commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory());
 
 		final ClusterConfiguration clusterConfiguration;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java
new file mode 100644
index 0000000..70618b1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for using java utilities in bash scripts.
+ */
+public class BashJavaUtils {
+
+	public static void main(String[] args) throws Exception {
+		checkArgument(args.length > 0, "Command not specified.");
+
+		switch (Command.valueOf(args[0])) {
+			case GET_TM_RESOURCE_DYNAMIC_CONFIGS:
+				getTmResourceDynamicConfigs(args);
+				break;
+			case GET_TM_RESOURCE_JVM_PARAMS:
+				getTmResourceJvmParams(args);
+				break;
+			default:
+				// unexpected, Command#valueOf should fail if a unknown command is passed in
+				throw new RuntimeException("Unexpected, something is wrong.");
+		}
+	}
+
+	private static void getTmResourceDynamicConfigs(String[] args) throws Exception {
+		Configuration configuration = TaskManagerRunner.loadConfiguration(Arrays.copyOfRange(args, 1, args.length));
+		TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
+		System.out.println(TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec));
+	}
+
+	private static void getTmResourceJvmParams(String[] args) throws Exception {
+		Configuration configuration = TaskManagerRunner.loadConfiguration(Arrays.copyOfRange(args, 1, args.length));
+		TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
+		System.out.println(TaskExecutorResourceUtils.generateJvmParametersStr(taskExecutorResourceSpec));
+	}
+
+	/**
+	 * Commands that BashJavaUtils supports.
+	 */
+	public enum Command {
+		/**
+		 * Get dynamic configs of task executor resources.
+		 */
+		GET_TM_RESOURCE_DYNAMIC_CONFIGS,
+
+		/**
+		 * Get JVM parameters of task executor resources.
+		 */
+		GET_TM_RESOURCE_JVM_PARAMS
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
index 8577d8f..432c1af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -27,7 +28,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Consumer;
 
@@ -60,19 +60,7 @@ public class TaskExecutorResourceUtilsTest extends TestLogger {
 	@Test
 	public void testGenerateDynamicConfigurations() {
 		String dynamicConfigsStr = TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
-		Map<String, String> configs = new HashMap<>();
-		String[] configStrs = dynamicConfigsStr.split(" ");
-		assertThat(configStrs.length % 2, is(0));
-		for (int i = 0; i < configStrs.length; ++i) {
-			String configStr = configStrs[i];
-			if (i % 2 == 0) {
-				assertThat(configStr, is("-D"));
-			} else {
-				String[] configKV = configStr.split("=");
-				assertThat(configKV.length, is(2));
-				configs.put(configKV[0], configKV[1]);
-			}
-		}
+		Map<String, String> configs = ConfigurationUtils.parseTmResourceDynamicConfigs(dynamicConfigsStr);
 
 		assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())), is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
 		assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())), is(TM_RESOURCE_SPEC.getTaskHeapSize()));
@@ -84,30 +72,14 @@ public class TaskExecutorResourceUtilsTest extends TestLogger {
 	}
 
 	@Test
-	public void testGenerateJvmParameters() throws Exception {
+	public void testGenerateJvmParameters() {
 		String jvmParamsStr = TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC);
-		MemorySize heapSizeMax = null;
-		MemorySize heapSizeMin = null;
-		MemorySize directSize = null;
-		MemorySize metaspaceSize = null;
-		for (String paramStr : jvmParamsStr.split(" ")) {
-			if (paramStr.startsWith("-Xmx")) {
-				heapSizeMax = MemorySize.parse(paramStr.substring("-Xmx".length()));
-			} else if (paramStr.startsWith("-Xms")) {
-				heapSizeMin = MemorySize.parse(paramStr.substring("-Xms".length()));
-			} else if (paramStr.startsWith("-XX:MaxDirectMemorySize=")) {
-				directSize = MemorySize.parse(paramStr.substring("-XX:MaxDirectMemorySize=".length()));
-			} else if (paramStr.startsWith("-XX:MaxMetaspaceSize=")) {
-				metaspaceSize = MemorySize.parse(paramStr.substring("-XX:MaxMetaspaceSize=".length()));
-			} else {
-				throw new Exception("Unknown JVM parameter: " + paramStr);
-			}
-		}
+		Map<String, String> configs = ConfigurationUtils.parseTmResourceJvmParams(jvmParamsStr);
 
-		assertThat(heapSizeMax, is(TM_RESOURCE_SPEC.getFrameworkHeapSize().add(TM_RESOURCE_SPEC.getTaskHeapSize()).add(TM_RESOURCE_SPEC.getOnHeapManagedMemorySize())));
-		assertThat(heapSizeMin, is(heapSizeMax));
-		assertThat(directSize, is(TM_RESOURCE_SPEC.getTaskOffHeapSize().add(TM_RESOURCE_SPEC.getShuffleMemSize())));
-		assertThat(metaspaceSize, is(TM_RESOURCE_SPEC.getJvmMetaspaceSize()));
+		assertThat(MemorySize.parse(configs.get("-Xmx")), is(TM_RESOURCE_SPEC.getFrameworkHeapSize().add(TM_RESOURCE_SPEC.getTaskHeapSize()).add(TM_RESOURCE_SPEC.getOnHeapManagedMemorySize())));
+		assertThat(MemorySize.parse(configs.get("-Xms")), is(TM_RESOURCE_SPEC.getFrameworkHeapSize().add(TM_RESOURCE_SPEC.getTaskHeapSize()).add(TM_RESOURCE_SPEC.getOnHeapManagedMemorySize())));
+		assertThat(MemorySize.parse(configs.get("-XX:MaxDirectMemorySize=")), is(TM_RESOURCE_SPEC.getTaskOffHeapSize().add(TM_RESOURCE_SPEC.getShuffleMemSize())));
+		assertThat(MemorySize.parse(configs.get("-XX:MaxMetaspaceSize=")), is(TM_RESOURCE_SPEC.getJvmMetaspaceSize()));
 	}
 
 	@Test public void testConfigFrameworkHeapMemory() {


[flink] 01/21: [hotfix] Refactor 'TaskManagerHeapSizeCalculationJavaBashTest', abstract 'JavaBashTestBase' for executing testing bash scripts from java classes.

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2b9e5d9f545a136cede63bc924c03ea013029fe3
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu Sep 26 19:38:55 2019 +0800

    [hotfix] Refactor 'TaskManagerHeapSizeCalculationJavaBashTest', abstract 'JavaBashTestBase' for executing testing bash scripts from java classes.
---
 .../org/apache/flink/dist/JavaBashTestBase.java    | 60 ++++++++++++++++++++++
 ...TaskManagerHeapSizeCalculationJavaBashTest.java | 34 +-----------
 2 files changed, 61 insertions(+), 33 deletions(-)

diff --git a/flink-dist/src/test/java/org/apache/flink/dist/JavaBashTestBase.java b/flink-dist/src/test/java/org/apache/flink/dist/JavaBashTestBase.java
new file mode 100644
index 0000000..63faaa2
--- /dev/null
+++ b/flink-dist/src/test/java/org/apache/flink/dist/JavaBashTestBase.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assume;
+import org.junit.BeforeClass;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * Abstract test class for executing bash scripts.
+ */
+public abstract class JavaBashTestBase extends TestLogger {
+	@BeforeClass
+	public static void checkOperatingSystem() {
+		Assume.assumeTrue("This test checks shell scripts which are not available on Windows.",
+			!OperatingSystem.isWindows());
+	}
+
+	/**
+	 * Executes the given shell script wrapper and returns its output.
+	 *
+	 * @param command  command to run
+	 *
+	 * @return raw script output
+	 */
+	protected String executeScript(final String[] command) throws IOException {
+		ProcessBuilder pb = new ProcessBuilder(command);
+		pb.redirectErrorStream(true);
+		Process process = pb.start();
+		BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+		StringBuilder sb = new StringBuilder();
+		String s;
+		while ((s = reader.readLine()) != null) {
+			sb.append(s);
+		}
+		return sb.toString();
+	}
+}
diff --git a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
index 0b5f5be..4f172d9 100755
--- a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
+++ b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
@@ -24,16 +24,10 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Assume;
-import org.junit.Before;
 import org.junit.Test;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.util.Random;
 
 import static org.hamcrest.CoreMatchers.allOf;
@@ -51,7 +45,7 @@ import static org.junit.Assert.assertThat;
  * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do
  * not need high precision.
  */
-public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
+public class TaskManagerHeapSizeCalculationJavaBashTest extends JavaBashTestBase {
 
 	/** Key that is used by <tt>config.sh</tt>. */
 	private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.size";
@@ -64,12 +58,6 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 	 */
 	private static final int NUM_RANDOM_TESTS = 20;
 
-	@Before
-	public void checkOperatingSystem() {
-		Assume.assumeTrue("This test checks shell scripts which are not available on Windows.",
-			!OperatingSystem.isWindows());
-	}
-
 	/**
 	 * Tests that {@link NettyShuffleEnvironmentConfiguration#calculateNetworkBufferMemory(long, Configuration)}
 	 * has the same result as the shell script.
@@ -310,24 +298,4 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 				lessThanOrEqualTo(javaHeapSizeMB + absoluteTolerance)));
 	}
 
-	/**
-	 * Executes the given shell script wrapper and returns its output.
-	 *
-	 * @param command  command to run
-	 *
-	 * @return raw script output
-	 */
-	private String executeScript(final String[] command) throws IOException {
-		ProcessBuilder pb = new ProcessBuilder(command);
-		pb.redirectErrorStream(true);
-		Process process = pb.start();
-		BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
-		StringBuilder sb = new StringBuilder();
-		String s;
-		while ((s = reader.readLine()) != null) {
-			sb.append(s);
-		}
-		return sb.toString();
-	}
-
 }


[flink] 19/21: Fix yarn cut off

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f24495c0df56473c850605d9e34baf2f7464be1
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Wed Nov 6 09:59:32 2019 +0100

    Fix yarn cut off
---
 .../client/deployment/ClusterSpecification.java    |   6 +-
 .../clusterframework/TaskExecutorResourceSpec.java |   4 +
 .../TaskExecutorResourceUtils.java                 |   4 +-
 .../ActiveResourceManagerFactory.java              |  23 +---
 .../ActiveResourceManagerFactoryTest.java          |  97 --------------
 .../flink/yarn/CliFrontendRunWithYarnTest.java     |   3 +-
 .../apache/flink/yarn/YarnConfigurationITCase.java |  28 +---
 .../flink/yarn/YarnClusterClientFactory.java       |   6 +-
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  18 +--
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |   2 +-
 .../apache/flink/yarn/FlinkYarnSessionCliTest.java | 142 +++++++--------------
 11 files changed, 73 insertions(+), 260 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
index 72975d8..0d8d105 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.deployment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 
 /**
  * Description of the cluster to start by the {@link ClusterDescriptor}.
@@ -68,7 +69,10 @@ public final class ClusterSpecification {
 		int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 
 		int jobManagerMemoryMb = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
-		int taskManagerMemoryMb = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
+		int taskManagerMemoryMb = TaskExecutorResourceUtils
+			.resourceSpecFromConfig(configuration)
+			.getTotalProcessMemorySize()
+			.getMebiBytes();
 
 		return new ClusterSpecificationBuilder()
 			.setMasterMemoryMB(jobManagerMemoryMb)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
index d6cbe5b..d73e7b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
@@ -156,6 +156,10 @@ public class TaskExecutorResourceSpec implements java.io.Serializable {
 		return getTotalFlinkMemorySize().add(jvmMetaspaceSize).add(jvmOverheadSize);
 	}
 
+	public MemorySize getHeapSize() {
+		return frameworkHeapSize.add(taskHeapSize).add(onHeapManagedMemorySize);
+	}
+
 	@Override
 	public String toString() {
 		return "TaskExecutorResourceSpec {"
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index 4b649e4..9c69b62 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -46,9 +46,7 @@ public class TaskExecutorResourceUtils {
 	// ------------------------------------------------------------------------
 
 	public static String generateJvmParametersStr(final TaskExecutorResourceSpec taskExecutorResourceSpec) {
-		final MemorySize jvmHeapSize = taskExecutorResourceSpec.getFrameworkHeapSize()
-			.add(taskExecutorResourceSpec.getTaskHeapSize())
-			.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+		final MemorySize jvmHeapSize = taskExecutorResourceSpec.getHeapSize();
 		final MemorySize jvmDirectSize = taskExecutorResourceSpec.getTaskOffHeapSize()
 			.add(taskExecutorResourceSpec.getShuffleMemSize());
 		final MemorySize jvmMetaspaceSize = taskExecutorResourceSpec.getJvmMetaspaceSize();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
index d292e5a..7444e23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
@@ -19,9 +19,6 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -30,17 +27,11 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 
 import javax.annotation.Nullable;
 
 /**
  * Resource manager factory which creates active {@link ResourceManager} implementations.
- *
- * <p>The default implementation will call {@link #createActiveResourceManagerConfiguration}
- * to create a new configuration which is configured with active resource manager relevant
- * configuration options.
- *
  * @param <T> type of the {@link ResourceIDRetrievable}
  */
 public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievable> implements ResourceManagerFactory<T> {
@@ -57,7 +48,7 @@ public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievab
 			@Nullable String webInterfaceUrl,
 			ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception {
 		return createActiveResourceManager(
-			createActiveResourceManagerConfiguration(configuration),
+			configuration,
 			resourceId,
 			rpcService,
 			highAvailabilityServices,
@@ -68,18 +59,6 @@ public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievab
 			resourceManagerMetricGroup);
 	}
 
-	public static Configuration createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
-		final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(originalConfiguration).getMebiBytes();
-		final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(originalConfiguration, taskManagerMemoryMB);
-		final long processMemoryBytes = (taskManagerMemoryMB - cutoffMB) << 20; // megabytes to bytes
-		final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(originalConfiguration, processMemoryBytes);
-
-		final Configuration resourceManagerConfig = new Configuration(originalConfiguration);
-		resourceManagerConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, managedMemoryBytes + "b");
-
-		return resourceManagerConfig;
-	}
-
 	protected abstract ResourceManager<T> createActiveResourceManager(
 		Configuration configuration,
 		ResourceID resourceId,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
deleted file mode 100644
index ff61e65..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-/**
- * Tests for the {@link ActiveResourceManagerFactory}.
- */
-public class ActiveResourceManagerFactoryTest extends TestLogger {
-
-	/**
-	 * Test which ensures that the {@link ActiveResourceManagerFactory} sets the correct managed
-	 * memory when creating a resource manager.
-	 */
-	@Test
-	public void createResourceManager_WithDefaultConfiguration_ShouldSetManagedMemory() throws Exception {
-		final Configuration configuration = new Configuration();
-
-		final TestingActiveResourceManagerFactory resourceManagerFactory = new TestingActiveResourceManagerFactory();
-
-		final TestingRpcService rpcService = new TestingRpcService();
-
-		try {
-			final ResourceManager<ResourceID> ignored = resourceManagerFactory.createResourceManager(
-				configuration,
-				ResourceID.generate(),
-				rpcService,
-				new TestingHighAvailabilityServices(),
-				new TestingHeartbeatServices(),
-				new TestingFatalErrorHandler(),
-				new ClusterInformation("foobar", 1234),
-				null,
-				UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());
-		} finally {
-			RpcUtils.terminateRpcService(rpcService, Time.seconds(10L));
-		}
-	}
-
-	private static final class TestingActiveResourceManagerFactory extends ActiveResourceManagerFactory<ResourceID> {
-
-		@Override
-		protected ResourceManager<ResourceID> createActiveResourceManager(
-				Configuration configuration,
-				ResourceID resourceId,
-				RpcService rpcService,
-				HighAvailabilityServices highAvailabilityServices,
-				HeartbeatServices heartbeatServices,
-				FatalErrorHandler fatalErrorHandler,
-				ClusterInformation clusterInformation,
-				@Nullable String webInterfaceUrl,
-				ResourceManagerMetricGroup resourceManagerMetricGroup) {
-			assertThat(configuration.contains(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE), is(true));
-
-			return null;
-		}
-	}
-}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
index 4e7ece3..4c83115 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
@@ -38,6 +38,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
+import static org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
 
@@ -66,7 +67,7 @@ public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
 	public void testRun() throws Exception {
 		String testJarPath = getTestJarPath("BatchWordCount.jar").getAbsolutePath();
 
-		Configuration configuration = new Configuration();
+		Configuration configuration = adjustMemoryConfigurationForLocalExecution(new Configuration());
 		configuration.setString(JobManagerOptions.ADDRESS, "localhost");
 		configuration.setInteger(JobManagerOptions.PORT, 8081);
 
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index dbad597..36f90e4 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -24,13 +24,11 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -84,8 +82,11 @@ public class YarnConfigurationITCase extends YarnTestBase {
 			final YarnClient yarnClient = getYarnClient();
 			final Configuration configuration = new Configuration(flinkConfiguration);
 
+			final TaskExecutorResourceSpec spec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
 			final int masterMemory = 64;
-			final int taskManagerMemory = 512;
+			final int taskManagerMemory = spec.getTotalProcessMemorySize().getMebiBytes();
+			final long expectedHeapBytes = spec.getHeapSize().getBytes();
+			final int expectedManagedMemoryMB = spec.getManagedMemorySize().getMebiBytes();
 			final int slotsPerTaskManager = 3;
 
 			// disable heap cutoff min
@@ -175,23 +176,13 @@ public class YarnConfigurationITCase extends YarnTestBase {
 
 					assertThat(taskManagerInfo.getNumberSlots(), is(slotsPerTaskManager));
 
-					final ContaineredTaskManagerParameters containeredTaskManagerParameters = ContaineredTaskManagerParameters.create(
-						configuration,
-						null,
-						taskManagerMemory,
-						slotsPerTaskManager);
-
-					final long expectedHeadSize = containeredTaskManagerParameters.taskManagerHeapSizeMB() << 20L;
-
 					// We compare here physical memory assigned to a container with the heap memory that we should pass to
 					// jvm as Xmx parameter. Those value might differ significantly due to system page size or jvm
 					// implementation therefore we use 15% threshold here.
 					assertThat(
-						(double) taskManagerInfo.getHardwareDescription().getSizeOfJvmHeap() / (double) expectedHeadSize,
+						(double) taskManagerInfo.getHardwareDescription().getSizeOfJvmHeap() / expectedHeapBytes,
 						is(closeTo(1.0, 0.15)));
 
-					final int expectedManagedMemoryMB = calculateManagedMemorySizeMB(configuration);
-
 					assertThat((int) (taskManagerInfo.getHardwareDescription().getSizeOfManagedMemory() >> 20), is(expectedManagedMemoryMB));
 				} finally {
 					restClient.shutdown(TIMEOUT);
@@ -214,9 +205,4 @@ public class YarnConfigurationITCase extends YarnTestBase {
 			return taskManagerInfo.getNumberSlots() > 0;
 		}
 	}
-
-	private static int calculateManagedMemorySizeMB(Configuration configuration) {
-		Configuration resourceManagerConfig = ActiveResourceManagerFactory.createActiveResourceManagerConfiguration(configuration);
-		return MemorySize.parse(resourceManagerConfig.getString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE)).getMebiBytes();
-	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index aa138a7..6ea7d63 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -70,7 +71,10 @@ public class YarnClusterClientFactory implements ClusterClientFactory<Applicatio
 		final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
 
 		// Task Managers memory
-		final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
+		final int taskManagerMemoryMB = TaskExecutorResourceUtils
+			.resourceSpecFromConfig(configuration)
+			.getTotalProcessMemorySize()
+			.getMebiBytes();
 
 		int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 74207ff..14005f4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -40,11 +40,10 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.plugin.PluginConfig;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
@@ -458,16 +457,11 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 	 * @throws FlinkException if the cluster cannot be started with the provided {@link ClusterSpecification}
 	 */
 	private void validateClusterSpecification(ClusterSpecification clusterSpecification) throws FlinkException {
+		flinkConfiguration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, clusterSpecification.getTaskManagerMemoryMB() + "m");
 		try {
-			final long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB();
-			// We do the validation by calling the calculation methods here
-			// Internally these methods will check whether the cluster can be started with the provided
-			// ClusterSpecification and the configured memory requirements
-			final long cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize);
-			TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration);
+			TaskExecutorResourceUtils.resourceSpecFromConfig(flinkConfiguration);
 		} catch (IllegalArgumentException iae) {
-			throw new FlinkException("Cannot fulfill the minimum memory requirements with the provided " +
-					"cluster specification. Please increase the memory of the cluster.", iae);
+			throw new FlinkException("Inconsistent cluster specification.", iae);
 		}
 	}
 
@@ -834,10 +828,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 				TaskManagerOptions.NUM_TASK_SLOTS,
 				clusterSpecification.getSlotsPerTaskManager());
 
-		configuration.setString(
-				TaskManagerOptions.TOTAL_PROCESS_MEMORY,
-				clusterSpecification.getTaskManagerMemoryMB() + "m");
-
 		// Upload the flink configuration
 		// write out configuration file
 		File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 0efa610..0e7df5b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -374,7 +374,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 			if (!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)) {
 				tmMemoryVal += "m";
 			}
-			effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, tmMemoryVal);
+			effectiveConfiguration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, tmMemoryVal);
 		}
 
 		if (commandLine.hasOption(slots.getOpt())) {
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index a357126..9376533 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -107,11 +107,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		String[] params =
 			new String[] {"-ys", "3"};
 
-		FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCliWithTmTotalMemory(1024);
 
 		final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
 
@@ -129,11 +125,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		String[] params =
 			new String[] {"-yd"};
 
-		FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCli();
 
 		final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
 
@@ -151,11 +143,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 		String[] params = new String[] {"-yz", zkNamespaceCliInput};
 
-		FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCli();
 
 		CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
 
@@ -172,11 +160,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 		String[] params = new String[] {"-ynl", nodeLabelCliInput };
 
-		FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCli();
 
 		CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
 
@@ -198,11 +182,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		final Configuration configuration = new Configuration();
 		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
 
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true);
 
@@ -225,21 +205,12 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		final Configuration configuration = new Configuration();
 		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
 
-		new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		createFlinkYarnSessionCli(configuration);
 	}
 
 	@Test
 	public void testResumeFromYarnID() throws Exception {
-		final Configuration configuration = new Configuration();
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
 
@@ -252,12 +223,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 	@Test
 	public void testResumeFromYarnIDZookeeperNamespace() throws Exception {
-		final Configuration configuration = new Configuration();
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
 
@@ -273,12 +239,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 	@Test
 	public void testResumeFromYarnIDZookeeperNamespaceOverride() throws Exception {
-		final Configuration configuration = new Configuration();
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
 
 		final String overrideZkNamespace = "my_cluster";
 
@@ -300,11 +261,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		final Configuration configuration = new Configuration();
 		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
 
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true);
 
 		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
@@ -326,15 +283,11 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		final int slotsPerTaskManager = 30;
 
 		configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory + "m");
-		configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, taskManagerMemory + "m");
+		configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, taskManagerMemory + "m");
 		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);
 
 		final String[] args = {"-yjm", String.valueOf(jobManagerMemory) + "m", "-ytm", String.valueOf(taskManagerMemory) + "m", "-ys", String.valueOf(slotsPerTaskManager)};
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
 
 		CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
@@ -357,16 +310,12 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		final int jobManagerMemory = 1337;
 		configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory + "m");
 		final int taskManagerMemory = 7331;
-		configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, taskManagerMemory + "m");
+		configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, taskManagerMemory + "m");
 		final int slotsPerTaskManager = 42;
 		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);
 
 		final String[] args = {};
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
 
 		CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
@@ -385,11 +334,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 	@Test
 	public void testHeapMemoryPropertyWithoutUnit() throws Exception {
 		final String[] args = new String[] { "-yjm", "1024", "-ytm", "2048" };
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
@@ -407,11 +352,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 	@Test
 	public void testHeapMemoryPropertyWithUnitMB() throws Exception {
 		final String[] args = new String[] { "-yjm", "1024m", "-ytm", "2048m" };
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
 		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
@@ -428,11 +369,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 	@Test
 	public void testHeapMemoryPropertyWithArbitraryUnit() throws Exception {
 		final String[] args = new String[] { "-yjm", "1g", "-ytm", "2g" };
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
 		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
@@ -452,11 +389,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
 		configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
 
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
 
@@ -469,15 +402,12 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 	}
 
 	/**
-	 * Tests the specifying heap memory with config default value for job manager and task manager.
+	 * Tests the specifying job manager heap memory with config default value for job manager and task manager.
 	 */
 	@Test
-	public void testHeapMemoryPropertyWithConfigDefaultValue() throws Exception {
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+	public void testJobManagerHeapMemoryPropertyWithConfigDefaultValue() throws Exception {
+		int totalMemomory = 1024;
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCliWithTmTotalMemory(totalMemomory);
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
 
@@ -485,18 +415,14 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
 		final ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig);
 
-		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
-		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(1024));
+		assertThat(clusterSpecification.getMasterMemoryMB(), is(totalMemomory));
+		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(totalMemomory));
 	}
 
 	@Test
 	public void testMultipleYarnShipOptions() throws Exception {
 		final String[] args = new String[]{"run", "--yarnship", tmp.newFolder().getAbsolutePath(), "--yarnship", tmp.newFolder().getAbsolutePath()};
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
@@ -527,4 +453,22 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 		return tmpFolder.getAbsoluteFile();
 	}
+
+	private FlinkYarnSessionCli createFlinkYarnSessionCli() throws FlinkException {
+		return createFlinkYarnSessionCli(new Configuration());
+	}
+
+	private FlinkYarnSessionCli createFlinkYarnSessionCliWithTmTotalMemory(int totalMemomory) throws FlinkException {
+		Configuration configuration = new Configuration();
+		configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemomory + "m");
+		return createFlinkYarnSessionCli(configuration);
+	}
+
+	private FlinkYarnSessionCli createFlinkYarnSessionCli(Configuration configuration) throws FlinkException {
+		return new FlinkYarnSessionCli(
+			configuration,
+			tmp.getRoot().getAbsolutePath(),
+			"y",
+			"yarn");
+	}
 }


[flink] 06/21: [FLINK-13983][runtime] Use flip49 config options to decide memory size of ShuffleEnvironment.

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a28bdc854df6e2c52dcb03e36de0ba3915f7866c
Author: Xintong Song <to...@gmail.com>
AuthorDate: Sun Sep 29 11:56:23 2019 +0800

    [FLINK-13983][runtime] Use flip49 config options to decide memory size of ShuffleEnvironment.
---
 .../io/network/NettyShuffleServiceFactory.java     |  1 +
 .../runtime/shuffle/ShuffleEnvironmentContext.java | 13 ++++++
 .../runtime/taskexecutor/TaskManagerServices.java  |  1 +
 .../TaskManagerServicesConfiguration.java          | 40 +++++++++++++++++
 .../NettyShuffleEnvironmentConfiguration.java      | 51 +++++++++++++++-------
 .../NettyShuffleEnvironmentConfigurationTest.java  |  3 +-
 6 files changed, 93 insertions(+), 16 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index f266c77..477aca0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -59,6 +59,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 		NettyShuffleEnvironmentConfiguration networkConfig = NettyShuffleEnvironmentConfiguration.fromConfiguration(
 			shuffleEnvironmentContext.getConfiguration(),
 			shuffleEnvironmentContext.getMaxJvmHeapMemory(),
+			shuffleEnvironmentContext.getShuffleMemorySize(),
 			shuffleEnvironmentContext.isLocalCommunicationOnly(),
 			shuffleEnvironmentContext.getHostAddress());
 		return createNettyShuffleEnvironment(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
index 7c1abc3..9d7e9eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
@@ -19,10 +19,13 @@
 package org.apache.flink.runtime.shuffle;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 
+import javax.annotation.Nullable;
+
 import java.net.InetAddress;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -34,6 +37,8 @@ public class ShuffleEnvironmentContext {
 	private final Configuration configuration;
 	private final ResourceID taskExecutorResourceId;
 	private final long maxJvmHeapMemory;
+	@Nullable // should only be null when flip49 is disabled
+	private final MemorySize shuffleMemorySize;
 	private final boolean localCommunicationOnly;
 	private final InetAddress hostAddress;
 	private final TaskEventPublisher eventPublisher;
@@ -43,6 +48,8 @@ public class ShuffleEnvironmentContext {
 			Configuration configuration,
 			ResourceID taskExecutorResourceId,
 			long maxJvmHeapMemory,
+			@Nullable // should only be null when flip49 is disabled
+			MemorySize shuffleMemorySize,
 			boolean localCommunicationOnly,
 			InetAddress hostAddress,
 			TaskEventPublisher eventPublisher,
@@ -50,6 +57,7 @@ public class ShuffleEnvironmentContext {
 		this.configuration = checkNotNull(configuration);
 		this.taskExecutorResourceId = checkNotNull(taskExecutorResourceId);
 		this.maxJvmHeapMemory = maxJvmHeapMemory;
+		this.shuffleMemorySize = shuffleMemorySize;
 		this.localCommunicationOnly = localCommunicationOnly;
 		this.hostAddress = checkNotNull(hostAddress);
 		this.eventPublisher = checkNotNull(eventPublisher);
@@ -68,6 +76,11 @@ public class ShuffleEnvironmentContext {
 		return maxJvmHeapMemory;
 	}
 
+	@Nullable // should only be null when flip49 is disabled
+	public MemorySize getShuffleMemorySize() {
+		return shuffleMemorySize;
+	}
+
 	public boolean isLocalCommunicationOnly() {
 		return localCommunicationOnly;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index d914311..e570799 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -311,6 +311,7 @@ public class TaskManagerServices {
 			taskManagerServicesConfiguration.getConfiguration(),
 			taskManagerServicesConfiguration.getResourceID(),
 			taskManagerServicesConfiguration.getMaxJvmHeapMemory(),
+			taskManagerServicesConfiguration.getShuffleMemorySize(),
 			taskManagerServicesConfiguration.isLocalCommunicationOnly(),
 			taskManagerServicesConfiguration.getTaskManagerAddress(),
 			taskEventDispatcher,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 42aaabf..c2133c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -22,9 +22,12 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
@@ -85,6 +88,9 @@ public class TaskManagerServicesConfiguration {
 
 	private Optional<Time> systemResourceMetricsProbingInterval;
 
+	@Nullable // should only be null when flip49 is disabled
+	private final TaskExecutorResourceSpec taskExecutorResourceSpec;
+
 	public TaskManagerServicesConfiguration(
 			Configuration configuration,
 			ResourceID resourceID,
@@ -101,6 +107,8 @@ public class TaskManagerServicesConfiguration {
 			MemoryType memoryType,
 			float memoryFraction,
 			int pageSize,
+			@Nullable // should only be null when flip49 is disabled
+			TaskExecutorResourceSpec taskExecutorResourceSpec,
 			long timerServiceShutdownTimeout,
 			RetryingRegistrationConfiguration retryingRegistrationConfiguration,
 			Optional<Time> systemResourceMetricsProbingInterval) {
@@ -122,6 +130,8 @@ public class TaskManagerServicesConfiguration {
 		this.memoryFraction = memoryFraction;
 		this.pageSize = pageSize;
 
+		this.taskExecutorResourceSpec = taskExecutorResourceSpec;
+
 		checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
 			"service shutdown timeout must be greater or equal to 0.");
 		this.timerServiceShutdownTimeout = timerServiceShutdownTimeout;
@@ -207,6 +217,11 @@ public class TaskManagerServicesConfiguration {
 		return pageSize;
 	}
 
+	@Nullable // should only be null when flip49 is disabled
+	public MemorySize getShuffleMemorySize() {
+		return taskExecutorResourceSpec == null ? null : taskExecutorResourceSpec.getShuffleMemSize();
+	}
+
 	long getTimerServiceShutdownTimeout() {
 		return timerServiceShutdownTimeout;
 	}
@@ -259,6 +274,30 @@ public class TaskManagerServicesConfiguration {
 
 		final RetryingRegistrationConfiguration retryingRegistrationConfiguration = RetryingRegistrationConfiguration.fromConfiguration(configuration);
 
+		if (configuration.getBoolean(TaskManagerOptions.ENABLE_FLIP_49_CONFIG)) {
+			final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
+			return new TaskManagerServicesConfiguration(
+				configuration,
+				resourceID,
+				remoteAddress,
+				localCommunicationOnly,
+				tmpDirs,
+				localStateRootDir,
+				freeHeapMemoryWithDefrag,
+				maxJvmHeapMemory,
+				localRecoveryMode,
+				queryableStateConfig,
+				ConfigurationParserUtils.getSlot(configuration),
+				ConfigurationParserUtils.getManagedMemorySize(configuration),
+				ConfigurationParserUtils.getMemoryType(configuration),
+				ConfigurationParserUtils.getManagedMemoryFraction(configuration),
+				ConfigurationParserUtils.getPageSize(configuration),
+				taskExecutorResourceSpec,
+				timerServiceShutdownTimeout,
+				retryingRegistrationConfiguration,
+				ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
+		}
+
 		return new TaskManagerServicesConfiguration(
 			configuration,
 			resourceID,
@@ -275,6 +314,7 @@ public class TaskManagerServicesConfiguration {
 			ConfigurationParserUtils.getMemoryType(configuration),
 			ConfigurationParserUtils.getManagedMemoryFraction(configuration),
 			ConfigurationParserUtils.getPageSize(configuration),
+			null,
 			timerServiceShutdownTimeout,
 			retryingRegistrationConfiguration,
 			ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index a6eb90f..7bb8a8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -167,6 +167,7 @@ public class NettyShuffleEnvironmentConfiguration {
 	 *
 	 * @param configuration configuration object
 	 * @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+	 * @param shuffleMemorySize the size of memory reserved for shuffle environment, null if flip49 is disabled
 	 * @param localTaskManagerCommunication true, to skip initializing the network stack
 	 * @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible
 	 * @return NettyShuffleEnvironmentConfiguration
@@ -174,6 +175,8 @@ public class NettyShuffleEnvironmentConfiguration {
 	public static NettyShuffleEnvironmentConfiguration fromConfiguration(
 		Configuration configuration,
 		long maxJvmHeapMemory,
+		@Nullable // should only be null when flip49 is disabled
+		MemorySize shuffleMemorySize,
 		boolean localTaskManagerCommunication,
 		InetAddress taskManagerAddress) {
 
@@ -181,7 +184,7 @@ public class NettyShuffleEnvironmentConfiguration {
 
 		final int pageSize = ConfigurationParserUtils.getPageSize(configuration);
 
-		final int numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory);
+		final int numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory, shuffleMemorySize, pageSize);
 
 		final NettyConfig nettyConfig = createNettyConfig(configuration, localTaskManagerCommunication, taskManagerAddress, dataport);
 
@@ -450,36 +453,54 @@ public class NettyShuffleEnvironmentConfiguration {
 	 *
 	 * @param configuration configuration object
 	 * @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+	 * @param shuffleMemorySize the size of memory reserved for shuffle environment, null if flip49 is disabled
+	 * @param pageSize size of memory segment
 	 * @return the number of network buffers
 	 */
 	@SuppressWarnings("deprecation")
-	private static int calculateNumberOfNetworkBuffers(Configuration configuration, long maxJvmHeapMemory) {
+	private static int calculateNumberOfNetworkBuffers(
+		Configuration configuration,
+		long maxJvmHeapMemory,
+		@Nullable // should only be null when flip49 is disabled
+		MemorySize shuffleMemorySize,
+		int pageSize) {
+
 		final int numberOfNetworkBuffers;
-		if (!hasNewNetworkConfig(configuration)) {
+		if (shuffleMemorySize != null) { // flip49 enbaled
+			numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(shuffleMemorySize.getBytes(), pageSize);
+			logIfIgnoringOldConfigs(configuration);
+		} else if (!hasNewNetworkConfig(configuration)) {
 			// fallback: number of network buffers
 			numberOfNetworkBuffers = configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS);
 
 			checkOldNetworkConfig(numberOfNetworkBuffers);
 		} else {
-			if (configuration.contains(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS)) {
-				LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
-					NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS.key());
-			}
+			logIfIgnoringOldConfigs(configuration);
 
 			final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
-
-			// tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
-			long numberOfNetworkBuffersLong = networkMemorySize / ConfigurationParserUtils.getPageSize(configuration);
-			if (numberOfNetworkBuffersLong > Integer.MAX_VALUE) {
-				throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize
-					+ ") corresponds to more than MAX_INT pages.");
-			}
-			numberOfNetworkBuffers = (int) numberOfNetworkBuffersLong;
+			numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(networkMemorySize, pageSize);
 		}
 
 		return numberOfNetworkBuffers;
 	}
 
+	private static void logIfIgnoringOldConfigs(Configuration configuration) {
+		if (configuration.contains(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS)) {
+			LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
+				NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS.key());
+		}
+	}
+
+	private static int calculateNumberOfNetworkBuffers(long networkMemorySizeByte, int pageSizeByte) {
+		// tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
+		long numberOfNetworkBuffersLong = networkMemorySizeByte / pageSizeByte;
+		if (numberOfNetworkBuffersLong > Integer.MAX_VALUE) {
+			throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySizeByte
+				+ ") corresponds to more than MAX_INT pages.");
+		}
+		return (int) numberOfNetworkBuffersLong;
+	}
+
 	/**
 	 * Generates {@link NettyConfig} from Flink {@link Configuration}.
 	 *
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
index a3e295f..c4c4c0b 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
@@ -43,7 +43,7 @@ public class NettyShuffleEnvironmentConfigurationTest extends TestLogger {
 	private static final long MEM_SIZE_PARAM = 128L * 1024 * 1024;
 
 	/**
-	 * Verifies that {@link  NettyShuffleEnvironmentConfiguration#fromConfiguration(Configuration, long, boolean, InetAddress)}
+	 * Verifies that {@link  NettyShuffleEnvironmentConfiguration#fromConfiguration(Configuration, long, MemorySize, boolean, InetAddress)}
 	 * returns the correct result for new configurations via
 	 * {@link NettyShuffleEnvironmentOptions#NETWORK_REQUEST_BACKOFF_INITIAL},
 	 * {@link NettyShuffleEnvironmentOptions#NETWORK_REQUEST_BACKOFF_MAX},
@@ -63,6 +63,7 @@ public class NettyShuffleEnvironmentConfigurationTest extends TestLogger {
 		final  NettyShuffleEnvironmentConfiguration networkConfig =  NettyShuffleEnvironmentConfiguration.fromConfiguration(
 			config,
 			MEM_SIZE_PARAM,
+			null,
 			true,
 			InetAddress.getLoopbackAddress());
 


[flink] 04/21: [FLINK-13983][dist] TM startup scripts calls java codes to set flip49 TM resource configs and JVM parameters, if feature option is enabled.

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4088bea93ad89940909e1c0d3bb92054315f795d
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Sep 27 11:19:55 2019 +0800

    [FLINK-13983][dist] TM startup scripts calls java codes to set flip49 TM resource configs and JVM parameters, if feature option is enabled.
---
 flink-dist/src/main/flink-bin/bin/config.sh      | 49 ++++++++++++++++++++++++
 flink-dist/src/main/flink-bin/bin/taskmanager.sh | 30 ++++-----------
 2 files changed, 57 insertions(+), 22 deletions(-)

diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 090c1fb..52ce960 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -117,6 +117,8 @@ KEY_TASKM_NET_BUF_MIN="taskmanager.network.memory.min"
 KEY_TASKM_NET_BUF_MAX="taskmanager.network.memory.max"
 KEY_TASKM_NET_BUF_NR="taskmanager.network.numberOfBuffers" # fallback
 
+KEY_TASKM_ENABLE_FLIP49="taskmanager.enable-flip-49" # temporal feature option for flip-49
+
 KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"
 
 KEY_ENV_PID_DIR="env.pid.dir"
@@ -429,6 +431,11 @@ if [ -z "${FLINK_TM_NET_BUF_MAX}" -o "${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then
     FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX})
 fi
 
+# Define FLINK_TM_ENABLE_FLIP49 if it is not already set
+# temporal feature option for flip-49
+if [ -z "${FLINK_TM_ENABLE_FLIP49}" ]; then
+    FLINK_TM_ENABLE_FLIP49=$(readFromConfig ${KEY_TASKM_ENABLE_FLIP49} "false" "${YAML_CONF}")
+fi
 
 # Verify that NUMA tooling is available
 command -v numactl >/dev/null 2>&1
@@ -790,3 +797,45 @@ runBashJavaUtilsCmd() {
 
     echo ${output}
 }
+
+getTmResourceDynamicConfigsAndJvmParams() {
+    if [[ "`echo ${FLINK_TM_ENABLE_FLIP49} | tr '[:upper:]' '[:lower:]'`" == "true" ]]; then
+        echo "$(getTmResourceDynamicConfigsAndJvmParamsFlip49)"
+    else
+        echo "$(getTmResourceDynamicConfigsAndJvmParamsLegacy)"
+    fi
+}
+
+getTmResourceDynamicConfigsAndJvmParamsFlip49() {
+    local class_path=`constructFlinkClassPath`
+    class_path=`manglePathList ${class_path}`
+
+    local dynamic_configs=$(runBashJavaUtilsCmd GET_TM_RESOURCE_DYNAMIC_CONFIGS ${class_path} ${FLINK_CONF_DIR})
+    local jvm_params=$(runBashJavaUtilsCmd GET_TM_RESOURCE_JVM_PARAMS ${class_path} ${FLINK_CONF_DIR})
+
+    echo ${jvm_params} $'\n' ${dynamic_configs}
+}
+
+getTmResourceDynamicConfigsAndJvmParamsLegacy() {
+    if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
+	    echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace with key \`${KEY_TASKM_MEM_SIZE}\`"
+    else
+	    flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
+	    FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
+    fi
+
+    if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then
+        echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
+        exit 1
+    fi
+
+    if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then
+
+        TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
+        # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used
+        TM_MAX_OFFHEAP_SIZE="8388607T"
+
+        local jvm_params="-Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"
+        echo ${jvm_params} # no dynamic configs
+    fi
+}
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index f12f9d6..e78a1f1 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -44,32 +44,18 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
         export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
     fi
 
-    if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
-	    echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace with key \`${KEY_TASKM_MEM_SIZE}\`"
-    else
-	    flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
-	    FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
-    fi
-
-    if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then
-        echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
-        exit 1
-    fi
-
-    if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then
-
-        TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
-        # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used
-        TM_MAX_OFFHEAP_SIZE="8388607T"
-
-        export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"
-
-    fi
-
     # Add TaskManager-specific JVM options
     export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
 
     # Startup parameters
+    dynamic_configs_and_jvm_params=$(getTmResourceDynamicConfigsAndJvmParams)
+    IFS=$'\n' lines=(${dynamic_configs_and_jvm_params})
+
+    jvm_params=${lines[0]}
+    export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
+
+    dynamic_configs=${lines[1]}
+    ARGS=(${ARGS[@]} ${dynamic_configs})
     ARGS+=("--configDir" "${FLINK_CONF_DIR}")
 fi
 


[flink] 08/21: [FLINK-13986][core][config] Change default value of flip49 feature flag to true

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e1daac36f0ac6e5e2039823d0a48ef576839f437
Author: Xintong Song <to...@gmail.com>
AuthorDate: Wed Oct 16 21:00:09 2019 +0800

    [FLINK-13986][core][config] Change default value of flip49 feature flag to true
---
 .../main/java/org/apache/flink/configuration/TaskManagerOptions.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index b190ee8..7d1492c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -484,7 +484,7 @@ public class TaskManagerOptions {
 	@Documentation.ExcludeFromDocumentation("FLIP-49 is still in development.")
 	public static final ConfigOption<Boolean> ENABLE_FLIP_49_CONFIG =
 			key("taskmanager.enable-flip-49")
-			.defaultValue(false)
+			.defaultValue(true)
 			.withDescription("Toggle to switch between FLIP-49 and current task manager memory configurations.");
 
 	/** Not intended to be instantiated. */


[flink] 12/21: Treat legacy TM heap size as total process memory, not flink memory

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd73130c0bf036bf4d04c9b2102e2b6fd1f908ce
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Wed Nov 6 16:24:08 2019 +0100

    Treat legacy TM heap size as total process memory, not flink memory
---
 .../flink/configuration/TaskManagerOptions.java    |  2 +-
 .../TaskExecutorResourceUtils.java                 | 27 +++++++++++-----------
 .../TaskExecutorResourceUtilsTest.java             | 16 ++++++-------
 3 files changed, 23 insertions(+), 22 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 7d1492c..d717ad5 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -253,6 +253,7 @@ public class TaskManagerOptions {
 	public static final ConfigOption<String> TOTAL_PROCESS_MEMORY =
 		key("taskmanager.memory.total-process.size")
 			.noDefaultValue()
+			.withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key())
 			.withDescription("Total Process Memory size for the TaskExecutors. This includes all the memory that a"
 				+ " TaskExecutor consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. On"
 				+ " containerized setups, this should be set to the container memory.");
@@ -264,7 +265,6 @@ public class TaskManagerOptions {
 	public static final ConfigOption<String> TOTAL_FLINK_MEMORY =
 		key("taskmanager.memory.total-flink.size")
 		.noDefaultValue()
-		.withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key())
 		.withDescription("Total Flink Memory size for the TaskExecutors. This includes all the memory that a"
 			+ " TaskExecutor consumes, except for JVM Metaspace and JVM Overhead. It consists of Framework Heap Memory,"
 			+ " Task Heap Memory, Task Off-Heap Memory, Managed Memory, and Shuffle Memory.");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index 83d2d7d..f2a275f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -408,8 +408,13 @@ public class TaskExecutorResourceUtils {
 
 	private static MemorySize getTotalFlinkMemorySize(final Configuration config) {
 		checkArgument(isTotalFlinkMemorySizeExplicitlyConfigured(config));
-		if (config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY)) {
-			return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_FLINK_MEMORY));
+		return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_FLINK_MEMORY));
+	}
+
+	private static MemorySize getTotalProcessMemorySize(final Configuration config) {
+		checkArgument(isTotalProcessMemorySizeExplicitlyConfigured(config));
+		if (config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)) {
+			return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
 		} else {
 			@SuppressWarnings("deprecation")
 			final long legacyHeapMemoryMB = config.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB);
@@ -417,11 +422,6 @@ public class TaskExecutorResourceUtils {
 		}
 	}
 
-	private static MemorySize getTotalProcessMemorySize(final Configuration config) {
-		checkArgument(isTotalProcessMemorySizeExplicitlyConfigured(config));
-		return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
-	}
-
 	private static boolean isTaskHeapMemorySizeExplicitlyConfigured(final Configuration config) {
 		return config.contains(TaskManagerOptions.TASK_HEAP_MEMORY);
 	}
@@ -454,15 +454,16 @@ public class TaskExecutorResourceUtils {
 	}
 
 	private static boolean isTotalFlinkMemorySizeExplicitlyConfigured(final Configuration config) {
-		// backward compatible with the deprecated config option TASK_MANAGER_HEAP_MEMORY_MB only when it's explicitly
-		// configured by the user
-		@SuppressWarnings("deprecation")
-		final boolean legacyConfigured = config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB);
-		return config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY) || legacyConfigured;
+		return config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY);
 	}
 
 	private static boolean isTotalProcessMemorySizeExplicitlyConfigured(final Configuration config) {
-		return config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY);
+		// backward compatible with the deprecated config options TASK_MANAGER_HEAP_MEMORY and TASK_MANAGER_HEAP_MEMORY_MB
+		// only when they are explicitly configured by the user
+		@SuppressWarnings("deprecation")
+		final boolean legacyConfigured =
+			config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) || config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+		return config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY) || legacyConfigured;
 	}
 
 	private static void sanityCheckTotalFlinkMemory(final Configuration config, final FlinkInternalMemory flinkInternalMemory) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
index 432c1af..79ea059 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
@@ -457,17 +457,17 @@ public class TaskExecutorResourceUtilsTest extends TestLogger {
 	}
 
 	@Test
-	public void testConfigTotalFlinkMemoryLegacyMB() {
-		final MemorySize totalFlinkMemorySize = MemorySize.parse("1g");
+	public void testConfigTotalProcessMemoryLegacyMB() {
+		final MemorySize totalProcessMemorySize = MemorySize.parse("1g");
 
 		@SuppressWarnings("deprecation")
 		final ConfigOption<Integer> legacyOption = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB;
 
 		Configuration conf = new Configuration();
-		conf.setInteger(legacyOption, totalFlinkMemorySize.getMebiBytes());
+		conf.setInteger(legacyOption, totalProcessMemorySize.getMebiBytes());
 
 		TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(conf);
-		assertThat(taskExecutorResourceSpec.getTotalFlinkMemorySize(), is(totalFlinkMemorySize));
+		assertThat(taskExecutorResourceSpec.getTotalProcessMemorySize(), is(totalProcessMemorySize));
 	}
 
 	@Test
@@ -495,17 +495,17 @@ public class TaskExecutorResourceUtilsTest extends TestLogger {
 	}
 
 	@Test
-	public void testConfigTotalFlinkMemoryLegacySize() {
-		final MemorySize totalFlinkMemorySize = MemorySize.parse("1g");
+	public void testConfigTotalProcessMemoryLegacySize() {
+		final MemorySize totalProcessMemorySize = MemorySize.parse("1g");
 
 		@SuppressWarnings("deprecation")
 		final ConfigOption<String> legacyOption = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY;
 
 		Configuration conf = new Configuration();
-		conf.setString(legacyOption, totalFlinkMemorySize.getMebiBytes() + "m");
+		conf.setString(legacyOption, totalProcessMemorySize.getMebiBytes() + "m");
 
 		TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(conf);
-		assertThat(taskExecutorResourceSpec.getTotalFlinkMemorySize(), is(totalFlinkMemorySize));
+		assertThat(taskExecutorResourceSpec.getTotalProcessMemorySize(), is(totalProcessMemorySize));
 	}
 
 	@Test


[flink] 17/21: fix LaunchableMesosWorkerTest

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dc0ba44b40fd86c1fb019f37383b849956125874
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Mon Nov 4 18:31:06 2019 +0100

    fix LaunchableMesosWorkerTest
---
 .../flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
index 3d53160..42095b8 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.util.MesosResourceAllocation;
 import org.apache.flink.mesos.util.MesosUtils;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.mesos.Protos;
@@ -86,6 +87,7 @@ public class LaunchableMesosWorkerTest extends TestLogger {
 		configuration.setString(MesosOptions.MASTER_URL, "foobar");
 		final MemorySize memorySize = new MemorySize(1337L);
 		configuration.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, memorySize.toString());
+		TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(configuration);
 
 		final LaunchableTask launchableTask = new LaunchableMesosWorker(
 			ignored -> Option.empty(),


[flink] 11/21: Adjust memory configuration for local execution

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4224fd9dd21a6fd6d96aa389676660a522c92e21
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Thu Oct 31 11:53:43 2019 +0100

    Adjust memory configuration for local execution
---
 .../MesosTaskManagerParametersTest.java            | 32 ++++++++++++----------
 .../TaskExecutorResourceUtils.java                 | 24 ++++++++++++++++
 .../minicluster/MiniClusterConfiguration.java      |  3 +-
 ...tractTaskManagerProcessFailureRecoveryTest.java |  2 ++
 .../apache/flink/yarn/YarnResourceManagerTest.java |  2 ++
 5 files changed, 48 insertions(+), 15 deletions(-)

diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
index ead7a12..73a36d0 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
@@ -32,6 +32,7 @@ import java.util.List;
 
 import scala.Option;
 
+import static org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -78,7 +79,7 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 		Configuration config = new Configuration();
 		config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_VOLUMES, "/host/path:/container/path:ro");
 
-		MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+		MesosTaskManagerParameters params = createMesosTaskManagerParameters(config);
 		assertEquals(1, params.containerVolumes().size());
 		assertEquals("/container/path", params.containerVolumes().get(0).getContainerPath());
 		assertEquals("/host/path", params.containerVolumes().get(0).getHostPath());
@@ -90,7 +91,7 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 		Configuration config = new Configuration();
 		config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS, "testKey=testValue");
 
-		MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+		MesosTaskManagerParameters params = createMesosTaskManagerParameters(config);
 		assertEquals(params.dockerParameters().size(), 1);
 		assertEquals(params.dockerParameters().get(0).getKey(), "testKey");
 		assertEquals(params.dockerParameters().get(0).getValue(), "testValue");
@@ -102,7 +103,7 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 		config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS,
 				"testKey1=testValue1,testKey2=testValue2,testParam3=key3=value3,testParam4=\"key4=value4\"");
 
-		MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+		MesosTaskManagerParameters params = createMesosTaskManagerParameters(config);
 		assertEquals(params.dockerParameters().size(), 4);
 		assertEquals(params.dockerParameters().get(0).getKey(), "testKey1");
 		assertEquals(params.dockerParameters().get(0).getValue(), "testValue1");
@@ -118,7 +119,7 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 	public void testContainerDockerParametersMalformed() throws Exception {
 		Configuration config = new Configuration();
 		config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS, "badParam");
-		MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+		MesosTaskManagerParameters params = createMesosTaskManagerParameters(config);
 	}
 
 	@Test
@@ -127,7 +128,7 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 		config.setString(MesosTaskManagerParameters.MESOS_TM_URIS,
 				"file:///dev/null,http://localhost/test,  test_url ");
 
-		MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+		MesosTaskManagerParameters params = createMesosTaskManagerParameters(config);
 		assertEquals(params.uris().size(), 3);
 		assertEquals(params.uris().get(0), "file:///dev/null");
 		assertEquals(params.uris().get(1), "http://localhost/test");
@@ -138,7 +139,7 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 	public void testUriParametersDefault() throws Exception {
 		Configuration config = new Configuration();
 
-		MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+		MesosTaskManagerParameters params = createMesosTaskManagerParameters(config);
 		assertEquals(params.uris().size(), 0);
 	}
 
@@ -146,7 +147,7 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 		Configuration config = new Configuration();
 		config.setBoolean(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_FORCE_PULL_IMAGE, true);
 
-		MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+		MesosTaskManagerParameters params = createMesosTaskManagerParameters(config);
 		assertEquals(params.dockerForcePullImage(), true);
 	}
 
@@ -155,7 +156,7 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 		Configuration config = new Configuration();
 		config.setBoolean(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_FORCE_PULL_IMAGE, false);
 
-		MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+		MesosTaskManagerParameters params = createMesosTaskManagerParameters(config);
 		assertEquals(params.dockerForcePullImage(), false);
 	}
 
@@ -163,14 +164,14 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 	public void testForcePullImageDefault() {
 		Configuration config = new Configuration();
 
-		MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+		MesosTaskManagerParameters params = createMesosTaskManagerParameters(config);
 		assertEquals(params.dockerForcePullImage(), false);
 	}
 
 	@Test
 	public void givenTwoConstraintsInConfigShouldBeParsed() throws Exception {
 
-		MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo,az:eu-west-1"));
+		MesosTaskManagerParameters mesosTaskManagerParameters = createMesosTaskManagerParameters(withHardHostAttrConstraintConfiguration("cluster:foo,az:eu-west-1"));
 		assertThat(mesosTaskManagerParameters.constraints().size(), is(2));
 		ConstraintEvaluator firstConstraintEvaluator = new HostAttrValueConstraint("cluster", new Func1<String, String>() {
 			@Override
@@ -192,7 +193,7 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 	@Test
 	public void givenOneConstraintInConfigShouldBeParsed() throws Exception {
 
-		MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo"));
+		MesosTaskManagerParameters mesosTaskManagerParameters = createMesosTaskManagerParameters(withHardHostAttrConstraintConfiguration("cluster:foo"));
 		assertThat(mesosTaskManagerParameters.constraints().size(), is(1));
 		ConstraintEvaluator firstConstraintEvaluator = new HostAttrValueConstraint("cluster", new Func1<String, String>() {
 			@Override
@@ -206,20 +207,20 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 	@Test
 	public void givenEmptyConstraintInConfigShouldBeParsed() throws Exception {
 
-		MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration(""));
+		MesosTaskManagerParameters mesosTaskManagerParameters = createMesosTaskManagerParameters(withHardHostAttrConstraintConfiguration(""));
 		assertThat(mesosTaskManagerParameters.constraints().size(), is(0));
 	}
 
 	@Test
 	public void givenInvalidConstraintInConfigShouldBeParsed() throws Exception {
 
-		MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration(",:,"));
+		MesosTaskManagerParameters mesosTaskManagerParameters = createMesosTaskManagerParameters(withHardHostAttrConstraintConfiguration(",:,"));
 		assertThat(mesosTaskManagerParameters.constraints().size(), is(0));
 	}
 
 	@Test(expected = IllegalConfigurationException.class)
 	public void testNegativeNumberOfGPUs() throws Exception {
-		MesosTaskManagerParameters.create(withGPUConfiguration(-1));
+		createMesosTaskManagerParameters(withGPUConfiguration(-1));
 	}
 
 	private static Configuration withGPUConfiguration(int gpus) {
@@ -238,4 +239,7 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 		};
 	}
 
+	private static MesosTaskManagerParameters createMesosTaskManagerParameters(Configuration configuration) {
+		return MesosTaskManagerParameters.create(adjustMemoryConfigurationForLocalExecution(configuration));
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index ee13943..83d2d7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -607,4 +608,27 @@ public class TaskExecutorResourceUtils {
 			return metaspace.add(overhead);
 		}
 	}
+
+	/**
+	 * Adjusts Flink TM memory configuration for local execution.
+	 *
+	 * <p>Normally Flink requires at least one memory setting after FLIP-49.
+	 * If JVM process starts for TM without e.g. explicit {@link TaskManagerOptions#TOTAL_FLINK_MEMORY},
+	 * then it will fail. In case of adhoc local execution for {@link org.apache.flink.runtime.minicluster.MiniCluster}
+	 * or tests, default flink-conf.yaml is not loaded which contains explicit {@link TaskManagerOptions#TOTAL_FLINK_MEMORY}.
+	 * We derive managed memory size from the available JVM heap size in this case.
+	 */
+	public static Configuration adjustMemoryConfigurationForLocalExecution(Configuration configuration) {
+		try {
+			resourceSpecFromConfig(configuration);
+		} catch (Throwable t) {
+			configuration.setString(TaskManagerOptions.TASK_HEAP_MEMORY, "0");
+			long freeHeapMemoryWithDefrag = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
+			float managedMemoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
+			long managedMemorySize = (long) (freeHeapMemoryWithDefrag * managedMemoryFraction);
+			configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, Long.toString(managedMemorySize));
+			configuration.setFloat(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_FRACTION, 0.0f);
+		}
+		return configuration;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 6518aa0..9c89d0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -30,6 +30,7 @@ import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
 
+import static org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution;
 import static org.apache.flink.runtime.minicluster.RpcServiceSharing.SHARED;
 
 /**
@@ -74,7 +75,7 @@ public class MiniClusterConfiguration {
 			configuration.setString(JobManagerOptions.SCHEDULER, schedulerType);
 		}
 
-		return new UnmodifiableConfiguration(configuration);
+		return new UnmodifiableConfiguration(adjustMemoryConfigurationForLocalExecution(configuration));
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index c8f44e8..b4beaca 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
@@ -317,6 +318,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			try {
 				final ParameterTool parameterTool = ParameterTool.fromArgs(args);
 				Configuration cfg = parameterTool.getConfiguration();
+				TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(cfg);
 
 				TaskManagerRunner.runTaskManager(cfg, ResourceID.generate());
 			}
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index a4e6812..0e528ff 100755
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -132,6 +133,7 @@ public class YarnResourceManagerTest extends TestLogger {
 
 		flinkConfig = new Configuration();
 		flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100);
+		TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(flinkConfig);
 
 		File root = folder.getRoot();
 		File home = new File(root, "home");