You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/11 20:40:21 UTC

storm git commit: STORM-2153: eliminate string concatenation when looking up metrics

Repository: storm
Updated Branches:
  refs/heads/metrics_v2 8d53800f1 -> d3c00ee77


STORM-2153: eliminate string concatenation when looking up metrics


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d3c00ee7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d3c00ee7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d3c00ee7

Branch: refs/heads/metrics_v2
Commit: d3c00ee7705b2d7b1bba4afd1146fb4c258a471d
Parents: 8d53800
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu Jan 11 15:39:37 2018 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu Jan 11 15:39:37 2018 -0500

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/executor.clj    | 11 +--
 .../src/clj/org/apache/storm/daemon/task.clj    | 12 ++--
 .../storm/metrics2/StormMetricRegistry.java     |  5 ++
 .../org/apache/storm/metrics2/TaskMetrics.java  | 72 ++++++++++++++++++++
 4 files changed, 89 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d3c00ee7/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index e8d23e5..3dd7289 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -35,7 +35,7 @@
   (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric])
   (:import [org.apache.storm Config Constants])
   (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
-  (:import [org.apache.storm.metrics2 StormMetricRegistry])
+  (:import [org.apache.storm.metrics2 StormMetricRegistry TaskMetrics])
   (:import [com.codahale.metrics Meter Counter])
   (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
   (:import [java.util.concurrent ConcurrentLinkedQueue])
@@ -265,6 +265,7 @@
      :stats (mk-executor-stats <> (sampling-rate storm-conf))
      :interval->task->metric-registry (HashMap.)
      :task->component (:task->component worker)
+     :task-metrics (TaskMetrics/taskMetricsMap (first task-ids) (last task-ids) worker-context component-id)
      :stream->component->grouper (outbound-components worker-context component-id storm-conf)
      :report-error (throttled-report-error-fn <>)
      :report-error-and-die (fn [error]
@@ -442,7 +443,7 @@
     (.fail spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
     (when time-delta
-      (stats/spout-failed-tuple! (:stats executor-data) (StormMetricRegistry/counter "failed" (:worker-context executor-data) (:component-id executor-data) task-id (:stream tuple-info)) (:stream tuple-info) time-delta))))
+      (stats/spout-failed-tuple! (:stats executor-data)  (.getFailed ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (:stream tuple-info)) (:stream tuple-info) time-delta))))
 
 (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?]
   (let [^ISpout spout (:object task-data)
@@ -451,7 +452,7 @@
     (.ack spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
     (when time-delta
-      (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" (:worker-context executor-data) (:component-id executor-data) task-id (:stream tuple-info)) (:stream tuple-info) time-delta))))
+      (stats/spout-acked-tuple! (:stats executor-data) (.getAcked ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (:stream tuple-info)) (:stream tuple-info) time-delta))))
 
 (defn mk-task-receiver [executor-data tuple-action-fn]
   (let [task-ids (:task-ids executor-data)
@@ -820,7 +821,7 @@
                            (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
                            (when (<= 0 delta)
                              (stats/bolt-acked-tuple! executor-stats
-                                                      (StormMetricRegistry/counter "acked" worker-context  (:component-id executor-data) task-id (.getSourceStreamId tuple))
+                                                      (.getAcked ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (.getSourceStreamId tuple))
                                                       (.getSourceComponent tuple)
                                                       (.getSourceStreamId tuple)
                                                       delta))))
@@ -836,7 +837,7 @@
                            (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
                            (when (<= 0 delta)
                              (stats/bolt-failed-tuple! executor-stats
-                                                       (StormMetricRegistry/counter "failed" worker-context (:component-id executor-data) task-id (.getSourceStreamId tuple))
+                                                       (.getFailed ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (.getSourceStreamId tuple))
                                                        (.getSourceComponent tuple)
                                                        (.getSourceStreamId tuple)
                                                        delta))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d3c00ee7/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 9e18331..26ce76c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -23,11 +23,11 @@
   (:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo
             EmitInfo BoltFailInfo BoltAckInfo])
   (:import [org.apache.storm.task TopologyContext ShellBolt WorkerTopologyContext])
-  (:import [org.apache.storm.metrics2 StormMetricRegistry])
+  (:import [org.apache.storm.metrics2 StormMetricRegistry TaskMetrics])
   (:import [org.apache.storm.utils Utils])
   (:import [org.apache.storm.generated ShellComponent JavaObject])
   (:import [org.apache.storm.spout ShellSpout])
-  (:import [java.util Collection List ArrayList])
+  (:import [java.util Collection List ArrayList Map])
   (:import [com.codahale.metrics Meter Counter])
   (:require [org.apache.storm
              [thrift :as thrift]
@@ -143,9 +143,9 @@
               (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))                          
             (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
             (when (emit-sampler)
-              (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id task-id stream) stream)
+              (stats/emitted-tuple! executor-stats (.getEmitted ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) stream) stream)
               (if out-task-id
-                (stats/transferred-tuples! executor-stats (StormMetricRegistry/counter "transferred" worker-context component-id task-id stream) stream 1)))
+                (stats/transferred-tuples! executor-stats (.getTransferred ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) stream) stream 1)))
             (if out-task-id [out-task-id])
             ))
         ([^String stream ^List values]
@@ -163,8 +163,8 @@
                    )))
              (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
              (when (emit-sampler)
-               (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id task-id stream) stream)
-               (stats/transferred-tuples! executor-stats (StormMetricRegistry/counter "transferred" worker-context component-id task-id stream) stream (count out-tasks)))
+               (stats/emitted-tuple! executor-stats (.getEmitted (.get ^Map (:task-metrics executor-data) task-id) stream) stream)
+               (stats/transferred-tuples! executor-stats (.getTransferred ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) stream) stream (count out-tasks)))
              out-tasks)))
     ))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d3c00ee7/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index e1305f9..e0023fd 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -78,6 +78,11 @@ public class StormMetricRegistry {
         return REGISTRY.counter(metricName);
     }
 
+    public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){
+        String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort);
+        return REGISTRY.counter(metricName);
+    }
+
     public static void start(Map<String, Object> stormConfig, DaemonType type){
         try {
             hostName = dotToUnderScore(Utils.localHostname());

http://git-wip-us.apache.org/repos/asf/storm/blob/d3c00ee7/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java b/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
new file mode 100644
index 0000000..5bb01d2
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
@@ -0,0 +1,72 @@
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import org.apache.storm.task.WorkerTopologyContext;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class TaskMetrics {
+    ConcurrentMap<String, Counter> ackedByStream = new ConcurrentHashMap<>();
+    ConcurrentMap<String, Counter> failedByStream = new ConcurrentHashMap<>();
+    ConcurrentMap<String, Counter> emittedByStream = new ConcurrentHashMap<>();
+    ConcurrentMap<String, Counter> transferredByStream = new ConcurrentHashMap<>();
+
+    private String topologyId;
+    private String componentId;
+    private Integer taskId;
+    private Integer workerPort;
+
+    public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid){
+        this.topologyId = context.getStormId();
+        this.componentId = componentId;
+        this.taskId = taskid;
+        this.workerPort = context.getThisWorkerPort();
+    }
+
+    public Counter getAcked(String streamId) {
+        Counter c = this.ackedByStream.get(streamId);
+        if (c == null) {
+            c = StormMetricRegistry.counter("acked", this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
+            this.ackedByStream.put(streamId, c);
+        }
+        return c;
+    }
+
+    public Counter getFailed(String streamId) {
+        Counter c = this.ackedByStream.get(streamId);
+        if (c == null) {
+            c = StormMetricRegistry.counter("failed", this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
+            this.failedByStream.put(streamId, c);
+        }
+        return c;
+    }
+
+    public Counter getEmitted(String streamId) {
+        Counter c = this.emittedByStream.get(streamId);
+        if (c == null) {
+            c = StormMetricRegistry.counter("emitted", this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
+            this.emittedByStream.put(streamId, c);
+        }
+        return c;
+    }
+
+    public Counter getTransferred(String streamId) {
+        Counter c = this.transferredByStream.get(streamId);
+        if (c == null) {
+            c = StormMetricRegistry.counter("transferred", this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
+            this.transferredByStream.put(streamId, c);
+        }
+        return c;
+    }
+
+    public static Map<Integer, TaskMetrics> taskMetricsMap(Integer startTaskId, Integer endTaskId, WorkerTopologyContext context, String componentId){
+        Map<Integer, TaskMetrics> retval = new HashMap<>();
+        for (int i = startTaskId; i < endTaskId + 1; i++) {
+            retval.put(i, new TaskMetrics(context, componentId, i));
+        }
+        return retval;
+    }
+}