You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/06/12 12:15:58 UTC

[flink] branch release-1.11 updated (c5e46ca -> 38b822f)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from c5e46ca  [FLINK-18141][doc][parquet] Add documentation for Parquet format
     new 4b176f2  [FLINK-18175][conf] Log final memory configuration
     new 09e3daa  [FLINK-17977][runtime] Log FS safety-net lifecycle on DEBUG
     new 32a195e  [FLINK-17977][runtime] Log initiation of savepoint operations
     new 556ef94  [FLINK-17977][runtime] Log leader grant/revocation to shutdown JobManager on DEBUG
     new a2b8d4c  [FLINK-17977][akka] Log target address retrieval on DEBUG
     new 016479a  [FLINK-17977][runtime] Log registration attempts on DEBUG
     new daa5cf7  [FLINK-17977][runtime] Log incompatible security context factories on DEBUG
     new d8eb962  [FLINK-17977][runtime] Log message timeout on DEBUG
     new 9c7a3e3  [FLINK-17977][runtime] Log outdated TaskExecutor registration on DEBUG
     new 38b822f  [FLINK-17977][core] Silence type extractor warnings for built-in Row

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/api/java/typeutils/TypeExtractor.java    |  6 +--
 .../runtime/jobmaster/JobManagerRunnerImpl.java    |  4 +-
 .../apache/flink/runtime/net/ConnectionUtils.java  |  2 +-
 .../runtime/registration/RetryingRegistration.java |  4 +-
 .../runtime/resourcemanager/ResourceManager.java   |  2 +-
 .../flink/runtime/scheduler/SchedulerBase.java     |  4 ++
 .../flink/runtime/security/SecurityUtils.java      |  2 +-
 .../taskexecutor/TaskManagerConfiguration.java     |  2 +-
 .../org/apache/flink/runtime/taskmanager/Task.java |  4 +-
 .../flink/runtime/util/bash/BashJavaUtils.java     | 43 ++++++++++++++++++++++
 10 files changed, 60 insertions(+), 13 deletions(-)


[flink] 08/10: [FLINK-17977][runtime] Log message timeout on DEBUG

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d8eb96257fac39b5fa687e101e6ce1a2fb1893c0
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jun 11 19:12:17 2020 +0200

    [FLINK-17977][runtime] Log message timeout on DEBUG
---
 .../org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 2a582b3..00d2617 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -192,7 +192,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 					"'.Use formats like '50 s' or '1 min' to specify the timeout.");
 		}
 
-		LOG.info("Messages have a max timeout of " + timeout);
+		LOG.debug("Messages have a max timeout of " + timeout);
 
 		Time finiteRegistrationDuration;
 		try {


[flink] 09/10: [FLINK-17977][runtime] Log outdated TaskExecutor registration on DEBUG

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9c7a3e3148ad433c74b0afbae57e467e4ef19940
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jun 11 19:13:51 2020 +0200

    [FLINK-17977][runtime] Log outdated TaskExecutor registration on DEBUG
---
 .../java/org/apache/flink/runtime/resourcemanager/ResourceManager.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index b4c0133..8c83a38 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -392,7 +392,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 						return registerTaskExecutorInternal(taskExecutorGateway, taskExecutorRegistration);
 					}
 				} else {
-					log.info("Ignoring outdated TaskExecutorGateway connection.");
+					log.debug("Ignoring outdated TaskExecutorGateway connection for {}.", resourceId);
 					return new RegistrationResponse.Decline("Decline outdated task executor registration.");
 				}
 			},


[flink] 06/10: [FLINK-17977][runtime] Log registration attempts on DEBUG

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 016479a2ba16d52c22ab23ab0c8e0e416a3667ed
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jun 11 19:07:40 2020 +0200

    [FLINK-17977][runtime] Log registration attempts on DEBUG
---
 .../org/apache/flink/runtime/registration/RetryingRegistration.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index 828cc26..a681501 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -171,7 +171,7 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc
 								strippedFailure);
 						} else {
 							log.info(
-								"Could not resolve {} address {}, retrying in {} ms: {}.",
+								"Could not resolve {} address {}, retrying in {} ms: {}",
 								targetName,
 								targetAddress,
 								retryingRegistrationConfiguration.getErrorDelayMillis(),
@@ -201,7 +201,7 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc
 		}
 
 		try {
-			log.info("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis);
+			log.debug("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis);
 			CompletableFuture<RegistrationResponse> registrationFuture = invokeRegistration(gateway, fencingToken, timeoutMillis);
 
 			// if the registration was successful, let the TaskExecutor know


[flink] 07/10: [FLINK-17977][runtime] Log incompatible security context factories on DEBUG

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit daa5cf7ee8352cbd3a258dc9c66780cd5919c899
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jun 11 19:11:21 2020 +0200

    [FLINK-17977][runtime] Log incompatible security context factories on DEBUG
    
    Being incompatible is perfectly normal, as it is for example the case if the security context isn't configured in the first place.
---
 .../src/main/java/org/apache/flink/runtime/security/SecurityUtils.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index e187d12..f9c326d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -98,7 +98,7 @@ public class SecurityUtils {
 						LOG.error("Error occur when instantiate security context with: " + contextFactoryClass , le);
 					}
 				} else {
-					LOG.warn("Unable to install incompatible security context factory {}", contextFactoryClass);
+					LOG.debug("Unable to install security context factory {}", contextFactoryClass);
 				}
 			} catch (NoMatchSecurityFactoryException ne) {
 				LOG.warn("Unable to instantiate security context factory {}", contextFactoryClass);


[flink] 04/10: [FLINK-17977][runtime] Log leader grant/revocation to shutdown JobManager on DEBUG

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 556ef942e4d7cb9ee3c221cee51adc6bbd3c1996
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jun 11 19:01:47 2020 +0200

    [FLINK-17977][runtime] Log leader grant/revocation to shutdown JobManager on DEBUG
---
 .../java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
index a6108d8..4d750d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
@@ -272,7 +272,7 @@ public class JobManagerRunnerImpl implements LeaderContender, OnCompletionAction
 	public void grantLeadership(final UUID leaderSessionID) {
 		synchronized (lock) {
 			if (shutdown) {
-				log.info("JobManagerRunner already shutdown.");
+				log.debug("JobManagerRunner cannot be granted leadership because it is already shut down.");
 				return;
 			}
 
@@ -365,7 +365,7 @@ public class JobManagerRunnerImpl implements LeaderContender, OnCompletionAction
 	public void revokeLeadership() {
 		synchronized (lock) {
 			if (shutdown) {
-				log.info("JobManagerRunner already shutdown.");
+				log.debug("Ignoring revoking leadership because JobManagerRunner is already shut down.");
 				return;
 			}
 


[flink] 10/10: [FLINK-17977][core] Silence type extractor warnings for built-in Row

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 38b822f116424166bc5e5729ccbc0faf139e4bff
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jun 11 19:18:59 2020 +0200

    [FLINK-17977][core] Silence type extractor warnings for built-in Row
---
 .../java/org/apache/flink/api/java/typeutils/TypeExtractor.java     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index acb1ec9..adcfdbb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -1816,10 +1816,10 @@ public class TypeExtractor {
 			if(hasGetter && hasSetter) {
 				return true;
 			} else {
-				if(!hasGetter) {
+				if(!hasGetter && clazz != Row.class) {
 					LOG.info(clazz+" does not contain a getter for field "+f.getName() );
 				}
-				if(!hasSetter) {
+				if(!hasSetter && clazz != Row.class) {
 					LOG.info(clazz+" does not contain a setter for field "+f.getName() );
 				}
 				return false;
@@ -1858,7 +1858,7 @@ public class TypeExtractor {
 		List<PojoField> pojoFields = new ArrayList<PojoField>();
 		for (Field field : fields) {
 			Type fieldType = field.getGenericType();
-			if(!isValidPojoField(field, clazz, typeHierarchy)) {
+			if(!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) {
 				LOG.info("Class " + clazz + " cannot be used as a POJO type because not all fields are valid POJO fields, " +
 					"and must be processed as GenericType. Please read the Flink documentation " +
 					"on \"Data Types & Serialization\" for details of the effect on performance.");


[flink] 02/10: [FLINK-17977][runtime] Log FS safety-net lifecycle on DEBUG

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 09e3daabe5599e314be87ae0eb6a337f37113d64
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jun 11 18:59:51 2020 +0200

    [FLINK-17977][runtime] Log FS safety-net lifecycle on DEBUG
---
 .../src/main/java/org/apache/flink/runtime/taskmanager/Task.java      | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index bd4671f..f0bff66 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -598,7 +598,7 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
 			// ----------------------------
 
 			// activate safety net for task thread
-			LOG.info("Creating FileSystem stream leak safety net for task {}", this);
+			LOG.debug("Creating FileSystem stream leak safety net for task {}", this);
 			FileSystemSafetyNet.initializeSafetyNetForThread();
 
 			// first of all, get a user-code classloader
@@ -836,7 +836,7 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
 				fileCache.releaseJob(jobId, executionId);
 
 				// close and de-activate safety net for task thread
-				LOG.info("Ensuring all FileSystem streams are closed for task {}", this);
+				LOG.debug("Ensuring all FileSystem streams are closed for task {}", this);
 				FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 
 				notifyFinalState();


[flink] 05/10: [FLINK-17977][akka] Log target address retrieval on DEBUG

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a2b8d4cc607845bb10974f2b4ec786c2a6497060
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jun 11 19:02:31 2020 +0200

    [FLINK-17977][akka] Log target address retrieval on DEBUG
---
 .../src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
index cf1a815..bea3de9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
@@ -388,7 +388,7 @@ public class ConnectionUtils {
 						} else if (retrievalState == LeaderRetrievalState.NEWLY_RETRIEVED) {
 							targetAddress = AkkaUtils.getInetSocketAddressFromAkkaURL(akkaURL);
 
-							LOG.info("Retrieved new target address {}.", targetAddress);
+							LOG.debug("Retrieved new target address {} for akka URL {}.", targetAddress, akkaURL);
 
 							retrievalState = LeaderRetrievalState.RETRIEVED;
 


[flink] 03/10: [FLINK-17977][runtime] Log initiation of savepoint operations

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 32a195e9d50309db654206c7e43dc82906aa2021
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jun 11 19:00:30 2020 +0200

    [FLINK-17977][runtime] Log initiation of savepoint operations
---
 .../main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java   | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 1a10d6b..95eec11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -747,6 +747,8 @@ public abstract class SchedulerBase implements SchedulerNG {
 					"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
 		}
 
+		log.info("Triggering {}savepoint for job {}.", cancelJob ? "cancel-with-" : "", jobGraph.getJobID());
+
 		if (cancelJob) {
 			checkpointCoordinator.stopCheckpointScheduler();
 		}
@@ -857,6 +859,8 @@ public abstract class SchedulerBase implements SchedulerNG {
 					"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
 		}
 
+		log.info("Triggering stop-with-savepoint for job {}.", jobGraph.getJobID());
+
 		// we stop the checkpoint coordinator so that we are guaranteed
 		// to have only the data of the synchronous savepoint committed.
 		// in case of failure, and if the job restarts, the coordinator


[flink] 01/10: [FLINK-18175][conf] Log final memory configuration

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4b176f28b5995b65c6bfc4bd0337b7959b0f4e60
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Jun 12 14:12:10 2020 +0200

    [FLINK-18175][conf] Log final memory configuration
---
 .../flink/runtime/util/bash/BashJavaUtils.java     | 43 ++++++++++++++++++++++
 1 file changed, 43 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
index f460906..9d53db0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
@@ -21,14 +21,20 @@ package org.apache.flink.runtime.util.bash;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
 import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
+import org.apache.flink.runtime.util.config.memory.jobmanager.JobManagerFlinkMemory;
+import org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemory;
 import org.apache.flink.util.FlinkException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -39,6 +45,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * Utility class for using java utilities in bash scripts.
  */
 public class BashJavaUtils {
+	private static final Logger LOG = LoggerFactory.getLogger(BashJavaUtils.class);
 
 	@VisibleForTesting
 	public static final String EXECUTION_PREFIX = "BASH_JAVA_UTILS_EXEC_RESULT:";
@@ -79,6 +86,9 @@ public class BashJavaUtils {
 			configuration,
 			TaskManagerOptions.TOTAL_FLINK_MEMORY);
 		TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromConfig(configurationWithFallback);
+
+		logTaskExecutorConfiguration(taskExecutorProcessSpec);
+
 		return Arrays.asList(
 			ProcessMemoryUtils.generateJvmParametersStr(taskExecutorProcessSpec),
 			TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec));
@@ -92,9 +102,42 @@ public class BashJavaUtils {
 		JobManagerProcessSpec jobManagerProcessSpec = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
 			configuration,
 			JobManagerOptions.JVM_HEAP_MEMORY);
+
+		logMasterConfiguration(jobManagerProcessSpec);
+
 		return Collections.singletonList(ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec));
 	}
 
+	private static void logMasterConfiguration(JobManagerProcessSpec spec) {
+		JobManagerFlinkMemory flinkMemory = spec.getFlinkMemory();
+		LOG.info("Final Master Memory configuration:");
+		LOG.info("  Total Process Memory: {}", spec.getTotalProcessMemorySize().toHumanReadableString());
+		LOG.info("    Total Flink Memory: {}", flinkMemory.getTotalFlinkMemorySize().toHumanReadableString());
+		LOG.info("      JVM Heap:         {}", flinkMemory.getJvmHeapMemorySize().toHumanReadableString());
+		LOG.info("      Off-heap:         {}", flinkMemory.getJvmDirectMemorySize().toHumanReadableString());
+		LOG.info("    JVM Metaspace:      {}", spec.getJvmMetaspaceSize().toHumanReadableString());
+		LOG.info("    JVM Overhead:       {}", spec.getJvmOverheadSize().toHumanReadableString());
+	}
+
+	private static void logTaskExecutorConfiguration(TaskExecutorProcessSpec spec) {
+		TaskExecutorFlinkMemory flinkMemory = spec.getFlinkMemory();
+		MemorySize totalOffHeapMemory = flinkMemory.getManaged().add(flinkMemory.getJvmDirectMemorySize());
+		LOG.info("Final TaskExecutor Memory configuration:");
+		LOG.info("  Total Process Memory:          {}", spec.getTotalProcessMemorySize().toHumanReadableString());
+		LOG.info("    Total Flink Memory:          {}", flinkMemory.getTotalFlinkMemorySize().toHumanReadableString());
+		LOG.info("      Total JVM Heap Memory:     {}", flinkMemory.getJvmHeapMemorySize().toHumanReadableString());
+		LOG.info("        Framework:               {}", flinkMemory.getFrameworkHeap().toHumanReadableString());
+		LOG.info("        Task:                    {}", flinkMemory.getTaskHeap().toHumanReadableString());
+		LOG.info("      Total Off-heap Memory:     {}", totalOffHeapMemory.toHumanReadableString());
+		LOG.info("        Managed:                 {}", flinkMemory.getManaged().toHumanReadableString());
+		LOG.info("        Total JVM Direct Memory: {}", flinkMemory.getJvmDirectMemorySize().toHumanReadableString());
+		LOG.info("          Framework:             {}", flinkMemory.getFrameworkOffHeap().toHumanReadableString());
+		LOG.info("          Task:                  {}", flinkMemory.getTaskOffHeap().toHumanReadableString());
+		LOG.info("          Network:               {}", flinkMemory.getNetwork().toHumanReadableString());
+		LOG.info("    JVM Metaspace:               {}", spec.getJvmMetaspaceSize().toHumanReadableString());
+		LOG.info("    JVM Overhead:                {}", spec.getJvmOverheadSize().toHumanReadableString());
+	}
+
 	/**
 	 * Commands that BashJavaUtils supports.
 	 */