You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by GitBox <gi...@apache.org> on 2020/08/11 21:22:10 UTC

[GitHub] [storm] agresch opened a new pull request #3323: STORM-3673 remove v1 built in metrics in favor of TaskMetrics

agresch opened a new pull request #3323:
URL: https://github.com/apache/storm/pull/3323


   ## What is the purpose of the change
   
   Moving tuple counting metrics to V2 API exclusively.  They are currently using Counters.  Possibly we could use ResettingAverageGauge to map closer to the V1 getValueAndReset().  Since we already had been using Counters for TaskMetrics, I left this as is for now. 
   
   ResettingAverageGauge was added to track latencies and have lower performance impact than a Histogram.
   
   Classes such as MultiLatencyStatAndMetric no longer need their metric portion and have been renamed to reflect the Ioss of IMetric implementation.
   
   ## How was the change tested
   
   Ran word count and validated metric output.
   
   NOTE: LatencyStat unit test called getValueAndReset().  When I removed this functionality, the unit test was receiving unexpected differences in reported values.  If it's important to remove the unused getValueAndReset() method, I suggest opening a new JIRA to debug the issue.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3323: STORM-3673 remove v1 built in metrics in favor of TaskMetrics

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3323:
URL: https://github.com/apache/storm/pull/3323#discussion_r469678349



##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
##########
@@ -13,69 +13,151 @@
 package org.apache.storm.metrics2;
 
 import com.codahale.metrics.Counter;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.ConfigUtils;
 
 public class TaskMetrics {
-    private static final String METRIC_NAME_ACKED = "acked";
-    private static final String METRIC_NAME_FAILED = "failed";
-    private static final String METRIC_NAME_EMITTED = "emitted";
-    private static final String METRIC_NAME_TRANSFERRED = "transferred";
+    private static final String METRIC_NAME_ACKED = "__ack-count";
+    private static final String METRIC_NAME_FAILED = "__fail-count";
+    private static final String METRIC_NAME_EMITTED = "__emit-count";
+    private static final String METRIC_NAME_TRANSFERRED = "__transfer-count";
+    private static final String METRIC_NAME_EXECUTED = "__execute-count";
+    private static final String METRIC_NAME_PROCESS_LATENCY = "__process-latency";
+    private static final String METRIC_NAME_COMPLETE_LATENCY = "__complete-latency";
+    private static final String METRIC_NAME_EXECUTE_LATENCY = "__execute-latency";
 
-    private final ConcurrentMap<String, Counter> ackedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> failedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> emittedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> transferredByStream = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Counter> counters = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, ResettingAverageGauge> gauges = new ConcurrentHashMap<>();
 
     private final String topologyId;
     private final String componentId;
     private final Integer taskId;
     private final Integer workerPort;
     private final StormMetricRegistry metricRegistry;
+    private final int samplingRate;
 
-    public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid, StormMetricRegistry metricRegistry) {
+
+    public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid,
+                       StormMetricRegistry metricRegistry, Map<String, Object> topoConf) {
         this.metricRegistry = metricRegistry;
         this.topologyId = context.getStormId();
         this.componentId = componentId;
         this.taskId = taskid;
         this.workerPort = context.getThisWorkerPort();
+        this.samplingRate = ConfigUtils.samplingRate(topoConf);
+    }
+
+    public void spoutAckedTuple(String streamId, long latencyMs) {
+        String metricName = METRIC_NAME_ACKED + "-" + streamId;
+        Counter c = this.counters.get(metricName);
+        if (c == null) {

Review comment:
       I am not very sure about spoutAckedTuple at this moment. But methods like `boltFailedTuple` and `boltAckedTuple` might be invoked in multiple threads if the bolt is a multi-threaded bolt




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3323: STORM-3673 remove v1 built in metrics in favor of TaskMetrics

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3323:
URL: https://github.com/apache/storm/pull/3323#discussion_r477432315



##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java
##########
@@ -14,36 +14,22 @@
 
 import com.codahale.metrics.Gauge;
 
-public class ResettingAverageGauge implements Gauge<Long> {
-    private long total = 0L;
-    private long samples = 0L;
+public class RollingAverageGauge implements Gauge<Double> {
+    private long[] samples = new long[3];
+    private int index = 0;
 
-    public ResettingAverageGauge() {
-    }
-
-    public void addValue(long value) {
+    @Override
+    public Double getValue() {
         synchronized (this) {
-            total++;
-            samples += value;
+            long total = samples[0] + samples[1] + samples[2];

Review comment:
       At beginning, the values are all 0. So it is not accurate if there are not enough samples (less than 3).
   For example, if there is only one value, x, the result will be `x / 3.0`, while the result should have been `x`.
   
   But this is not very likely to happen since number of values will quickly increase to more than 3. I think it is acceptable.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3323: STORM-3673 remove v1 built in metrics in favor of TaskMetrics

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3323:
URL: https://github.com/apache/storm/pull/3323#discussion_r469373981



##########
File path: storm-core/test/clj/org/apache/storm/metrics_test.clj
##########
@@ -238,143 +238,4 @@
       )))
 
 
-(deftest test-builtin-metrics-1

Review comment:
       No.  This seemed like an integration test.  I'm not sure how valuable this is.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3323: STORM-3673 remove v1 built in metrics in favor of TaskMetrics

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3323:
URL: https://github.com/apache/storm/pull/3323#discussion_r468885240



##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/ResettingAverageGauge.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Gauge;
+
+public class ResettingAverageGauge implements Gauge<Long> {
+    private long total = 0L;
+    private long samples = 0L;
+
+    public ResettingAverageGauge() {
+    }
+
+    public void addValue(long value) {
+        synchronized (this) {
+            total++;
+            samples += value;
+        }
+    }
+
+    /**
+     * Returns the metric's current average value.  The data stored is reset when read.
+     *
+     * @return the metric's average value
+     */
+    @Override
+    public Long getValue() {
+        synchronized (this) {
+            if (samples <= 0L) {
+                return 0L;
+            } else {
+                long result = total / samples;

Review comment:
       Does `total` mean the sum and `samples` mean the number of records? 
   
   Line 26
   ```
               total++;
               samples += value;
   ```
   doesn't match

##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/ResettingAverageGauge.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Gauge;
+
+public class ResettingAverageGauge implements Gauge<Long> {
+    private long total = 0L;
+    private long samples = 0L;
+
+    public ResettingAverageGauge() {
+    }
+
+    public void addValue(long value) {
+        synchronized (this) {
+            total++;
+            samples += value;
+        }
+    }
+
+    /**
+     * Returns the metric's current average value.  The data stored is reset when read.
+     *
+     * @return the metric's average value
+     */
+    @Override
+    public Long getValue() {

Review comment:
       The latency value should be a double like before

##########
File path: storm-client/src/jvm/org/apache/storm/metric/internal/LatencyStat.java
##########
@@ -123,11 +122,6 @@ public void record(long latency) {
         }
     }
 
-    @Override
-    public synchronized Object getValueAndReset() {
-        return getValueAndReset(System.currentTimeMillis());
-    }
-
     synchronized Object getValueAndReset(long now) {

Review comment:
       Maybe add a comment to indicate why it is not removed, and consider adding `@VisibleForTesting` if it is proper

##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
##########
@@ -13,69 +13,151 @@
 package org.apache.storm.metrics2;
 
 import com.codahale.metrics.Counter;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.ConfigUtils;
 
 public class TaskMetrics {
-    private static final String METRIC_NAME_ACKED = "acked";
-    private static final String METRIC_NAME_FAILED = "failed";
-    private static final String METRIC_NAME_EMITTED = "emitted";
-    private static final String METRIC_NAME_TRANSFERRED = "transferred";
+    private static final String METRIC_NAME_ACKED = "__ack-count";
+    private static final String METRIC_NAME_FAILED = "__fail-count";
+    private static final String METRIC_NAME_EMITTED = "__emit-count";
+    private static final String METRIC_NAME_TRANSFERRED = "__transfer-count";
+    private static final String METRIC_NAME_EXECUTED = "__execute-count";
+    private static final String METRIC_NAME_PROCESS_LATENCY = "__process-latency";
+    private static final String METRIC_NAME_COMPLETE_LATENCY = "__complete-latency";
+    private static final String METRIC_NAME_EXECUTE_LATENCY = "__execute-latency";
 
-    private final ConcurrentMap<String, Counter> ackedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> failedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> emittedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> transferredByStream = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Counter> counters = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, ResettingAverageGauge> gauges = new ConcurrentHashMap<>();
 
     private final String topologyId;
     private final String componentId;
     private final Integer taskId;
     private final Integer workerPort;
     private final StormMetricRegistry metricRegistry;
+    private final int samplingRate;
 
-    public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid, StormMetricRegistry metricRegistry) {
+
+    public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid,
+                       StormMetricRegistry metricRegistry, Map<String, Object> topoConf) {
         this.metricRegistry = metricRegistry;
         this.topologyId = context.getStormId();
         this.componentId = componentId;
         this.taskId = taskid;
         this.workerPort = context.getThisWorkerPort();
+        this.samplingRate = ConfigUtils.samplingRate(topoConf);
+    }
+
+    public void spoutAckedTuple(String streamId, long latencyMs) {
+        String metricName = METRIC_NAME_ACKED + "-" + streamId;
+        Counter c = this.counters.get(metricName);
+        if (c == null) {
+            c = metricRegistry.counter(metricName, this.topologyId, this.componentId,
+                    this.taskId, this.workerPort, streamId);
+            this.counters.put(metricName, c);
+        }
+        c.inc(this.samplingRate);
+
+        metricName = METRIC_NAME_COMPLETE_LATENCY + "-" + streamId;
+        ResettingAverageGauge gauge = this.gauges.get(metricName);
+        if (gauge == null) {
+            gauge = new ResettingAverageGauge();
+            metricRegistry.gauge(metricName, gauge, this.topologyId, this.componentId,
+                    streamId, this.taskId, this.workerPort);
+            this.gauges.put(metricName, gauge);
+        }
+        gauge.addValue(latencyMs);
     }
 
-    public Counter getAcked(String streamId) {
-        Counter c = this.ackedByStream.get(streamId);
+    public void boltAckedTuple(String sourceComponentId, String sourceStreamId, long latencyMs) {
+        String key = sourceComponentId + ":" + sourceStreamId;
+        String metricName = METRIC_NAME_ACKED + "-" + key;
+        Counter c = this.counters.get(metricName);
         if (c == null) {
-            c = metricRegistry.counter(METRIC_NAME_ACKED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
-            this.ackedByStream.put(streamId, c);
+            c = metricRegistry.counter(metricName, this.topologyId, this.componentId,
+                    this.taskId, this.workerPort, sourceStreamId);
+            this.counters.put(metricName, c);
         }
-        return c;
+        c.inc(this.samplingRate);
+
+        metricName = METRIC_NAME_PROCESS_LATENCY + "-" + key;
+        ResettingAverageGauge gauge = this.gauges.get(metricName);
+        if (gauge == null) {
+            gauge = new ResettingAverageGauge();
+            metricRegistry.gauge(metricName, gauge, this.topologyId, this.componentId,
+                    sourceStreamId, this.taskId, this.workerPort);
+            this.gauges.put(metricName, gauge);
+        }
+        gauge.addValue(latencyMs);
     }
 
-    public Counter getFailed(String streamId) {
-        Counter c = this.failedByStream.get(streamId);
+    public void spoutFailedTuple(String streamId) {

Review comment:
       Not directly related to this PR. But at some point, we need to update https://github.com/apache/storm/blob/master/docs/Metrics.md#__complete-latency
   https://github.com/apache/storm/blob/master/docs/Metrics.md#__process-latency
   
   They say " It is the average amount of time it took for ack or fail to be called". But the latency is only calculated on `ack` not on `fail`.  
   
   The original code https://github.com/apache/storm/blob/0.5.0/src/clj/backtype/storm/stats.clj#L192-L210 is implemented the same. I believe this is a mistake introduced in https://issues.apache.org/jira/browse/STORM-2616

##########
File path: storm-client/src/jvm/org/apache/storm/metric/internal/MultiLatencyStat.java
##########
@@ -68,20 +67,9 @@ protected String keyToString(T key) {
         return key.toString();
     }
 
-    @Override
-    public Object getValueAndReset() {
-        Map<String, Double> ret = new HashMap<String, Double>();
-        for (Map.Entry<T, LatencyStatAndMetric> entry : lat.entrySet()) {
-            String key = keyToString(entry.getKey());

Review comment:
       `keyToString` method can be removed too

##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/ResettingAverageGauge.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Gauge;
+
+public class ResettingAverageGauge implements Gauge<Long> {

Review comment:
       Sorry I just realized resetting gauge won't work when there are multiple reporters. Because every reporter will invoke `getValue`, the value will be reset by the first reporter so the following reporters won't get useful metric..

##########
File path: storm-client/src/jvm/org/apache/storm/metric/internal/MultiLatencyStat.java
##########
@@ -16,32 +16,31 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.storm.metric.api.IMetric;
 
 /**
- * Acts as a Latnecy Metric for multiple keys, but keeps track of approximate counts for the last 10 mins, 3 hours, 1 day, and all time. for
+ * Keeps track of approximate counts for the last 10 mins, 3 hours, 1 day, and all time. for

Review comment:
       nit:  `approximate latency`

##########
File path: storm-core/test/clj/org/apache/storm/metrics_test.clj
##########
@@ -238,143 +238,4 @@
       )))
 
 
-(deftest test-builtin-metrics-1

Review comment:
       Do we have a replacement for these tests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on pull request #3323: STORM-3673 remove v1 built in metrics in favor of TaskMetrics

Posted by GitBox <gi...@apache.org>.
agresch commented on pull request #3323:
URL: https://github.com/apache/storm/pull/3323#issuecomment-672978482


   Good point on the ResettingGauge.  Spoke with Ethan and suggested creating a Gauge that averages the last 3 samples reported instead.  
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm merged pull request #3323: STORM-3673 remove v1 built in metrics in favor of TaskMetrics

Posted by GitBox <gi...@apache.org>.
Ethanlm merged pull request #3323:
URL: https://github.com/apache/storm/pull/3323


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3323: STORM-3673 remove v1 built in metrics in favor of TaskMetrics

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3323:
URL: https://github.com/apache/storm/pull/3323#discussion_r470013538



##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
##########
@@ -13,69 +13,151 @@
 package org.apache.storm.metrics2;
 
 import com.codahale.metrics.Counter;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.ConfigUtils;
 
 public class TaskMetrics {
-    private static final String METRIC_NAME_ACKED = "acked";
-    private static final String METRIC_NAME_FAILED = "failed";
-    private static final String METRIC_NAME_EMITTED = "emitted";
-    private static final String METRIC_NAME_TRANSFERRED = "transferred";
+    private static final String METRIC_NAME_ACKED = "__ack-count";
+    private static final String METRIC_NAME_FAILED = "__fail-count";
+    private static final String METRIC_NAME_EMITTED = "__emit-count";
+    private static final String METRIC_NAME_TRANSFERRED = "__transfer-count";
+    private static final String METRIC_NAME_EXECUTED = "__execute-count";
+    private static final String METRIC_NAME_PROCESS_LATENCY = "__process-latency";
+    private static final String METRIC_NAME_COMPLETE_LATENCY = "__complete-latency";
+    private static final String METRIC_NAME_EXECUTE_LATENCY = "__execute-latency";
 
-    private final ConcurrentMap<String, Counter> ackedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> failedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> emittedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> transferredByStream = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Counter> counters = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, ResettingAverageGauge> gauges = new ConcurrentHashMap<>();
 
     private final String topologyId;
     private final String componentId;
     private final Integer taskId;
     private final Integer workerPort;
     private final StormMetricRegistry metricRegistry;
+    private final int samplingRate;
 
-    public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid, StormMetricRegistry metricRegistry) {
+
+    public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid,
+                       StormMetricRegistry metricRegistry, Map<String, Object> topoConf) {
         this.metricRegistry = metricRegistry;
         this.topologyId = context.getStormId();
         this.componentId = componentId;
         this.taskId = taskid;
         this.workerPort = context.getThisWorkerPort();
+        this.samplingRate = ConfigUtils.samplingRate(topoConf);
+    }
+
+    public void spoutAckedTuple(String streamId, long latencyMs) {
+        String metricName = METRIC_NAME_ACKED + "-" + streamId;
+        Counter c = this.counters.get(metricName);
+        if (c == null) {

Review comment:
       ok, will address this 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3323: STORM-3673 remove v1 built in metrics in favor of TaskMetrics

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3323:
URL: https://github.com/apache/storm/pull/3323#discussion_r469421513



##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
##########
@@ -13,69 +13,151 @@
 package org.apache.storm.metrics2;
 
 import com.codahale.metrics.Counter;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.ConfigUtils;
 
 public class TaskMetrics {
-    private static final String METRIC_NAME_ACKED = "acked";
-    private static final String METRIC_NAME_FAILED = "failed";
-    private static final String METRIC_NAME_EMITTED = "emitted";
-    private static final String METRIC_NAME_TRANSFERRED = "transferred";
+    private static final String METRIC_NAME_ACKED = "__ack-count";
+    private static final String METRIC_NAME_FAILED = "__fail-count";
+    private static final String METRIC_NAME_EMITTED = "__emit-count";
+    private static final String METRIC_NAME_TRANSFERRED = "__transfer-count";
+    private static final String METRIC_NAME_EXECUTED = "__execute-count";
+    private static final String METRIC_NAME_PROCESS_LATENCY = "__process-latency";
+    private static final String METRIC_NAME_COMPLETE_LATENCY = "__complete-latency";
+    private static final String METRIC_NAME_EXECUTE_LATENCY = "__execute-latency";
 
-    private final ConcurrentMap<String, Counter> ackedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> failedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> emittedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> transferredByStream = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Counter> counters = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, ResettingAverageGauge> gauges = new ConcurrentHashMap<>();
 
     private final String topologyId;
     private final String componentId;
     private final Integer taskId;
     private final Integer workerPort;
     private final StormMetricRegistry metricRegistry;
+    private final int samplingRate;
 
-    public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid, StormMetricRegistry metricRegistry) {
+
+    public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid,
+                       StormMetricRegistry metricRegistry, Map<String, Object> topoConf) {
         this.metricRegistry = metricRegistry;
         this.topologyId = context.getStormId();
         this.componentId = componentId;
         this.taskId = taskid;
         this.workerPort = context.getThisWorkerPort();
+        this.samplingRate = ConfigUtils.samplingRate(topoConf);
+    }
+
+    public void spoutAckedTuple(String streamId, long latencyMs) {
+        String metricName = METRIC_NAME_ACKED + "-" + streamId;
+        Counter c = this.counters.get(metricName);
+        if (c == null) {

Review comment:
       Do we need synchronization here? Will the object be created multiple times?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3323: STORM-3673 remove v1 built in metrics in favor of TaskMetrics

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3323:
URL: https://github.com/apache/storm/pull/3323#discussion_r469436135



##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
##########
@@ -13,69 +13,151 @@
 package org.apache.storm.metrics2;
 
 import com.codahale.metrics.Counter;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.ConfigUtils;
 
 public class TaskMetrics {
-    private static final String METRIC_NAME_ACKED = "acked";
-    private static final String METRIC_NAME_FAILED = "failed";
-    private static final String METRIC_NAME_EMITTED = "emitted";
-    private static final String METRIC_NAME_TRANSFERRED = "transferred";
+    private static final String METRIC_NAME_ACKED = "__ack-count";
+    private static final String METRIC_NAME_FAILED = "__fail-count";
+    private static final String METRIC_NAME_EMITTED = "__emit-count";
+    private static final String METRIC_NAME_TRANSFERRED = "__transfer-count";
+    private static final String METRIC_NAME_EXECUTED = "__execute-count";
+    private static final String METRIC_NAME_PROCESS_LATENCY = "__process-latency";
+    private static final String METRIC_NAME_COMPLETE_LATENCY = "__complete-latency";
+    private static final String METRIC_NAME_EXECUTE_LATENCY = "__execute-latency";
 
-    private final ConcurrentMap<String, Counter> ackedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> failedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> emittedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> transferredByStream = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Counter> counters = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, ResettingAverageGauge> gauges = new ConcurrentHashMap<>();
 
     private final String topologyId;
     private final String componentId;
     private final Integer taskId;
     private final Integer workerPort;
     private final StormMetricRegistry metricRegistry;
+    private final int samplingRate;
 
-    public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid, StormMetricRegistry metricRegistry) {
+
+    public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid,
+                       StormMetricRegistry metricRegistry, Map<String, Object> topoConf) {
         this.metricRegistry = metricRegistry;
         this.topologyId = context.getStormId();
         this.componentId = componentId;
         this.taskId = taskid;
         this.workerPort = context.getThisWorkerPort();
+        this.samplingRate = ConfigUtils.samplingRate(topoConf);
+    }
+
+    public void spoutAckedTuple(String streamId, long latencyMs) {
+        String metricName = METRIC_NAME_ACKED + "-" + streamId;
+        Counter c = this.counters.get(metricName);
+        if (c == null) {

Review comment:
       I assumed that we were thread safe from the caller.  But my understanding could be wrong.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org