You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/07/31 12:48:15 UTC

[flink] branch master updated: [FLINK-9972][flip6] Handle debug memory logging in flip6

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e31f7a  [FLINK-9972][flip6] Handle debug memory logging in flip6
7e31f7a is described below

commit 7e31f7a75899cf84cbbd3a513c0ce3f53dc33ce5
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jul 26 15:42:34 2018 +0200

    [FLINK-9972][flip6] Handle debug memory logging in flip6
    
    Debug memory logging was being ignored in flip6 code and was
    only supported in legacy code/mode.
    
    This closes #6431.
---
 .../flink/runtime/taskexecutor/TaskManagerRunner.java |  3 +++
 .../flink/runtime/taskmanager/MemoryLogger.java       | 19 +++++++++++++++++++
 .../flink/runtime/taskmanager/TaskManager.scala       | 14 +-------------
 3 files changed, 23 insertions(+), 13 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 91c5704..c1f0cc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.taskmanager.MemoryLogger;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
@@ -157,6 +158,8 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 
 		this.terminationFuture = new CompletableFuture<>();
 		this.shutdown = false;
+
+		MemoryLogger.startIfConfigured(LOG, configuration, actorSystem);
 	}
 
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
index 8bb3576..366e5fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
@@ -18,8 +18,12 @@
 
 package org.apache.flink.runtime.taskmanager;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+
 import akka.actor.ActorSystem;
 import org.slf4j.Logger;
+
 import javax.management.MBeanServer;
 
 import java.lang.management.BufferPoolMXBean;
@@ -56,6 +60,21 @@ public class MemoryLogger extends Thread {
 	private final ActorSystem monitored;
 	
 	private volatile boolean running = true;
+
+	public static void startIfConfigured(
+			Logger logger,
+			Configuration configuration,
+			ActorSystem taskManagerSystem) {
+		if (!logger.isInfoEnabled() || !configuration.getBoolean(TaskManagerOptions.DEBUG_MEMORY_LOG)) {
+			return;
+		}
+		logger.info("Starting periodic memory usage logger");
+
+		new MemoryLogger(
+			logger,
+			configuration.getLong(TaskManagerOptions.DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS),
+			taskManagerSystem).start();
+	}
 	
 	/**
 	 * Creates a new memory logger that logs in the given interval and lives as long as the
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 9a057ab..62fe862 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1882,19 +1882,7 @@ object TaskManager {
         )
       }
 
-      // if desired, start the logging daemon that periodically logs the
-      // memory usage information
-      if (LOG.isInfoEnabled && configuration.getBoolean(
-        TaskManagerOptions.DEBUG_MEMORY_LOG))
-      {
-        LOG.info("Starting periodic memory usage logger")
-
-        val interval = configuration.getLong(
-          TaskManagerOptions.DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS)
-
-        val logger = new MemoryLogger(LOG.logger, interval, taskManagerSystem)
-        logger.start()
-      }
+      MemoryLogger.startIfConfigured(LOG.logger, configuration, taskManagerSystem)
 
       // block until everything is done
       taskManagerSystem.awaitTermination()