You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/10 19:25:59 UTC

[1/2] storm git commit: STORM-2153: Use StringBuilder instead of String.format for composing metric names

Repository: storm
Updated Branches:
  refs/heads/metrics_v2 44cd8ac3b -> 8d53800f1


STORM-2153: Use StringBuilder instead of String.format for composing metric names


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8bf7252e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8bf7252e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8bf7252e

Branch: refs/heads/metrics_v2
Commit: 8bf7252ebd36e540515fedd70fb8c2004c1e4364
Parents: 44cd8ac
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Jan 10 13:43:33 2018 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Jan 10 13:43:33 2018 -0500

----------------------------------------------------------------------
 .../storm/metrics2/StormMetricRegistry.java     | 60 +++++++++++++-------
 1 file changed, 38 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8bf7252e/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 789367b..aea4539 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -118,7 +118,6 @@ public class StormMetricRegistry {
 
     }
 
-
     public static void stop(){
         for(StormReporter sr : REPORTERS){
             sr.stop();
@@ -126,32 +125,49 @@ public class StormMetricRegistry {
     }
 
     public static String metricName(String name, String stormId, String componentId, String streamId, String executorId, Integer workerPort){
-        return String.format("storm.worker.%s.%s.%s.%s.%s.%s-%s",
-                stormId,
-                hostName,
-                dotToUnderScore(componentId),
-                dotToUnderScore(streamId),
-                dotToUnderScore(executorId),
-                workerPort,
-                name);
+        StringBuilder sb = new StringBuilder("storm.worker.");
+        sb.append(stormId);
+        sb.append(".");
+        sb.append(hostName);
+        sb.append(".");
+        sb.append(dotToUnderScore(componentId));
+        sb.append(".");
+        sb.append(dotToUnderScore(streamId));
+        sb.append(".");
+        sb.append(dotToUnderScore(executorId));
+        sb.append(".");
+        sb.append(workerPort);
+        sb.append("-");
+        sb.append(name);
+        return sb.toString();
     }
 
-    public static String metricName(String name, String stormId, String componentId, Integer workerPort){
-        return String.format("storm.worker.%s.%s.%s.%s-%s",
-                stormId,
-                hostName,
-                dotToUnderScore(componentId),
-                workerPort,
-                name);
+    public static String metricName(String name, String stormId, String componentId, Integer workerPort) {
+        StringBuilder sb = new StringBuilder("storm.worker.");
+        sb.append(stormId);
+        sb.append(".");
+        sb.append(hostName);
+        sb.append(".");
+        sb.append(dotToUnderScore(componentId));
+        sb.append(".");
+        sb.append(workerPort);
+        sb.append("-");
+        sb.append(name);
+        return sb.toString();
     }
 
     public static String metricName(String name, TopologyContext context){
-        return String.format("storm.topology.%s.%s.%s.%s.%s-%s",
-                context.getStormId(),
-                hostName,
-                dotToUnderScore(context.getThisComponentId()),
-                context.getThisWorkerPort(),
-                name);
+        StringBuilder sb = new StringBuilder("storm.topology.");
+        sb.append(context.getStormId());
+        sb.append(".");
+        sb.append(hostName);
+        sb.append(".");
+        sb.append(dotToUnderScore(context.getThisComponentId()));
+        sb.append(".");
+        sb.append(context.getThisWorkerPort());
+        sb.append("-");
+        sb.append(name);
+        return sb.toString();
     }
 
     private static String dotToUnderScore(String str){


[2/2] storm git commit: STORM-2153: use taskId in metrics names instead of executorId

Posted by pt...@apache.org.
STORM-2153: use taskId in metrics names instead of executorId


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8d53800f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8d53800f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8d53800f

Branch: refs/heads/metrics_v2
Commit: 8d53800f14ced3fd630a02dfd9537d5900979562
Parents: 8bf7252
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Jan 10 14:25:50 2018 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Jan 10 14:25:50 2018 -0500

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/daemon/executor.clj |  8 ++++----
 storm-core/src/clj/org/apache/storm/daemon/task.clj     |  8 ++++----
 .../org/apache/storm/metrics2/StormMetricRegistry.java  | 12 ++++++------
 3 files changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8d53800f/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 993add6..e8d23e5 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -442,7 +442,7 @@
     (.fail spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
     (when time-delta
-      (stats/spout-failed-tuple! (:stats executor-data) (StormMetricRegistry/counter "failed" (:worker-context executor-data) (:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream tuple-info)) (:stream tuple-info) time-delta))))
+      (stats/spout-failed-tuple! (:stats executor-data) (StormMetricRegistry/counter "failed" (:worker-context executor-data) (:component-id executor-data) task-id (:stream tuple-info)) (:stream tuple-info) time-delta))))
 
 (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?]
   (let [^ISpout spout (:object task-data)
@@ -451,7 +451,7 @@
     (.ack spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
     (when time-delta
-      (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" (:worker-context executor-data) (:component-id executor-data) (pr-str (:executor-id executor-data)) (:stream tuple-info)) (:stream tuple-info) time-delta))))
+      (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" (:worker-context executor-data) (:component-id executor-data) task-id (:stream tuple-info)) (:stream tuple-info) time-delta))))
 
 (defn mk-task-receiver [executor-data tuple-action-fn]
   (let [task-ids (:task-ids executor-data)
@@ -820,7 +820,7 @@
                            (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
                            (when (<= 0 delta)
                              (stats/bolt-acked-tuple! executor-stats
-                                                      (StormMetricRegistry/counter "acked" worker-context  (:component-id executor-data) (pr-str (:executor-id executor-data)) (.getSourceStreamId tuple))
+                                                      (StormMetricRegistry/counter "acked" worker-context  (:component-id executor-data) task-id (.getSourceStreamId tuple))
                                                       (.getSourceComponent tuple)
                                                       (.getSourceStreamId tuple)
                                                       delta))))
@@ -836,7 +836,7 @@
                            (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
                            (when (<= 0 delta)
                              (stats/bolt-failed-tuple! executor-stats
-                                                       (StormMetricRegistry/counter "failed" worker-context (:component-id executor-data) (pr-str (:executor-id executor-data)) (.getSourceStreamId tuple))
+                                                       (StormMetricRegistry/counter "failed" worker-context (:component-id executor-data) task-id (.getSourceStreamId tuple))
                                                        (.getSourceComponent tuple)
                                                        (.getSourceStreamId tuple)
                                                        delta))))

http://git-wip-us.apache.org/repos/asf/storm/blob/8d53800f/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 7132fc1..9e18331 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -143,9 +143,9 @@
               (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))                          
             (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
             (when (emit-sampler)
-              (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id (pr-str (:executor-id executor-data)) stream) stream)
+              (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id task-id stream) stream)
               (if out-task-id
-                (stats/transferred-tuples! executor-stats (StormMetricRegistry/counter "transferred" worker-context component-id (pr-str (:executor-id executor-data)) stream) stream 1)))
+                (stats/transferred-tuples! executor-stats (StormMetricRegistry/counter "transferred" worker-context component-id task-id stream) stream 1)))
             (if out-task-id [out-task-id])
             ))
         ([^String stream ^List values]
@@ -163,8 +163,8 @@
                    )))
              (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
              (when (emit-sampler)
-               (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id (pr-str (:executor-id executor-data)) stream) stream)
-               (stats/transferred-tuples! executor-stats (StormMetricRegistry/counter "transferred" worker-context component-id (pr-str (:executor-id executor-data)) stream) stream (count out-tasks)))
+               (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id task-id stream) stream)
+               (stats/transferred-tuples! executor-stats (StormMetricRegistry/counter "transferred" worker-context component-id task-id stream) stream (count out-tasks)))
              out-tasks)))
     ))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/8d53800f/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index aea4539..e1305f9 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -68,13 +68,13 @@ public class StormMetricRegistry {
         );
     }
 
-    public static Meter meter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){
-        String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort());
+    public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){
+        String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort());
         return REGISTRY.meter(metricName);
     }
 
-    public static Counter counter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){
-        String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort());
+    public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){
+        String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort());
         return REGISTRY.counter(metricName);
     }
 
@@ -124,7 +124,7 @@ public class StormMetricRegistry {
         }
     }
 
-    public static String metricName(String name, String stormId, String componentId, String streamId, String executorId, Integer workerPort){
+    public static String metricName(String name, String stormId, String componentId, String streamId, Integer taskId, Integer workerPort){
         StringBuilder sb = new StringBuilder("storm.worker.");
         sb.append(stormId);
         sb.append(".");
@@ -134,7 +134,7 @@ public class StormMetricRegistry {
         sb.append(".");
         sb.append(dotToUnderScore(streamId));
         sb.append(".");
-        sb.append(dotToUnderScore(executorId));
+        sb.append(taskId);
         sb.append(".");
         sb.append(workerPort);
         sb.append("-");