You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:21 UTC

[15/38] storm git commit: STORM-2153 New Metrics Reporting API

STORM-2153 New Metrics Reporting API

* address missing sampling rate
* rename field names cause we use Counter instead of Meter


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/00a382b0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/00a382b0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/00a382b0

Branch: refs/heads/1.x-branch
Commit: 00a382b017c1e29863ac4d9a4449086ef79384e4
Parents: 85dbacd
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Nov 30 10:38:27 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Nov 30 10:41:13 2017 +0900

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/executor.clj    | 22 ++++------
 .../src/clj/org/apache/storm/daemon/task.clj    |  8 ++--
 storm-core/src/clj/org/apache/storm/stats.clj   | 43 ++++++++++++--------
 3 files changed, 39 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/00a382b0/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 720bfa7..0aca4bd 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -257,7 +257,7 @@
      :batch-transfer-queue batch-transfer->worker
      :transfer-fn (mk-executor-transfer-fn batch-transfer->worker storm-conf)
      :suicide-fn (:suicide-fn worker)
-     :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker) 
+     :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker)
                                                           :acls (Utils/getWorkerACL storm-conf)
                                                           :context (ClusterStateContext. DaemonType/WORKER))
      :type executor-type
@@ -280,8 +280,8 @@
                                (log-message "Got interrupted excpetion shutting thread down...")
                                ((:suicide-fn <>))))
      :sampler (mk-stats-sampler storm-conf)
-     :failed-meter (StormMetricRegistry/counter "failed" worker-context component-id)
-     :acked-meter (StormMetricRegistry/counter "acked" worker-context component-id)
+     :failed-counter (StormMetricRegistry/counter "failed" worker-context component-id)
+     :acked-counter (StormMetricRegistry/counter "acked" worker-context component-id)
      :spout-throttling-metrics (if (= executor-type :spout)
                                 (builtin-metrics/make-spout-throttling-data)
                                 nil)
@@ -437,27 +437,23 @@
 (defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id debug?]
   (let [^ISpout spout (:object task-data)
         storm-conf (:storm-conf executor-data)
-        task-id (:task-id task-data)
-        failed-meter (:failed-meter executor-data)]
+        task-id (:task-id task-data)]
     ;;TODO: need to throttle these when there's lots of failures
     (when debug?
       (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id))
     (.fail spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
     (when time-delta
-      (.inc ^Counter failed-meter)
-      (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
+      (stats/spout-failed-tuple! (:stats executor-data) (:failed-counter executor-data) (:stream tuple-info) time-delta))))
 
 (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?]
   (let [^ISpout spout (:object task-data)
-        task-id (:task-id task-data)
-        acked-meter (:acked-meter executor-data)]
+        task-id (:task-id task-data)]
     (when debug? (log-message "SPOUT Acking message " id " " msg-id))
     (.ack spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
     (when time-delta
-      (.inc ^Counter acked-meter)
-      (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
+      (stats/spout-acked-tuple! (:stats executor-data) (:acked-counter executor-data) (:stream tuple-info) time-delta))))
 
 (defn mk-task-receiver [executor-data tuple-action-fn]
   (let [task-ids (:task-ids executor-data)
@@ -825,8 +821,8 @@
                              (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
                            (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
                            (when (<= 0 delta)
-                             (.inc ^Counter (:acked-meter (:executor-data task-data)))
                              (stats/bolt-acked-tuple! executor-stats
+                                                      (:acked-counter (:executor-data task-data))
                                                       (.getSourceComponent tuple)
                                                       (.getSourceStreamId tuple)
                                                       delta))))
@@ -841,8 +837,8 @@
                              (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
                            (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
                            (when (<= 0 delta)
-                             (.inc  ^Counter (:failed-meter (:executor-data task-data)))
                              (stats/bolt-failed-tuple! executor-stats
+                                                       (:failed-counter (:executor-data task-data))
                                                        (.getSourceComponent tuple)
                                                        (.getSourceStreamId tuple)
                                                        delta))))

http://git-wip-us.apache.org/repos/asf/storm/blob/00a382b0/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index c43d20d..a2f6c54 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -131,7 +131,7 @@
         user-context (:user-context task-data)
         executor-stats (:stats executor-data)
         debug? (= true (storm-conf TOPOLOGY-DEBUG))
-        ^Counter emitted-meter (StormMetricRegistry/counter "emitted" worker-context component-id)]
+        ^Counter emitted-counter (StormMetricRegistry/counter "emitted" worker-context component-id)]
         
     (fn ([^Integer out-task-id ^String stream ^List values]
           (when debug?
@@ -144,8 +144,7 @@
               (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))                          
             (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
             (when (emit-sampler)
-              (.inc ^Counter emitted-meter)
-              (stats/emitted-tuple! executor-stats stream)
+              (stats/emitted-tuple! executor-stats emitted-counter stream)
               (if out-task-id
                 (stats/transferred-tuples! executor-stats stream 1)))
             (if out-task-id [out-task-id])
@@ -165,8 +164,7 @@
                    )))
              (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
              (when (emit-sampler)
-               (.inc ^Counter emitted-meter)
-               (stats/emitted-tuple! executor-stats stream)
+               (stats/emitted-tuple! executor-stats emitted-counter stream)
                (stats/transferred-tuples! executor-stats stream (count out-tasks)))
              out-tasks)))
     ))

http://git-wip-us.apache.org/repos/asf/storm/blob/00a382b0/storm-core/src/clj/org/apache/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/stats.clj b/storm-core/src/clj/org/apache/storm/stats.clj
index 17d0219..41aaf04 100644
--- a/storm-core/src/clj/org/apache/storm/stats.clj
+++ b/storm-core/src/clj/org/apache/storm/stats.clj
@@ -26,7 +26,8 @@
             WorkerResources])
   (:import [org.apache.storm.utils Utils])
   (:import [org.apache.storm.scheduler WorkerSlot])
-  (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric])
+  (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric]
+           (com.codahale.metrics Counter))
   (:use [org.apache.storm log util])
   (:use [clojure.math.numeric-tower :only [ceil]]))
 
@@ -117,9 +118,11 @@
   `(:complete-latencies ~stats))
 
 (defn emitted-tuple!
-  [stats stream]
-  (let [^MultiCountStatAndMetric emitted (stats-emitted stats)]
-    (.incBy emitted ^Object stream ^long (stats-rate stats))))
+  [stats ^Counter emitted-counter stream]
+  (let [^MultiCountStatAndMetric emitted (stats-emitted stats)
+        ^long rate (stats-rate stats)]
+    (.incBy emitted ^Object stream rate)
+    (.inc emitted-counter rate)))
 
 (defn transferred-tuples!
   [stats stream amt]
@@ -135,30 +138,38 @@
     (.record exec-lat key latency-ms)))
 
 (defn bolt-acked-tuple!
-  [^BoltExecutorStats stats component stream latency-ms]
+  [^BoltExecutorStats stats ^Counter acked-counter component stream latency-ms]
   (let [key [component stream]
         ^MultiCountStatAndMetric acked (stats-acked stats)
-        ^MultiLatencyStatAndMetric process-lat (stats-process-latencies stats)]
-    (.incBy acked key (stats-rate stats))
+        ^MultiLatencyStatAndMetric process-lat (stats-process-latencies stats)
+        ^long rate (stats-rate stats)]
+    (.incBy acked key rate)
+    (.inc acked-counter rate)
     (.record process-lat key latency-ms)))
 
 (defn bolt-failed-tuple!
-  [^BoltExecutorStats stats component stream latency-ms]
+  [^BoltExecutorStats stats ^Counter failed-counter component stream latency-ms]
   (let [key [component stream]
-        ^MultiCountStatAndMetric failed (stats-failed stats)]
-    (.incBy failed key (stats-rate stats))))
+        ^MultiCountStatAndMetric failed (stats-failed stats)
+        ^long rate (stats-rate stats)]
+    (.incBy failed key rate)
+    (.inc failed-counter rate)))
 
 (defn spout-acked-tuple!
-  [^SpoutExecutorStats stats stream latency-ms]
+  [^SpoutExecutorStats stats ^Counter acked-counter stream latency-ms]
   (let [^MultiCountStatAndMetric acked (stats-acked stats)
-        ^MultiLatencyStatAndMetric complete-latencies (stats-complete-latencies stats)]
-    (.incBy acked stream (stats-rate stats))
+        ^MultiLatencyStatAndMetric complete-latencies (stats-complete-latencies stats)
+        ^long rate (stats-rate stats)]
+    (.incBy acked stream rate)
+    (.inc acked-counter rate)
     (.record complete-latencies stream latency-ms)))
 
 (defn spout-failed-tuple!
-  [^SpoutExecutorStats stats stream latency-ms]
-  (let [^MultiCountStatAndMetric failed (stats-failed stats)]
-    (.incBy failed stream (stats-rate stats))))
+  [^SpoutExecutorStats stats ^Counter failed-counter stream latency-ms]
+  (let [^MultiCountStatAndMetric failed (stats-failed stats)
+        ^long rate (stats-rate stats)]
+    (.incBy failed stream rate)
+    (.inc failed-counter rate)))
 
 (defn- close-stat! [stat]
   (.close stat))