You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/04/08 14:28:39 UTC

[storm] branch master updated: STORM-3614 switch SystemBolt metrics to V2 API

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new a34e0cb  STORM-3614 switch SystemBolt metrics to V2 API
     new 4e12603  Merge pull request #3242 from agresch/agresch_storm_3614
a34e0cb is described below

commit a34e0cb54f4712a8f10fd2c082ee105c38f24ca7
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Wed Apr 1 14:58:18 2020 -0500

    STORM-3614 switch SystemBolt metrics to V2 API
---
 DEPENDENCY-LICENSES                                |   3 +-
 LICENSE-binary                                     |   3 +-
 docs/Metrics.md                                    |  19 ++--
 pom.xml                                            |   5 +
 storm-client/pom.xml                               |   4 +
 .../jvm/org/apache/storm/metric/SystemBolt.java    | 104 +++++++--------------
 .../jvm/org/apache/storm/task/IMetricsContext.java |   3 +
 .../jvm/org/apache/storm/task/TopologyContext.java |   6 ++
 .../trident/operation/TridentOperationContext.java |   6 ++
 .../test/clj/org/apache/storm/metrics_test.clj     |  21 -----
 10 files changed, 67 insertions(+), 107 deletions(-)

diff --git a/DEPENDENCY-LICENSES b/DEPENDENCY-LICENSES
index fc1d7db..2346f54 100644
--- a/DEPENDENCY-LICENSES
+++ b/DEPENDENCY-LICENSES
@@ -359,8 +359,7 @@ List of third-party dependencies grouped by their license type.
         * JSON.simple (com.googlecode.json-simple:json-simple:1.1 - http://code.google.com/p/json-simple/)
         * JSON Small and Fast Parser (net.minidev:json-smart:2.3 - http://www.minidev.net/)
         * JTA 1.1 (org.apache.geronimo.specs:geronimo-jta_1.1_spec:1.1.1 - http://geronimo.apache.org/specs/geronimo-jta_1.1_spec)
-        * JVM Integration for Metrics (io.dropwizard.metrics:metrics-jvm:3.1.0 - http://metrics.codahale.com/metrics-jvm/)
-        * JVM Integration for Metrics (io.dropwizard.metrics:metrics-jvm:4.0.2 - http://metrics.dropwizard.io/metrics-jvm)
+        * JVM Integration for Metrics (io.dropwizard.metrics:metrics-jvm:3.2.6 - http://metrics.dropwizard.io/metrics-jvm/)
         * kafka-avro-serializer (io.confluent:kafka-avro-serializer:1.0 - http://confluent.io/kafka-avro-serializer)
         * kafka-schema-registry-client (io.confluent:kafka-schema-registry-client:1.0 - http://confluent.io/kafka-schema-registry-client)
         * Logging (commons-logging:commons-logging:1.0.3 - http://jakarta.apache.org/commons/logging/)
diff --git a/LICENSE-binary b/LICENSE-binary
index 958df2b..e16d0c4 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -877,8 +877,7 @@ The license texts of these dependencies can be found in the licenses directory.
         * JSON.simple (com.googlecode.json-simple:json-simple:1.1 - http://code.google.com/p/json-simple/)
         * JSON Small and Fast Parser (net.minidev:json-smart:2.3 - http://www.minidev.net/)
         * JTA 1.1 (org.apache.geronimo.specs:geronimo-jta_1.1_spec:1.1.1 - http://geronimo.apache.org/specs/geronimo-jta_1.1_spec)
-        * JVM Integration for Metrics (io.dropwizard.metrics:metrics-jvm:3.1.0 - http://metrics.codahale.com/metrics-jvm/)
-        * JVM Integration for Metrics (io.dropwizard.metrics:metrics-jvm:4.0.2 - http://metrics.dropwizard.io/metrics-jvm)
+        * JVM Integration for Metrics (io.dropwizard.metrics:metrics-jvm:3.2.6 - http://metrics.dropwizard.io/metrics-jvm/)
         * LZ4 and xxHash (net.jpountz.lz4:lz4:1.3.0 - https://github.com/jpountz/lz4-java)
         * Metrics Core (io.dropwizard.metrics:metrics-core:3.2.6 - http://metrics.dropwizard.io/metrics-core/)
         * Metrics Health Checks (io.dropwizard.metrics:metrics-healthchecks:4.0.2 - http://metrics.dropwizard.io/metrics-healthchecks)
diff --git a/docs/Metrics.md b/docs/Metrics.md
index e92f5db..8df1a45 100644
--- a/docs/Metrics.md
+++ b/docs/Metrics.md
@@ -299,29 +299,28 @@ The value is a map where the key is a NodeInfo class for the downstream worker i
 
 ##### JVM Memory
 
-JVM memory usage is reported through `memory/nonHeap` for off heap memory and `memory/heap` for on heap memory.  These values come from the [MemoryUsage](https://docs.oracle.com/javase/8/docs/api/index.html?java/lang/management/MemoryUsage.html) mxbean.  Each of the metrics are reported as a map with the following keys, and values returned by the corresponding java code.
+JVM memory usage is reported through `memory.non-heap` for off heap memory, `memory.heap` for on heap memory and `memory.total` for combined values.  These values come from the [MemoryUsage](https://docs.oracle.com/javase/8/docs/api/index.html?java/lang/management/MemoryUsage.html) mxbean.  Each of the metrics are reported as a map with the following keys, and values returned by the corresponding java code.
 
 | Key | Corresponding Code |
 |--------|--------------------|
-| `maxBytes` | `memUsage.getMax()` |
-| `committedBytes` | `memUsage.getCommitted()` |
-| `initBytes` | `memUsage.getInit()` |
-| `usedBytes` | `memUsage.getUsed()` |
-| `virtualFreeBytes` | `memUsage.getMax() - memUsage.getUsed()` |
-| `unusedBytes` | `memUsage.getCommitted() - memUsage.getUsed()` |
+| `max` | `memUsage.getMax()` |
+| `committed` | `memUsage.getCommitted()` |
+| `init` | `memUsage.getInit()` |
+| `used` | `memUsage.getUsed()` |
+| `usage` | `Ratio.of(memUsage.getUsed(), memUsage.getMax())` |
 
 ##### JVM Garbage Collection
 
-The exact GC metric name depends on the garbage collector that your worker uses.  The data is all collected from `ManagementFactory.getGarbageCollectorMXBeans()` and the name of the metrics is `"GC/"` followed by the name of the returned bean with white space removed.  The reported metrics are just
+The exact GC metric name depends on the garbage collector that your worker uses.  The data is all collected from `ManagementFactory.getGarbageCollectorMXBeans()` and the name of the metrics is `"GC"` followed by the name of the returned bean with white space removed.  The reported metrics are just
 
 * `count` the number of gc events that happened and
-* `timeMs` the total number of milliseconds that were spent doing gc.  
+* `time` the total number of milliseconds that were spent doing gc.  
 
 Please refer to the [JVM documentation](https://docs.oracle.com/javase/8/docs/api/java/lang/management/ManagementFactory.html#getGarbageCollectorMXBeans--) for more details.
 
 ##### JVM Misc
 
-* `threadCount` is the number of threads currently in the JVM.
+* There are metrics prefixed with `threads` providing the number of threads, daemon threads, blocked and deadlocked threads.
 
 ##### Uptime
 
diff --git a/pom.xml b/pom.xml
index a4e81a8..92ccf51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -977,6 +977,11 @@
                 <version>${metrics.version}</version>
             </dependency>
             <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-jvm</artifactId>
+                <version>${metrics.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>org.mockito</groupId>
                 <artifactId>mockito-core</artifactId>
                 <version>${mockito.version}</version>
diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index 0ff861c..37bd61f 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -100,6 +100,10 @@
             <groupId>io.dropwizard.metrics</groupId>
             <artifactId>metrics-graphite</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-jvm</artifactId>
+        </dependency>
 
         <!-- end of transitive dependency management -->
 
diff --git a/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java b/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
index f2efc2f..9c68c89 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
@@ -12,15 +12,14 @@
 
 package org.apache.storm.metric;
 
-import java.lang.management.GarbageCollectorMXBean;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.jvm.CachedThreadStatesGaugeSet;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
 import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.MemoryUsage;
 import java.lang.management.RuntimeMXBean;
-import java.lang.management.ThreadMXBean;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.function.Supplier;
+import java.util.concurrent.TimeUnit;
 import org.apache.storm.Config;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.task.IBolt;
@@ -46,14 +45,29 @@ public class SystemBolt implements IBolt {
 
         int bucketSize = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
 
+        context.registerMetricSet("GC", new GarbageCollectorMetricSet());
+        context.registerMetricSet("threads", new CachedThreadStatesGaugeSet(bucketSize, TimeUnit.SECONDS));
+        context.registerMetricSet("memory", new MemoryUsageGaugeSet());
+
         final RuntimeMXBean jvmRt = ManagementFactory.getRuntimeMXBean();
-        context.registerMetric("uptimeSecs", () -> jvmRt.getUptime() / 1000.0, bucketSize);
-        context.registerMetric("startTimeSecs", () -> jvmRt.getStartTime() / 1000.0, bucketSize);
 
-        final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
-        context.registerMetric("threadCount", threadBean::getThreadCount, bucketSize);
+        context.registerGauge("uptimeSecs", new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return jvmRt.getUptime() / 1000L;
+            }
+        });
 
-        context.registerMetric("newWorkerEvent", new IMetric() {
+        context.registerGauge("startTimeSecs", new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return jvmRt.getStartTime() / 1000L;
+            }
+        });
+
+        // newWorkerEvent: 1 when a worker is first started and 0 all other times.
+        // This can be used to tell when a worker has crashed and is restarted.
+        final IMetric newWorkerEvent = new IMetric() {
             boolean doEvent = true;
 
             @Override
@@ -65,17 +79,13 @@ public class SystemBolt implements IBolt {
                     return 0;
                 }
             }
-        }, bucketSize);
-
-        @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
-        final MemoryMXBean jvmMemRT = ManagementFactory.getMemoryMXBean();
-
-        context.registerMetric("memory/heap", new MemoryUsageMetric(jvmMemRT::getHeapMemoryUsage), bucketSize);
-        context.registerMetric("memory/nonHeap", new MemoryUsageMetric(jvmMemRT::getNonHeapMemoryUsage), bucketSize);
-
-        for (GarbageCollectorMXBean b : ManagementFactory.getGarbageCollectorMXBeans()) {
-            context.registerMetric("GC/" + b.getName().replaceAll("\\W", ""), new GarbageCollectorMetric(b), bucketSize);
-        }
+        };
+        context.registerGauge("newWorkerEvent", new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return (Integer) newWorkerEvent.getValueAndReset();
+            }
+        });
 
         registerMetrics(context, (Map<String, String>) topoConf.get(Config.WORKER_METRICS), bucketSize, topoConf);
         registerMetrics(context, (Map<String, String>) topoConf.get(Config.TOPOLOGY_WORKER_METRICS), bucketSize, topoConf);
@@ -102,54 +112,4 @@ public class SystemBolt implements IBolt {
     @Override
     public void cleanup() {
     }
-
-    private static class MemoryUsageMetric implements IMetric {
-        Supplier<MemoryUsage> getUsage;
-
-        MemoryUsageMetric(Supplier<MemoryUsage> getUsage) {
-            this.getUsage = getUsage;
-        }
-
-        @Override
-        public Object getValueAndReset() {
-            MemoryUsage memUsage = getUsage.get();
-            HashMap<String, Object> m = new HashMap<>();
-            m.put("maxBytes", memUsage.getMax());
-            m.put("committedBytes", memUsage.getCommitted());
-            m.put("initBytes", memUsage.getInit());
-            m.put("usedBytes", memUsage.getUsed());
-            m.put("virtualFreeBytes", memUsage.getMax() - memUsage.getUsed());
-            m.put("unusedBytes", memUsage.getCommitted() - memUsage.getUsed());
-            return m;
-        }
-    }
-
-    // canonically the metrics data exported is time bucketed when doing counts.
-    // convert the absolute values here into time buckets.
-    private static class GarbageCollectorMetric implements IMetric {
-        GarbageCollectorMXBean gcBean;
-        Long collectionCount;
-        Long collectionTime;
-
-        GarbageCollectorMetric(GarbageCollectorMXBean gcBean) {
-            this.gcBean = gcBean;
-        }
-
-        @Override
-        public Object getValueAndReset() {
-            Long collectionCountP = gcBean.getCollectionCount();
-            Long collectionTimeP = gcBean.getCollectionTime();
-
-            Map<String, Object> ret = null;
-            if (collectionCount != null && collectionTime != null) {
-                ret = new HashMap<>();
-                ret.put("count", collectionCountP - collectionCount);
-                ret.put("timeMs", collectionTimeP - collectionTime);
-            }
-
-            collectionCount = collectionCountP;
-            collectionTime = collectionTimeP;
-            return ret;
-        }
-    }
 }
diff --git a/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java b/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java
index 958ce5f..de5cbc5 100644
--- a/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java
+++ b/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java
@@ -16,6 +16,7 @@ import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.Timer;
 import org.apache.storm.metric.api.CombinedMetric;
 import org.apache.storm.metric.api.ICombiner;
@@ -55,4 +56,6 @@ public interface IMetricsContext {
     Counter registerCounter(String name);
 
     <T> Gauge<T> registerGauge(String name, Gauge<T> gauge);
+
+    MetricSet registerMetricSet(String prefix, MetricSet set);
 }
diff --git a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
index 5a3f4c1..b46e981 100644
--- a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
+++ b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
@@ -16,6 +16,7 @@ import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.Timer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -424,6 +425,11 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
         return metricRegistry.registry().register(metricName(name), gauge);
     }
 
+    @Override
+    public MetricSet registerMetricSet(String prefix, MetricSet set) {
+        return metricRegistry.registry().register(metricName(prefix), set);
+    }
+
     private String metricName(String name) {
         return metricRegistry.metricName(name, this);
     }
diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java b/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java
index 68111a6..b1cc163 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java
@@ -16,6 +16,7 @@ import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.Timer;
 import org.apache.storm.metric.api.CombinedMetric;
 import org.apache.storm.metric.api.ICombiner;
@@ -92,4 +93,9 @@ public class TridentOperationContext implements IMetricsContext {
     public <T> Gauge<T> registerGauge(String name, Gauge<T> gauge) {
         return topoContext.registerGauge(name, gauge);
     }
+
+    @Override
+    public MetricSet registerMetricSet(String prefix, MetricSet set) {
+        return topoContext.registerMetricSet(prefix, set);
+    }
 }
diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj
index 8ce8fbb..c779ced 100644
--- a/storm-core/test/clj/org/apache/storm/metrics_test.clj
+++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj
@@ -376,26 +376,5 @@
       (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 2 4 cluster)
       (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 3 4 cluster))))
 
-(deftest test-system-bolt
-  (with-open [cluster (.build (doto (LocalCluster$Builder.)
-                                (.withSimulatedTime)
-                                (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
-                           [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
-                           TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60})))]
-    (let [feeder (FeederSpout. ["field1"])
-          topology (Thrift/buildTopology
-                    {"1" (Thrift/prepareSpoutDetails feeder)}
-                    {})]      
-      (.submitTopology cluster "metrics-tester" {} topology)
-
-      (.feed feeder ["a"] 1)
-      (.advanceClusterTime cluster 70)
-      (assert-metric-running-sum! "__system" "newWorkerEvent" 1 1 cluster)
-      (assert-metric-data-exists! "__system" "uptimeSecs")
-      (assert-metric-data-exists! "__system" "startTimeSecs")
-
-      (.advanceClusterTime cluster 180)
-      (assert-metric-running-sum! "__system" "newWorkerEvent" 1 4 cluster)
-      )))