You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2021/07/01 15:18:10 UTC

[storm] branch master updated: STORM-3778 convert SpoutThrottlingMetrics to V2 API (#3400)

This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e4b235  STORM-3778 convert SpoutThrottlingMetrics to V2 API (#3400)
8e4b235 is described below

commit 8e4b235555d576a2a424fa2ac0086bd4bb96fe82
Author: agresch <ag...@gmail.com>
AuthorDate: Thu Jul 1 10:18:01 2021 -0500

    STORM-3778 convert SpoutThrottlingMetrics to V2 API (#3400)
---
 .../daemon/metrics/SpoutThrottlingMetrics.java     | 40 ----------------------
 .../apache/storm/executor/spout/SpoutExecutor.java | 20 +++++++----
 .../jvm/org/apache/storm/metrics2/RateCounter.java | 19 +++++++---
 .../apache/storm/metrics2/StormMetricRegistry.java | 14 ++++++++
 4 files changed, 42 insertions(+), 51 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
deleted file mode 100644
index a8e721e..0000000
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.daemon.metrics;
-
-import org.apache.storm.metric.api.CountMetric;
-
-public class SpoutThrottlingMetrics extends BuiltinMetrics {
-    private final CountMetric skippedMaxSpoutMs = new CountMetric();
-    private final CountMetric skippedInactiveMs = new CountMetric();
-    private final CountMetric skippedBackPressureMs = new CountMetric();
-
-    public SpoutThrottlingMetrics() {
-        metricMap.put("skipped-max-spout-ms", skippedMaxSpoutMs);
-        metricMap.put("skipped-inactive-ms", skippedInactiveMs);
-        metricMap.put("skipped-backpressure-ms", skippedBackPressureMs);
-
-    }
-
-    public void skippedMaxSpoutMs(long ms) {
-        this.skippedMaxSpoutMs.incrBy(ms);
-    }
-
-    public void skippedInactiveMs(long ms) {
-        this.skippedInactiveMs.incrBy(ms);
-    }
-
-    public void skippedBackPressureMs(long ms) {
-        this.skippedBackPressureMs.incrBy(ms);
-    }
-}
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index bd284c4..94e8899 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -23,12 +23,12 @@ import org.apache.storm.ICredentialsListener;
 import org.apache.storm.daemon.Acker;
 import org.apache.storm.daemon.StormCommon;
 import org.apache.storm.daemon.Task;
-import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
 import org.apache.storm.daemon.worker.WorkerState;
 import org.apache.storm.executor.Executor;
 import org.apache.storm.executor.TupleInfo;
 import org.apache.storm.hooks.info.SpoutAckInfo;
 import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.metrics2.RateCounter;
 import org.apache.storm.policy.IWaitStrategy;
 import org.apache.storm.policy.IWaitStrategy.WaitSituation;
 import org.apache.storm.spout.ISpout;
@@ -56,7 +56,6 @@ public class SpoutExecutor extends Executor {
     private final AtomicBoolean lastActive;
     private final MutableLong emittedCount;
     private final MutableLong emptyEmitStreak;
-    private final SpoutThrottlingMetrics spoutThrottlingMetrics;
     private final boolean hasAckers;
     private final SpoutExecutorStats stats;
     SpoutOutputCollectorImpl spoutOutputCollector;
@@ -65,6 +64,9 @@ public class SpoutExecutor extends Executor {
     private List<SpoutOutputCollector> outputCollectors;
     private RotatingMap<Long, TupleInfo> pending;
     private long threadId = 0;
+    private final RateCounter skippedMaxSpoutMs;
+    private final RateCounter skippedInactiveMs;
+    private final RateCounter skippedBackpressureMs;
 
     public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) {
         super(workerData, executorId, credentials, ClientStatsUtil.SPOUT);
@@ -77,9 +79,14 @@ public class SpoutExecutor extends Executor {
         this.hasAckers = StormCommon.hasAckers(topoConf);
         this.emittedCount = new MutableLong(0);
         this.emptyEmitStreak = new MutableLong(0);
-        this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
         this.stats = new SpoutExecutorStats(
             ConfigUtils.samplingRate(this.getTopoConf()), ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
+        this.skippedMaxSpoutMs = workerData.getMetricRegistry().rateCounter("__skipped-max-spout-ms", componentId,
+                taskIds.get(0));
+        this.skippedInactiveMs = workerData.getMetricRegistry().rateCounter("__skipped-inactive-ms", componentId,
+                taskIds.get(0));
+        this.skippedBackpressureMs = workerData.getMetricRegistry().rateCounter("__skipped-backpressure-ms", componentId,
+                taskIds.get(0));
     }
 
     @Override
@@ -116,7 +123,6 @@ public class SpoutExecutor extends Executor {
             }
         });
 
-        this.spoutThrottlingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
         this.errorReportingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
         this.outputCollectors = new ArrayList<>();
         for (int i = 0; i < idToTask.size(); ++i) {
@@ -224,7 +230,7 @@ public class SpoutExecutor extends Executor {
                     LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait.");
                 }
                 bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
-                spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start);
+                skippedBackpressureMs.inc(Time.currentTimeMillis() - start);
             }
 
             private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException {
@@ -232,7 +238,7 @@ public class SpoutExecutor extends Executor {
                 long start = Time.currentTimeMillis();
                 swIdleCount = spoutWaitStrategy.idle(swIdleCount);
                 if (reachedMaxSpoutPending) {
-                    spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start);
+                    skippedMaxSpoutMs.inc(Time.currentTimeMillis() - start);
                 } else {
                     if (emptyStretch > 0) {
                         LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch);
@@ -275,7 +281,7 @@ public class SpoutExecutor extends Executor {
         }
         long start = Time.currentTimeMillis();
         Time.sleep(100);
-        spoutThrottlingMetrics.skippedInactiveMs(Time.currentTimeMillis() - start);
+        skippedInactiveMs.inc(Time.currentTimeMillis() - start);
     }
 
     @Override
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java b/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
index 6f5c32c..4c9b563 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
@@ -28,16 +28,27 @@ public class RateCounter implements Gauge<Double> {
 
     RateCounter(StormMetricRegistry metricRegistry, String metricName, String topologyId,
                 String componentId, int taskId, int workerPort, String streamId) {
-        counter = metricRegistry.counter(metricName, topologyId, componentId,
-                taskId, workerPort, streamId);
-        metricRegistry.gauge(metricName + ".m1_rate", this, topologyId, componentId, streamId,
-                taskId, workerPort);
+        if (streamId != null) {
+            counter = metricRegistry.counter(metricName, topologyId, componentId,
+                    taskId, workerPort, streamId);
+            metricRegistry.gauge(metricName + ".m1_rate", this, topologyId, componentId, streamId,
+                    taskId, workerPort);
+        } else {
+            counter = metricRegistry.counter(metricName, componentId, taskId);
+            metricRegistry.gauge(metricName + ".m1_rate", this, componentId, taskId);
+        }
+
         this.timeSpanInSeconds = Math.max(60 - (60 % metricRegistry.getRateCounterUpdateIntervalSeconds()),
                 metricRegistry.getRateCounterUpdateIntervalSeconds());
         this.values = new long[this.timeSpanInSeconds / metricRegistry.getRateCounterUpdateIntervalSeconds() + 1];
 
     }
 
+    RateCounter(StormMetricRegistry metricRegistry, String metricName, String topologyId,
+                String componentId, int taskId, int workerPort) {
+        this(metricRegistry, metricName, topologyId, componentId, taskId, workerPort, null);
+    }
+
     /**
      * Reports the the average rate of events per second over 1 minute for the metric.
      * @return the rate
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 18177b9..c442f3f 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -70,6 +70,13 @@ public class StormMetricRegistry implements MetricRegistryProvider {
         return rateCounter;
     }
 
+    public RateCounter rateCounter(String metricName, String componentId, int taskId) {
+        RateCounter rateCounter = new RateCounter(this, metricName, topologyId, componentId, taskId,
+                port);
+        rateCounters.add(rateCounter);
+        return rateCounter;
+    }
+
     public <T> SimpleGauge<T> gauge(
         T initialValue, String name, String topologyId, String componentId, Integer taskId, Integer port) {
         Gauge gauge = new SimpleGauge<>(initialValue);
@@ -151,6 +158,13 @@ public class StormMetricRegistry implements MetricRegistryProvider {
         return counter;
     }
 
+    public Counter counter(String name, String componentId, Integer taskId) {
+        MetricNames metricNames = workerMetricName(name, topologyId, componentId, taskId, port);
+        Counter counter = registerCounter(metricNames, new Counter(), taskId, componentId, null);
+        saveMetricTaskIdMapping(taskId, metricNames, counter, taskIdCounters);
+        return counter;
+    }
+
     public Timer timer(String name, TopologyContext context) {
         MetricNames metricNames = topologyMetricName(name, context);
         Timer timer = registerTimer(metricNames, new Timer(), context.getThisTaskId(), context.getThisComponentId(), null);