You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/28 11:42:22 UTC
flink git commit: [FLINK-4544[ Refactor JM/TM metrics
Repository: flink
Updated Branches:
refs/heads/master 3ce8596b4 -> 5b54009eb
[FLINK-4544[ Refactor JM/TM metrics
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b54009e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b54009e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b54009e
Branch: refs/heads/master
Commit: 5b54009ebdf1602bdc9860b46ee34e65ef74246a
Parents: 3ce8596
Author: zentol <ch...@apache.org>
Authored: Fri Oct 28 12:09:56 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Oct 28 13:41:39 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/metrics/util/MetricUtils.java | 245 +++++++++++++++++++
.../flink/runtime/jobmanager/JobManager.scala | 128 +---------
.../flink/runtime/taskmanager/TaskManager.scala | 186 ++------------
3 files changed, 266 insertions(+), 293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5b54009e/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
new file mode 100644
index 0000000..64d06ce
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http//www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.BufferPoolMXBean;
+import java.lang.management.ClassLoadingMXBean;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.ThreadMXBean;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+
+public class MetricUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class);
+ private static final String METRIC_GROUP_STATUS_NAME = "Status";
+
+ private MetricUtils() {
+ }
+
+ public static void instantiateNetworkMetrics(
+ MetricGroup metrics,
+ final NetworkEnvironment network) {
+ MetricGroup status = metrics.addGroup(METRIC_GROUP_STATUS_NAME);
+
+ status.gauge("TotalMemorySegments", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
+ }
+ });
+ status.gauge("AvailableMemorySegments", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
+ }
+ });
+ }
+
+ public static void instantiateStatusMetrics(
+ MetricGroup metrics) {
+ MetricGroup status = metrics
+ .addGroup(METRIC_GROUP_STATUS_NAME);
+
+ MetricGroup jvm = status
+ .addGroup("JVM");
+
+ instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
+ instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
+ instantiateMemoryMetrics(jvm.addGroup("Memory"));
+ instantiateThreadMetrics(jvm.addGroup("Threads"));
+ instantiateCPUMetrics(jvm.addGroup("CPU"));
+ }
+
+ private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
+ final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean();
+
+ metrics.gauge("ClassesLoaded", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return mxBean.getTotalLoadedClassCount();
+ }
+ });
+ metrics.gauge("ClassesUnloaded", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return mxBean.getUnloadedClassCount();
+ }
+ });
+ }
+
+ private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) {
+ List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans();
+
+ for (final GarbageCollectorMXBean garbageCollector : garbageCollectors) {
+ MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName());
+ gcGroup.gauge("Count", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return garbageCollector.getCollectionCount();
+ }
+ });
+ gcGroup.gauge("Time", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return garbageCollector.getCollectionTime();
+ }
+ });
+ }
+ }
+
+ private static void instantiateMemoryMetrics(MetricGroup metrics) {
+ final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+ MetricGroup heap = metrics.addGroup("Heap");
+ heap.gauge("Used", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return mxBean.getHeapMemoryUsage().getUsed();
+ }
+ });
+ heap.gauge("Committed", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return mxBean.getHeapMemoryUsage().getCommitted();
+ }
+ });
+ heap.gauge("Max", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return mxBean.getHeapMemoryUsage().getMax();
+ }
+ });
+
+ MetricGroup nonHeap = metrics.addGroup("NonHeap");
+ nonHeap.gauge("Used", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return mxBean.getNonHeapMemoryUsage().getUsed();
+ }
+ });
+ nonHeap.gauge("Committed", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return mxBean.getNonHeapMemoryUsage().getCommitted();
+ }
+ });
+ nonHeap.gauge("Max", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return mxBean.getNonHeapMemoryUsage().getMax();
+ }
+ });
+
+ List<BufferPoolMXBean> bufferMxBeans = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
+
+ for (final BufferPoolMXBean bufferMxBean : bufferMxBeans) {
+ MetricGroup bufferGroup = metrics.addGroup(bufferMxBean.getName());
+ bufferGroup.gauge("Count", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return bufferMxBean.getCount();
+ }
+ });
+ bufferGroup.gauge("MemoryUsed", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return bufferMxBean.getMemoryUsed();
+ }
+ });
+ bufferGroup.gauge("TotalCapacity", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return bufferMxBean.getTotalCapacity();
+ }
+ });
+ }
+ }
+
+ private static void instantiateThreadMetrics(MetricGroup metrics) {
+ final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
+
+ metrics.gauge("Count", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return mxBean.getThreadCount();
+ }
+ });
+ }
+
+ private static void instantiateCPUMetrics(MetricGroup metrics) {
+ try {
+ final OperatingSystemMXBean mxBean = ManagementFactory.getOperatingSystemMXBean();
+
+ final Method fetchCPULoadMethod = Class.forName("com.sun.management.OperatingSystemMXBean")
+ .getMethod("getProcessCpuLoad");
+ // verify that we can invoke the method
+ fetchCPULoadMethod.invoke(mxBean);
+
+ final Method fetchCPUTimeMethod = Class.forName("com.sun.management.OperatingSystemMXBean")
+ .getMethod("getProcessCpuTime");
+ // verify that we can invoke the method
+ fetchCPUTimeMethod.invoke(mxBean);
+
+ metrics.gauge("Load", new Gauge<Double>() {
+ @Override
+ public Double getValue() {
+ try {
+ return (Double) fetchCPULoadMethod.invoke(mxBean);
+ } catch (IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) {
+ return -1.0;
+ }
+ }
+ });
+ metrics.gauge("Time", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ try {
+ return (Long) fetchCPUTimeMethod.invoke(mxBean);
+ } catch (IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) {
+ return -1L;
+ }
+ }
+ });
+ } catch (ClassNotFoundException | InvocationTargetException | SecurityException | NoSuchMethodException | IllegalArgumentException | IllegalAccessException ignored) {
+ LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
+ " - CPU load metrics will not be available.");
+ // make sure that a metric still exists for the given name
+ metrics.gauge("Load", new Gauge<Double>() {
+ @Override
+ public Double getValue() {
+ return -1.0;
+ }
+ });
+ metrics.gauge("Time", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return -1L;
+ }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b54009e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8ea053e..516bbbe 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,11 +19,9 @@
package org.apache.flink.runtime.jobmanager
import java.io.{File, IOException}
-import java.lang.management.ManagementFactory
import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, UnknownHostException}
import java.util.UUID
import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
-import javax.management.ObjectName
import akka.actor.Status.{Failure, Success}
import akka.actor._
@@ -69,6 +67,7 @@ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage,
import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.util.MetricUtils
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered}
import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
@@ -1842,130 +1841,7 @@ class JobManager(
jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numRunningJobs", new Gauge[Long] {
override def getValue: Long = JobManager.this.currentJobs.size
})
- instantiateStatusMetrics(jobManagerMetricGroup)
- }
-
- private def instantiateStatusMetrics(jobManagerMetricGroup: MetricGroup) : Unit = {
- val jvm = jobManagerMetricGroup
- .addGroup("Status")
- .addGroup("JVM")
-
- instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
- instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"))
- instantiateMemoryMetrics(jvm.addGroup("Memory"))
- instantiateThreadMetrics(jvm.addGroup("Threads"))
- instantiateCPUMetrics(jvm.addGroup("CPU"))
- }
-
- private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
- val mxBean = ManagementFactory.getClassLoadingMXBean
-
- metrics.gauge[Long, Gauge[Long]]("ClassesLoaded", new Gauge[Long] {
- override def getValue: Long = mxBean.getTotalLoadedClassCount
- })
- metrics.gauge[Long, Gauge[Long]]("ClassesUnloaded", new Gauge[Long] {
- override def getValue: Long = mxBean.getUnloadedClassCount
- })
- }
-
- private def instantiateGarbageCollectorMetrics(metrics: MetricGroup) {
- val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans
-
- for (garbageCollector <- garbageCollectors.asScala) {
- val gcGroup = metrics.addGroup(garbageCollector.getName)
- gcGroup.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] {
- override def getValue: Long = garbageCollector.getCollectionCount
- })
- gcGroup.gauge[Long, Gauge[Long]]("Time", new Gauge[Long] {
- override def getValue: Long = garbageCollector.getCollectionTime
- })
- }
- }
-
- private def instantiateMemoryMetrics(metrics: MetricGroup) {
- val mxBean = ManagementFactory.getMemoryMXBean
- val heap = metrics.addGroup("Heap")
- heap.gauge[Long, Gauge[Long]]("Used", new Gauge[Long] {
- override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
- })
- heap.gauge[Long, Gauge[Long]]("Committed", new Gauge[Long] {
- override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
- })
- heap.gauge[Long, Gauge[Long]]("Max", new Gauge[Long] {
- override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
- })
-
- val nonHeap = metrics.addGroup("NonHeap")
- nonHeap.gauge[Long, Gauge[Long]]("Used", new Gauge[Long] {
- override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
- })
- nonHeap.gauge[Long, Gauge[Long]]("Committed", new Gauge[Long] {
- override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
- })
- nonHeap.gauge[Long, Gauge[Long]]("Max", new Gauge[Long] {
- override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
- })
-
- val con = ManagementFactory.getPlatformMBeanServer;
-
- val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
-
- val direct = metrics.addGroup("Direct")
- direct.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] {
- override def getValue: Long = con
- .getAttribute(directObjectName, "Count").asInstanceOf[Long]
- })
- direct.gauge[Long, Gauge[Long]]("MemoryUsed", new Gauge[Long] {
- override def getValue: Long = con
- .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long]
- })
- direct.gauge[Long, Gauge[Long]]("TotalCapacity", new Gauge[Long] {
- override def getValue: Long = con
- .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long]
- })
-
- val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=mapped")
-
- val mapped = metrics.addGroup("Mapped")
- mapped.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] {
- override def getValue: Long = con
- .getAttribute(mappedObjectName, "Count").asInstanceOf[Long]
- })
- mapped.gauge[Long, Gauge[Long]]("MemoryUsed", new Gauge[Long] {
- override def getValue: Long = con
- .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long]
- })
- mapped.gauge[Long, Gauge[Long]]("TotalCapacity", new Gauge[Long] {
- override def getValue: Long = con
- .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long]
- })
- }
-
- private def instantiateThreadMetrics(metrics: MetricGroup): Unit = {
- val mxBean = ManagementFactory.getThreadMXBean
-
- metrics.gauge[Int, Gauge[Int]]("Count", new Gauge[Int] {
- override def getValue: Int = mxBean.getThreadCount
- })
- }
-
- private def instantiateCPUMetrics(metrics: MetricGroup): Unit = {
- try {
- val mxBean = ManagementFactory.getOperatingSystemMXBean
- .asInstanceOf[com.sun.management.OperatingSystemMXBean]
-
- metrics.gauge[Double, Gauge[Double]]("Load", new Gauge[Double] {
- override def getValue: Double = mxBean.getProcessCpuLoad
- })
- metrics.gauge[Long, Gauge[Long]]("Time", new Gauge[Long] {
- override def getValue: Long = mxBean.getProcessCpuTime
- })
- }
- catch {
- case t: Throwable =>
- log.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
- " - CPU load metrics will not be available.")
- }
+ MetricUtils.instantiateStatusMetrics(jobManagerMetricGroup)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b54009e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8b597d0..9727860 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -20,12 +20,10 @@ package org.apache.flink.runtime.taskmanager
import java.io.{File, FileInputStream, IOException}
import java.lang.management.{ManagementFactory, OperatingSystemMXBean}
-import java.lang.reflect.Method
import java.net.{InetAddress, InetSocketAddress}
import java.util
import java.util.UUID
import java.util.concurrent.TimeUnit
-import javax.management.ObjectName
import _root_.akka.actor._
import _root_.akka.pattern.ask
@@ -39,7 +37,6 @@ import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.flink.configuration._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType}
-import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge}
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.clusterframework.messages.StopCluster
import org.apache.flink.runtime.clusterframework.types.ResourceID
@@ -69,6 +66,7 @@ import org.apache.flink.runtime.messages.TaskMessages._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint}
import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
+import org.apache.flink.runtime.metrics.util.MetricUtils
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.query.KvStateRegistry
import org.apache.flink.runtime.query.netty.{DisabledKvStateRequestStats, KvStateServer}
@@ -998,7 +996,8 @@ class TaskManager(
taskManagerMetricGroup =
new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString)
- TaskManager.instantiateStatusMetrics(taskManagerMetricGroup, network)
+ MetricUtils.instantiateStatusMetrics(taskManagerMetricGroup)
+ MetricUtils.instantiateNetworkMetrics(taskManagerMetricGroup, network)
// watch job manager to detect when it dies
context.watch(jobManager)
@@ -2502,176 +2501,29 @@ object TaskManager {
ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage()
})
- // Pre-processing steps for registering cpuLoad
val osBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean()
-
- val fetchCPULoadMethod: Option[Method] =
- try {
- Class.forName("com.sun.management.OperatingSystemMXBean")
- .getMethods()
- .find( _.getName() == "getProcessCpuLoad" )
- }
- catch {
- case t: Throwable =>
- LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
- " - CPU load metrics will not be available.")
- None
- }
-
- metricRegistry.register("cpuLoad", new Gauge[Double] {
- override def getValue: Double = {
- try{
- fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
- }
- catch {
- case t: Throwable =>
- LOG.warn("Error retrieving CPU Load through OperatingSystemMXBean", t)
- -1.0
- }
- }
- })
- metricRegistry
- }
-
- private def instantiateStatusMetrics(
- taskManagerMetricGroup: MetricGroup,
- network: NetworkEnvironment)
- : Unit = {
- val status = taskManagerMetricGroup
- .addGroup("Status")
-
- instantiateNetworkMetrics(status.addGroup("Network"), network)
- val jvm = status
- .addGroup("JVM")
-
- instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
- instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"))
- instantiateMemoryMetrics(jvm.addGroup("Memory"))
- instantiateThreadMetrics(jvm.addGroup("Threads"))
- instantiateCPUMetrics(jvm.addGroup("CPU"))
- }
-
- private def instantiateNetworkMetrics(
- metrics: MetricGroup,
- network: NetworkEnvironment)
- : Unit = {
- metrics.gauge[Long, FlinkGauge[Long]]("TotalMemorySegments", new FlinkGauge[Long] {
- override def getValue: Long = network.getNetworkBufferPool.getTotalNumberOfMemorySegments
- })
- metrics.gauge[Long, FlinkGauge[Long]]("AvailableMemorySegments", new FlinkGauge[Long] {
- override def getValue: Long = network.getNetworkBufferPool.getNumberOfAvailableMemorySegments
- })
- }
-
- private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
- val mxBean = ManagementFactory.getClassLoadingMXBean
-
- metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getTotalLoadedClassCount
- })
- metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getUnloadedClassCount
- })
- }
+ try {
+ val fetchCPULoadMethod = Class.forName("com.sun.management.OperatingSystemMXBean")
+ .getMethods()
+ .find( _.getName() == "getProcessCpuLoad" )
- private def instantiateGarbageCollectorMetrics(metrics: MetricGroup) {
- val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans
+ // verify that we can invoke the method
+ fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
- for (garbageCollector <- garbageCollectors.asScala) {
- val gcGroup = metrics.addGroup(garbageCollector.getName)
- gcGroup.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
- override def getValue: Long = garbageCollector.getCollectionCount
- })
- gcGroup.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
- override def getValue: Long = garbageCollector.getCollectionTime
+ metricRegistry.register("cpuLoad", new Gauge[Double] {
+ override def getValue: Double = fetchCPULoadMethod
+ .map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
})
}
- }
-
- private def instantiateMemoryMetrics(metrics: MetricGroup) {
- val mxBean = ManagementFactory.getMemoryMXBean
- val heap = metrics.addGroup("Heap")
- heap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
- })
- heap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
- })
- heap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
- })
-
- val nonHeap = metrics.addGroup("NonHeap")
- nonHeap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
- })
- nonHeap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
- })
- nonHeap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
- })
-
- val con = ManagementFactory.getPlatformMBeanServer;
-
- val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
-
- val direct = metrics.addGroup("Direct")
- direct.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
- override def getValue: Long = con
- .getAttribute(directObjectName, "Count").asInstanceOf[Long]
- })
- direct.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
- override def getValue: Long = con
- .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long]
- })
- direct.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
- override def getValue: Long = con
- .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long]
- })
-
- val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=mapped")
-
- val mapped = metrics.addGroup("Mapped")
- mapped.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
- override def getValue: Long = con
- .getAttribute(mappedObjectName, "Count").asInstanceOf[Long]
- })
- mapped.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
- override def getValue: Long = con
- .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long]
- })
- mapped.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
- override def getValue: Long = con
- .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long]
- })
- }
-
- private def instantiateThreadMetrics(metrics: MetricGroup): Unit = {
- val mxBean = ManagementFactory.getThreadMXBean
-
- metrics.gauge[Int, FlinkGauge[Int]]("Count", new FlinkGauge[Int] {
- override def getValue: Int = mxBean.getThreadCount
- })
- }
-
- private def instantiateCPUMetrics(metrics: MetricGroup): Unit = {
- try {
- val mxBean = ManagementFactory.getOperatingSystemMXBean
- .asInstanceOf[com.sun.management.OperatingSystemMXBean]
-
- metrics.gauge[Double, FlinkGauge[Double]]("Load", new FlinkGauge[Double] {
- override def getValue: Double = mxBean.getProcessCpuLoad
- })
- metrics.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
- override def getValue: Long = mxBean.getProcessCpuTime
- })
- }
catch {
- case t: Throwable =>
- LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
- " - CPU load metrics will not be available.")
+ case t: Throwable =>
+ LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
+ " - CPU load metrics will not be available.")
+ metricRegistry.register("cpuLoad", new Gauge[Double] {
+ override def getValue: Double = -1.0
+ })
}
+ metricRegistry
}
}