You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/09/30 21:24:40 UTC

[storm] branch master updated: STORM-3697 add capacity metric (#3333)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bef73a  STORM-3697 add capacity metric (#3333)
7bef73a is described below

commit 7bef73a6faa14558ef254efe74cbe4bfef81c2e2
Author: agresch <ag...@gmail.com>
AuthorDate: Wed Sep 30 16:24:22 2020 -0500

    STORM-3697 add capacity metric (#3333)
---
 .../apache/storm/executor/bolt/BoltExecutor.java   |  8 ++--
 .../storm/metric/internal/MultiCountStat.java      |  2 +
 .../apache/storm/metrics2/RollingAverageGauge.java |  6 +--
 .../jvm/org/apache/storm/metrics2/TaskMetrics.java |  9 ++++
 .../org/apache/storm/stats/BoltExecutorStats.java  | 50 +++++++++++++++++++++-
 5 files changed, 68 insertions(+), 7 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index bc9d6b0..ddd830d 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -227,9 +227,11 @@ public class BoltExecutor extends Executor {
                 new BoltExecuteInfo(tuple, taskId, delta).applyOn(topologyContext);
             }
             if (delta >= 0) {
-                stats.boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
-                Task task = idToTask.get(taskId - idToTaskBase);
-                task.getTaskMetrics().boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
+                Task firstTask = idToTask.get(taskIds.get(0) - idToTaskBase);
+                stats.boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta,
+                        workerData.getUptime().upTime(), firstTask);
+                Task currentTask = idToTask.get(taskId - idToTaskBase);
+                currentTask.getTaskMetrics().boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
             }
         }
     }
diff --git a/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStat.java b/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStat.java
index 25fa270..8bf18d5 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStat.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStat.java
@@ -21,6 +21,8 @@ import java.util.concurrent.ConcurrentHashMap;
  * Acts as a MultiCount Stat, but keeps track of approximate counts for the last 10 mins, 3 hours, 1 day, and all time. for the same keys
  */
 public class MultiCountStat<T> {
+    public static final int TEN_MIN_IN_SECONDS = 60 * 10;
+    public static final String TEN_MIN_IN_SECONDS_STR = TEN_MIN_IN_SECONDS + "";
     private final int numBuckets;
     private ConcurrentHashMap<T, CountStat> counts = new ConcurrentHashMap<>();
 
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java b/storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java
index 650fdee..9a50bb9 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java
@@ -15,18 +15,18 @@ package org.apache.storm.metrics2;
 import com.codahale.metrics.Gauge;
 
 public class RollingAverageGauge implements Gauge<Double> {
-    private long[] samples = new long[3];
+    private double[] samples = new double[3];
     private int index = 0;
 
     @Override
     public Double getValue() {
         synchronized (this) {
-            long total = samples[0] + samples[1] + samples[2];
+            double total = samples[0] + samples[1] + samples[2];
             return total / 3.0;
         }
     }
 
-    public void addValue(long value) {
+    public void addValue(double value) {
         synchronized (this) {
             samples[index] = value;
             index = (++index % 3);
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
index d52190f..43903fe 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
@@ -18,6 +18,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.storm.task.WorkerTopologyContext;
 import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
 
 public class TaskMetrics {
     private static final String METRIC_NAME_ACKED = "__ack-count";
@@ -28,6 +29,7 @@ public class TaskMetrics {
     private static final String METRIC_NAME_PROCESS_LATENCY = "__process-latency";
     private static final String METRIC_NAME_COMPLETE_LATENCY = "__complete-latency";
     private static final String METRIC_NAME_EXECUTE_LATENCY = "__execute-latency";
+    private static final String METRIC_NAME_CAPACITY = "__capacity";
 
     private final ConcurrentMap<String, Counter> counters = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, RollingAverageGauge> gauges = new ConcurrentHashMap<>();
@@ -50,6 +52,13 @@ public class TaskMetrics {
         this.samplingRate = ConfigUtils.samplingRate(topoConf);
     }
 
+    public void setCapacity(double capacity) {
+        String metricName = METRIC_NAME_CAPACITY;
+        // capacity is over all streams, will report using the default streamId
+        RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, Utils.DEFAULT_STREAM_ID);
+        gauge.addValue(capacity);
+    }
+
     public void spoutAckedTuple(String streamId, long latencyMs) {
         String metricName = METRIC_NAME_ACKED + "-" + streamId;
         Counter c = this.getCounter(metricName, streamId);
diff --git a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index 030b0d7..26e3776 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@ -12,7 +12,12 @@
 
 package org.apache.storm.stats;
 
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.daemon.Task;
 import org.apache.storm.generated.BoltStats;
 import org.apache.storm.generated.ExecutorSpecificStats;
 import org.apache.storm.generated.ExecutorStats;
@@ -53,10 +58,53 @@ public class BoltExecutorStats extends CommonStats {
         super.cleanupStats();
     }
 
-    public void boltExecuteTuple(String component, String stream, long latencyMs) {
+    public void boltExecuteTuple(String component, String stream, long latencyMs, long workerUptimeSecs,
+                                 Task firstExecutorTask) {
         List key = Lists.newArrayList(component, stream);
         this.getExecuted().incBy(key, this.rate);
         this.getExecuteLatencies().record(key, latencyMs);
+
+        // Calculate capacity:  This is really for the whole executor, but we will use the executor's first task
+        // for reporting the metric.
+        double capacity = calculateCapacity(workerUptimeSecs);
+        firstExecutorTask.getTaskMetrics().setCapacity(capacity);
+    }
+
+    private double calculateCapacity(long workerUptimeSecs) {
+        if (workerUptimeSecs > 0) {
+            Map<String, Double> execAvg = valueStat(this.getExecuteLatencies()).get(MultiCountStat.TEN_MIN_IN_SECONDS_STR);
+            Map<String, Long> exec = valueStat(this.getExecuted()).get(MultiCountStat.TEN_MIN_IN_SECONDS_STR);
+
+            Set<Object> allKeys = new HashSet<>();
+            if (execAvg != null) {
+                allKeys.addAll(execAvg.keySet());
+            }
+            if (exec != null) {
+                allKeys.addAll(exec.keySet());
+            }
+
+            double totalAvg = 0;
+            for (Object k : allKeys) {
+                double avg = getOr0(execAvg, k).doubleValue();
+                long cnt = getOr0(exec, k).longValue();
+                totalAvg += avg * cnt;
+            }
+
+            return totalAvg / (Math.min(workerUptimeSecs, MultiCountStat.TEN_MIN_IN_SECONDS) * 1000);
+        }
+        return 0.0;
+    }
+
+    private static Number getOr0(Map m, Object k) {
+        if (m == null) {
+            return 0;
+        }
+
+        Number n = (Number) m.get(k);
+        if (n == null) {
+            return 0;
+        }
+        return n;
     }
 
     public void boltAckedTuple(String component, String stream, long latencyMs) {