You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/14 22:37:07 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1018] Report GC counts and durations from Gobblin containers …
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f001e58 [GOBBLIN-1018] Report GC counts and durations from Gobblin containers …
f001e58 is described below
commit f001e58fdaa6b9d2ca28317f6f16c11a847c348d
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Jan 14 14:37:00 2020 -0800
[GOBBLIN-1018] Report GC counts and durations from Gobblin containers …
Closes #2864 from sv2000/gcStats
---
.../gobblin/cluster/ContainerHealthMetrics.java | 7 +-
.../cluster/ContainerHealthMetricsService.java | 136 ++++++++++++++++-----
.../cluster/ContainerHealthMetricsServiceTest.java | 16 ++-
3 files changed, 124 insertions(+), 35 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java
index 2bf187c..137e1ae 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java
@@ -30,5 +30,10 @@ public class ContainerHealthMetrics {
public static final String NUM_AVAILABLE_PROCESSORS = CONTAINER_METRICS_PREFIX + "numAvailableProcessors";
public static final String TOTAL_PHYSICAL_MEM_SIZE = CONTAINER_METRICS_PREFIX + "totalPhysicalMemSize";
public static final String FREE_PHYSICAL_MEM_SIZE = CONTAINER_METRICS_PREFIX + "freePhysicalMemSize";
-
+ public static final String MINOR_GC_COUNT = CONTAINER_METRICS_PREFIX + "minorGcCount";
+ public static final String MINOR_GC_DURATION = CONTAINER_METRICS_PREFIX + "minorGcDuration";
+ public static final String MAJOR_GC_COUNT = CONTAINER_METRICS_PREFIX + "majorGcCount";
+ public static final String MAJOR_GC_DURATION = CONTAINER_METRICS_PREFIX + "majorGcDuration";
+ public static final String UNKNOWN_GC_COUNT = CONTAINER_METRICS_PREFIX + "unknownGcCount";
+ public static final String UNKNOWN_GC_DURATION = CONTAINER_METRICS_PREFIX + "unknownGcDuration";
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java
index b043857..d603bae 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java
@@ -17,18 +17,22 @@
package org.apache.gobblin.cluster;
+import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.AtomicDouble;
import com.sun.management.OperatingSystemMXBean;
import com.typesafe.config.Config;
+import lombok.Data;
+
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.util.ConfigUtils;
@@ -43,39 +47,73 @@ import org.apache.gobblin.util.ConfigUtils;
* {@link com.google.common.util.concurrent.ServiceManager} that manages the lifecycle of
* a {@link ContainerHealthMetricsService}.
* </p>
- * TODO: Add Garbage Collection metrics.
*/
public class ContainerHealthMetricsService extends AbstractScheduledService {
//Container metrics service configurations
private static final String CONTAINER_METRICS_SERVICE_REPORTING_INTERVAL_SECONDS = "container.health.metrics.service.reportingIntervalSeconds";
private static final Long DEFAULT_CONTAINER_METRICS_REPORTING_INTERVAL = 30L;
+ private static final Set<String> YOUNG_GC_TYPES = new HashSet<>(3);
+ private static final Set<String> OLD_GC_TYPES = new HashSet<String>(3);
+
+ static {
+ // young generation GC names
+ YOUNG_GC_TYPES.add("PS Scavenge");
+ YOUNG_GC_TYPES.add("ParNew");
+ YOUNG_GC_TYPES.add("G1 Young Generation");
+
+ // old generation GC names
+ OLD_GC_TYPES.add("PS MarkSweep");
+ OLD_GC_TYPES.add("ConcurrentMarkSweep");
+ OLD_GC_TYPES.add("G1 Old Generation");
+ }
private final long metricReportingInterval;
private final OperatingSystemMXBean operatingSystemMXBean;
private final MemoryMXBean memoryMXBean;
+ private final List<GarbageCollectorMXBean> garbageCollectorMXBeans;
+ //Heap stats
AtomicDouble processCpuLoad = new AtomicDouble(0);
AtomicDouble systemCpuLoad = new AtomicDouble(0);
AtomicDouble systemLoadAvg = new AtomicDouble(0);
- AtomicLong committedVmemSize = new AtomicLong(0);
- AtomicLong processCpuTime = new AtomicLong(0);
- AtomicLong freeSwapSpaceSize = new AtomicLong(0);
- AtomicLong numAvailableProcessors = new AtomicLong(0);
- AtomicLong totalPhysicalMemSize = new AtomicLong(0);
- AtomicLong totalSwapSpaceSize = new AtomicLong(0);
- AtomicLong freePhysicalMemSize = new AtomicLong(0);
- AtomicLong processHeapUsedSize = new AtomicLong(0);
+ AtomicDouble committedVmemSize = new AtomicDouble(0);
+ AtomicDouble processCpuTime = new AtomicDouble(0);
+ AtomicDouble freeSwapSpaceSize = new AtomicDouble(0);
+ AtomicDouble numAvailableProcessors = new AtomicDouble(0);
+ AtomicDouble totalPhysicalMemSize = new AtomicDouble(0);
+ AtomicDouble totalSwapSpaceSize = new AtomicDouble(0);
+ AtomicDouble freePhysicalMemSize = new AtomicDouble(0);
+ AtomicDouble processHeapUsedSize = new AtomicDouble(0);
+
+ //GC stats and counters
+ AtomicDouble minorGcCount = new AtomicDouble(0);
+ AtomicDouble majorGcCount = new AtomicDouble(0);
+ AtomicDouble unknownGcCount = new AtomicDouble(0);
+ AtomicDouble minorGcDuration = new AtomicDouble(0);
+ AtomicDouble majorGcDuration = new AtomicDouble(0);
+ AtomicDouble unknownGcDuration = new AtomicDouble(0);
public ContainerHealthMetricsService(Config config) {
this.metricReportingInterval = ConfigUtils.getLong(config, CONTAINER_METRICS_SERVICE_REPORTING_INTERVAL_SECONDS, DEFAULT_CONTAINER_METRICS_REPORTING_INTERVAL);
this.operatingSystemMXBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
this.memoryMXBean = ManagementFactory.getMemoryMXBean();
+ this.garbageCollectorMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
//Build all the gauges and register them with the metrics registry.
List<ContextAwareGauge<Double>> systemMetrics = buildGaugeList();
systemMetrics.forEach(metric -> RootMetricContext.get().register(metric));
}
+ @Data
+ public static class GcStats {
+ long minorCount;
+ double minorDuration;
+ long majorCount;
+ double majorDuration;
+ long unknownCount;
+ double unknownDuration;
+ }
+
/**
* Run one iteration of the scheduled task. If any invocation of this method throws an exception,
* the service will transition to the {@link com.google.common.util.concurrent.Service.State#FAILED} state and this method will no
@@ -94,35 +132,69 @@ public class ContainerHealthMetricsService extends AbstractScheduledService {
this.totalSwapSpaceSize.set(this.operatingSystemMXBean.getTotalSwapSpaceSize());
this.freePhysicalMemSize.set(this.operatingSystemMXBean.getFreePhysicalMemorySize());
this.processHeapUsedSize.set(this.memoryMXBean.getHeapMemoryUsage().getUsed());
+
+ GcStats gcStats = collectGcStats();
+ //Since GC Beans report accumulated counts/durations, we need to subtract the previous values to obtain the counts/durations
+ // since the last measurement time.
+ this.minorGcCount.set(gcStats.getMinorCount() - this.minorGcCount.get());
+ this.minorGcDuration.set(gcStats.getMinorDuration() - this.minorGcDuration.get());
+ this.majorGcCount.set(gcStats.getMajorCount() - this.majorGcCount.get());
+ this.majorGcDuration.set(gcStats.getMajorDuration() - this.majorGcDuration.get());
+ this.unknownGcCount.set(gcStats.getUnknownCount() - this.unknownGcCount.get());
+ this.unknownGcDuration.set(gcStats.getUnknownDuration() - this.unknownGcDuration.get());
}
protected List<ContextAwareGauge<Double>> buildGaugeList() {
List<ContextAwareGauge<Double>> gaugeList = new ArrayList<>();
- gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_CPU_LOAD,
- () -> this.processCpuLoad.get()));
- gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.SYSTEM_CPU_LOAD,
- () -> this.systemCpuLoad.get()));
- gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.SYSTEM_LOAD_AVG,
- () -> this.systemLoadAvg.get()));
- gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.COMMITTED_VMEM_SIZE,
- () -> Long.valueOf(this.committedVmemSize.get()).doubleValue()));
- gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_CPU_TIME,
- () -> Long.valueOf(this.processCpuTime.get()).doubleValue()));
- gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.FREE_SWAP_SPACE_SIZE,
- () -> Long.valueOf(this.freeSwapSpaceSize.get()).doubleValue()));
- gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.NUM_AVAILABLE_PROCESSORS,
- () -> Long.valueOf(this.numAvailableProcessors.get()).doubleValue()));
- gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.TOTAL_PHYSICAL_MEM_SIZE,
- () -> Long.valueOf(this.totalPhysicalMemSize.get()).doubleValue()));
- gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.TOTAL_SWAP_SPACE_SIZE,
- () -> Long.valueOf(this.totalSwapSpaceSize.get()).doubleValue()));
- gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.FREE_PHYSICAL_MEM_SIZE,
- () -> Long.valueOf(this.freePhysicalMemSize.get()).doubleValue()));
- gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_HEAP_USED_SIZE,
- () -> Long.valueOf(this.processHeapUsedSize.get()).doubleValue()));
+
+ gaugeList.add(createGauge(ContainerHealthMetrics.PROCESS_CPU_LOAD, this.processCpuLoad));
+ gaugeList.add(createGauge(ContainerHealthMetrics.SYSTEM_CPU_LOAD, this.systemCpuLoad));
+ gaugeList.add(createGauge(ContainerHealthMetrics.SYSTEM_LOAD_AVG, this.systemLoadAvg));
+ gaugeList.add(createGauge(ContainerHealthMetrics.COMMITTED_VMEM_SIZE, this.committedVmemSize));
+ gaugeList.add(createGauge(ContainerHealthMetrics.PROCESS_CPU_TIME, this.processCpuTime));
+ gaugeList.add(createGauge(ContainerHealthMetrics.FREE_SWAP_SPACE_SIZE, this.freeSwapSpaceSize));
+ gaugeList.add(createGauge(ContainerHealthMetrics.NUM_AVAILABLE_PROCESSORS, this.numAvailableProcessors));
+ gaugeList.add(createGauge(ContainerHealthMetrics.TOTAL_PHYSICAL_MEM_SIZE, this.totalPhysicalMemSize));
+ gaugeList.add(createGauge(ContainerHealthMetrics.TOTAL_SWAP_SPACE_SIZE, this.totalSwapSpaceSize));
+ gaugeList.add(createGauge(ContainerHealthMetrics.FREE_PHYSICAL_MEM_SIZE, this.freePhysicalMemSize));
+ gaugeList.add(createGauge(ContainerHealthMetrics.PROCESS_HEAP_USED_SIZE, this.processHeapUsedSize));
+ gaugeList.add(createGauge(ContainerHealthMetrics.MINOR_GC_COUNT, this.minorGcCount));
+ gaugeList.add(createGauge(ContainerHealthMetrics.MINOR_GC_DURATION, this.minorGcDuration));
+ gaugeList.add(createGauge(ContainerHealthMetrics.MAJOR_GC_COUNT, this.majorGcCount));
+ gaugeList.add(createGauge(ContainerHealthMetrics.MAJOR_GC_DURATION, this.majorGcDuration));
+ gaugeList.add(createGauge(ContainerHealthMetrics.UNKNOWN_GC_COUNT, this.unknownGcCount));
+ gaugeList.add(createGauge(ContainerHealthMetrics.UNKNOWN_GC_DURATION, this.unknownGcDuration));
return gaugeList;
}
+ private ContextAwareGauge<Double> createGauge(String name, AtomicDouble metric) {
+ return RootMetricContext.get().newContextAwareGauge(name, () -> metric.get());
+ }
+
+ private GcStats collectGcStats() {
+ //Collect GC stats by iterating over all GC beans.
+ GcStats gcStats = new GcStats();
+
+ for (GarbageCollectorMXBean garbageCollectorMXBean: this.garbageCollectorMXBeans) {
+ long count = garbageCollectorMXBean.getCollectionCount();
+ double duration = (double) garbageCollectorMXBean.getCollectionTime();
+ if (count >= 0) {
+ if (YOUNG_GC_TYPES.contains(garbageCollectorMXBean.getName())) {
+ gcStats.setMinorCount(gcStats.getMinorCount() + count);
+ gcStats.setMinorDuration(gcStats.getMinorDuration() + duration);
+ }
+ else if (OLD_GC_TYPES.contains(garbageCollectorMXBean.getName())) {
+ gcStats.setMajorCount(gcStats.getMajorCount() + count);
+ gcStats.setMajorDuration(gcStats.getMajorDuration() + duration);
+ } else {
+ gcStats.setUnknownCount(gcStats.getUnknownCount() + count);
+ gcStats.setUnknownDuration(gcStats.getUnknownDuration() + duration);
+ }
+ }
+ }
+ return gcStats;
+ }
+
/**
* Returns the {@link Scheduler} object used to configure this service. This method will only be
* called once.
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java
index ac67c01..66efe74 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java
@@ -30,10 +30,22 @@ public class ContainerHealthMetricsServiceTest {
Config config = ConfigFactory.empty();
ContainerHealthMetricsService service = new ContainerHealthMetricsService(config);
service.runOneIteration();
- long processCpuTime1 = service.processCpuTime.get();
+ Assert.assertTrue( service.minorGcCount.get() >= 0);
+ Assert.assertTrue( service.minorGcDuration.get() >= 0);
+ Assert.assertTrue( service.majorGcCount.get() >= 0);
+ Assert.assertTrue( service.minorGcDuration.get() >= 0);
+ Assert.assertTrue( service.unknownGcCount.get() >= 0);
+ Assert.assertTrue( service.unknownGcDuration.get() >= 0);
+ double processCpuTime1 = service.processCpuTime.get();
Thread.sleep(10);
service.runOneIteration();
- long processCpuTime2 = service.processCpuTime.get();
+ double processCpuTime2 = service.processCpuTime.get();
Assert.assertTrue( processCpuTime1 <= processCpuTime2);
+ Assert.assertTrue( service.minorGcCount.get() >= 0);
+ Assert.assertTrue( service.minorGcDuration.get() >= 0);
+ Assert.assertTrue( service.majorGcCount.get() >= 0);
+ Assert.assertTrue( service.minorGcDuration.get() >= 0);
+ Assert.assertTrue( service.unknownGcCount.get() >= 0);
+ Assert.assertTrue( service.unknownGcDuration.get() >= 0);
}
}