You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/14 22:37:07 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1018] Report GC counts and durations from Gobblin containers …

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f001e58  [GOBBLIN-1018] Report GC counts and durations from Gobblin containers …
f001e58 is described below

commit f001e58fdaa6b9d2ca28317f6f16c11a847c348d
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Jan 14 14:37:00 2020 -0800

    [GOBBLIN-1018] Report GC counts and durations from Gobblin containers …
    
    Closes #2864 from sv2000/gcStats
---
 .../gobblin/cluster/ContainerHealthMetrics.java    |   7 +-
 .../cluster/ContainerHealthMetricsService.java     | 136 ++++++++++++++++-----
 .../cluster/ContainerHealthMetricsServiceTest.java |  16 ++-
 3 files changed, 124 insertions(+), 35 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java
index 2bf187c..137e1ae 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java
@@ -30,5 +30,10 @@ public class ContainerHealthMetrics {
   public static final String NUM_AVAILABLE_PROCESSORS = CONTAINER_METRICS_PREFIX + "numAvailableProcessors";
   public static final String TOTAL_PHYSICAL_MEM_SIZE = CONTAINER_METRICS_PREFIX + "totalPhysicalMemSize";
   public static final String FREE_PHYSICAL_MEM_SIZE = CONTAINER_METRICS_PREFIX + "freePhysicalMemSize";
-
+  public static final String MINOR_GC_COUNT = CONTAINER_METRICS_PREFIX + "minorGcCount";
+  public static final String MINOR_GC_DURATION = CONTAINER_METRICS_PREFIX + "minorGcDuration";
+  public static final String MAJOR_GC_COUNT = CONTAINER_METRICS_PREFIX + "majorGcCount";
+  public static final String MAJOR_GC_DURATION = CONTAINER_METRICS_PREFIX + "majorGcDuration";
+  public static final String UNKNOWN_GC_COUNT = CONTAINER_METRICS_PREFIX + "unknownGcCount";
+  public static final String UNKNOWN_GC_DURATION = CONTAINER_METRICS_PREFIX + "unknownGcDuration";
 }
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java
index b043857..d603bae 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java
@@ -17,18 +17,22 @@
 
 package org.apache.gobblin.cluster;
 
+import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.AbstractScheduledService;
 import com.google.common.util.concurrent.AtomicDouble;
 import com.sun.management.OperatingSystemMXBean;
 import com.typesafe.config.Config;
 
+import lombok.Data;
+
 import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.RootMetricContext;
 import org.apache.gobblin.util.ConfigUtils;
@@ -43,39 +47,73 @@ import org.apache.gobblin.util.ConfigUtils;
  *   {@link com.google.common.util.concurrent.ServiceManager} that manages the lifecycle of
  *   a {@link ContainerHealthMetricsService}.
  * </p>
- * TODO: Add Garbage Collection metrics.
 */
 public class ContainerHealthMetricsService extends AbstractScheduledService {
   //Container metrics service configurations
   private static final String CONTAINER_METRICS_SERVICE_REPORTING_INTERVAL_SECONDS = "container.health.metrics.service.reportingIntervalSeconds";
   private static final Long DEFAULT_CONTAINER_METRICS_REPORTING_INTERVAL = 30L;
+  private static final Set<String> YOUNG_GC_TYPES = new HashSet<>(3);
+  private static final Set<String> OLD_GC_TYPES = new HashSet<String>(3);
+
+  static {
+    // young generation GC names
+    YOUNG_GC_TYPES.add("PS Scavenge");
+    YOUNG_GC_TYPES.add("ParNew");
+    YOUNG_GC_TYPES.add("G1 Young Generation");
+
+    // old generation GC names
+    OLD_GC_TYPES.add("PS MarkSweep");
+    OLD_GC_TYPES.add("ConcurrentMarkSweep");
+    OLD_GC_TYPES.add("G1 Old Generation");
+  }
 
   private final long metricReportingInterval;
   private final OperatingSystemMXBean operatingSystemMXBean;
   private final MemoryMXBean memoryMXBean;
+  private final List<GarbageCollectorMXBean> garbageCollectorMXBeans;
 
+  //Heap stats
   AtomicDouble processCpuLoad = new AtomicDouble(0);
   AtomicDouble systemCpuLoad = new AtomicDouble(0);
   AtomicDouble systemLoadAvg = new AtomicDouble(0);
-  AtomicLong committedVmemSize = new AtomicLong(0);
-  AtomicLong processCpuTime = new AtomicLong(0);
-  AtomicLong freeSwapSpaceSize = new AtomicLong(0);
-  AtomicLong numAvailableProcessors = new AtomicLong(0);
-  AtomicLong totalPhysicalMemSize = new AtomicLong(0);
-  AtomicLong totalSwapSpaceSize = new AtomicLong(0);
-  AtomicLong freePhysicalMemSize = new AtomicLong(0);
-  AtomicLong processHeapUsedSize = new AtomicLong(0);
+  AtomicDouble committedVmemSize = new AtomicDouble(0);
+  AtomicDouble processCpuTime = new AtomicDouble(0);
+  AtomicDouble freeSwapSpaceSize = new AtomicDouble(0);
+  AtomicDouble numAvailableProcessors = new AtomicDouble(0);
+  AtomicDouble totalPhysicalMemSize = new AtomicDouble(0);
+  AtomicDouble totalSwapSpaceSize = new AtomicDouble(0);
+  AtomicDouble freePhysicalMemSize = new AtomicDouble(0);
+  AtomicDouble processHeapUsedSize = new AtomicDouble(0);
+
+  //GC stats and counters
+  AtomicDouble minorGcCount = new AtomicDouble(0);
+  AtomicDouble majorGcCount = new AtomicDouble(0);
+  AtomicDouble unknownGcCount = new AtomicDouble(0);
+  AtomicDouble minorGcDuration = new AtomicDouble(0);
+  AtomicDouble majorGcDuration = new AtomicDouble(0);
+  AtomicDouble unknownGcDuration = new AtomicDouble(0);
 
   public ContainerHealthMetricsService(Config config) {
     this.metricReportingInterval = ConfigUtils.getLong(config, CONTAINER_METRICS_SERVICE_REPORTING_INTERVAL_SECONDS, DEFAULT_CONTAINER_METRICS_REPORTING_INTERVAL);
     this.operatingSystemMXBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
     this.memoryMXBean = ManagementFactory.getMemoryMXBean();
+    this.garbageCollectorMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
 
     //Build all the gauges and register them with the metrics registry.
     List<ContextAwareGauge<Double>> systemMetrics = buildGaugeList();
     systemMetrics.forEach(metric -> RootMetricContext.get().register(metric));
   }
 
+  @Data
+  public static class GcStats {
+    long minorCount;
+    double minorDuration;
+    long majorCount;
+    double majorDuration;
+    long unknownCount;
+    double unknownDuration;
+  }
+
   /**
    * Run one iteration of the scheduled task. If any invocation of this method throws an exception,
    * the service will transition to the {@link com.google.common.util.concurrent.Service.State#FAILED} state and this method will no
@@ -94,35 +132,69 @@ public class ContainerHealthMetricsService extends AbstractScheduledService {
     this.totalSwapSpaceSize.set(this.operatingSystemMXBean.getTotalSwapSpaceSize());
     this.freePhysicalMemSize.set(this.operatingSystemMXBean.getFreePhysicalMemorySize());
     this.processHeapUsedSize.set(this.memoryMXBean.getHeapMemoryUsage().getUsed());
+
+    GcStats gcStats = collectGcStats();
+    //Since GC Beans report accumulated counts/durations, we need to subtract the previous values to obtain the counts/durations
+    // since the last measurement time.
+    this.minorGcCount.set(gcStats.getMinorCount() - this.minorGcCount.get());
+    this.minorGcDuration.set(gcStats.getMinorDuration() - this.minorGcDuration.get());
+    this.majorGcCount.set(gcStats.getMajorCount() - this.majorGcCount.get());
+    this.majorGcDuration.set(gcStats.getMajorDuration() - this.majorGcDuration.get());
+    this.unknownGcCount.set(gcStats.getUnknownCount() - this.unknownGcCount.get());
+    this.unknownGcDuration.set(gcStats.getUnknownDuration() - this.unknownGcDuration.get());
   }
 
   protected List<ContextAwareGauge<Double>> buildGaugeList() {
     List<ContextAwareGauge<Double>> gaugeList = new ArrayList<>();
-    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_CPU_LOAD,
-        () -> this.processCpuLoad.get()));
-    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.SYSTEM_CPU_LOAD,
-        () -> this.systemCpuLoad.get()));
-    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.SYSTEM_LOAD_AVG,
-        () -> this.systemLoadAvg.get()));
-    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.COMMITTED_VMEM_SIZE,
-        () -> Long.valueOf(this.committedVmemSize.get()).doubleValue()));
-    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_CPU_TIME,
-        () -> Long.valueOf(this.processCpuTime.get()).doubleValue()));
-    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.FREE_SWAP_SPACE_SIZE,
-        () -> Long.valueOf(this.freeSwapSpaceSize.get()).doubleValue()));
-    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.NUM_AVAILABLE_PROCESSORS,
-        () -> Long.valueOf(this.numAvailableProcessors.get()).doubleValue()));
-    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.TOTAL_PHYSICAL_MEM_SIZE,
-        () -> Long.valueOf(this.totalPhysicalMemSize.get()).doubleValue()));
-    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.TOTAL_SWAP_SPACE_SIZE,
-        () -> Long.valueOf(this.totalSwapSpaceSize.get()).doubleValue()));
-    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.FREE_PHYSICAL_MEM_SIZE,
-        () -> Long.valueOf(this.freePhysicalMemSize.get()).doubleValue()));
-    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_HEAP_USED_SIZE,
-        () -> Long.valueOf(this.processHeapUsedSize.get()).doubleValue()));
+
+    gaugeList.add(createGauge(ContainerHealthMetrics.PROCESS_CPU_LOAD, this.processCpuLoad));
+    gaugeList.add(createGauge(ContainerHealthMetrics.SYSTEM_CPU_LOAD, this.systemCpuLoad));
+    gaugeList.add(createGauge(ContainerHealthMetrics.SYSTEM_LOAD_AVG, this.systemLoadAvg));
+    gaugeList.add(createGauge(ContainerHealthMetrics.COMMITTED_VMEM_SIZE, this.committedVmemSize));
+    gaugeList.add(createGauge(ContainerHealthMetrics.PROCESS_CPU_TIME, this.processCpuTime));
+    gaugeList.add(createGauge(ContainerHealthMetrics.FREE_SWAP_SPACE_SIZE, this.freeSwapSpaceSize));
+    gaugeList.add(createGauge(ContainerHealthMetrics.NUM_AVAILABLE_PROCESSORS, this.numAvailableProcessors));
+    gaugeList.add(createGauge(ContainerHealthMetrics.TOTAL_PHYSICAL_MEM_SIZE, this.totalPhysicalMemSize));
+    gaugeList.add(createGauge(ContainerHealthMetrics.TOTAL_SWAP_SPACE_SIZE, this.totalSwapSpaceSize));
+    gaugeList.add(createGauge(ContainerHealthMetrics.FREE_PHYSICAL_MEM_SIZE, this.freePhysicalMemSize));
+    gaugeList.add(createGauge(ContainerHealthMetrics.PROCESS_HEAP_USED_SIZE, this.processHeapUsedSize));
+    gaugeList.add(createGauge(ContainerHealthMetrics.MINOR_GC_COUNT, this.minorGcCount));
+    gaugeList.add(createGauge(ContainerHealthMetrics.MINOR_GC_DURATION, this.minorGcDuration));
+    gaugeList.add(createGauge(ContainerHealthMetrics.MAJOR_GC_COUNT, this.majorGcCount));
+    gaugeList.add(createGauge(ContainerHealthMetrics.MAJOR_GC_DURATION, this.majorGcDuration));
+    gaugeList.add(createGauge(ContainerHealthMetrics.UNKNOWN_GC_COUNT, this.unknownGcCount));
+    gaugeList.add(createGauge(ContainerHealthMetrics.UNKNOWN_GC_DURATION, this.unknownGcDuration));
     return gaugeList;
   }
 
+  private ContextAwareGauge<Double> createGauge(String name, AtomicDouble metric) {
+    return RootMetricContext.get().newContextAwareGauge(name, () -> metric.get());
+  }
+
+  private GcStats collectGcStats() {
+    //Collect GC stats by iterating over all GC beans.
+    GcStats gcStats = new GcStats();
+
+    for (GarbageCollectorMXBean garbageCollectorMXBean: this.garbageCollectorMXBeans) {
+      long count = garbageCollectorMXBean.getCollectionCount();
+      double duration = (double) garbageCollectorMXBean.getCollectionTime();
+      if (count >= 0) {
+        if (YOUNG_GC_TYPES.contains(garbageCollectorMXBean.getName())) {
+          gcStats.setMinorCount(gcStats.getMinorCount() + count);
+          gcStats.setMinorDuration(gcStats.getMinorDuration() + duration);
+        }
+        else if (OLD_GC_TYPES.contains(garbageCollectorMXBean.getName())) {
+          gcStats.setMajorCount(gcStats.getMajorCount() + count);
+          gcStats.setMajorDuration(gcStats.getMajorDuration() + duration);
+        } else {
+          gcStats.setUnknownCount(gcStats.getUnknownCount() + count);
+          gcStats.setUnknownDuration(gcStats.getUnknownDuration() + duration);
+        }
+      }
+    }
+    return gcStats;
+  }
+
   /**
    * Returns the {@link Scheduler} object used to configure this service.  This method will only be
    * called once.
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java
index ac67c01..66efe74 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java
@@ -30,10 +30,22 @@ public class ContainerHealthMetricsServiceTest {
     Config config = ConfigFactory.empty();
     ContainerHealthMetricsService service = new ContainerHealthMetricsService(config);
     service.runOneIteration();
-    long processCpuTime1 = service.processCpuTime.get();
+    Assert.assertTrue( service.minorGcCount.get() >= 0);
+    Assert.assertTrue( service.minorGcDuration.get() >= 0);
+    Assert.assertTrue( service.majorGcCount.get() >= 0);
+    Assert.assertTrue( service.minorGcDuration.get() >= 0);
+    Assert.assertTrue( service.unknownGcCount.get() >= 0);
+    Assert.assertTrue( service.unknownGcDuration.get() >= 0);
+    double processCpuTime1 = service.processCpuTime.get();
     Thread.sleep(10);
     service.runOneIteration();
-    long processCpuTime2 = service.processCpuTime.get();
+    double processCpuTime2 = service.processCpuTime.get();
     Assert.assertTrue( processCpuTime1 <= processCpuTime2);
+    Assert.assertTrue( service.minorGcCount.get() >= 0);
+    Assert.assertTrue( service.minorGcDuration.get() >= 0);
+    Assert.assertTrue( service.majorGcCount.get() >= 0);
+    Assert.assertTrue( service.minorGcDuration.get() >= 0);
+    Assert.assertTrue( service.unknownGcCount.get() >= 0);
+    Assert.assertTrue( service.unknownGcDuration.get() >= 0);
   }
 }