You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 21:46:50 UTC

flink git commit: [runtime] Extend memory and GC monitor logging.

Repository: flink
Updated Branches:
  refs/heads/master 8b904ae21 -> 35ea6505c


[runtime] Extend memory and GC monitor logging.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35ea6505
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35ea6505
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35ea6505

Branch: refs/heads/master
Commit: 35ea6505cd66f29d9e9d382fc4133dc4aac75923
Parents: 8b904ae
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 21 20:38:31 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 20:38:31 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/taskmanager/MemoryLogger.java | 183 +++++++++++++++++++
 .../flink/runtime/taskmanager/TaskManager.scala |  61 +------
 .../test/classloading/ClassLoaderITCase.java    |   6 +-
 3 files changed, 188 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35ea6505/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
new file mode 100644
index 0000000..5c821e9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
@@ -0,0 +1,183 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorSystem;
+
+import org.slf4j.Logger;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryType;
+import java.lang.management.MemoryUsage;
+import java.util.List;
+
+/**
+ * A thread the periodically logs statistics about:
+ * <ul>
+ *     <li>Heap and non-heap memory usage</li>
+ *     <li>Memory pools and pool usage</li>
+ *     <li>Garbage collection times and counts</li>
+ * </ul>
+ */
+public class MemoryLogger extends Thread {
+	
+	private final Logger logger;
+
+	private final long interval;
+	
+	private final MemoryMXBean memoryBean;
+
+	private final List<MemoryPoolMXBean> poolBeans;
+	
+	private final List<GarbageCollectorMXBean> gcBeans;
+	
+	private final ActorSystem monitored;
+	
+	private volatile boolean running = true;
+
+	
+	public MemoryLogger(Logger logger, long interval) {
+		this(logger, interval, null);
+	}
+		
+	public MemoryLogger(Logger logger, long interval, ActorSystem monitored) {
+		super("Memory Logger");
+		setDaemon(true);
+		setPriority(Thread.MIN_PRIORITY);
+		
+		this.logger = logger;
+		this.interval = interval;
+		this.monitored = monitored;
+
+		this.memoryBean = ManagementFactory.getMemoryMXBean();
+		this.poolBeans = ManagementFactory.getMemoryPoolMXBeans();
+		this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
+	}
+	
+	public void shutdown() {
+		this.running = false;
+		interrupt();
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public void run() {
+		try {
+			while (running && (monitored == null || !monitored.isTerminated())) {
+				logger.info(getMemoryUsageStatsAsString(memoryBean));
+				logger.info(getMemoryPoolStatsAsString(poolBeans));
+				logger.info(getGarbageCollectorStatsAsString(gcBeans));
+				
+				try {
+					Thread.sleep(interval);
+				}
+				catch (InterruptedException e) {
+					if (running) {
+						throw e;
+					}
+				}
+			}
+		}
+		catch (Throwable t) {
+			logger.error("Memory logger terminated with exception", t);
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the memory footprint of the JVM in a string representation.
+	 *
+	 * @return A string describing how much heap memory and direct memory are allocated and used.
+	 */
+	public static String getMemoryUsageStatsAsString(MemoryMXBean memoryMXBean) {
+		MemoryUsage heap = memoryMXBean.getHeapMemoryUsage();
+		MemoryUsage nonHeap = memoryMXBean.getNonHeapMemoryUsage();
+
+		long heapUsed = heap.getUsed() >> 20;
+		long heapCommitted = heap.getCommitted() >> 20;
+		long heapMax = heap.getMax() >> 20;
+
+		long nonHeapUsed = nonHeap.getUsed() >> 20;
+		long nonHeapCommitted = nonHeap.getCommitted() >> 20;
+		long nonHeapMax = nonHeap.getMax() >> 20;
+
+		return String.format("Memory usage stats: [HEAP: %d/%d/%d MB, " +
+				"NON HEAP: %d/%d/%d MB (used/committed/max)]",
+				heapUsed, heapCommitted, heapMax, nonHeapUsed, nonHeapCommitted, nonHeapMax);
+	}
+
+	/**
+	 * Gets the memory pool statistics from the JVM.
+	 *
+	 * @param poolBeans The collection of memory pool beans.
+	 * @return A string denoting the names and sizes of the memory pools.
+	 */
+	public static String getMemoryPoolStatsAsString(List<MemoryPoolMXBean> poolBeans) {
+		StringBuilder bld = new StringBuilder("Off-heap pool stats: ");
+		int count = 0;
+		
+		for (MemoryPoolMXBean bean : poolBeans) {
+			if (bean.getType() == MemoryType.NON_HEAP) {
+				if (count > 0) {
+					bld.append(", ");
+				}
+				count++;
+
+				MemoryUsage usage = bean.getUsage();
+				long used = usage.getUsed() >> 20;
+				long committed = usage.getCommitted() >> 20;
+				long max = usage.getMax() >> 20;
+				
+				bld.append('[').append(bean.getName()).append(": ");
+				bld.append(used).append('/').append(committed).append('/').append(max);
+				bld.append(" MB (used/committed/max)]");
+			}
+		}
+
+		return bld.toString();
+	}
+	
+	/**
+	 * Gets the garbage collection statistics from the JVM.
+	 *
+	 * @param gcMXBeans The collection of garbage collector beans.
+	 * @return A string denoting the number of times and total elapsed time in garbage collection.
+	 */
+	public static String getGarbageCollectorStatsAsString(List<GarbageCollectorMXBean> gcMXBeans) {
+		StringBuilder bld = new StringBuilder("Garbage collector stats: ");
+		
+		for (GarbageCollectorMXBean bean : gcMXBeans) {
+			bld.append('[').append(bean.getName()).append(", GC TIME (ms): ").append(bean.getCollectionTime());
+			bld.append(", GC COUNT: ").append(bean.getCollectionCount()).append(']');
+			
+			bld.append(", ");
+		}
+		
+		if (!gcMXBeans.isEmpty()) {
+			bld.setLength(bld.length() - 2);
+		}
+		
+		return bld.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35ea6505/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index e6fcca2..6a89624 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -176,7 +176,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
     // log the initial memory utilization
     if (log.isInfoEnabled) {
-      log.info(TaskManager.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean))
+      log.info(MemoryLogger.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean))
     }
 
     // kick off the registration
@@ -1278,25 +1278,7 @@ object TaskManager {
           ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
           ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS)
 
-        val logger = new Thread("Memory Usage Logger") {
-          override def run(): Unit = {
-            try {
-              val memoryMXBean = ManagementFactory.getMemoryMXBean
-              val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans.asScala
-
-              while (!taskManagerSystem.isTerminated) {
-                Thread.sleep(interval)
-                LOG.info(getMemoryUsageStatsAsString(memoryMXBean))
-                LOG.info(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans))
-              }
-            }
-            catch {
-              case t: Throwable => LOG.error("Memory usage logging thread died", t)
-            }
-          }
-        }
-        logger.setDaemon(true)
-        logger.setPriority(Thread.MIN_PRIORITY)
+        val logger = new MemoryLogger(LOG.logger, interval, taskManagerSystem)
         logger.start()
       }
 
@@ -1718,45 +1700,6 @@ object TaskManager {
   }
 
   /**
-   * Gets the memory footprint of the JVM in a string representation.
-   *
-   * @param memoryMXBean The memory management bean used to access the memory statistics.
-   * @return A string describing how much heap memory and direct memory are allocated and used.
-   */
-  private def getMemoryUsageStatsAsString(memoryMXBean: MemoryMXBean): String = {
-    val heap = memoryMXBean.getHeapMemoryUsage
-    val nonHeap = memoryMXBean.getNonHeapMemoryUsage
-
-    val heapUsed = heap.getUsed >> 20
-    val heapCommitted = heap.getCommitted >> 20
-    val heapMax = heap.getMax >> 20
-
-    val nonHeapUsed = nonHeap.getUsed >> 20
-    val nonHeapCommitted = nonHeap.getCommitted >> 20
-    val nonHeapMax = nonHeap.getMax >> 20
-
-    s"Memory usage stats: [HEAP: $heapUsed/$heapCommitted/$heapMax MB, " +
-      s"NON HEAP: $nonHeapUsed/$nonHeapCommitted/$nonHeapMax MB (used/committed/max)]"
-  }
-
-  /**
-   * Gets the garbage collection statistics from the JVM.
-   *
-   * @param gcMXBeans The collection of garbage collector beans.
-   * @return A string denoting the number of times and total elapsed time in garbage collection.
-   */
-  private def getGarbageCollectorStatsAsString(gcMXBeans: Iterable[GarbageCollectorMXBean])
-  : String = {
-    val beans = gcMXBeans map {
-      bean =>
-        s"[${bean.getName}, GC TIME (ms): ${bean.getCollectionTime}, " +
-          s"GC COUNT: ${bean.getCollectionCount}]"
-    } mkString ", "
-
-    "Garbage collector stats: " + beans
-  }
-
-  /**
    * Creates the registry of default metrics, including stats about garbage collection, memory
    * usage, and system CPU load.
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/35ea6505/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 09fd14c..9069573 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -76,9 +76,9 @@ public class ClassLoaderITCase {
 				testCluster.shutdown();
 			}
 		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			Assert.fail(t.getMessage());
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
 		}
 	}
 }