You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/06/23 07:12:19 UTC

[flink] 01/03: [FLINK-18353] Make enabling of the JVM Direct Memory limit configurable for JM

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 246a8c5dbfbf652f56bad312a6e90362c3ffe9d7
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Mon Jun 22 19:09:00 2020 +0300

    [FLINK-18353] Make enabling of the JVM Direct Memory limit configurable for JM
    
    The JVM Direct Memory leak is unlikely in JM. Therefore, we could
    disable its limit by default. This way it could span into e.g. JVM
    Overhead w/o failure to improve the user experience as before FLIP-116.
    If user needs the limit, e.g. to investigate container OOMs, the limit can be enabled by
    setting the 'jobmanager.memory.enable-jvm-direct-memory-limit' option.
    
    This closes #12745.
---
 .../_includes/generated/common_memory_section.html |  8 ++++-
 .../generated/job_manager_configuration.html       |  8 ++++-
 docs/ops/memory/mem_migration.md                   | 13 +++++++
 docs/ops/memory/mem_setup.md                       | 15 ++++----
 docs/ops/memory/mem_setup_jobmanager.md            |  6 ++--
 docs/ops/memory/mem_trouble.md                     |  8 +++--
 .../flink/configuration/ConfigurationUtils.java    |  1 -
 .../flink/configuration/JobManagerOptions.java     | 27 +++++++++++++--
 .../decorators/JavaCmdJobManagerDecorator.java     |  6 ++--
 .../decorators/JavaCmdJobManagerDecoratorTest.java |  6 ++--
 .../runtime/jobmanager/JobManagerProcessUtils.java |  6 ++++
 .../flink/runtime/util/bash/BashJavaUtils.java     |  2 +-
 .../util/config/memory/ProcessMemoryUtils.java     | 20 ++++++++---
 .../config/memory/ProcessMemoryUtilsTestBase.java  | 40 +++++++++++++++-------
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  3 +-
 .../flink/yarn/YarnClusterDescriptorTest.java      |  4 +--
 16 files changed, 129 insertions(+), 44 deletions(-)

diff --git a/docs/_includes/generated/common_memory_section.html b/docs/_includes/generated/common_memory_section.html
index ce0878c..3b1df1b 100644
--- a/docs/_includes/generated/common_memory_section.html
+++ b/docs/_includes/generated/common_memory_section.html
@@ -9,6 +9,12 @@
     </thead>
     <tbody>
         <tr>
+            <td><h5>jobmanager.memory.enable-jvm-direct-memory-limit</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable the JVM direct memory limit of the JobManager process (-XX:MaxDirectMemorySize). The limit will be set to the value of 'jobmanager.memory.off-heap.size' option. </td>
+        </tr>
+        <tr>
             <td><h5>jobmanager.memory.flink.size</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>MemorySize</td>
@@ -48,7 +54,7 @@
             <td><h5>jobmanager.memory.off-heap.size</h5></td>
             <td style="word-wrap: break-word;">128 mb</td>
             <td>MemorySize</td>
-            <td>Off-heap Memory size for JobManager. The JVM direct memory limit of the Job Manager process (-XX:MaxDirectMemorySize) will be set to this value. This option covers all off-heap memory usage including direct and native memory allocation.</td>
+            <td>Off-heap Memory size for JobManager. This option covers all off-heap memory usage including direct and native memory allocation. The JVM direct memory limit of the JobManager process (-XX:MaxDirectMemorySize) will be set to this value if the limit is enabled by 'jobmanager.memory.enable-jvm-direct-memory-limit'. </td>
         </tr>
         <tr>
             <td><h5>jobmanager.memory.process.size</h5></td>
diff --git a/docs/_includes/generated/job_manager_configuration.html b/docs/_includes/generated/job_manager_configuration.html
index 87341aa..988bcc6 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -33,6 +33,12 @@
             <td>This option specifies how the job computation recovers from task failures. Accepted values are:<ul><li>'full': Restarts all tasks to recover the job.</li><li>'region': Restarts all tasks that could be affected by the task failure. More details can be found <a href="../dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy">here</a>.</li></ul></td>
         </tr>
         <tr>
+            <td><h5>jobmanager.memory.enable-jvm-direct-memory-limit</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable the JVM direct memory limit of the JobManager process (-XX:MaxDirectMemorySize). The limit will be set to the value of 'jobmanager.memory.off-heap.size' option. </td>
+        </tr>
+        <tr>
             <td><h5>jobmanager.memory.flink.size</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>MemorySize</td>
@@ -72,7 +78,7 @@
             <td><h5>jobmanager.memory.off-heap.size</h5></td>
             <td style="word-wrap: break-word;">128 mb</td>
             <td>MemorySize</td>
-            <td>Off-heap Memory size for JobManager. The JVM direct memory limit of the Job Manager process (-XX:MaxDirectMemorySize) will be set to this value. This option covers all off-heap memory usage including direct and native memory allocation.</td>
+            <td>Off-heap Memory size for JobManager. This option covers all off-heap memory usage including direct and native memory allocation. The JVM direct memory limit of the JobManager process (-XX:MaxDirectMemorySize) will be set to this value if the limit is enabled by 'jobmanager.memory.enable-jvm-direct-memory-limit'. </td>
         </tr>
         <tr>
             <td><h5>jobmanager.memory.process.size</h5></td>
diff --git a/docs/ops/memory/mem_migration.md b/docs/ops/memory/mem_migration.md
index 64427de..0282579 100644
--- a/docs/ops/memory/mem_migration.md
+++ b/docs/ops/memory/mem_migration.md
@@ -239,6 +239,19 @@ is also derived as the rest of what is left after subtracting all other componen
 control over the [JVM Heap]({% link ops/memory/mem_setup_jobmanager.md %}#configure-jvm-heap) by adjusting the
 [`jobmanager.memory.heap.size`](../config.html#jobmanager-memory-heap-size) option.
 
+## Flink JVM process memory limits
+
+Since *1.10* release, Flink sets the *JVM Metaspace* and *JVM Direct Memory* limits for the TaskManager process
+by adding the corresponding JVM arguments. Since *1.11* release, Flink also sets the *JVM Metaspace* limit for the JobManager process.
+You can enable the *JVM Direct Memory* limit for JobManager process if you set the
+[`jobmanager.memory.enable-jvm-direct-memory-limit`](../config.html#jobmanager-memory-enable-jvm-direct-memory-limit) option.
+See also [JVM parameters](mem_setup.html#jvm-parameters).
+
+Flink sets the mentioned JVM memory limits to simplify debugging of the corresponding memory leaks and avoid
+[the container out-of-memory errors](mem_trouble.html#container-memory-exceeded).
+See also the troubleshooting guide for details about the [JVM Metaspace](mem_trouble.html#outofmemoryerror-metaspace)
+and [JVM Direct Memory](mem_trouble.html#outofmemoryerror-direct-buffer-memory) *OutOfMemoryErrors*.
+
 ## Container Cut-Off Memory
 
 For containerized deployments, you could previously specify a cut-off memory. This memory could accommodate for unaccounted memory allocations.
diff --git a/docs/ops/memory/mem_setup.md b/docs/ops/memory/mem_setup.md
index cd4ee2d..b39cdab 100644
--- a/docs/ops/memory/mem_setup.md
+++ b/docs/ops/memory/mem_setup.md
@@ -94,13 +94,16 @@ Configuring other memory components also requires caution as it can produce furt
 Flink explicitly adds the following memory related JVM arguments while starting its processes, based on the configured
 or derived memory component sizes:
 
-| &nbsp;&nbsp;**JVM Arguments**&nbsp;&nbsp; | &nbsp;&nbsp;**Value for TaskManager**&nbsp;&nbsp; | &nbsp;&nbsp;**Value for JobManager**&nbsp;&nbsp; |
-| :---------------------------------------- | :------------------------------------------------- | :------------------------------------------------ |
-| *-Xmx* and *-Xms*                         | Framework + Task Heap Memory                       | JVM Heap Memory                                   |
-| *-XX:MaxDirectMemorySize*                 | Framework + Task Off-heap (*) + Network Memory     | Off-heap Memory (*)                               |
-| *-XX:MaxMetaspaceSize*                    | JVM Metaspace                                      | JVM Metaspace                                     |
+| &nbsp;&nbsp;**JVM Arguments**&nbsp;&nbsp;                                              | &nbsp;&nbsp;**Value for TaskManager**&nbsp;&nbsp;  | &nbsp;&nbsp;**Value for JobManager**&nbsp;&nbsp;  |
+| :------------------------------------------------------------------------------------- | :------------------------------------------------- | :------------------------------------------------ |
+| *-Xmx* and *-Xms*                                                                      | Framework + Task Heap Memory                       | JVM Heap Memory                                   |
+| *-XX:MaxDirectMemorySize*<br/>(always added only for TaskManager, see note for JobManager) | Framework + Task Off-heap (\*) + Network Memory     | Off-heap Memory (\*),(\*\*)                          |
+| *-XX:MaxMetaspaceSize*                                                                 | JVM Metaspace                                      | JVM Metaspace                                     |
 {:.table-bordered}
-(*) Notice, that the native non-direct usage of memory in user code can be also accounted for as a part of the off-heap memory.
+(\*) Notice, that the native non-direct usage of memory in user code can be also accounted for as a part of the off-heap memory.
+<br/>
+(\*\*) The *JVM Direct memory limit* is added for JobManager process only if the corresponding option
+[`jobmanager.memory.enable-jvm-direct-memory-limit`](../config.html#jobmanager-memory-enable-jvm-direct-memory-limit) is set.
 <br/><br/>
 
 Check also the detailed memory model for [TaskManager](mem_setup_tm.html#detailed-memory-model) and
diff --git a/docs/ops/memory/mem_setup_jobmanager.md b/docs/ops/memory/mem_setup_jobmanager.md
index c564da9..f0125d1 100644
--- a/docs/ops/memory/mem_setup_jobmanager.md
+++ b/docs/ops/memory/mem_setup_jobmanager.md
@@ -74,8 +74,10 @@ The Flink scripts and CLI set the *JVM Heap* size via the JVM parameters *-Xms*
 
 ### Configure Off-heap Memory
 
-The *Off-heap* memory component accounts for any type of *JVM direct memory* and *native memory* usage. Therefore, it
-is also set via the corresponding JVM argument: *-XX:MaxDirectMemorySize*, see also [JVM parameters](mem_setup.html#jvm-parameters).
+The *Off-heap* memory component accounts for any type of *JVM direct memory* and *native memory* usage. Therefore,
+you can also enable the *JVM Direct Memory* limit by setting the [`jobmanager.memory.enable-jvm-direct-memory-limit`](../config.html#jobmanager-memory-enable-jvm-direct-memory-limit) option.
+If this option is configured, Flink will set the limit to the *Off-heap* memory size via the corresponding JVM argument: *-XX:MaxDirectMemorySize*.
+See also [JVM parameters](mem_setup.html#jvm-parameters).
 
 The size of this component can be configured by [`jobmanager.memory.off-heap.size`](../config.html#jobmanager-memory-off-heap-size)
 option. This option can be tuned e.g. if the JobManager process throws ‘OutOfMemoryError: Direct buffer memory’, see
diff --git a/docs/ops/memory/mem_trouble.md b/docs/ops/memory/mem_trouble.md
index 8cf940b..64c479b 100644
--- a/docs/ops/memory/mem_trouble.md
+++ b/docs/ops/memory/mem_trouble.md
@@ -37,7 +37,7 @@ greater than 1, etc.) or configuration conflicts. Check the documentation chapte
 The exception usually indicates that the *JVM Heap* is too small. You can try to increase the JVM Heap size
 by increasing [total memory](mem_setup.html#configure-total-memory). You can also directly increase
 [task heap memory](mem_setup_tm.html#task-operator-heap-memory) for TaskManagers or
-[JVM Heap memory]({% link ops/memory/mem_setup_jobmanager.md %}#configure-jvm-heap) for Masters.
+[JVM Heap memory]({% link ops/memory/mem_setup_jobmanager.md %}#configure-jvm-heap) for JobManagers.
 
 <span class="label label-info">Note</span> You can also increase the [framework heap memory](mem_setup_tm.html#framework-memory)
 for TaskManagers, but you should only change this option if you are sure the Flink framework itself needs more memory.
@@ -54,7 +54,7 @@ See also how to configure off-heap memory for [TaskManagers](mem_setup_tm.html#c
 
 The exception usually indicates that [JVM metaspace limit](mem_setup.html#jvm-parameters) is configured too small.
 You can try to increase the JVM metaspace option for [TaskManagers](../config.html#taskmanager-memory-jvm-metaspace-size)
-or [Masters](../config.html#jobmanager-memory-jvm-metaspace-size).
+or [JobManagers](../config.html#jobmanager-memory-jvm-metaspace-size).
 
 ## IOException: Insufficient number of network buffers
 
@@ -72,6 +72,10 @@ If a Flink container tries to allocate memory beyond its requested size (Yarn, M
 this usually indicates that Flink has not reserved enough native memory. You can observe this either by using an external
 monitoring system or from the error messages when a container gets killed by the deployment environment.
 
+If you encounter this problem in the *JobManager* process, you can also enable the *JVM Direct Memory* limit by setting the
+[`jobmanager.memory.enable-jvm-direct-memory-limit`](../config.html#jobmanager-memory-enable-jvm-direct-memory-limit) option
+to exclude possible *JVM Direct Memory* leak.
+
 If [RocksDBStateBackend](../state/state_backends.html#the-rocksdbstatebackend) is used, and the memory controlling is disabled,
 you can try to increase the TaskManager's [managed memory](mem_setup.html#managed-memory).
 
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 4fbc512..d4ff3b4 100755
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -194,7 +194,6 @@ public class ConfigurationUtils {
 
 		checkArgument(configs.containsKey(xmx));
 		checkArgument(configs.containsKey(xms));
-		checkArgument(configs.containsKey(maxDirect));
 		checkArgument(configs.containsKey(maxMetadata));
 
 		return configs;
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index dad1497..aa3dc9a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -169,9 +169,30 @@ public class JobManagerOptions {
 		key("jobmanager.memory.off-heap.size")
 			.memoryType()
 			.defaultValue(MemorySize.ofMebiBytes(128))
-			.withDescription("Off-heap Memory size for JobManager. The JVM direct memory limit of the Job Manager " +
-				"process (-XX:MaxDirectMemorySize) will be set to this value. This option covers all off-heap memory " +
-				"usage including direct and native memory allocation.");
+			.withDescription(Description
+				.builder()
+				.text(
+					"Off-heap Memory size for JobManager. This option covers all off-heap memory usage including " +
+						"direct and native memory allocation. The JVM direct memory limit of the JobManager process " +
+						"(-XX:MaxDirectMemorySize) will be set to this value if the limit is enabled by " +
+						"'jobmanager.memory.enable-jvm-direct-memory-limit'. ")
+				.build());
+
+	/**
+	 * Off-heap Memory size for the JobManager.
+	 */
+	@Documentation.Section(Documentation.Sections.COMMON_MEMORY)
+	public static final ConfigOption<Boolean> JVM_DIRECT_MEMORY_LIMIT_ENABLED =
+		key("jobmanager.memory.enable-jvm-direct-memory-limit")
+			.booleanType()
+			.defaultValue(false)
+			.withDescription(Description
+				.builder()
+				.text(
+					"Whether to enable the JVM direct memory limit of the JobManager process " +
+						"(-XX:MaxDirectMemorySize). The limit will be set to the value of '%s' option. ",
+					text(OFF_HEAP_MEMORY.key()))
+				.build());
 
 	/**
 	 * JVM Metaspace Size for the JobManager.
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java
index 134bfee..8102c6b 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java
@@ -25,8 +25,6 @@ import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerPar
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
-import org.apache.flink.runtime.util.config.memory.ProcessMemorySpec;
-import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
 
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ContainerBuilder;
@@ -84,13 +82,13 @@ public class JavaCmdJobManagerDecorator extends AbstractKubernetesStepDecorator
 	 */
 	private static String getJobManagerStartCommand(
 			Configuration flinkConfig,
-			ProcessMemorySpec jobManagerProcessSpec,
+			JobManagerProcessSpec jobManagerProcessSpec,
 			String configDirectory,
 			String logDirectory,
 			boolean hasLogback,
 			boolean hasLog4j,
 			String mainClass) {
-		final String jvmMemOpts = ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec);
+		final String jvmMemOpts = JobManagerProcessUtils.generateJvmParametersStr(jobManagerProcessSpec, flinkConfig);
 		return KubernetesUtils.getCommonStartCommand(
 			flinkConfig,
 			KubernetesUtils.ClusterComponent.JOB_MANAGER,
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java
index f236d47..32a12a1 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
-import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
 
 import io.fabric8.kubernetes.api.model.Container;
 import org.junit.Test;
@@ -64,8 +63,9 @@ public class JavaCmdJobManagerDecoratorTest extends KubernetesJobManagerTestBase
 					FLINK_LOG_DIR_IN_POD, FLINK_LOG_DIR_IN_POD);
 
 	// Memory variables
-	private static final String jmJvmMem = ProcessMemoryUtils.generateJvmParametersStr(
-		JobManagerProcessUtils.createDefaultJobManagerProcessSpec(JOB_MANAGER_MEMORY));
+	private final String jmJvmMem = JobManagerProcessUtils.generateJvmParametersStr(
+		JobManagerProcessUtils.createDefaultJobManagerProcessSpec(JOB_MANAGER_MEMORY),
+		flinkConfig);
 
 	private JavaCmdJobManagerDecorator javaCmdJobManagerDecorator;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java
index d34b3bc..d5b1f8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java
@@ -93,4 +93,10 @@ public class JobManagerProcessUtils {
 		configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(totalProcessMemoryMb));
 		return processSpecFromConfig(configuration);
 	}
+
+	public static String generateJvmParametersStr(JobManagerProcessSpec processSpec, Configuration configuration) {
+		return ProcessMemoryUtils.generateJvmParametersStr(
+			processSpec,
+			configuration.getBoolean(JobManagerOptions.JVM_DIRECT_MEMORY_LIMIT_ENABLED));
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
index 9d53db0..079dde2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
@@ -105,7 +105,7 @@ public class BashJavaUtils {
 
 		logMasterConfiguration(jobManagerProcessSpec);
 
-		return Collections.singletonList(ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec));
+		return Collections.singletonList(JobManagerProcessUtils.generateJvmParametersStr(jobManagerProcessSpec, configuration));
 	}
 
 	private static void logMasterConfiguration(JobManagerProcessSpec spec) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
index b154156..479d484 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
@@ -290,9 +290,21 @@ public class ProcessMemoryUtils<FM extends FlinkMemory> {
 	}
 
 	public static String generateJvmParametersStr(ProcessMemorySpec processSpec) {
-		return "-Xmx" + processSpec.getJvmHeapMemorySize().getBytes()
-			+ " -Xms" + processSpec.getJvmHeapMemorySize().getBytes()
-			+ " -XX:MaxDirectMemorySize=" + processSpec.getJvmDirectMemorySize().getBytes()
-			+ " -XX:MaxMetaspaceSize=" + processSpec.getJvmMetaspaceSize().getBytes();
+		return generateJvmParametersStr(processSpec, true);
+	}
+
+	public static String generateJvmParametersStr(ProcessMemorySpec processSpec, boolean enableDirectMemoryLimit) {
+		final StringBuilder jvmArgStr = new StringBuilder();
+
+		jvmArgStr.append("-Xmx").append(processSpec.getJvmHeapMemorySize().getBytes());
+		jvmArgStr.append(" -Xms").append(processSpec.getJvmHeapMemorySize().getBytes());
+
+		if (enableDirectMemoryLimit) {
+			jvmArgStr.append(" -XX:MaxDirectMemorySize=").append(processSpec.getJvmDirectMemorySize().getBytes());
+		}
+
+		jvmArgStr.append(" -XX:MaxMetaspaceSize=").append(processSpec.getJvmMetaspaceSize().getBytes());
+
+		return jvmArgStr.toString();
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
index 301e2b6..b787df2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
@@ -76,20 +76,28 @@ public abstract class ProcessMemoryUtilsTestBase<T extends ProcessMemorySpec> ex
 
 	@Test
 	public void testGenerateJvmParameters() {
-		MemorySize heap = MemorySize.ofMebiBytes(1);
-		MemorySize directMemory = MemorySize.ofMebiBytes(2);
-		MemorySize metaspace = MemorySize.ofMebiBytes(3);
-		String jvmParamsStr = ProcessMemoryUtils.generateJvmParametersStr(new JvmArgTestingProcessMemorySpec(
-			heap,
-			directMemory,
-			metaspace
-		));
+		ProcessMemorySpec spec = JvmArgTestingProcessMemorySpec.generate();
+		String jvmParamsStr = ProcessMemoryUtils.generateJvmParametersStr(spec, true);
 		Map<String, String> configs = ConfigurationUtils.parseJvmArgString(jvmParamsStr);
 
-		assertThat(MemorySize.parse(configs.get("-Xmx")), is(heap));
-		assertThat(MemorySize.parse(configs.get("-Xms")), is(heap));
-		assertThat(MemorySize.parse(configs.get("-XX:MaxDirectMemorySize=")), is(directMemory));
-		assertThat(MemorySize.parse(configs.get("-XX:MaxMetaspaceSize=")), is(metaspace));
+		assertThat(configs.size(), is(4));
+		assertThat(MemorySize.parse(configs.get("-Xmx")), is(spec.getJvmHeapMemorySize()));
+		assertThat(MemorySize.parse(configs.get("-Xms")), is(spec.getJvmHeapMemorySize()));
+		assertThat(MemorySize.parse(configs.get("-XX:MaxMetaspaceSize=")), is(spec.getJvmMetaspaceSize()));
+		assertThat(MemorySize.parse(configs.get("-XX:MaxDirectMemorySize=")), is(spec.getJvmDirectMemorySize()));
+	}
+
+	@Test
+	public void testGenerateJvmParametersWithoutDirectMemoryLimit() {
+		ProcessMemorySpec spec = JvmArgTestingProcessMemorySpec.generate();
+		String jvmParamsStr = ProcessMemoryUtils.generateJvmParametersStr(spec, false);
+		Map<String, String> configs = ConfigurationUtils.parseJvmArgString(jvmParamsStr);
+
+		assertThat(configs.size(), is(3));
+		assertThat(MemorySize.parse(configs.get("-Xmx")), is(spec.getJvmHeapMemorySize()));
+		assertThat(MemorySize.parse(configs.get("-Xms")), is(spec.getJvmHeapMemorySize()));
+		assertThat(MemorySize.parse(configs.get("-XX:MaxMetaspaceSize=")), is(spec.getJvmMetaspaceSize()));
+		assertThat(configs.containsKey("-XX:MaxDirectMemorySize="), is(false));
 	}
 
 	@Test
@@ -380,5 +388,13 @@ public abstract class ProcessMemoryUtilsTestBase<T extends ProcessMemorySpec> ex
 		public MemorySize getTotalProcessMemorySize() {
 			throw new UnsupportedOperationException();
 		}
+
+		public static JvmArgTestingProcessMemorySpec generate() {
+			return new JvmArgTestingProcessMemorySpec(
+				MemorySize.ofMebiBytes(1),
+				MemorySize.ofMebiBytes(2),
+				MemorySize.ofMebiBytes(3)
+			);
+		}
 	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 52ac1b1..c98139c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -49,7 +49,6 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
 import org.apache.flink.runtime.util.HadoopUtils;
-import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
@@ -1446,7 +1445,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 		final  Map<String, String> startCommandValues = new HashMap<>();
 		startCommandValues.put("java", "$JAVA_HOME/bin/java");
 
-		String jvmHeapMem = ProcessMemoryUtils.generateJvmParametersStr(processSpec);
+		String jvmHeapMem = JobManagerProcessUtils.generateJvmParametersStr(processSpec, flinkConfiguration);
 		startCommandValues.put("jvmmem", jvmHeapMem);
 
 		startCommandValues.put("jvmopts", javaOpts);
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 9f845ee..83784fb 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
-import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
+import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
@@ -166,7 +166,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 
 		final JobManagerProcessSpec jobManagerProcessSpec = createDefaultJobManagerProcessSpec(1024);
 		final String java = "$JAVA_HOME/bin/java";
-		final String jvmmem = ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec);
+		final String jvmmem = JobManagerProcessUtils.generateJvmParametersStr(jobManagerProcessSpec, cfg);
 		final String jvmOpts = "-Djvm"; // if set
 		final String jmJvmOpts = "-DjmJvm"; // if set
 		final String krb5 = "-Djava.security.krb5.conf=krb5.conf";