You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/01 22:27:47 UTC

[7/7] flink git commit: [FLINK-7876] Merge TaskExecutorMetricsInitializer and MetricUtils

[FLINK-7876] Merge TaskExecutorMetricsInitializer and MetricUtils

This commit removes the TaskExecutorMetricsInitializer and moves its methods
to MetricUtils.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fb7e0b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fb7e0b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fb7e0b9

Branch: refs/heads/master
Commit: 7fb7e0b9775d1773d20e63732130ae140781a6f2
Parents: ad42ee2
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 1 12:31:52 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 1 15:52:01 2017 +0100

----------------------------------------------------------------------
 .../runtime/jobmaster/JobManagerRunner.java     |   3 +-
 .../flink/runtime/metrics/util/MetricUtils.java | 230 +++++++++--------
 .../utils/TaskExecutorMetricsInitializer.java   | 257 -------------------
 .../flink/runtime/jobmanager/JobManager.scala   |   3 +-
 4 files changed, 128 insertions(+), 365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 14baa6f..f95b5a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
@@ -127,7 +128,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");
 
 			final String hostAddress = rpcService.getAddress().isEmpty() ? "localhost" : rpcService.getAddress();
-			jobManagerMetrics = new JobManagerMetricGroup(metricRegistry, hostAddress);
+			jobManagerMetrics = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, hostAddress);
 			this.jobManagerMetricGroup = jobManagerMetrics;
 
 			// libraries and class loader first

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 08353e3..2a59a7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -22,23 +22,27 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
 
-import org.apache.commons.lang3.text.WordUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.management.BufferPoolMXBean;
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
 import java.lang.management.ClassLoadingMXBean;
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
-import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.ThreadMXBean;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.util.List;
 
 /**
@@ -51,6 +55,21 @@ public class MetricUtils {
 	private MetricUtils() {
 	}
 
+	public static JobManagerMetricGroup instantiateJobManagerMetricGroup(
+			final MetricRegistry metricRegistry,
+			final String hostname) {
+		final JobManagerMetricGroup jobManagerMetricGroup = new JobManagerMetricGroup(
+			metricRegistry,
+			hostname);
+
+		MetricGroup statusGroup = jobManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);
+
+		// initialize the JM metrics
+		instantiateStatusMetrics(statusGroup);
+
+		return jobManagerMetricGroup;
+	}
+
 	public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup(
 			MetricRegistry metricRegistry,
 			TaskManagerLocation taskManagerLocation,
@@ -60,59 +79,55 @@ public class MetricUtils {
 			taskManagerLocation.getHostname(),
 			taskManagerLocation.getResourceID().toString());
 
+		MetricGroup statusGroup = taskManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);
+
 		// Initialize the TM metrics
-		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, network);
+		instantiateStatusMetrics(statusGroup);
+		instantiateNetworkMetrics(statusGroup, network);
 
 		return taskManagerMetricGroup;
 	}
 
-	public static void instantiateNetworkMetrics(
-		MetricGroup metrics,
-		final NetworkEnvironment network) {
-		MetricGroup status = metrics.addGroup(METRIC_GROUP_STATUS_NAME);
+	public static void instantiateStatusMetrics(
+			MetricGroup metricGroup) {
+		MetricGroup jvm = metricGroup.addGroup("JVM");
 
-		MetricGroup networkGroup = status
-			.addGroup("Network");
+		instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
+		instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
+		instantiateMemoryMetrics(jvm.addGroup("Memory"));
+		instantiateThreadMetrics(jvm.addGroup("Threads"));
+		instantiateCPUMetrics(jvm.addGroup("CPU"));
+	}
 
-		networkGroup.gauge("TotalMemorySegments", new Gauge<Integer>() {
+	private static void instantiateNetworkMetrics(
+		MetricGroup metrics,
+		final NetworkEnvironment network) {
+		metrics.<Long, Gauge<Long>>gauge("TotalMemorySegments", new Gauge<Long> () {
 			@Override
-			public Integer getValue() {
-				return network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
+			public Long getValue() {
+				return (long) network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
 			}
 		});
-		networkGroup.gauge("AvailableMemorySegments", new Gauge<Integer>() {
+
+		metrics.<Long, Gauge<Long>>gauge("AvailableMemorySegments", new Gauge<Long> () {
 			@Override
-			public Integer getValue() {
-				return network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
+			public Long getValue() {
+				return (long) network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
 			}
 		});
 	}
 
-	public static void instantiateStatusMetrics(
-		MetricGroup metrics) {
-		MetricGroup status = metrics
-			.addGroup(METRIC_GROUP_STATUS_NAME);
-
-		MetricGroup jvm = status
-			.addGroup("JVM");
-
-		instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
-		instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
-		instantiateMemoryMetrics(jvm.addGroup("Memory"));
-		instantiateThreadMetrics(jvm.addGroup("Threads"));
-		instantiateCPUMetrics(jvm.addGroup("CPU"));
-	}
-
 	private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
 		final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean();
 
-		metrics.gauge("ClassesLoaded", new Gauge<Long>() {
+		metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", new Gauge<Long> () {
 			@Override
 			public Long getValue() {
 				return mxBean.getTotalLoadedClassCount();
 			}
 		});
-		metrics.gauge("ClassesUnloaded", new Gauge<Long>() {
+
+		metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", new Gauge<Long> () {
 			@Override
 			public Long getValue() {
 				return mxBean.getUnloadedClassCount();
@@ -123,15 +138,17 @@ public class MetricUtils {
 	private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) {
 		List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans();
 
-		for (final GarbageCollectorMXBean garbageCollector : garbageCollectors) {
+		for (final GarbageCollectorMXBean garbageCollector: garbageCollectors) {
 			MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName());
-			gcGroup.gauge("Count", new Gauge<Long>() {
+
+			gcGroup.<Long, Gauge<Long>>gauge("Count", new Gauge<Long> () {
 				@Override
 				public Long getValue() {
 					return garbageCollector.getCollectionCount();
 				}
 			});
-			gcGroup.gauge("Time", new Gauge<Long>() {
+
+			gcGroup.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () {
 				@Override
 				public Long getValue() {
 					return garbageCollector.getCollectionTime();
@@ -142,20 +159,22 @@ public class MetricUtils {
 
 	private static void instantiateMemoryMetrics(MetricGroup metrics) {
 		final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+
 		MetricGroup heap = metrics.addGroup("Heap");
-		heap.gauge("Used", new Gauge<Long>() {
+
+		heap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
 			@Override
 			public Long getValue() {
 				return mxBean.getHeapMemoryUsage().getUsed();
 			}
 		});
-		heap.gauge("Committed", new Gauge<Long>() {
+		heap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
 			@Override
 			public Long getValue() {
 				return mxBean.getHeapMemoryUsage().getCommitted();
 			}
 		});
-		heap.gauge("Max", new Gauge<Long>() {
+		heap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
 			@Override
 			public Long getValue() {
 				return mxBean.getHeapMemoryUsage().getMax();
@@ -163,54 +182,61 @@ public class MetricUtils {
 		});
 
 		MetricGroup nonHeap = metrics.addGroup("NonHeap");
-		nonHeap.gauge("Used", new Gauge<Long>() {
+
+		nonHeap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
 			@Override
 			public Long getValue() {
 				return mxBean.getNonHeapMemoryUsage().getUsed();
 			}
 		});
-		nonHeap.gauge("Committed", new Gauge<Long>() {
+		nonHeap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
 			@Override
 			public Long getValue() {
 				return mxBean.getNonHeapMemoryUsage().getCommitted();
 			}
 		});
-		nonHeap.gauge("Max", new Gauge<Long>() {
+		nonHeap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
 			@Override
 			public Long getValue() {
 				return mxBean.getNonHeapMemoryUsage().getMax();
 			}
 		});
 
-		List<BufferPoolMXBean> bufferMxBeans = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
+		final MBeanServer con = ManagementFactory.getPlatformMBeanServer();
 
-		for (final BufferPoolMXBean bufferMxBean : bufferMxBeans) {
-			MetricGroup bufferGroup = metrics.addGroup(WordUtils.capitalize(bufferMxBean.getName()));
-			bufferGroup.gauge("Count", new Gauge<Long>() {
-				@Override
-				public Long getValue() {
-					return bufferMxBean.getCount();
-				}
-			});
-			bufferGroup.gauge("MemoryUsed", new Gauge<Long>() {
-				@Override
-				public Long getValue() {
-					return bufferMxBean.getMemoryUsed();
-				}
-			});
-			bufferGroup.gauge("TotalCapacity", new Gauge<Long>() {
-				@Override
-				public Long getValue() {
-					return bufferMxBean.getTotalCapacity();
-				}
-			});
+		final String directBufferPoolName = "java.nio:type=BufferPool,name=direct";
+
+		try {
+			final ObjectName directObjectName = new ObjectName(directBufferPoolName);
+
+			MetricGroup direct = metrics.addGroup("Direct");
+
+			direct.<Long, Gauge<Long>>gauge("Count", new AttributeGauge<>(con, directObjectName, "Count", -1L));
+			direct.<Long, Gauge<Long>>gauge("MemoryUsed", new AttributeGauge<>(con, directObjectName, "MemoryUsed", -1L));
+			direct.<Long, Gauge<Long>>gauge("TotalCapacity", new AttributeGauge<>(con, directObjectName, "TotalCapacity", -1L));
+		} catch (MalformedObjectNameException e) {
+			LOG.warn("Could not create object name {}.", directBufferPoolName, e);
+		}
+
+		final String mappedBufferPoolName = "java.nio:type=BufferPool,name=mapped";
+
+		try {
+			final ObjectName mappedObjectName = new ObjectName(mappedBufferPoolName);
+
+			MetricGroup mapped = metrics.addGroup("Mapped");
+
+			mapped.<Long, Gauge<Long>>gauge("Count", new AttributeGauge<>(con, mappedObjectName, "Count", -1L));
+			mapped.<Long, Gauge<Long>>gauge("MemoryUsed", new AttributeGauge<>(con, mappedObjectName, "MemoryUsed", -1L));
+			mapped.<Long, Gauge<Long>>gauge("TotalCapacity", new AttributeGauge<>(con, mappedObjectName, "TotalCapacity", -1L));
+		} catch (MalformedObjectNameException e) {
+			LOG.warn("Could not create object name {}.", mappedBufferPoolName, e);
 		}
 	}
 
 	private static void instantiateThreadMetrics(MetricGroup metrics) {
 		final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
 
-		metrics.gauge("Count", new Gauge<Integer>() {
+		metrics.<Integer, Gauge<Integer>>gauge("Count", new Gauge<Integer> () {
 			@Override
 			public Integer getValue() {
 				return mxBean.getThreadCount();
@@ -220,54 +246,48 @@ public class MetricUtils {
 
 	private static void instantiateCPUMetrics(MetricGroup metrics) {
 		try {
-			final OperatingSystemMXBean mxBean = ManagementFactory.getOperatingSystemMXBean();
-
-			final Method fetchCPULoadMethod = Class.forName("com.sun.management.OperatingSystemMXBean")
-				.getMethod("getProcessCpuLoad");
-			// verify that we can invoke the method
-			fetchCPULoadMethod.invoke(mxBean);
+			final com.sun.management.OperatingSystemMXBean mxBean = (com.sun.management.OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
 
-			final Method fetchCPUTimeMethod = Class.forName("com.sun.management.OperatingSystemMXBean")
-				.getMethod("getProcessCpuTime");
-			// verify that we can invoke the method
-			fetchCPUTimeMethod.invoke(mxBean);
-
-			metrics.gauge("Load", new Gauge<Double>() {
+			metrics.<Double, Gauge<Double>>gauge("Load", new Gauge<Double> () {
 				@Override
 				public Double getValue() {
-					try {
-						return (Double) fetchCPULoadMethod.invoke(mxBean);
-					} catch (IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) {
-						return -1.0;
-					}
+					return mxBean.getProcessCpuLoad();
 				}
 			});
-			metrics.gauge("Time", new Gauge<Long>() {
+			metrics.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () {
 				@Override
 				public Long getValue() {
-					try {
-						return (Long) fetchCPUTimeMethod.invoke(mxBean);
-					} catch (IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) {
-						return -1L;
-					}
+					return mxBean.getProcessCpuTime();
 				}
 			});
-		} catch (ClassNotFoundException | InvocationTargetException | SecurityException | NoSuchMethodException | IllegalArgumentException | IllegalAccessException ignored) {
+		} catch (Exception e) {
 			LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
-				" - CPU load metrics will not be available.");
-			// make sure that a metric still exists for the given name
-			metrics.gauge("Load", new Gauge<Double>() {
-				@Override
-				public Double getValue() {
-					return -1.0;
-				}
-			});
-			metrics.gauge("Time", new Gauge<Long>() {
-				@Override
-				public Long getValue() {
-					return -1L;
-				}
-			});
+				" - CPU load metrics will not be available.", e);
+		}
+	}
+
+	private static final class AttributeGauge<T> implements Gauge<T> {
+		private final MBeanServer server;
+		private final ObjectName objectName;
+		private final String attributeName;
+		private final T errorValue;
+
+		private AttributeGauge(MBeanServer server, ObjectName objectName, String attributeName, T errorValue) {
+			this.server = Preconditions.checkNotNull(server);
+			this.objectName = Preconditions.checkNotNull(objectName);
+			this.attributeName = Preconditions.checkNotNull(attributeName);
+			this.errorValue = errorValue;
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public T getValue() {
+			try {
+				return (T) server.getAttribute(objectName, attributeName);
+			} catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) {
+				LOG.warn("Could not read attribute {}.", attributeName, e);
+				return errorValue;
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
deleted file mode 100644
index 1f8d5ed..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskexecutor.utils;
-
-import com.sun.management.OperatingSystemMXBean;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.taskexecutor.TaskExecutor;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.AttributeNotFoundException;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.ReflectionException;
-import java.lang.management.ClassLoadingMXBean;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.ThreadMXBean;
-import java.util.List;
-
-/**
- * Utility class ot initialize {@link TaskExecutor} specific metrics.
- */
-public class TaskExecutorMetricsInitializer {
-	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorMetricsInitializer.class);
-
-	public static void instantiateStatusMetrics(
-		MetricGroup taskManagerMetricGroup,
-		NetworkEnvironment network) {
-		MetricGroup status = taskManagerMetricGroup.addGroup("Status");
-
-		instantiateNetworkMetrics(status.addGroup("Network"), network);
-
-		MetricGroup jvm = status.addGroup("JVM");
-
-		instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
-		instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
-		instantiateMemoryMetrics(jvm.addGroup("Memory"));
-		instantiateThreadMetrics(jvm.addGroup("Threads"));
-		instantiateCPUMetrics(jvm.addGroup("CPU"));
-	}
-
-	private static void instantiateNetworkMetrics(
-		MetricGroup metrics,
-		final NetworkEnvironment network) {
-		metrics.<Long, Gauge<Long>>gauge("TotalMemorySegments", new Gauge<Long> () {
-			@Override
-			public Long getValue() {
-				return (long) network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
-			}
-		});
-
-		metrics.<Long, Gauge<Long>>gauge("AvailableMemorySegments", new Gauge<Long> () {
-			@Override
-			public Long getValue() {
-				return (long) network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
-			}
-		});
-	}
-
-	private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
-		final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean();
-
-		metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", new Gauge<Long> () {
-			@Override
-			public Long getValue() {
-				return mxBean.getTotalLoadedClassCount();
-			}
-		});
-
-		metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", new Gauge<Long> () {
-			@Override
-			public Long getValue() {
-				return mxBean.getUnloadedClassCount();
-			}
-		});
-	}
-
-	private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) {
-		List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans();
-
-		for (final GarbageCollectorMXBean garbageCollector: garbageCollectors) {
-			MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName());
-
-			gcGroup.<Long, Gauge<Long>>gauge("Count", new Gauge<Long> () {
-				@Override
-				public Long getValue() {
-					return garbageCollector.getCollectionCount();
-				}
-			});
-
-			gcGroup.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () {
-				@Override
-				public Long getValue() {
-					return garbageCollector.getCollectionTime();
-				}
-			});
-		}
-	}
-
-	private static void instantiateMemoryMetrics(MetricGroup metrics) {
-		final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
-
-		MetricGroup heap = metrics.addGroup("Heap");
-
-		heap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
-			@Override
-			public Long getValue() {
-				return mxBean.getHeapMemoryUsage().getUsed();
-			}
-		});
-		heap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
-			@Override
-			public Long getValue() {
-				return mxBean.getHeapMemoryUsage().getCommitted();
-			}
-		});
-		heap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
-			@Override
-			public Long getValue() {
-				return mxBean.getHeapMemoryUsage().getMax();
-			}
-		});
-
-		MetricGroup nonHeap = metrics.addGroup("NonHeap");
-
-		nonHeap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
-			@Override
-			public Long getValue() {
-				return mxBean.getNonHeapMemoryUsage().getUsed();
-			}
-		});
-		nonHeap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
-			@Override
-			public Long getValue() {
-				return mxBean.getNonHeapMemoryUsage().getCommitted();
-			}
-		});
-		nonHeap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
-			@Override
-			public Long getValue() {
-				return mxBean.getNonHeapMemoryUsage().getMax();
-			}
-		});
-
-		final MBeanServer con = ManagementFactory.getPlatformMBeanServer();
-
-		final String directBufferPoolName = "java.nio:type=BufferPool,name=direct";
-
-		try {
-			final ObjectName directObjectName = new ObjectName(directBufferPoolName);
-
-			MetricGroup direct = metrics.addGroup("Direct");
-
-			direct.<Long, Gauge<Long>>gauge("Count", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "Count", -1L));
-			direct.<Long, Gauge<Long>>gauge("MemoryUsed", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "MemoryUsed", -1L));
-			direct.<Long, Gauge<Long>>gauge("TotalCapacity", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "TotalCapacity", -1L));
-		} catch (MalformedObjectNameException e) {
-			LOG.warn("Could not create object name {}.", directBufferPoolName, e);
-		}
-
-		final String mappedBufferPoolName = "java.nio:type=BufferPool,name=mapped";
-
-		try {
-			final ObjectName mappedObjectName = new ObjectName(mappedBufferPoolName);
-
-			MetricGroup mapped = metrics.addGroup("Mapped");
-
-			mapped.<Long, Gauge<Long>>gauge("Count", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "Count", -1L));
-			mapped.<Long, Gauge<Long>>gauge("MemoryUsed", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "MemoryUsed", -1L));
-			mapped.<Long, Gauge<Long>>gauge("TotalCapacity", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "TotalCapacity", -1L));
-		} catch (MalformedObjectNameException e) {
-			LOG.warn("Could not create object name {}.", mappedBufferPoolName, e);
-		}
-	}
-
-	private static void instantiateThreadMetrics(MetricGroup metrics) {
-		final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
-
-		metrics.<Integer, Gauge<Integer>>gauge("Count", new Gauge<Integer> () {
-			@Override
-			public Integer getValue() {
-				return mxBean.getThreadCount();
-			}
-		});
-	}
-
-	private static void instantiateCPUMetrics(MetricGroup metrics) {
-		try {
-			final OperatingSystemMXBean mxBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
-
-			metrics.<Double, Gauge<Double>>gauge("Load", new Gauge<Double> () {
-				@Override
-				public Double getValue() {
-					return mxBean.getProcessCpuLoad();
-				}
-			});
-			metrics.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () {
-				@Override
-				public Long getValue() {
-					return mxBean.getProcessCpuTime();
-				}
-			});
-		} catch (Exception e) {
-			LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
-				" - CPU load metrics will not be available.", e);
-		}
-	}
-
-	private static final class AttributeGauge<T> implements Gauge<T> {
-		private final MBeanServer server;
-		private final ObjectName objectName;
-		private final String attributeName;
-		private final T errorValue;
-
-		private AttributeGauge(MBeanServer server, ObjectName objectName, String attributeName, T errorValue) {
-			this.server = Preconditions.checkNotNull(server);
-			this.objectName = Preconditions.checkNotNull(objectName);
-			this.attributeName = Preconditions.checkNotNull(attributeName);
-			this.errorValue = errorValue;
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public T getValue() {
-			try {
-				return (T) server.getAttribute(objectName, attributeName);
-			} catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) {
-				LOG.warn("Could not read attribute {}.", attributeName, e);
-				return errorValue;
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d40a0fd..4fb1196 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1878,7 +1878,6 @@ class JobManager(
     jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numRunningJobs", new Gauge[Long] {
       override def getValue: Long = JobManager.this.currentJobs.size
     })
-    MetricUtils.instantiateStatusMetrics(jobManagerMetricGroup)
   }
 }
 
@@ -2513,7 +2512,7 @@ object JobManager {
       }
     }
 
-    val jobManagerMetricGroup = new JobManagerMetricGroup(
+    val jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
       metricRegistry,
       configuration.getString(JobManagerOptions.ADDRESS))