You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:24 UTC

[18/38] storm git commit: STORM-2153: add streamId and executorId to metrics names; replace '.' with '_' in metrics names

STORM-2153: add streamId and executorId to metrics names; replace '.' with '_' in metrics names


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c91da676
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c91da676
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c91da676

Branch: refs/heads/1.x-branch
Commit: c91da676e7fc550ab44fd2d4d91dc95243059eb7
Parents: dd977e8
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Dec 13 15:25:56 2017 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Dec 13 15:25:56 2017 -0500

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/executor.clj    |  6 ++--
 .../src/clj/org/apache/storm/daemon/task.clj    |  7 ++---
 .../storm/metrics2/StormMetricRegistry.java     | 32 +++++++++++++++-----
 3 files changed, 30 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c91da676/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 0aca4bd..fa7d44c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -280,8 +280,6 @@
                                (log-message "Got interrupted excpetion shutting thread down...")
                                ((:suicide-fn <>))))
      :sampler (mk-stats-sampler storm-conf)
-     :failed-counter (StormMetricRegistry/counter "failed" worker-context component-id)
-     :acked-counter (StormMetricRegistry/counter "acked" worker-context component-id)
      :spout-throttling-metrics (if (= executor-type :spout)
                                 (builtin-metrics/make-spout-throttling-data)
                                 nil)
@@ -444,7 +442,7 @@
     (.fail spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
     (when time-delta
-      (stats/spout-failed-tuple! (:stats executor-data) (:failed-counter executor-data) (:stream tuple-info) time-delta))))
+      (stats/spout-failed-tuple! (:stats executor-data) (StormMetricRegistry/counter "failed" worker-context (:component-id executor-data) (:executor-id executor-data) (:stream tuple-info)) (:stream tuple-info) time-delta))))
 
 (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?]
   (let [^ISpout spout (:object task-data)
@@ -453,7 +451,7 @@
     (.ack spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
     (when time-delta
-      (stats/spout-acked-tuple! (:stats executor-data) (:acked-counter executor-data) (:stream tuple-info) time-delta))))
+      (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" worker-context (:component-id executor-data) (:executor-id executor-data) (:stream tuple-info)) (:stream tuple-info) time-delta))))
 
 (defn mk-task-receiver [executor-data tuple-action-fn]
   (let [task-ids (:task-ids executor-data)

http://git-wip-us.apache.org/repos/asf/storm/blob/c91da676/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index a2f6c54..edc144c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -130,8 +130,7 @@
         stream->component->grouper (:stream->component->grouper executor-data)
         user-context (:user-context task-data)
         executor-stats (:stats executor-data)
-        debug? (= true (storm-conf TOPOLOGY-DEBUG))
-        ^Counter emitted-counter (StormMetricRegistry/counter "emitted" worker-context component-id)]
+        debug? (= true (storm-conf TOPOLOGY-DEBUG))]
         
     (fn ([^Integer out-task-id ^String stream ^List values]
           (when debug?
@@ -144,7 +143,7 @@
               (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))                          
             (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
             (when (emit-sampler)
-              (stats/emitted-tuple! executor-stats emitted-counter stream)
+              (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id (:executor-id executor-data) stream) stream)
               (if out-task-id
                 (stats/transferred-tuples! executor-stats stream 1)))
             (if out-task-id [out-task-id])
@@ -164,7 +163,7 @@
                    )))
              (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
              (when (emit-sampler)
-               (stats/emitted-tuple! executor-stats emitted-counter stream)
+               (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id (:executor-id executor-data) stream) stream)
                (stats/transferred-tuples! executor-stats stream (count out-tasks)))
              out-tasks)))
     ))

http://git-wip-us.apache.org/repos/asf/storm/blob/c91da676/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 200ddcf..2bab4e9 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -68,20 +68,20 @@ public class StormMetricRegistry {
         );
     }
 
-    public static Meter meter(String name, WorkerTopologyContext context, String componentId){
-        String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort());
+    public static Meter meter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){
+        String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort());
         return REGISTRY.meter(metricName);
     }
 
-    public static Counter counter(String name, WorkerTopologyContext context, String componentId){
-        String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort());
+    public static Counter counter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){
+        String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort());
         return REGISTRY.counter(metricName);
     }
 
     public static void start(Map<String, Object> stormConfig, DaemonType type){
         String localHost = "localhost";
         try {
-            hostName = Utils.localHostname();
+            hostName = dotToUnderScore(Utils.localHostname());
         } catch (UnknownHostException e) {
              LOG.warn("Unable to determine hostname while starting the metrics system. Hostname will be reported" +
                      " as 'localhost'.");
@@ -130,9 +130,27 @@ public class StormMetricRegistry {
         }
     }
 
-    public static String metricName(String name, String stormId, String componentId, Integer workerPort){
-        return String.format("storm.worker.%s.%s.%s.%s-%s", stormId, hostName, componentId, workerPort, name);
+    public static String metricName(String name, String stormId, String componentId, String streamId, String executorId, Integer workerPort){
+        return String.format("storm.worker.%s.%s.%s.%s.%s.%s-%s",
+                stormId,
+                hostName,
+                dotToUnderScore(componentId),
+                dotToUnderScore(streamId),
+                dotToUnderScore(executorId),
+                workerPort,
+                name);
     }
 
+    public static String metricName(String name, String stormId, String componentId, Integer workerPort){
+        return String.format("storm.worker.%s.%s.%s.%s-%s",
+                stormId,
+                hostName,
+                dotToUnderScore(componentId),
+                workerPort,
+                name);
+    }
 
+    private static String dotToUnderScore(String str){
+        return str.replace('.', '_');
+    }
 }