You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by GitBox <gi...@apache.org> on 2020/08/12 15:59:59 UTC

[GitHub] [storm] Ethanlm commented on a change in pull request #3323: STORM-3673 remove v1 built in metrics in favor of TaskMetrics

Ethanlm commented on a change in pull request #3323:
URL: https://github.com/apache/storm/pull/3323#discussion_r468885240



##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/ResettingAverageGauge.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Gauge;
+
+public class ResettingAverageGauge implements Gauge<Long> {
+    private long total = 0L;
+    private long samples = 0L;
+
+    public ResettingAverageGauge() {
+    }
+
+    public void addValue(long value) {
+        synchronized (this) {
+            total++;
+            samples += value;
+        }
+    }
+
+    /**
+     * Returns the metric's current average value.  The data stored is reset when read.
+     *
+     * @return the metric's average value
+     */
+    @Override
+    public Long getValue() {
+        synchronized (this) {
+            if (samples <= 0L) {
+                return 0L;
+            } else {
+                long result = total / samples;

Review comment:
       Does `total` mean the sum and `samples` mean the number of records? 
   
   Line 26
   ```
               total++;
               samples += value;
   ```
   doesn't match

##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/ResettingAverageGauge.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Gauge;
+
+public class ResettingAverageGauge implements Gauge<Long> {
+    private long total = 0L;
+    private long samples = 0L;
+
+    public ResettingAverageGauge() {
+    }
+
+    public void addValue(long value) {
+        synchronized (this) {
+            total++;
+            samples += value;
+        }
+    }
+
+    /**
+     * Returns the metric's current average value.  The data stored is reset when read.
+     *
+     * @return the metric's average value
+     */
+    @Override
+    public Long getValue() {

Review comment:
       The latency value should be a double like before

##########
File path: storm-client/src/jvm/org/apache/storm/metric/internal/LatencyStat.java
##########
@@ -123,11 +122,6 @@ public void record(long latency) {
         }
     }
 
-    @Override
-    public synchronized Object getValueAndReset() {
-        return getValueAndReset(System.currentTimeMillis());
-    }
-
     synchronized Object getValueAndReset(long now) {

Review comment:
       Maybe add a comment to indicate why it is not removed, and consider adding `@VisibleForTesting` if it is proper

##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
##########
@@ -13,69 +13,151 @@
 package org.apache.storm.metrics2;
 
 import com.codahale.metrics.Counter;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.ConfigUtils;
 
 public class TaskMetrics {
-    private static final String METRIC_NAME_ACKED = "acked";
-    private static final String METRIC_NAME_FAILED = "failed";
-    private static final String METRIC_NAME_EMITTED = "emitted";
-    private static final String METRIC_NAME_TRANSFERRED = "transferred";
+    private static final String METRIC_NAME_ACKED = "__ack-count";
+    private static final String METRIC_NAME_FAILED = "__fail-count";
+    private static final String METRIC_NAME_EMITTED = "__emit-count";
+    private static final String METRIC_NAME_TRANSFERRED = "__transfer-count";
+    private static final String METRIC_NAME_EXECUTED = "__execute-count";
+    private static final String METRIC_NAME_PROCESS_LATENCY = "__process-latency";
+    private static final String METRIC_NAME_COMPLETE_LATENCY = "__complete-latency";
+    private static final String METRIC_NAME_EXECUTE_LATENCY = "__execute-latency";
 
-    private final ConcurrentMap<String, Counter> ackedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> failedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> emittedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> transferredByStream = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Counter> counters = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, ResettingAverageGauge> gauges = new ConcurrentHashMap<>();
 
     private final String topologyId;
     private final String componentId;
     private final Integer taskId;
     private final Integer workerPort;
     private final StormMetricRegistry metricRegistry;
+    private final int samplingRate;
 
-    public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid, StormMetricRegistry metricRegistry) {
+
+    public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid,
+                       StormMetricRegistry metricRegistry, Map<String, Object> topoConf) {
         this.metricRegistry = metricRegistry;
         this.topologyId = context.getStormId();
         this.componentId = componentId;
         this.taskId = taskid;
         this.workerPort = context.getThisWorkerPort();
+        this.samplingRate = ConfigUtils.samplingRate(topoConf);
+    }
+
+    public void spoutAckedTuple(String streamId, long latencyMs) {
+        String metricName = METRIC_NAME_ACKED + "-" + streamId;
+        Counter c = this.counters.get(metricName);
+        if (c == null) {
+            c = metricRegistry.counter(metricName, this.topologyId, this.componentId,
+                    this.taskId, this.workerPort, streamId);
+            this.counters.put(metricName, c);
+        }
+        c.inc(this.samplingRate);
+
+        metricName = METRIC_NAME_COMPLETE_LATENCY + "-" + streamId;
+        ResettingAverageGauge gauge = this.gauges.get(metricName);
+        if (gauge == null) {
+            gauge = new ResettingAverageGauge();
+            metricRegistry.gauge(metricName, gauge, this.topologyId, this.componentId,
+                    streamId, this.taskId, this.workerPort);
+            this.gauges.put(metricName, gauge);
+        }
+        gauge.addValue(latencyMs);
     }
 
-    public Counter getAcked(String streamId) {
-        Counter c = this.ackedByStream.get(streamId);
+    public void boltAckedTuple(String sourceComponentId, String sourceStreamId, long latencyMs) {
+        String key = sourceComponentId + ":" + sourceStreamId;
+        String metricName = METRIC_NAME_ACKED + "-" + key;
+        Counter c = this.counters.get(metricName);
         if (c == null) {
-            c = metricRegistry.counter(METRIC_NAME_ACKED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
-            this.ackedByStream.put(streamId, c);
+            c = metricRegistry.counter(metricName, this.topologyId, this.componentId,
+                    this.taskId, this.workerPort, sourceStreamId);
+            this.counters.put(metricName, c);
         }
-        return c;
+        c.inc(this.samplingRate);
+
+        metricName = METRIC_NAME_PROCESS_LATENCY + "-" + key;
+        ResettingAverageGauge gauge = this.gauges.get(metricName);
+        if (gauge == null) {
+            gauge = new ResettingAverageGauge();
+            metricRegistry.gauge(metricName, gauge, this.topologyId, this.componentId,
+                    sourceStreamId, this.taskId, this.workerPort);
+            this.gauges.put(metricName, gauge);
+        }
+        gauge.addValue(latencyMs);
     }
 
-    public Counter getFailed(String streamId) {
-        Counter c = this.failedByStream.get(streamId);
+    public void spoutFailedTuple(String streamId) {

Review comment:
       Not directly related to this PR. But at some point, we need to update https://github.com/apache/storm/blob/master/docs/Metrics.md#__complete-latency
   https://github.com/apache/storm/blob/master/docs/Metrics.md#__process-latency
   
   They say " It is the average amount of time it took for ack or fail to be called". But the latency is only calculated on `ack` not on `fail`.  
   
   The original code https://github.com/apache/storm/blob/0.5.0/src/clj/backtype/storm/stats.clj#L192-L210 is implemented the same. I believe this is a mistake introduced in https://issues.apache.org/jira/browse/STORM-2616

##########
File path: storm-client/src/jvm/org/apache/storm/metric/internal/MultiLatencyStat.java
##########
@@ -68,20 +67,9 @@ protected String keyToString(T key) {
         return key.toString();
     }
 
-    @Override
-    public Object getValueAndReset() {
-        Map<String, Double> ret = new HashMap<String, Double>();
-        for (Map.Entry<T, LatencyStatAndMetric> entry : lat.entrySet()) {
-            String key = keyToString(entry.getKey());

Review comment:
       `keyToString` method can be removed too

##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/ResettingAverageGauge.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Gauge;
+
+public class ResettingAverageGauge implements Gauge<Long> {

Review comment:
       Sorry I just realized resetting gauge won't work when there are multiple reporters. Because every reporter will invoke `getValue`, the value will be reset by the first reporter so the following reporters won't get useful metric..

##########
File path: storm-client/src/jvm/org/apache/storm/metric/internal/MultiLatencyStat.java
##########
@@ -16,32 +16,31 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.storm.metric.api.IMetric;
 
 /**
- * Acts as a Latnecy Metric for multiple keys, but keeps track of approximate counts for the last 10 mins, 3 hours, 1 day, and all time. for
+ * Keeps track of approximate counts for the last 10 mins, 3 hours, 1 day, and all time. for

Review comment:
       nit:  `approximate latency`

##########
File path: storm-core/test/clj/org/apache/storm/metrics_test.clj
##########
@@ -238,143 +238,4 @@
       )))
 
 
-(deftest test-builtin-metrics-1

Review comment:
       Do we have a replacement for these tests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org