You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 21:46:50 UTC
flink git commit: [runtime] Extend memory and GC monitor logging.
Repository: flink
Updated Branches:
refs/heads/master 8b904ae21 -> 35ea6505c
[runtime] Extend memory and GC monitor logging.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35ea6505
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35ea6505
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35ea6505
Branch: refs/heads/master
Commit: 35ea6505cd66f29d9e9d382fc4133dc4aac75923
Parents: 8b904ae
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 21 20:38:31 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 20:38:31 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/taskmanager/MemoryLogger.java | 183 +++++++++++++++++++
.../flink/runtime/taskmanager/TaskManager.scala | 61 +------
.../test/classloading/ClassLoaderITCase.java | 6 +-
3 files changed, 188 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/35ea6505/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
new file mode 100644
index 0000000..5c821e9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
@@ -0,0 +1,183 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorSystem;
+
+import org.slf4j.Logger;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryType;
+import java.lang.management.MemoryUsage;
+import java.util.List;
+
+/**
+ * A thread the periodically logs statistics about:
+ * <ul>
+ * <li>Heap and non-heap memory usage</li>
+ * <li>Memory pools and pool usage</li>
+ * <li>Garbage collection times and counts</li>
+ * </ul>
+ */
+public class MemoryLogger extends Thread {
+
+ private final Logger logger;
+
+ private final long interval;
+
+ private final MemoryMXBean memoryBean;
+
+ private final List<MemoryPoolMXBean> poolBeans;
+
+ private final List<GarbageCollectorMXBean> gcBeans;
+
+ private final ActorSystem monitored;
+
+ private volatile boolean running = true;
+
+
+ public MemoryLogger(Logger logger, long interval) {
+ this(logger, interval, null);
+ }
+
+ public MemoryLogger(Logger logger, long interval, ActorSystem monitored) {
+ super("Memory Logger");
+ setDaemon(true);
+ setPriority(Thread.MIN_PRIORITY);
+
+ this.logger = logger;
+ this.interval = interval;
+ this.monitored = monitored;
+
+ this.memoryBean = ManagementFactory.getMemoryMXBean();
+ this.poolBeans = ManagementFactory.getMemoryPoolMXBeans();
+ this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
+ }
+
+ public void shutdown() {
+ this.running = false;
+ interrupt();
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void run() {
+ try {
+ while (running && (monitored == null || !monitored.isTerminated())) {
+ logger.info(getMemoryUsageStatsAsString(memoryBean));
+ logger.info(getMemoryPoolStatsAsString(poolBeans));
+ logger.info(getGarbageCollectorStatsAsString(gcBeans));
+
+ try {
+ Thread.sleep(interval);
+ }
+ catch (InterruptedException e) {
+ if (running) {
+ throw e;
+ }
+ }
+ }
+ }
+ catch (Throwable t) {
+ logger.error("Memory logger terminated with exception", t);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the memory footprint of the JVM in a string representation.
+ *
+ * @return A string describing how much heap memory and direct memory are allocated and used.
+ */
+ public static String getMemoryUsageStatsAsString(MemoryMXBean memoryMXBean) {
+ MemoryUsage heap = memoryMXBean.getHeapMemoryUsage();
+ MemoryUsage nonHeap = memoryMXBean.getNonHeapMemoryUsage();
+
+ long heapUsed = heap.getUsed() >> 20;
+ long heapCommitted = heap.getCommitted() >> 20;
+ long heapMax = heap.getMax() >> 20;
+
+ long nonHeapUsed = nonHeap.getUsed() >> 20;
+ long nonHeapCommitted = nonHeap.getCommitted() >> 20;
+ long nonHeapMax = nonHeap.getMax() >> 20;
+
+ return String.format("Memory usage stats: [HEAP: %d/%d/%d MB, " +
+ "NON HEAP: %d/%d/%d MB (used/committed/max)]",
+ heapUsed, heapCommitted, heapMax, nonHeapUsed, nonHeapCommitted, nonHeapMax);
+ }
+
+ /**
+ * Gets the memory pool statistics from the JVM.
+ *
+ * @param poolBeans The collection of memory pool beans.
+ * @return A string denoting the names and sizes of the memory pools.
+ */
+ public static String getMemoryPoolStatsAsString(List<MemoryPoolMXBean> poolBeans) {
+ StringBuilder bld = new StringBuilder("Off-heap pool stats: ");
+ int count = 0;
+
+ for (MemoryPoolMXBean bean : poolBeans) {
+ if (bean.getType() == MemoryType.NON_HEAP) {
+ if (count > 0) {
+ bld.append(", ");
+ }
+ count++;
+
+ MemoryUsage usage = bean.getUsage();
+ long used = usage.getUsed() >> 20;
+ long committed = usage.getCommitted() >> 20;
+ long max = usage.getMax() >> 20;
+
+ bld.append('[').append(bean.getName()).append(": ");
+ bld.append(used).append('/').append(committed).append('/').append(max);
+ bld.append(" MB (used/committed/max)]");
+ }
+ }
+
+ return bld.toString();
+ }
+
+ /**
+ * Gets the garbage collection statistics from the JVM.
+ *
+ * @param gcMXBeans The collection of garbage collector beans.
+ * @return A string denoting the number of times and total elapsed time in garbage collection.
+ */
+ public static String getGarbageCollectorStatsAsString(List<GarbageCollectorMXBean> gcMXBeans) {
+ StringBuilder bld = new StringBuilder("Garbage collector stats: ");
+
+ for (GarbageCollectorMXBean bean : gcMXBeans) {
+ bld.append('[').append(bean.getName()).append(", GC TIME (ms): ").append(bean.getCollectionTime());
+ bld.append(", GC COUNT: ").append(bean.getCollectionCount()).append(']');
+
+ bld.append(", ");
+ }
+
+ if (!gcMXBeans.isEmpty()) {
+ bld.setLength(bld.length() - 2);
+ }
+
+ return bld.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/35ea6505/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index e6fcca2..6a89624 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -176,7 +176,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
// log the initial memory utilization
if (log.isInfoEnabled) {
- log.info(TaskManager.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean))
+ log.info(MemoryLogger.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean))
}
// kick off the registration
@@ -1278,25 +1278,7 @@ object TaskManager {
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS)
- val logger = new Thread("Memory Usage Logger") {
- override def run(): Unit = {
- try {
- val memoryMXBean = ManagementFactory.getMemoryMXBean
- val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans.asScala
-
- while (!taskManagerSystem.isTerminated) {
- Thread.sleep(interval)
- LOG.info(getMemoryUsageStatsAsString(memoryMXBean))
- LOG.info(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans))
- }
- }
- catch {
- case t: Throwable => LOG.error("Memory usage logging thread died", t)
- }
- }
- }
- logger.setDaemon(true)
- logger.setPriority(Thread.MIN_PRIORITY)
+ val logger = new MemoryLogger(LOG.logger, interval, taskManagerSystem)
logger.start()
}
@@ -1718,45 +1700,6 @@ object TaskManager {
}
/**
- * Gets the memory footprint of the JVM in a string representation.
- *
- * @param memoryMXBean The memory management bean used to access the memory statistics.
- * @return A string describing how much heap memory and direct memory are allocated and used.
- */
- private def getMemoryUsageStatsAsString(memoryMXBean: MemoryMXBean): String = {
- val heap = memoryMXBean.getHeapMemoryUsage
- val nonHeap = memoryMXBean.getNonHeapMemoryUsage
-
- val heapUsed = heap.getUsed >> 20
- val heapCommitted = heap.getCommitted >> 20
- val heapMax = heap.getMax >> 20
-
- val nonHeapUsed = nonHeap.getUsed >> 20
- val nonHeapCommitted = nonHeap.getCommitted >> 20
- val nonHeapMax = nonHeap.getMax >> 20
-
- s"Memory usage stats: [HEAP: $heapUsed/$heapCommitted/$heapMax MB, " +
- s"NON HEAP: $nonHeapUsed/$nonHeapCommitted/$nonHeapMax MB (used/committed/max)]"
- }
-
- /**
- * Gets the garbage collection statistics from the JVM.
- *
- * @param gcMXBeans The collection of garbage collector beans.
- * @return A string denoting the number of times and total elapsed time in garbage collection.
- */
- private def getGarbageCollectorStatsAsString(gcMXBeans: Iterable[GarbageCollectorMXBean])
- : String = {
- val beans = gcMXBeans map {
- bean =>
- s"[${bean.getName}, GC TIME (ms): ${bean.getCollectionTime}, " +
- s"GC COUNT: ${bean.getCollectionCount}]"
- } mkString ", "
-
- "Garbage collector stats: " + beans
- }
-
- /**
* Creates the registry of default metrics, including stats about garbage collection, memory
* usage, and system CPU load.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/35ea6505/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 09fd14c..9069573 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -76,9 +76,9 @@ public class ClassLoaderITCase {
testCluster.shutdown();
}
}
- catch (Throwable t) {
- t.printStackTrace();
- Assert.fail(t.getMessage());
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
}
}
}