You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/07/31 12:48:15 UTC
[flink] branch master updated: [FLINK-9972][flip6] Handle debug
memory logging in flip6
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7e31f7a [FLINK-9972][flip6] Handle debug memory logging in flip6
7e31f7a is described below
commit 7e31f7a75899cf84cbbd3a513c0ce3f53dc33ce5
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jul 26 15:42:34 2018 +0200
[FLINK-9972][flip6] Handle debug memory logging in flip6
Debug memory logging was being ignored in flip6 code and was
only supported in legacy code/mode.
This closes #6431.
---
.../flink/runtime/taskexecutor/TaskManagerRunner.java | 3 +++
.../flink/runtime/taskmanager/MemoryLogger.java | 19 +++++++++++++++++++
.../flink/runtime/taskmanager/TaskManager.scala | 14 +-------------
3 files changed, 23 insertions(+), 13 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 91c5704..c1f0cc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.taskmanager.MemoryLogger;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
@@ -157,6 +158,8 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
this.terminationFuture = new CompletableFuture<>();
this.shutdown = false;
+
+ MemoryLogger.startIfConfigured(LOG, configuration, actorSystem);
}
// --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
index 8bb3576..366e5fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
@@ -18,8 +18,12 @@
package org.apache.flink.runtime.taskmanager;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+
import akka.actor.ActorSystem;
import org.slf4j.Logger;
+
import javax.management.MBeanServer;
import java.lang.management.BufferPoolMXBean;
@@ -56,6 +60,21 @@ public class MemoryLogger extends Thread {
private final ActorSystem monitored;
private volatile boolean running = true;
+
+ public static void startIfConfigured(
+ Logger logger,
+ Configuration configuration,
+ ActorSystem taskManagerSystem) {
+ if (!logger.isInfoEnabled() || !configuration.getBoolean(TaskManagerOptions.DEBUG_MEMORY_LOG)) {
+ return;
+ }
+ logger.info("Starting periodic memory usage logger");
+
+ new MemoryLogger(
+ logger,
+ configuration.getLong(TaskManagerOptions.DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS),
+ taskManagerSystem).start();
+ }
/**
* Creates a new memory logger that logs in the given interval and lives as long as the
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 9a057ab..62fe862 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1882,19 +1882,7 @@ object TaskManager {
)
}
- // if desired, start the logging daemon that periodically logs the
- // memory usage information
- if (LOG.isInfoEnabled && configuration.getBoolean(
- TaskManagerOptions.DEBUG_MEMORY_LOG))
- {
- LOG.info("Starting periodic memory usage logger")
-
- val interval = configuration.getLong(
- TaskManagerOptions.DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS)
-
- val logger = new MemoryLogger(LOG.logger, interval, taskManagerSystem)
- logger.start()
- }
+ MemoryLogger.startIfConfigured(LOG.logger, configuration, taskManagerSystem)
// block until everything is done
taskManagerSystem.awaitTermination()