You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/06/12 12:15:59 UTC
[flink] 01/10: [FLINK-18175][conf] Log final memory configuration
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4b176f28b5995b65c6bfc4bd0337b7959b0f4e60
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Jun 12 14:12:10 2020 +0200
[FLINK-18175][conf] Log final memory configuration
---
.../flink/runtime/util/bash/BashJavaUtils.java | 43 ++++++++++++++++++++++
1 file changed, 43 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
index f460906..9d53db0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
@@ -21,14 +21,20 @@ package org.apache.flink.runtime.util.bash;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
+import org.apache.flink.runtime.util.config.memory.jobmanager.JobManagerFlinkMemory;
+import org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemory;
import org.apache.flink.util.FlinkException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -39,6 +45,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
* Utility class for using java utilities in bash scripts.
*/
public class BashJavaUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(BashJavaUtils.class);
@VisibleForTesting
public static final String EXECUTION_PREFIX = "BASH_JAVA_UTILS_EXEC_RESULT:";
@@ -79,6 +86,9 @@ public class BashJavaUtils {
configuration,
TaskManagerOptions.TOTAL_FLINK_MEMORY);
TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromConfig(configurationWithFallback);
+
+ logTaskExecutorConfiguration(taskExecutorProcessSpec);
+
return Arrays.asList(
ProcessMemoryUtils.generateJvmParametersStr(taskExecutorProcessSpec),
TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec));
@@ -92,9 +102,42 @@ public class BashJavaUtils {
JobManagerProcessSpec jobManagerProcessSpec = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
configuration,
JobManagerOptions.JVM_HEAP_MEMORY);
+
+ logMasterConfiguration(jobManagerProcessSpec);
+
return Collections.singletonList(ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec));
}
+ private static void logMasterConfiguration(JobManagerProcessSpec spec) {
+ JobManagerFlinkMemory flinkMemory = spec.getFlinkMemory();
+ LOG.info("Final Master Memory configuration:");
+ LOG.info(" Total Process Memory: {}", spec.getTotalProcessMemorySize().toHumanReadableString());
+ LOG.info(" Total Flink Memory: {}", flinkMemory.getTotalFlinkMemorySize().toHumanReadableString());
+ LOG.info(" JVM Heap: {}", flinkMemory.getJvmHeapMemorySize().toHumanReadableString());
+ LOG.info(" Off-heap: {}", flinkMemory.getJvmDirectMemorySize().toHumanReadableString());
+ LOG.info(" JVM Metaspace: {}", spec.getJvmMetaspaceSize().toHumanReadableString());
+ LOG.info(" JVM Overhead: {}", spec.getJvmOverheadSize().toHumanReadableString());
+ }
+
+ private static void logTaskExecutorConfiguration(TaskExecutorProcessSpec spec) {
+ TaskExecutorFlinkMemory flinkMemory = spec.getFlinkMemory();
+ MemorySize totalOffHeapMemory = flinkMemory.getManaged().add(flinkMemory.getJvmDirectMemorySize());
+ LOG.info("Final TaskExecutor Memory configuration:");
+ LOG.info(" Total Process Memory: {}", spec.getTotalProcessMemorySize().toHumanReadableString());
+ LOG.info(" Total Flink Memory: {}", flinkMemory.getTotalFlinkMemorySize().toHumanReadableString());
+ LOG.info(" Total JVM Heap Memory: {}", flinkMemory.getJvmHeapMemorySize().toHumanReadableString());
+ LOG.info(" Framework: {}", flinkMemory.getFrameworkHeap().toHumanReadableString());
+ LOG.info(" Task: {}", flinkMemory.getTaskHeap().toHumanReadableString());
+ LOG.info(" Total Off-heap Memory: {}", totalOffHeapMemory.toHumanReadableString());
+ LOG.info(" Managed: {}", flinkMemory.getManaged().toHumanReadableString());
+ LOG.info(" Total JVM Direct Memory: {}", flinkMemory.getJvmDirectMemorySize().toHumanReadableString());
+ LOG.info(" Framework: {}", flinkMemory.getFrameworkOffHeap().toHumanReadableString());
+ LOG.info(" Task: {}", flinkMemory.getTaskOffHeap().toHumanReadableString());
+ LOG.info(" Network: {}", flinkMemory.getNetwork().toHumanReadableString());
+ LOG.info(" JVM Metaspace: {}", spec.getJvmMetaspaceSize().toHumanReadableString());
+ LOG.info(" JVM Overhead: {}", spec.getJvmOverheadSize().toHumanReadableString());
+ }
+
/**
* Commands that BashJavaUtils supports.
*/