You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/28 11:42:22 UTC

flink git commit: [FLINK-4544[ Refactor JM/TM metrics

Repository: flink
Updated Branches:
  refs/heads/master 3ce8596b4 -> 5b54009eb


[FLINK-4544[ Refactor JM/TM metrics


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b54009e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b54009e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b54009e

Branch: refs/heads/master
Commit: 5b54009ebdf1602bdc9860b46ee34e65ef74246a
Parents: 3ce8596
Author: zentol <ch...@apache.org>
Authored: Fri Oct 28 12:09:56 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Oct 28 13:41:39 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/metrics/util/MetricUtils.java | 245 +++++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   | 128 +---------
 .../flink/runtime/taskmanager/TaskManager.scala | 186 ++------------
 3 files changed, 266 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b54009e/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
new file mode 100644
index 0000000..64d06ce
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http//www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.BufferPoolMXBean;
+import java.lang.management.ClassLoadingMXBean;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.ThreadMXBean;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+
+public class MetricUtils {
+	private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class);
+	private static final String METRIC_GROUP_STATUS_NAME = "Status";
+
+	private MetricUtils() {
+	}
+
+	public static void instantiateNetworkMetrics(
+		MetricGroup metrics,
+		final NetworkEnvironment network) {
+		MetricGroup status = metrics.addGroup(METRIC_GROUP_STATUS_NAME);
+
+		status.gauge("TotalMemorySegments", new Gauge<Integer>() {
+			@Override
+			public Integer getValue() {
+				return network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
+			}
+		});
+		status.gauge("AvailableMemorySegments", new Gauge<Integer>() {
+			@Override
+			public Integer getValue() {
+				return network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
+			}
+		});
+	}
+
+	public static void instantiateStatusMetrics(
+		MetricGroup metrics) {
+		MetricGroup status = metrics
+			.addGroup(METRIC_GROUP_STATUS_NAME);
+
+		MetricGroup jvm = status
+			.addGroup("JVM");
+
+		instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
+		instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
+		instantiateMemoryMetrics(jvm.addGroup("Memory"));
+		instantiateThreadMetrics(jvm.addGroup("Threads"));
+		instantiateCPUMetrics(jvm.addGroup("CPU"));
+	}
+
+	private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
+		final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean();
+
+		metrics.gauge("ClassesLoaded", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return mxBean.getTotalLoadedClassCount();
+			}
+		});
+		metrics.gauge("ClassesUnloaded", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return mxBean.getUnloadedClassCount();
+			}
+		});
+	}
+
+	private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) {
+		List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans();
+
+		for (final GarbageCollectorMXBean garbageCollector : garbageCollectors) {
+			MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName());
+			gcGroup.gauge("Count", new Gauge<Long>() {
+				@Override
+				public Long getValue() {
+					return garbageCollector.getCollectionCount();
+				}
+			});
+			gcGroup.gauge("Time", new Gauge<Long>() {
+				@Override
+				public Long getValue() {
+					return garbageCollector.getCollectionTime();
+				}
+			});
+		}
+	}
+
+	private static void instantiateMemoryMetrics(MetricGroup metrics) {
+		final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+		MetricGroup heap = metrics.addGroup("Heap");
+		heap.gauge("Used", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return mxBean.getHeapMemoryUsage().getUsed();
+			}
+		});
+		heap.gauge("Committed", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return mxBean.getHeapMemoryUsage().getCommitted();
+			}
+		});
+		heap.gauge("Max", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return mxBean.getHeapMemoryUsage().getMax();
+			}
+		});
+
+		MetricGroup nonHeap = metrics.addGroup("NonHeap");
+		nonHeap.gauge("Used", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return mxBean.getNonHeapMemoryUsage().getUsed();
+			}
+		});
+		nonHeap.gauge("Committed", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return mxBean.getNonHeapMemoryUsage().getCommitted();
+			}
+		});
+		nonHeap.gauge("Max", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return mxBean.getNonHeapMemoryUsage().getMax();
+			}
+		});
+
+		List<BufferPoolMXBean> bufferMxBeans = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
+
+		for (final BufferPoolMXBean bufferMxBean : bufferMxBeans) {
+			MetricGroup bufferGroup = metrics.addGroup(bufferMxBean.getName());
+			bufferGroup.gauge("Count", new Gauge<Long>() {
+				@Override
+				public Long getValue() {
+					return bufferMxBean.getCount();
+				}
+			});
+			bufferGroup.gauge("MemoryUsed", new Gauge<Long>() {
+				@Override
+				public Long getValue() {
+					return bufferMxBean.getMemoryUsed();
+				}
+			});
+			bufferGroup.gauge("TotalCapacity", new Gauge<Long>() {
+				@Override
+				public Long getValue() {
+					return bufferMxBean.getTotalCapacity();
+				}
+			});
+		}
+	}
+
+	private static void instantiateThreadMetrics(MetricGroup metrics) {
+		final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
+
+		metrics.gauge("Count", new Gauge<Integer>() {
+			@Override
+			public Integer getValue() {
+				return mxBean.getThreadCount();
+			}
+		});
+	}
+
+	private static void instantiateCPUMetrics(MetricGroup metrics) {
+		try {
+			final OperatingSystemMXBean mxBean = ManagementFactory.getOperatingSystemMXBean();
+
+			final Method fetchCPULoadMethod = Class.forName("com.sun.management.OperatingSystemMXBean")
+				.getMethod("getProcessCpuLoad");
+			// verify that we can invoke the method
+			fetchCPULoadMethod.invoke(mxBean);
+
+			final Method fetchCPUTimeMethod = Class.forName("com.sun.management.OperatingSystemMXBean")
+				.getMethod("getProcessCpuTime");
+			// verify that we can invoke the method
+			fetchCPUTimeMethod.invoke(mxBean);
+
+			metrics.gauge("Load", new Gauge<Double>() {
+				@Override
+				public Double getValue() {
+					try {
+						return (Double) fetchCPULoadMethod.invoke(mxBean);
+					} catch (IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) {
+						return -1.0;
+					}
+				}
+			});
+			metrics.gauge("Time", new Gauge<Long>() {
+				@Override
+				public Long getValue() {
+					try {
+						return (Long) fetchCPUTimeMethod.invoke(mxBean);
+					} catch (IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) {
+						return -1L;
+					}
+				}
+			});
+		} catch (ClassNotFoundException | InvocationTargetException | SecurityException | NoSuchMethodException | IllegalArgumentException | IllegalAccessException ignored) {
+			LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
+				" - CPU load metrics will not be available.");
+			// make sure that a metric still exists for the given name
+			metrics.gauge("Load", new Gauge<Double>() {
+				@Override
+				public Double getValue() {
+					return -1.0;
+				}
+			});
+			metrics.gauge("Time", new Gauge<Long>() {
+				@Override
+				public Long getValue() {
+					return -1L;
+				}
+			});
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b54009e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8ea053e..516bbbe 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,11 +19,9 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.io.{File, IOException}
-import java.lang.management.ManagementFactory
 import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, UnknownHostException}
 import java.util.UUID
 import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
-import javax.management.ObjectName
 
 import akka.actor.Status.{Failure, Success}
 import akka.actor._
@@ -69,6 +67,7 @@ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage,
 import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered}
 import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
@@ -1842,130 +1841,7 @@ class JobManager(
     jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numRunningJobs", new Gauge[Long] {
       override def getValue: Long = JobManager.this.currentJobs.size
     })
-    instantiateStatusMetrics(jobManagerMetricGroup)
-  }
-
-  private def instantiateStatusMetrics(jobManagerMetricGroup: MetricGroup) : Unit = {
-    val jvm = jobManagerMetricGroup
-      .addGroup("Status")
-      .addGroup("JVM")
-
-    instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
-    instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"))
-    instantiateMemoryMetrics(jvm.addGroup("Memory"))
-    instantiateThreadMetrics(jvm.addGroup("Threads"))
-    instantiateCPUMetrics(jvm.addGroup("CPU"))
-  }
-
-  private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
-    val mxBean = ManagementFactory.getClassLoadingMXBean
-
-    metrics.gauge[Long, Gauge[Long]]("ClassesLoaded", new Gauge[Long] {
-      override def getValue: Long = mxBean.getTotalLoadedClassCount
-    })
-    metrics.gauge[Long, Gauge[Long]]("ClassesUnloaded", new Gauge[Long] {
-      override def getValue: Long = mxBean.getUnloadedClassCount
-    })
-  }
-
-  private def instantiateGarbageCollectorMetrics(metrics: MetricGroup) {
-    val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans
-
-    for (garbageCollector <- garbageCollectors.asScala) {
-      val gcGroup = metrics.addGroup(garbageCollector.getName)
-      gcGroup.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] {
-        override def getValue: Long = garbageCollector.getCollectionCount
-      })
-      gcGroup.gauge[Long, Gauge[Long]]("Time", new Gauge[Long] {
-        override def getValue: Long = garbageCollector.getCollectionTime
-      })
-    }
-  }
-
-  private def instantiateMemoryMetrics(metrics: MetricGroup) {
-    val mxBean = ManagementFactory.getMemoryMXBean
-    val heap = metrics.addGroup("Heap")
-    heap.gauge[Long, Gauge[Long]]("Used", new Gauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
-    })
-    heap.gauge[Long, Gauge[Long]]("Committed", new Gauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
-    })
-    heap.gauge[Long, Gauge[Long]]("Max", new Gauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
-    })
-
-    val nonHeap = metrics.addGroup("NonHeap")
-    nonHeap.gauge[Long, Gauge[Long]]("Used", new Gauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
-    })
-    nonHeap.gauge[Long, Gauge[Long]]("Committed", new Gauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
-    })
-    nonHeap.gauge[Long, Gauge[Long]]("Max", new Gauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
-    })
-
-    val con = ManagementFactory.getPlatformMBeanServer;
-
-    val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
-
-    val direct = metrics.addGroup("Direct")
-    direct.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "Count").asInstanceOf[Long]
-    })
-    direct.gauge[Long, Gauge[Long]]("MemoryUsed", new Gauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long]
-    })
-    direct.gauge[Long, Gauge[Long]]("TotalCapacity", new Gauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long]
-    })
-
-    val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=mapped")
-
-    val mapped = metrics.addGroup("Mapped")
-    mapped.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "Count").asInstanceOf[Long]
-    })
-    mapped.gauge[Long, Gauge[Long]]("MemoryUsed", new Gauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long]
-    })
-    mapped.gauge[Long, Gauge[Long]]("TotalCapacity", new Gauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long]
-    })
-  }
-
-  private def instantiateThreadMetrics(metrics: MetricGroup): Unit = {
-    val mxBean = ManagementFactory.getThreadMXBean
-
-    metrics.gauge[Int, Gauge[Int]]("Count", new Gauge[Int] {
-      override def getValue: Int = mxBean.getThreadCount
-    })
-  }
-
-  private def instantiateCPUMetrics(metrics: MetricGroup): Unit = {
-    try {
-      val mxBean = ManagementFactory.getOperatingSystemMXBean
-        .asInstanceOf[com.sun.management.OperatingSystemMXBean]
-
-      metrics.gauge[Double, Gauge[Double]]("Load", new Gauge[Double] {
-        override def getValue: Double = mxBean.getProcessCpuLoad
-      })
-      metrics.gauge[Long, Gauge[Long]]("Time", new Gauge[Long] {
-        override def getValue: Long = mxBean.getProcessCpuTime
-      })
-    }
-    catch {
-      case t: Throwable =>
-        log.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
-          " - CPU load metrics will not be available.")
-    }
+    MetricUtils.instantiateStatusMetrics(jobManagerMetricGroup)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5b54009e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8b597d0..9727860 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -20,12 +20,10 @@ package org.apache.flink.runtime.taskmanager
 
 import java.io.{File, FileInputStream, IOException}
 import java.lang.management.{ManagementFactory, OperatingSystemMXBean}
-import java.lang.reflect.Method
 import java.net.{InetAddress, InetSocketAddress}
 import java.util
 import java.util.UUID
 import java.util.concurrent.TimeUnit
-import javax.management.ObjectName
 
 import _root_.akka.actor._
 import _root_.akka.pattern.ask
@@ -39,7 +37,6 @@ import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType}
-import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
@@ -69,6 +66,7 @@ import org.apache.flink.runtime.messages.TaskMessages._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint}
 import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
+import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.KvStateRegistry
 import org.apache.flink.runtime.query.netty.{DisabledKvStateRequestStats, KvStateServer}
@@ -998,7 +996,8 @@ class TaskManager(
     taskManagerMetricGroup = 
       new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString)
     
-    TaskManager.instantiateStatusMetrics(taskManagerMetricGroup, network)
+    MetricUtils.instantiateStatusMetrics(taskManagerMetricGroup)
+    MetricUtils.instantiateNetworkMetrics(taskManagerMetricGroup, network)
     
     // watch job manager to detect when it dies
     context.watch(jobManager)
@@ -2502,176 +2501,29 @@ object TaskManager {
         ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage()
     })
 
-    // Pre-processing steps for registering cpuLoad
     val osBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean()
-        
-    val fetchCPULoadMethod: Option[Method] = 
-      try {
-        Class.forName("com.sun.management.OperatingSystemMXBean")
-          .getMethods()
-          .find( _.getName() == "getProcessCpuLoad" )
-      }
-      catch {
-        case t: Throwable =>
-          LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
-            " - CPU load metrics will not be available.")
-          None
-      }
-
-    metricRegistry.register("cpuLoad", new Gauge[Double] {
-      override def getValue: Double = {
-        try{
-          fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
-        }
-        catch {
-          case t: Throwable =>
-            LOG.warn("Error retrieving CPU Load through OperatingSystemMXBean", t)
-            -1.0
-        }
-      }
-    })
-    metricRegistry
-  }
-
-  private def instantiateStatusMetrics(
-      taskManagerMetricGroup: MetricGroup,
-      network: NetworkEnvironment)
-    : Unit = {
-    val status = taskManagerMetricGroup
-      .addGroup("Status")
-
-    instantiateNetworkMetrics(status.addGroup("Network"), network)
 
-    val jvm = status
-      .addGroup("JVM")
-
-    instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
-    instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"))
-    instantiateMemoryMetrics(jvm.addGroup("Memory"))
-    instantiateThreadMetrics(jvm.addGroup("Threads"))
-    instantiateCPUMetrics(jvm.addGroup("CPU"))
-  }
-
-  private def instantiateNetworkMetrics(
-        metrics: MetricGroup,
-        network: NetworkEnvironment)
-    : Unit = {
-    metrics.gauge[Long, FlinkGauge[Long]]("TotalMemorySegments", new FlinkGauge[Long] {
-      override def getValue: Long = network.getNetworkBufferPool.getTotalNumberOfMemorySegments
-    })
-    metrics.gauge[Long, FlinkGauge[Long]]("AvailableMemorySegments", new FlinkGauge[Long] {
-      override def getValue: Long = network.getNetworkBufferPool.getNumberOfAvailableMemorySegments
-    })
-  }
-
-  private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
-    val mxBean = ManagementFactory.getClassLoadingMXBean
-
-    metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getTotalLoadedClassCount
-    })
-    metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getUnloadedClassCount
-    })
-  }
+    try {
+      val fetchCPULoadMethod = Class.forName("com.sun.management.OperatingSystemMXBean")
+        .getMethods()
+        .find( _.getName() == "getProcessCpuLoad" )
 
-  private def instantiateGarbageCollectorMetrics(metrics: MetricGroup) {
-    val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans
+      // verify that we can invoke the method
+      fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
 
-    for (garbageCollector <- garbageCollectors.asScala) {
-      val gcGroup = metrics.addGroup(garbageCollector.getName)
-      gcGroup.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
-        override def getValue: Long = garbageCollector.getCollectionCount
-      })
-      gcGroup.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
-        override def getValue: Long = garbageCollector.getCollectionTime
+      metricRegistry.register("cpuLoad", new Gauge[Double] {
+        override def getValue: Double = fetchCPULoadMethod
+          .map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
       })
     }
-  }
-
-  private def instantiateMemoryMetrics(metrics: MetricGroup) {
-    val mxBean = ManagementFactory.getMemoryMXBean
-    val heap = metrics.addGroup("Heap")
-    heap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
-    })
-    heap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
-    })
-    heap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
-    })
-
-    val nonHeap = metrics.addGroup("NonHeap")
-    nonHeap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
-    })
-    nonHeap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
-    })
-    nonHeap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
-    })
-
-    val con = ManagementFactory.getPlatformMBeanServer;
-
-    val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
-
-    val direct = metrics.addGroup("Direct")
-    direct.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "Count").asInstanceOf[Long]
-    })
-    direct.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long]
-    })
-    direct.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long]
-    })
-
-    val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=mapped")
-
-    val mapped = metrics.addGroup("Mapped")
-    mapped.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "Count").asInstanceOf[Long]
-    })
-    mapped.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long]
-    })
-    mapped.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long]
-    })
-  }
-
-  private def instantiateThreadMetrics(metrics: MetricGroup): Unit = {
-    val mxBean = ManagementFactory.getThreadMXBean
-
-    metrics.gauge[Int, FlinkGauge[Int]]("Count", new FlinkGauge[Int] {
-      override def getValue: Int = mxBean.getThreadCount
-    })
-  }
-
-  private def instantiateCPUMetrics(metrics: MetricGroup): Unit = {
-    try {
-      val mxBean = ManagementFactory.getOperatingSystemMXBean
-        .asInstanceOf[com.sun.management.OperatingSystemMXBean]
-
-      metrics.gauge[Double, FlinkGauge[Double]]("Load", new FlinkGauge[Double] {
-          override def getValue: Double = mxBean.getProcessCpuLoad
-        })
-      metrics.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
-          override def getValue: Long = mxBean.getProcessCpuTime
-        })
-    }
     catch {
-     case t: Throwable =>
-       LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
-        " - CPU load metrics will not be available.") 
+      case t: Throwable =>
+        LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
+          " - CPU load metrics will not be available.")
+        metricRegistry.register("cpuLoad", new Gauge[Double] {
+          override def getValue: Double = -1.0
+        })
     }
+    metricRegistry
   }
 }