You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/04/08 14:28:39 UTC
[storm] branch master updated: STORM-3614 switch SystemBolt metrics
to V2 API
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new a34e0cb STORM-3614 switch SystemBolt metrics to V2 API
new 4e12603 Merge pull request #3242 from agresch/agresch_storm_3614
a34e0cb is described below
commit a34e0cb54f4712a8f10fd2c082ee105c38f24ca7
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Wed Apr 1 14:58:18 2020 -0500
STORM-3614 switch SystemBolt metrics to V2 API
---
DEPENDENCY-LICENSES | 3 +-
LICENSE-binary | 3 +-
docs/Metrics.md | 19 ++--
pom.xml | 5 +
storm-client/pom.xml | 4 +
.../jvm/org/apache/storm/metric/SystemBolt.java | 104 +++++++--------------
.../jvm/org/apache/storm/task/IMetricsContext.java | 3 +
.../jvm/org/apache/storm/task/TopologyContext.java | 6 ++
.../trident/operation/TridentOperationContext.java | 6 ++
.../test/clj/org/apache/storm/metrics_test.clj | 21 -----
10 files changed, 67 insertions(+), 107 deletions(-)
diff --git a/DEPENDENCY-LICENSES b/DEPENDENCY-LICENSES
index fc1d7db..2346f54 100644
--- a/DEPENDENCY-LICENSES
+++ b/DEPENDENCY-LICENSES
@@ -359,8 +359,7 @@ List of third-party dependencies grouped by their license type.
* JSON.simple (com.googlecode.json-simple:json-simple:1.1 - http://code.google.com/p/json-simple/)
* JSON Small and Fast Parser (net.minidev:json-smart:2.3 - http://www.minidev.net/)
* JTA 1.1 (org.apache.geronimo.specs:geronimo-jta_1.1_spec:1.1.1 - http://geronimo.apache.org/specs/geronimo-jta_1.1_spec)
- * JVM Integration for Metrics (io.dropwizard.metrics:metrics-jvm:3.1.0 - http://metrics.codahale.com/metrics-jvm/)
- * JVM Integration for Metrics (io.dropwizard.metrics:metrics-jvm:4.0.2 - http://metrics.dropwizard.io/metrics-jvm)
+ * JVM Integration for Metrics (io.dropwizard.metrics:metrics-jvm:3.2.6 - http://metrics.dropwizard.io/metrics-jvm/)
* kafka-avro-serializer (io.confluent:kafka-avro-serializer:1.0 - http://confluent.io/kafka-avro-serializer)
* kafka-schema-registry-client (io.confluent:kafka-schema-registry-client:1.0 - http://confluent.io/kafka-schema-registry-client)
* Logging (commons-logging:commons-logging:1.0.3 - http://jakarta.apache.org/commons/logging/)
diff --git a/LICENSE-binary b/LICENSE-binary
index 958df2b..e16d0c4 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -877,8 +877,7 @@ The license texts of these dependencies can be found in the licenses directory.
* JSON.simple (com.googlecode.json-simple:json-simple:1.1 - http://code.google.com/p/json-simple/)
* JSON Small and Fast Parser (net.minidev:json-smart:2.3 - http://www.minidev.net/)
* JTA 1.1 (org.apache.geronimo.specs:geronimo-jta_1.1_spec:1.1.1 - http://geronimo.apache.org/specs/geronimo-jta_1.1_spec)
- * JVM Integration for Metrics (io.dropwizard.metrics:metrics-jvm:3.1.0 - http://metrics.codahale.com/metrics-jvm/)
- * JVM Integration for Metrics (io.dropwizard.metrics:metrics-jvm:4.0.2 - http://metrics.dropwizard.io/metrics-jvm)
+ * JVM Integration for Metrics (io.dropwizard.metrics:metrics-jvm:3.2.6 - http://metrics.dropwizard.io/metrics-jvm/)
* LZ4 and xxHash (net.jpountz.lz4:lz4:1.3.0 - https://github.com/jpountz/lz4-java)
* Metrics Core (io.dropwizard.metrics:metrics-core:3.2.6 - http://metrics.dropwizard.io/metrics-core/)
* Metrics Health Checks (io.dropwizard.metrics:metrics-healthchecks:4.0.2 - http://metrics.dropwizard.io/metrics-healthchecks)
diff --git a/docs/Metrics.md b/docs/Metrics.md
index e92f5db..8df1a45 100644
--- a/docs/Metrics.md
+++ b/docs/Metrics.md
@@ -299,29 +299,28 @@ The value is a map where the key is a NodeInfo class for the downstream worker i
##### JVM Memory
-JVM memory usage is reported through `memory/nonHeap` for off heap memory and `memory/heap` for on heap memory. These values come from the [MemoryUsage](https://docs.oracle.com/javase/8/docs/api/index.html?java/lang/management/MemoryUsage.html) mxbean. Each of the metrics are reported as a map with the following keys, and values returned by the corresponding java code.
+JVM memory usage is reported through `memory.non-heap` for off heap memory, `memory.heap` for on heap memory and `memory.total` for combined values. These values come from the [MemoryUsage](https://docs.oracle.com/javase/8/docs/api/index.html?java/lang/management/MemoryUsage.html) mxbean. Each of the metrics are reported as a map with the following keys, and values returned by the corresponding java code.
| Key | Corresponding Code |
|--------|--------------------|
-| `maxBytes` | `memUsage.getMax()` |
-| `committedBytes` | `memUsage.getCommitted()` |
-| `initBytes` | `memUsage.getInit()` |
-| `usedBytes` | `memUsage.getUsed()` |
-| `virtualFreeBytes` | `memUsage.getMax() - memUsage.getUsed()` |
-| `unusedBytes` | `memUsage.getCommitted() - memUsage.getUsed()` |
+| `max` | `memUsage.getMax()` |
+| `committed` | `memUsage.getCommitted()` |
+| `init` | `memUsage.getInit()` |
+| `used` | `memUsage.getUsed()` |
+| `usage` | `Ratio.of(memUsage.getUsed(), memUsage.getMax())` |
##### JVM Garbage Collection
-The exact GC metric name depends on the garbage collector that your worker uses. The data is all collected from `ManagementFactory.getGarbageCollectorMXBeans()` and the name of the metrics is `"GC/"` followed by the name of the returned bean with white space removed. The reported metrics are just
+The exact GC metric name depends on the garbage collector that your worker uses. The data is all collected from `ManagementFactory.getGarbageCollectorMXBeans()` and the name of the metrics is `"GC"` followed by the name of the returned bean with white space removed. The reported metrics are just
* `count` the number of gc events that happened and
-* `timeMs` the total number of milliseconds that were spent doing gc.
+* `time` the total number of milliseconds that were spent doing gc.
Please refer to the [JVM documentation](https://docs.oracle.com/javase/8/docs/api/java/lang/management/ManagementFactory.html#getGarbageCollectorMXBeans--) for more details.
##### JVM Misc
-* `threadCount` is the number of threads currently in the JVM.
+* There are metrics prefixed with `threads` providing the number of threads, daemon threads, blocked and deadlocked threads.
##### Uptime
diff --git a/pom.xml b/pom.xml
index a4e81a8..92ccf51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -977,6 +977,11 @@
<version>${metrics.version}</version>
</dependency>
<dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-jvm</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index 0ff861c..37bd61f 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -100,6 +100,10 @@
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-jvm</artifactId>
+ </dependency>
<!-- end of transitive dependency management -->
diff --git a/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java b/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
index f2efc2f..9c68c89 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
@@ -12,15 +12,14 @@
package org.apache.storm.metric;
-import java.lang.management.GarbageCollectorMXBean;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.jvm.CachedThreadStatesGaugeSet;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.MemoryUsage;
import java.lang.management.RuntimeMXBean;
-import java.lang.management.ThreadMXBean;
-import java.util.HashMap;
import java.util.Map;
-import java.util.function.Supplier;
+import java.util.concurrent.TimeUnit;
import org.apache.storm.Config;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.task.IBolt;
@@ -46,14 +45,29 @@ public class SystemBolt implements IBolt {
int bucketSize = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
+ context.registerMetricSet("GC", new GarbageCollectorMetricSet());
+ context.registerMetricSet("threads", new CachedThreadStatesGaugeSet(bucketSize, TimeUnit.SECONDS));
+ context.registerMetricSet("memory", new MemoryUsageGaugeSet());
+
final RuntimeMXBean jvmRt = ManagementFactory.getRuntimeMXBean();
- context.registerMetric("uptimeSecs", () -> jvmRt.getUptime() / 1000.0, bucketSize);
- context.registerMetric("startTimeSecs", () -> jvmRt.getStartTime() / 1000.0, bucketSize);
- final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
- context.registerMetric("threadCount", threadBean::getThreadCount, bucketSize);
+ context.registerGauge("uptimeSecs", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return jvmRt.getUptime() / 1000L;
+ }
+ });
- context.registerMetric("newWorkerEvent", new IMetric() {
+ context.registerGauge("startTimeSecs", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return jvmRt.getStartTime() / 1000L;
+ }
+ });
+
+ // newWorkerEvent: 1 when a worker is first started and 0 all other times.
+ // This can be used to tell when a worker has crashed and is restarted.
+ final IMetric newWorkerEvent = new IMetric() {
boolean doEvent = true;
@Override
@@ -65,17 +79,13 @@ public class SystemBolt implements IBolt {
return 0;
}
}
- }, bucketSize);
-
- @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
- final MemoryMXBean jvmMemRT = ManagementFactory.getMemoryMXBean();
-
- context.registerMetric("memory/heap", new MemoryUsageMetric(jvmMemRT::getHeapMemoryUsage), bucketSize);
- context.registerMetric("memory/nonHeap", new MemoryUsageMetric(jvmMemRT::getNonHeapMemoryUsage), bucketSize);
-
- for (GarbageCollectorMXBean b : ManagementFactory.getGarbageCollectorMXBeans()) {
- context.registerMetric("GC/" + b.getName().replaceAll("\\W", ""), new GarbageCollectorMetric(b), bucketSize);
- }
+ };
+ context.registerGauge("newWorkerEvent", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return (Integer) newWorkerEvent.getValueAndReset();
+ }
+ });
registerMetrics(context, (Map<String, String>) topoConf.get(Config.WORKER_METRICS), bucketSize, topoConf);
registerMetrics(context, (Map<String, String>) topoConf.get(Config.TOPOLOGY_WORKER_METRICS), bucketSize, topoConf);
@@ -102,54 +112,4 @@ public class SystemBolt implements IBolt {
@Override
public void cleanup() {
}
-
- private static class MemoryUsageMetric implements IMetric {
- Supplier<MemoryUsage> getUsage;
-
- MemoryUsageMetric(Supplier<MemoryUsage> getUsage) {
- this.getUsage = getUsage;
- }
-
- @Override
- public Object getValueAndReset() {
- MemoryUsage memUsage = getUsage.get();
- HashMap<String, Object> m = new HashMap<>();
- m.put("maxBytes", memUsage.getMax());
- m.put("committedBytes", memUsage.getCommitted());
- m.put("initBytes", memUsage.getInit());
- m.put("usedBytes", memUsage.getUsed());
- m.put("virtualFreeBytes", memUsage.getMax() - memUsage.getUsed());
- m.put("unusedBytes", memUsage.getCommitted() - memUsage.getUsed());
- return m;
- }
- }
-
- // canonically the metrics data exported is time bucketed when doing counts.
- // convert the absolute values here into time buckets.
- private static class GarbageCollectorMetric implements IMetric {
- GarbageCollectorMXBean gcBean;
- Long collectionCount;
- Long collectionTime;
-
- GarbageCollectorMetric(GarbageCollectorMXBean gcBean) {
- this.gcBean = gcBean;
- }
-
- @Override
- public Object getValueAndReset() {
- Long collectionCountP = gcBean.getCollectionCount();
- Long collectionTimeP = gcBean.getCollectionTime();
-
- Map<String, Object> ret = null;
- if (collectionCount != null && collectionTime != null) {
- ret = new HashMap<>();
- ret.put("count", collectionCountP - collectionCount);
- ret.put("timeMs", collectionTimeP - collectionTime);
- }
-
- collectionCount = collectionCountP;
- collectionTime = collectionTimeP;
- return ret;
- }
- }
}
diff --git a/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java b/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java
index 958ce5f..de5cbc5 100644
--- a/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java
+++ b/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java
@@ -16,6 +16,7 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Timer;
import org.apache.storm.metric.api.CombinedMetric;
import org.apache.storm.metric.api.ICombiner;
@@ -55,4 +56,6 @@ public interface IMetricsContext {
Counter registerCounter(String name);
<T> Gauge<T> registerGauge(String name, Gauge<T> gauge);
+
+ MetricSet registerMetricSet(String prefix, MetricSet set);
}
diff --git a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
index 5a3f4c1..b46e981 100644
--- a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
+++ b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
@@ -16,6 +16,7 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Timer;
import java.util.ArrayList;
import java.util.Collections;
@@ -424,6 +425,11 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
return metricRegistry.registry().register(metricName(name), gauge);
}
+ @Override
+ public MetricSet registerMetricSet(String prefix, MetricSet set) {
+ return metricRegistry.registry().register(metricName(prefix), set);
+ }
+
private String metricName(String name) {
return metricRegistry.metricName(name, this);
}
diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java b/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java
index 68111a6..b1cc163 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java
@@ -16,6 +16,7 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Timer;
import org.apache.storm.metric.api.CombinedMetric;
import org.apache.storm.metric.api.ICombiner;
@@ -92,4 +93,9 @@ public class TridentOperationContext implements IMetricsContext {
public <T> Gauge<T> registerGauge(String name, Gauge<T> gauge) {
return topoContext.registerGauge(name, gauge);
}
+
+ @Override
+ public MetricSet registerMetricSet(String prefix, MetricSet set) {
+ return topoContext.registerMetricSet(prefix, set);
+ }
}
diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj
index 8ce8fbb..c779ced 100644
--- a/storm-core/test/clj/org/apache/storm/metrics_test.clj
+++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj
@@ -376,26 +376,5 @@
(assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 2 4 cluster)
(assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 3 4 cluster))))
-(deftest test-system-bolt
- (with-open [cluster (.build (doto (LocalCluster$Builder.)
- (.withSimulatedTime)
- (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
- [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
- TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60})))]
- (let [feeder (FeederSpout. ["field1"])
- topology (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails feeder)}
- {})]
- (.submitTopology cluster "metrics-tester" {} topology)
-
- (.feed feeder ["a"] 1)
- (.advanceClusterTime cluster 70)
- (assert-metric-running-sum! "__system" "newWorkerEvent" 1 1 cluster)
- (assert-metric-data-exists! "__system" "uptimeSecs")
- (assert-metric-data-exists! "__system" "startTimeSecs")
-
- (.advanceClusterTime cluster 180)
- (assert-metric-running-sum! "__system" "newWorkerEvent" 1 4 cluster)
- )))