You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/06/12 12:15:59 UTC

[flink] 01/10: [FLINK-18175][conf] Log final memory configuration

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4b176f28b5995b65c6bfc4bd0337b7959b0f4e60
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Jun 12 14:12:10 2020 +0200

    [FLINK-18175][conf] Log final memory configuration
---
 .../flink/runtime/util/bash/BashJavaUtils.java     | 43 ++++++++++++++++++++++
 1 file changed, 43 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
index f460906..9d53db0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
@@ -21,14 +21,20 @@ package org.apache.flink.runtime.util.bash;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
 import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
+import org.apache.flink.runtime.util.config.memory.jobmanager.JobManagerFlinkMemory;
+import org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemory;
 import org.apache.flink.util.FlinkException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -39,6 +45,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * Utility class for using java utilities in bash scripts.
  */
 public class BashJavaUtils {
+	private static final Logger LOG = LoggerFactory.getLogger(BashJavaUtils.class);
 
 	@VisibleForTesting
 	public static final String EXECUTION_PREFIX = "BASH_JAVA_UTILS_EXEC_RESULT:";
@@ -79,6 +86,9 @@ public class BashJavaUtils {
 			configuration,
 			TaskManagerOptions.TOTAL_FLINK_MEMORY);
 		TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromConfig(configurationWithFallback);
+
+		logTaskExecutorConfiguration(taskExecutorProcessSpec);
+
 		return Arrays.asList(
 			ProcessMemoryUtils.generateJvmParametersStr(taskExecutorProcessSpec),
 			TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec));
@@ -92,9 +102,42 @@ public class BashJavaUtils {
 		JobManagerProcessSpec jobManagerProcessSpec = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
 			configuration,
 			JobManagerOptions.JVM_HEAP_MEMORY);
+
+		logMasterConfiguration(jobManagerProcessSpec);
+
 		return Collections.singletonList(ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec));
 	}
 
+	private static void logMasterConfiguration(JobManagerProcessSpec spec) {
+		JobManagerFlinkMemory flinkMemory = spec.getFlinkMemory();
+		LOG.info("Final Master Memory configuration:");
+		LOG.info("  Total Process Memory: {}", spec.getTotalProcessMemorySize().toHumanReadableString());
+		LOG.info("    Total Flink Memory: {}", flinkMemory.getTotalFlinkMemorySize().toHumanReadableString());
+		LOG.info("      JVM Heap:         {}", flinkMemory.getJvmHeapMemorySize().toHumanReadableString());
+		LOG.info("      Off-heap:         {}", flinkMemory.getJvmDirectMemorySize().toHumanReadableString());
+		LOG.info("    JVM Metaspace:      {}", spec.getJvmMetaspaceSize().toHumanReadableString());
+		LOG.info("    JVM Overhead:       {}", spec.getJvmOverheadSize().toHumanReadableString());
+	}
+
+	private static void logTaskExecutorConfiguration(TaskExecutorProcessSpec spec) {
+		TaskExecutorFlinkMemory flinkMemory = spec.getFlinkMemory();
+		MemorySize totalOffHeapMemory = flinkMemory.getManaged().add(flinkMemory.getJvmDirectMemorySize());
+		LOG.info("Final TaskExecutor Memory configuration:");
+		LOG.info("  Total Process Memory:          {}", spec.getTotalProcessMemorySize().toHumanReadableString());
+		LOG.info("    Total Flink Memory:          {}", flinkMemory.getTotalFlinkMemorySize().toHumanReadableString());
+		LOG.info("      Total JVM Heap Memory:     {}", flinkMemory.getJvmHeapMemorySize().toHumanReadableString());
+		LOG.info("        Framework:               {}", flinkMemory.getFrameworkHeap().toHumanReadableString());
+		LOG.info("        Task:                    {}", flinkMemory.getTaskHeap().toHumanReadableString());
+		LOG.info("      Total Off-heap Memory:     {}", totalOffHeapMemory.toHumanReadableString());
+		LOG.info("        Managed:                 {}", flinkMemory.getManaged().toHumanReadableString());
+		LOG.info("        Total JVM Direct Memory: {}", flinkMemory.getJvmDirectMemorySize().toHumanReadableString());
+		LOG.info("          Framework:             {}", flinkMemory.getFrameworkOffHeap().toHumanReadableString());
+		LOG.info("          Task:                  {}", flinkMemory.getTaskOffHeap().toHumanReadableString());
+		LOG.info("          Network:               {}", flinkMemory.getNetwork().toHumanReadableString());
+		LOG.info("    JVM Metaspace:               {}", spec.getJvmMetaspaceSize().toHumanReadableString());
+		LOG.info("    JVM Overhead:                {}", spec.getJvmOverheadSize().toHumanReadableString());
+	}
+
 	/**
 	 * Commands that BashJavaUtils supports.
 	 */