You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/01 22:27:47 UTC
[7/7] flink git commit: [FLINK-7876] Merge
TaskExecutorMetricsInitializer and MetricUtils
[FLINK-7876] Merge TaskExecutorMetricsInitializer and MetricUtils
This commit removes the TaskExecutorMetricsInitializer and moves its methods
to MetricUtils.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fb7e0b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fb7e0b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fb7e0b9
Branch: refs/heads/master
Commit: 7fb7e0b9775d1773d20e63732130ae140781a6f2
Parents: ad42ee2
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 1 12:31:52 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 1 15:52:01 2017 +0100
----------------------------------------------------------------------
.../runtime/jobmaster/JobManagerRunner.java | 3 +-
.../flink/runtime/metrics/util/MetricUtils.java | 230 +++++++++--------
.../utils/TaskExecutorMetricsInitializer.java | 257 -------------------
.../flink/runtime/jobmanager/JobManager.scala | 3 +-
4 files changed, 128 insertions(+), 365 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 14baa6f..f95b5a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
@@ -127,7 +128,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");
final String hostAddress = rpcService.getAddress().isEmpty() ? "localhost" : rpcService.getAddress();
- jobManagerMetrics = new JobManagerMetricGroup(metricRegistry, hostAddress);
+ jobManagerMetrics = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, hostAddress);
this.jobManagerMetricGroup = jobManagerMetrics;
// libraries and class loader first
http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 08353e3..2a59a7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -22,23 +22,27 @@ import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
-import org.apache.commons.lang3.text.WordUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.management.BufferPoolMXBean;
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
import java.lang.management.ClassLoadingMXBean;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
-import java.lang.management.OperatingSystemMXBean;
import java.lang.management.ThreadMXBean;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.util.List;
/**
@@ -51,6 +55,21 @@ public class MetricUtils {
private MetricUtils() {
}
+ public static JobManagerMetricGroup instantiateJobManagerMetricGroup(
+ final MetricRegistry metricRegistry,
+ final String hostname) {
+ final JobManagerMetricGroup jobManagerMetricGroup = new JobManagerMetricGroup(
+ metricRegistry,
+ hostname);
+
+ MetricGroup statusGroup = jobManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);
+
+ // initialize the JM metrics
+ instantiateStatusMetrics(statusGroup);
+
+ return jobManagerMetricGroup;
+ }
+
public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup(
MetricRegistry metricRegistry,
TaskManagerLocation taskManagerLocation,
@@ -60,59 +79,55 @@ public class MetricUtils {
taskManagerLocation.getHostname(),
taskManagerLocation.getResourceID().toString());
+ MetricGroup statusGroup = taskManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);
+
// Initialize the TM metrics
- TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, network);
+ instantiateStatusMetrics(statusGroup);
+ instantiateNetworkMetrics(statusGroup, network);
return taskManagerMetricGroup;
}
- public static void instantiateNetworkMetrics(
- MetricGroup metrics,
- final NetworkEnvironment network) {
- MetricGroup status = metrics.addGroup(METRIC_GROUP_STATUS_NAME);
+ public static void instantiateStatusMetrics(
+ MetricGroup metricGroup) {
+ MetricGroup jvm = metricGroup.addGroup("JVM");
- MetricGroup networkGroup = status
- .addGroup("Network");
+ instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
+ instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
+ instantiateMemoryMetrics(jvm.addGroup("Memory"));
+ instantiateThreadMetrics(jvm.addGroup("Threads"));
+ instantiateCPUMetrics(jvm.addGroup("CPU"));
+ }
- networkGroup.gauge("TotalMemorySegments", new Gauge<Integer>() {
+ private static void instantiateNetworkMetrics(
+ MetricGroup metrics,
+ final NetworkEnvironment network) {
+ metrics.<Long, Gauge<Long>>gauge("TotalMemorySegments", new Gauge<Long> () {
@Override
- public Integer getValue() {
- return network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
+ public Long getValue() {
+ return (long) network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
}
});
- networkGroup.gauge("AvailableMemorySegments", new Gauge<Integer>() {
+
+ metrics.<Long, Gauge<Long>>gauge("AvailableMemorySegments", new Gauge<Long> () {
@Override
- public Integer getValue() {
- return network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
+ public Long getValue() {
+ return (long) network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
}
});
}
- public static void instantiateStatusMetrics(
- MetricGroup metrics) {
- MetricGroup status = metrics
- .addGroup(METRIC_GROUP_STATUS_NAME);
-
- MetricGroup jvm = status
- .addGroup("JVM");
-
- instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
- instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
- instantiateMemoryMetrics(jvm.addGroup("Memory"));
- instantiateThreadMetrics(jvm.addGroup("Threads"));
- instantiateCPUMetrics(jvm.addGroup("CPU"));
- }
-
private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean();
- metrics.gauge("ClassesLoaded", new Gauge<Long>() {
+ metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", new Gauge<Long> () {
@Override
public Long getValue() {
return mxBean.getTotalLoadedClassCount();
}
});
- metrics.gauge("ClassesUnloaded", new Gauge<Long>() {
+
+ metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", new Gauge<Long> () {
@Override
public Long getValue() {
return mxBean.getUnloadedClassCount();
@@ -123,15 +138,17 @@ public class MetricUtils {
private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) {
List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans();
- for (final GarbageCollectorMXBean garbageCollector : garbageCollectors) {
+ for (final GarbageCollectorMXBean garbageCollector: garbageCollectors) {
MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName());
- gcGroup.gauge("Count", new Gauge<Long>() {
+
+ gcGroup.<Long, Gauge<Long>>gauge("Count", new Gauge<Long> () {
@Override
public Long getValue() {
return garbageCollector.getCollectionCount();
}
});
- gcGroup.gauge("Time", new Gauge<Long>() {
+
+ gcGroup.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () {
@Override
public Long getValue() {
return garbageCollector.getCollectionTime();
@@ -142,20 +159,22 @@ public class MetricUtils {
private static void instantiateMemoryMetrics(MetricGroup metrics) {
final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+
MetricGroup heap = metrics.addGroup("Heap");
- heap.gauge("Used", new Gauge<Long>() {
+
+ heap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
@Override
public Long getValue() {
return mxBean.getHeapMemoryUsage().getUsed();
}
});
- heap.gauge("Committed", new Gauge<Long>() {
+ heap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
@Override
public Long getValue() {
return mxBean.getHeapMemoryUsage().getCommitted();
}
});
- heap.gauge("Max", new Gauge<Long>() {
+ heap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
@Override
public Long getValue() {
return mxBean.getHeapMemoryUsage().getMax();
@@ -163,54 +182,61 @@ public class MetricUtils {
});
MetricGroup nonHeap = metrics.addGroup("NonHeap");
- nonHeap.gauge("Used", new Gauge<Long>() {
+
+ nonHeap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
@Override
public Long getValue() {
return mxBean.getNonHeapMemoryUsage().getUsed();
}
});
- nonHeap.gauge("Committed", new Gauge<Long>() {
+ nonHeap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
@Override
public Long getValue() {
return mxBean.getNonHeapMemoryUsage().getCommitted();
}
});
- nonHeap.gauge("Max", new Gauge<Long>() {
+ nonHeap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
@Override
public Long getValue() {
return mxBean.getNonHeapMemoryUsage().getMax();
}
});
- List<BufferPoolMXBean> bufferMxBeans = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
+ final MBeanServer con = ManagementFactory.getPlatformMBeanServer();
- for (final BufferPoolMXBean bufferMxBean : bufferMxBeans) {
- MetricGroup bufferGroup = metrics.addGroup(WordUtils.capitalize(bufferMxBean.getName()));
- bufferGroup.gauge("Count", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return bufferMxBean.getCount();
- }
- });
- bufferGroup.gauge("MemoryUsed", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return bufferMxBean.getMemoryUsed();
- }
- });
- bufferGroup.gauge("TotalCapacity", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return bufferMxBean.getTotalCapacity();
- }
- });
+ final String directBufferPoolName = "java.nio:type=BufferPool,name=direct";
+
+ try {
+ final ObjectName directObjectName = new ObjectName(directBufferPoolName);
+
+ MetricGroup direct = metrics.addGroup("Direct");
+
+ direct.<Long, Gauge<Long>>gauge("Count", new AttributeGauge<>(con, directObjectName, "Count", -1L));
+ direct.<Long, Gauge<Long>>gauge("MemoryUsed", new AttributeGauge<>(con, directObjectName, "MemoryUsed", -1L));
+ direct.<Long, Gauge<Long>>gauge("TotalCapacity", new AttributeGauge<>(con, directObjectName, "TotalCapacity", -1L));
+ } catch (MalformedObjectNameException e) {
+ LOG.warn("Could not create object name {}.", directBufferPoolName, e);
+ }
+
+ final String mappedBufferPoolName = "java.nio:type=BufferPool,name=mapped";
+
+ try {
+ final ObjectName mappedObjectName = new ObjectName(mappedBufferPoolName);
+
+ MetricGroup mapped = metrics.addGroup("Mapped");
+
+ mapped.<Long, Gauge<Long>>gauge("Count", new AttributeGauge<>(con, mappedObjectName, "Count", -1L));
+ mapped.<Long, Gauge<Long>>gauge("MemoryUsed", new AttributeGauge<>(con, mappedObjectName, "MemoryUsed", -1L));
+ mapped.<Long, Gauge<Long>>gauge("TotalCapacity", new AttributeGauge<>(con, mappedObjectName, "TotalCapacity", -1L));
+ } catch (MalformedObjectNameException e) {
+ LOG.warn("Could not create object name {}.", mappedBufferPoolName, e);
}
}
private static void instantiateThreadMetrics(MetricGroup metrics) {
final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
- metrics.gauge("Count", new Gauge<Integer>() {
+ metrics.<Integer, Gauge<Integer>>gauge("Count", new Gauge<Integer> () {
@Override
public Integer getValue() {
return mxBean.getThreadCount();
@@ -220,54 +246,48 @@ public class MetricUtils {
private static void instantiateCPUMetrics(MetricGroup metrics) {
try {
- final OperatingSystemMXBean mxBean = ManagementFactory.getOperatingSystemMXBean();
-
- final Method fetchCPULoadMethod = Class.forName("com.sun.management.OperatingSystemMXBean")
- .getMethod("getProcessCpuLoad");
- // verify that we can invoke the method
- fetchCPULoadMethod.invoke(mxBean);
+ final com.sun.management.OperatingSystemMXBean mxBean = (com.sun.management.OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
- final Method fetchCPUTimeMethod = Class.forName("com.sun.management.OperatingSystemMXBean")
- .getMethod("getProcessCpuTime");
- // verify that we can invoke the method
- fetchCPUTimeMethod.invoke(mxBean);
-
- metrics.gauge("Load", new Gauge<Double>() {
+ metrics.<Double, Gauge<Double>>gauge("Load", new Gauge<Double> () {
@Override
public Double getValue() {
- try {
- return (Double) fetchCPULoadMethod.invoke(mxBean);
- } catch (IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) {
- return -1.0;
- }
+ return mxBean.getProcessCpuLoad();
}
});
- metrics.gauge("Time", new Gauge<Long>() {
+ metrics.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () {
@Override
public Long getValue() {
- try {
- return (Long) fetchCPUTimeMethod.invoke(mxBean);
- } catch (IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) {
- return -1L;
- }
+ return mxBean.getProcessCpuTime();
}
});
- } catch (ClassNotFoundException | InvocationTargetException | SecurityException | NoSuchMethodException | IllegalArgumentException | IllegalAccessException ignored) {
+ } catch (Exception e) {
LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
- " - CPU load metrics will not be available.");
- // make sure that a metric still exists for the given name
- metrics.gauge("Load", new Gauge<Double>() {
- @Override
- public Double getValue() {
- return -1.0;
- }
- });
- metrics.gauge("Time", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return -1L;
- }
- });
+ " - CPU load metrics will not be available.", e);
+ }
+ }
+
+ private static final class AttributeGauge<T> implements Gauge<T> {
+ private final MBeanServer server;
+ private final ObjectName objectName;
+ private final String attributeName;
+ private final T errorValue;
+
+ private AttributeGauge(MBeanServer server, ObjectName objectName, String attributeName, T errorValue) {
+ this.server = Preconditions.checkNotNull(server);
+ this.objectName = Preconditions.checkNotNull(objectName);
+ this.attributeName = Preconditions.checkNotNull(attributeName);
+ this.errorValue = errorValue;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T getValue() {
+ try {
+ return (T) server.getAttribute(objectName, attributeName);
+ } catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) {
+ LOG.warn("Could not read attribute {}.", attributeName, e);
+ return errorValue;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
deleted file mode 100644
index 1f8d5ed..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskexecutor.utils;
-
-import com.sun.management.OperatingSystemMXBean;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.taskexecutor.TaskExecutor;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.AttributeNotFoundException;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.ReflectionException;
-import java.lang.management.ClassLoadingMXBean;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.ThreadMXBean;
-import java.util.List;
-
-/**
- * Utility class ot initialize {@link TaskExecutor} specific metrics.
- */
-public class TaskExecutorMetricsInitializer {
- private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorMetricsInitializer.class);
-
- public static void instantiateStatusMetrics(
- MetricGroup taskManagerMetricGroup,
- NetworkEnvironment network) {
- MetricGroup status = taskManagerMetricGroup.addGroup("Status");
-
- instantiateNetworkMetrics(status.addGroup("Network"), network);
-
- MetricGroup jvm = status.addGroup("JVM");
-
- instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
- instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
- instantiateMemoryMetrics(jvm.addGroup("Memory"));
- instantiateThreadMetrics(jvm.addGroup("Threads"));
- instantiateCPUMetrics(jvm.addGroup("CPU"));
- }
-
- private static void instantiateNetworkMetrics(
- MetricGroup metrics,
- final NetworkEnvironment network) {
- metrics.<Long, Gauge<Long>>gauge("TotalMemorySegments", new Gauge<Long> () {
- @Override
- public Long getValue() {
- return (long) network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
- }
- });
-
- metrics.<Long, Gauge<Long>>gauge("AvailableMemorySegments", new Gauge<Long> () {
- @Override
- public Long getValue() {
- return (long) network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
- }
- });
- }
-
- private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
- final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean();
-
- metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", new Gauge<Long> () {
- @Override
- public Long getValue() {
- return mxBean.getTotalLoadedClassCount();
- }
- });
-
- metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", new Gauge<Long> () {
- @Override
- public Long getValue() {
- return mxBean.getUnloadedClassCount();
- }
- });
- }
-
- private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) {
- List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans();
-
- for (final GarbageCollectorMXBean garbageCollector: garbageCollectors) {
- MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName());
-
- gcGroup.<Long, Gauge<Long>>gauge("Count", new Gauge<Long> () {
- @Override
- public Long getValue() {
- return garbageCollector.getCollectionCount();
- }
- });
-
- gcGroup.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () {
- @Override
- public Long getValue() {
- return garbageCollector.getCollectionTime();
- }
- });
- }
- }
-
- private static void instantiateMemoryMetrics(MetricGroup metrics) {
- final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
-
- MetricGroup heap = metrics.addGroup("Heap");
-
- heap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
- @Override
- public Long getValue() {
- return mxBean.getHeapMemoryUsage().getUsed();
- }
- });
- heap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
- @Override
- public Long getValue() {
- return mxBean.getHeapMemoryUsage().getCommitted();
- }
- });
- heap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
- @Override
- public Long getValue() {
- return mxBean.getHeapMemoryUsage().getMax();
- }
- });
-
- MetricGroup nonHeap = metrics.addGroup("NonHeap");
-
- nonHeap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
- @Override
- public Long getValue() {
- return mxBean.getNonHeapMemoryUsage().getUsed();
- }
- });
- nonHeap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
- @Override
- public Long getValue() {
- return mxBean.getNonHeapMemoryUsage().getCommitted();
- }
- });
- nonHeap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
- @Override
- public Long getValue() {
- return mxBean.getNonHeapMemoryUsage().getMax();
- }
- });
-
- final MBeanServer con = ManagementFactory.getPlatformMBeanServer();
-
- final String directBufferPoolName = "java.nio:type=BufferPool,name=direct";
-
- try {
- final ObjectName directObjectName = new ObjectName(directBufferPoolName);
-
- MetricGroup direct = metrics.addGroup("Direct");
-
- direct.<Long, Gauge<Long>>gauge("Count", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "Count", -1L));
- direct.<Long, Gauge<Long>>gauge("MemoryUsed", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "MemoryUsed", -1L));
- direct.<Long, Gauge<Long>>gauge("TotalCapacity", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "TotalCapacity", -1L));
- } catch (MalformedObjectNameException e) {
- LOG.warn("Could not create object name {}.", directBufferPoolName, e);
- }
-
- final String mappedBufferPoolName = "java.nio:type=BufferPool,name=mapped";
-
- try {
- final ObjectName mappedObjectName = new ObjectName(mappedBufferPoolName);
-
- MetricGroup mapped = metrics.addGroup("Mapped");
-
- mapped.<Long, Gauge<Long>>gauge("Count", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "Count", -1L));
- mapped.<Long, Gauge<Long>>gauge("MemoryUsed", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "MemoryUsed", -1L));
- mapped.<Long, Gauge<Long>>gauge("TotalCapacity", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "TotalCapacity", -1L));
- } catch (MalformedObjectNameException e) {
- LOG.warn("Could not create object name {}.", mappedBufferPoolName, e);
- }
- }
-
- private static void instantiateThreadMetrics(MetricGroup metrics) {
- final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
-
- metrics.<Integer, Gauge<Integer>>gauge("Count", new Gauge<Integer> () {
- @Override
- public Integer getValue() {
- return mxBean.getThreadCount();
- }
- });
- }
-
- private static void instantiateCPUMetrics(MetricGroup metrics) {
- try {
- final OperatingSystemMXBean mxBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
-
- metrics.<Double, Gauge<Double>>gauge("Load", new Gauge<Double> () {
- @Override
- public Double getValue() {
- return mxBean.getProcessCpuLoad();
- }
- });
- metrics.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () {
- @Override
- public Long getValue() {
- return mxBean.getProcessCpuTime();
- }
- });
- } catch (Exception e) {
- LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
- " - CPU load metrics will not be available.", e);
- }
- }
-
- private static final class AttributeGauge<T> implements Gauge<T> {
- private final MBeanServer server;
- private final ObjectName objectName;
- private final String attributeName;
- private final T errorValue;
-
- private AttributeGauge(MBeanServer server, ObjectName objectName, String attributeName, T errorValue) {
- this.server = Preconditions.checkNotNull(server);
- this.objectName = Preconditions.checkNotNull(objectName);
- this.attributeName = Preconditions.checkNotNull(attributeName);
- this.errorValue = errorValue;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public T getValue() {
- try {
- return (T) server.getAttribute(objectName, attributeName);
- } catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) {
- LOG.warn("Could not read attribute {}.", attributeName, e);
- return errorValue;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d40a0fd..4fb1196 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1878,7 +1878,6 @@ class JobManager(
jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numRunningJobs", new Gauge[Long] {
override def getValue: Long = JobManager.this.currentJobs.size
})
- MetricUtils.instantiateStatusMetrics(jobManagerMetricGroup)
}
}
@@ -2513,7 +2512,7 @@ object JobManager {
}
}
- val jobManagerMetricGroup = new JobManagerMetricGroup(
+ val jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
metricRegistry,
configuration.getString(JobManagerOptions.ADDRESS))