You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2021/07/01 15:18:10 UTC
[storm] branch master updated: STORM-3778 convert
SpoutThrottlingMetrics to V2 API (#3400)
This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 8e4b235 STORM-3778 convert SpoutThrottlingMetrics to V2 API (#3400)
8e4b235 is described below
commit 8e4b235555d576a2a424fa2ac0086bd4bb96fe82
Author: agresch <ag...@gmail.com>
AuthorDate: Thu Jul 1 10:18:01 2021 -0500
STORM-3778 convert SpoutThrottlingMetrics to V2 API (#3400)
---
.../daemon/metrics/SpoutThrottlingMetrics.java | 40 ----------------------
.../apache/storm/executor/spout/SpoutExecutor.java | 20 +++++++----
.../jvm/org/apache/storm/metrics2/RateCounter.java | 19 +++++++---
.../apache/storm/metrics2/StormMetricRegistry.java | 14 ++++++++
4 files changed, 42 insertions(+), 51 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
deleted file mode 100644
index a8e721e..0000000
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.daemon.metrics;
-
-import org.apache.storm.metric.api.CountMetric;
-
-public class SpoutThrottlingMetrics extends BuiltinMetrics {
- private final CountMetric skippedMaxSpoutMs = new CountMetric();
- private final CountMetric skippedInactiveMs = new CountMetric();
- private final CountMetric skippedBackPressureMs = new CountMetric();
-
- public SpoutThrottlingMetrics() {
- metricMap.put("skipped-max-spout-ms", skippedMaxSpoutMs);
- metricMap.put("skipped-inactive-ms", skippedInactiveMs);
- metricMap.put("skipped-backpressure-ms", skippedBackPressureMs);
-
- }
-
- public void skippedMaxSpoutMs(long ms) {
- this.skippedMaxSpoutMs.incrBy(ms);
- }
-
- public void skippedInactiveMs(long ms) {
- this.skippedInactiveMs.incrBy(ms);
- }
-
- public void skippedBackPressureMs(long ms) {
- this.skippedBackPressureMs.incrBy(ms);
- }
-}
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index bd284c4..94e8899 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -23,12 +23,12 @@ import org.apache.storm.ICredentialsListener;
import org.apache.storm.daemon.Acker;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.Task;
-import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.Executor;
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.hooks.info.SpoutAckInfo;
import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.metrics2.RateCounter;
import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.policy.IWaitStrategy.WaitSituation;
import org.apache.storm.spout.ISpout;
@@ -56,7 +56,6 @@ public class SpoutExecutor extends Executor {
private final AtomicBoolean lastActive;
private final MutableLong emittedCount;
private final MutableLong emptyEmitStreak;
- private final SpoutThrottlingMetrics spoutThrottlingMetrics;
private final boolean hasAckers;
private final SpoutExecutorStats stats;
SpoutOutputCollectorImpl spoutOutputCollector;
@@ -65,6 +64,9 @@ public class SpoutExecutor extends Executor {
private List<SpoutOutputCollector> outputCollectors;
private RotatingMap<Long, TupleInfo> pending;
private long threadId = 0;
+ private final RateCounter skippedMaxSpoutMs;
+ private final RateCounter skippedInactiveMs;
+ private final RateCounter skippedBackpressureMs;
public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) {
super(workerData, executorId, credentials, ClientStatsUtil.SPOUT);
@@ -77,9 +79,14 @@ public class SpoutExecutor extends Executor {
this.hasAckers = StormCommon.hasAckers(topoConf);
this.emittedCount = new MutableLong(0);
this.emptyEmitStreak = new MutableLong(0);
- this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
this.stats = new SpoutExecutorStats(
ConfigUtils.samplingRate(this.getTopoConf()), ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
+ this.skippedMaxSpoutMs = workerData.getMetricRegistry().rateCounter("__skipped-max-spout-ms", componentId,
+ taskIds.get(0));
+ this.skippedInactiveMs = workerData.getMetricRegistry().rateCounter("__skipped-inactive-ms", componentId,
+ taskIds.get(0));
+ this.skippedBackpressureMs = workerData.getMetricRegistry().rateCounter("__skipped-backpressure-ms", componentId,
+ taskIds.get(0));
}
@Override
@@ -116,7 +123,6 @@ public class SpoutExecutor extends Executor {
}
});
- this.spoutThrottlingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
this.errorReportingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
this.outputCollectors = new ArrayList<>();
for (int i = 0; i < idToTask.size(); ++i) {
@@ -224,7 +230,7 @@ public class SpoutExecutor extends Executor {
LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait.");
}
bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
- spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start);
+ skippedBackpressureMs.inc(Time.currentTimeMillis() - start);
}
private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException {
@@ -232,7 +238,7 @@ public class SpoutExecutor extends Executor {
long start = Time.currentTimeMillis();
swIdleCount = spoutWaitStrategy.idle(swIdleCount);
if (reachedMaxSpoutPending) {
- spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start);
+ skippedMaxSpoutMs.inc(Time.currentTimeMillis() - start);
} else {
if (emptyStretch > 0) {
LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch);
@@ -275,7 +281,7 @@ public class SpoutExecutor extends Executor {
}
long start = Time.currentTimeMillis();
Time.sleep(100);
- spoutThrottlingMetrics.skippedInactiveMs(Time.currentTimeMillis() - start);
+ skippedInactiveMs.inc(Time.currentTimeMillis() - start);
}
@Override
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java b/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
index 6f5c32c..4c9b563 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
@@ -28,16 +28,27 @@ public class RateCounter implements Gauge<Double> {
RateCounter(StormMetricRegistry metricRegistry, String metricName, String topologyId,
String componentId, int taskId, int workerPort, String streamId) {
- counter = metricRegistry.counter(metricName, topologyId, componentId,
- taskId, workerPort, streamId);
- metricRegistry.gauge(metricName + ".m1_rate", this, topologyId, componentId, streamId,
- taskId, workerPort);
+ if (streamId != null) {
+ counter = metricRegistry.counter(metricName, topologyId, componentId,
+ taskId, workerPort, streamId);
+ metricRegistry.gauge(metricName + ".m1_rate", this, topologyId, componentId, streamId,
+ taskId, workerPort);
+ } else {
+ counter = metricRegistry.counter(metricName, componentId, taskId);
+ metricRegistry.gauge(metricName + ".m1_rate", this, componentId, taskId);
+ }
+
this.timeSpanInSeconds = Math.max(60 - (60 % metricRegistry.getRateCounterUpdateIntervalSeconds()),
metricRegistry.getRateCounterUpdateIntervalSeconds());
this.values = new long[this.timeSpanInSeconds / metricRegistry.getRateCounterUpdateIntervalSeconds() + 1];
}
+ RateCounter(StormMetricRegistry metricRegistry, String metricName, String topologyId,
+ String componentId, int taskId, int workerPort) {
+ this(metricRegistry, metricName, topologyId, componentId, taskId, workerPort, null);
+ }
+
/**
* Reports the the average rate of events per second over 1 minute for the metric.
* @return the rate
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 18177b9..c442f3f 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -70,6 +70,13 @@ public class StormMetricRegistry implements MetricRegistryProvider {
return rateCounter;
}
+ public RateCounter rateCounter(String metricName, String componentId, int taskId) {
+ RateCounter rateCounter = new RateCounter(this, metricName, topologyId, componentId, taskId,
+ port);
+ rateCounters.add(rateCounter);
+ return rateCounter;
+ }
+
public <T> SimpleGauge<T> gauge(
T initialValue, String name, String topologyId, String componentId, Integer taskId, Integer port) {
Gauge gauge = new SimpleGauge<>(initialValue);
@@ -151,6 +158,13 @@ public class StormMetricRegistry implements MetricRegistryProvider {
return counter;
}
+ public Counter counter(String name, String componentId, Integer taskId) {
+ MetricNames metricNames = workerMetricName(name, topologyId, componentId, taskId, port);
+ Counter counter = registerCounter(metricNames, new Counter(), taskId, componentId, null);
+ saveMetricTaskIdMapping(taskId, metricNames, counter, taskIdCounters);
+ return counter;
+ }
+
public Timer timer(String name, TopologyContext context) {
MetricNames metricNames = topologyMetricName(name, context);
Timer timer = registerTimer(metricNames, new Timer(), context.getThisTaskId(), context.getThisComponentId(), null);