You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/09/30 21:24:40 UTC
[storm] branch master updated: STORM-3697 add capacity metric
(#3333)
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 7bef73a STORM-3697 add capacity metric (#3333)
7bef73a is described below
commit 7bef73a6faa14558ef254efe74cbe4bfef81c2e2
Author: agresch <ag...@gmail.com>
AuthorDate: Wed Sep 30 16:24:22 2020 -0500
STORM-3697 add capacity metric (#3333)
---
.../apache/storm/executor/bolt/BoltExecutor.java | 8 ++--
.../storm/metric/internal/MultiCountStat.java | 2 +
.../apache/storm/metrics2/RollingAverageGauge.java | 6 +--
.../jvm/org/apache/storm/metrics2/TaskMetrics.java | 9 ++++
.../org/apache/storm/stats/BoltExecutorStats.java | 50 +++++++++++++++++++++-
5 files changed, 68 insertions(+), 7 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index bc9d6b0..ddd830d 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -227,9 +227,11 @@ public class BoltExecutor extends Executor {
new BoltExecuteInfo(tuple, taskId, delta).applyOn(topologyContext);
}
if (delta >= 0) {
- stats.boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
- Task task = idToTask.get(taskId - idToTaskBase);
- task.getTaskMetrics().boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
+ Task firstTask = idToTask.get(taskIds.get(0) - idToTaskBase);
+ stats.boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta,
+ workerData.getUptime().upTime(), firstTask);
+ Task currentTask = idToTask.get(taskId - idToTaskBase);
+ currentTask.getTaskMetrics().boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
}
}
}
diff --git a/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStat.java b/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStat.java
index 25fa270..8bf18d5 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStat.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStat.java
@@ -21,6 +21,8 @@ import java.util.concurrent.ConcurrentHashMap;
* Acts as a MultiCount Stat, but keeps track of approximate counts for the last 10 mins, 3 hours, 1 day, and all time. for the same keys
*/
public class MultiCountStat<T> {
+ public static final int TEN_MIN_IN_SECONDS = 60 * 10;
+ public static final String TEN_MIN_IN_SECONDS_STR = TEN_MIN_IN_SECONDS + "";
private final int numBuckets;
private ConcurrentHashMap<T, CountStat> counts = new ConcurrentHashMap<>();
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java b/storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java
index 650fdee..9a50bb9 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java
@@ -15,18 +15,18 @@ package org.apache.storm.metrics2;
import com.codahale.metrics.Gauge;
public class RollingAverageGauge implements Gauge<Double> {
- private long[] samples = new long[3];
+ private double[] samples = new double[3];
private int index = 0;
@Override
public Double getValue() {
synchronized (this) {
- long total = samples[0] + samples[1] + samples[2];
+ double total = samples[0] + samples[1] + samples[2];
return total / 3.0;
}
}
- public void addValue(long value) {
+ public void addValue(double value) {
synchronized (this) {
samples[index] = value;
index = (++index % 3);
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
index d52190f..43903fe 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
@@ -18,6 +18,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
public class TaskMetrics {
private static final String METRIC_NAME_ACKED = "__ack-count";
@@ -28,6 +29,7 @@ public class TaskMetrics {
private static final String METRIC_NAME_PROCESS_LATENCY = "__process-latency";
private static final String METRIC_NAME_COMPLETE_LATENCY = "__complete-latency";
private static final String METRIC_NAME_EXECUTE_LATENCY = "__execute-latency";
+ private static final String METRIC_NAME_CAPACITY = "__capacity";
private final ConcurrentMap<String, Counter> counters = new ConcurrentHashMap<>();
private final ConcurrentMap<String, RollingAverageGauge> gauges = new ConcurrentHashMap<>();
@@ -50,6 +52,13 @@ public class TaskMetrics {
this.samplingRate = ConfigUtils.samplingRate(topoConf);
}
+ public void setCapacity(double capacity) {
+ String metricName = METRIC_NAME_CAPACITY;
+ // capacity is over all streams, will report using the default streamId
+ RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, Utils.DEFAULT_STREAM_ID);
+ gauge.addValue(capacity);
+ }
+
public void spoutAckedTuple(String streamId, long latencyMs) {
String metricName = METRIC_NAME_ACKED + "-" + streamId;
Counter c = this.getCounter(metricName, streamId);
diff --git a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index 030b0d7..26e3776 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@ -12,7 +12,12 @@
package org.apache.storm.stats;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.daemon.Task;
import org.apache.storm.generated.BoltStats;
import org.apache.storm.generated.ExecutorSpecificStats;
import org.apache.storm.generated.ExecutorStats;
@@ -53,10 +58,53 @@ public class BoltExecutorStats extends CommonStats {
super.cleanupStats();
}
- public void boltExecuteTuple(String component, String stream, long latencyMs) {
+ public void boltExecuteTuple(String component, String stream, long latencyMs, long workerUptimeSecs,
+ Task firstExecutorTask) {
List key = Lists.newArrayList(component, stream);
this.getExecuted().incBy(key, this.rate);
this.getExecuteLatencies().record(key, latencyMs);
+
+ // Calculate capacity: This is really for the whole executor, but we will use the executor's first task
+ // for reporting the metric.
+ double capacity = calculateCapacity(workerUptimeSecs);
+ firstExecutorTask.getTaskMetrics().setCapacity(capacity);
+ }
+
+ private double calculateCapacity(long workerUptimeSecs) {
+ if (workerUptimeSecs > 0) {
+ Map<String, Double> execAvg = valueStat(this.getExecuteLatencies()).get(MultiCountStat.TEN_MIN_IN_SECONDS_STR);
+ Map<String, Long> exec = valueStat(this.getExecuted()).get(MultiCountStat.TEN_MIN_IN_SECONDS_STR);
+
+ Set<Object> allKeys = new HashSet<>();
+ if (execAvg != null) {
+ allKeys.addAll(execAvg.keySet());
+ }
+ if (exec != null) {
+ allKeys.addAll(exec.keySet());
+ }
+
+ double totalAvg = 0;
+ for (Object k : allKeys) {
+ double avg = getOr0(execAvg, k).doubleValue();
+ long cnt = getOr0(exec, k).longValue();
+ totalAvg += avg * cnt;
+ }
+
+ return totalAvg / (Math.min(workerUptimeSecs, MultiCountStat.TEN_MIN_IN_SECONDS) * 1000);
+ }
+ return 0.0;
+ }
+
+ private static Number getOr0(Map m, Object k) {
+ if (m == null) {
+ return 0;
+ }
+
+ Number n = (Number) m.get(k);
+ if (n == null) {
+ return 0;
+ }
+ return n;
}
public void boltAckedTuple(String component, String stream, long latencyMs) {