You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:20 UTC

[14/38] storm git commit: WIP apply sampling to new metrics

WIP apply sampling to new metrics


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85dbacdd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85dbacdd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85dbacdd

Branch: refs/heads/1.x-branch
Commit: 85dbacdd058ee8b3246ff6982a4079713923b66e
Parents: e9a9f50
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Nov 28 11:51:45 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Nov 28 11:51:45 2017 +0900

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/daemon/executor.clj | 8 ++++----
 storm-core/src/clj/org/apache/storm/daemon/task.clj     | 4 ++--
 2 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/85dbacdd/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 94bd7af..720bfa7 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -442,10 +442,10 @@
     ;;TODO: need to throttle these when there's lots of failures
     (when debug?
       (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id))
-    (.inc ^Counter failed-meter)
     (.fail spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
     (when time-delta
+      (.inc ^Counter failed-meter)
       (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
 
 (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?]
@@ -453,10 +453,10 @@
         task-id (:task-id task-data)
         acked-meter (:acked-meter executor-data)]
     (when debug? (log-message "SPOUT Acking message " id " " msg-id))
-    (.inc ^Counter acked-meter)
     (.ack spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
     (when time-delta
+      (.inc ^Counter acked-meter)
       (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
 
 (defn mk-task-receiver [executor-data tuple-action-fn]
@@ -823,9 +823,9 @@
                          (let [delta (tuple-time-delta! tuple)]
                            (when debug? 
                              (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
-                           (.inc  ^Counter (:acked-meter (:executor-data task-data)))
                            (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
                            (when (<= 0 delta)
+                             (.inc ^Counter (:acked-meter (:executor-data task-data)))
                              (stats/bolt-acked-tuple! executor-stats
                                                       (.getSourceComponent tuple)
                                                       (.getSourceStreamId tuple)
@@ -839,9 +839,9 @@
                                debug? (= true (storm-conf TOPOLOGY-DEBUG))]
                            (when debug? 
                              (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
-                           (.inc  ^Counter (:failed-meter (:executor-data task-data)))
                            (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
                            (when (<= 0 delta)
+                             (.inc  ^Counter (:failed-meter (:executor-data task-data)))
                              (stats/bolt-failed-tuple! executor-stats
                                                        (.getSourceComponent tuple)
                                                        (.getSourceStreamId tuple)

http://git-wip-us.apache.org/repos/asf/storm/blob/85dbacdd/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 7162f7f..c43d20d 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -134,7 +134,6 @@
         ^Counter emitted-meter (StormMetricRegistry/counter "emitted" worker-context component-id)]
         
     (fn ([^Integer out-task-id ^String stream ^List values]
-          (.inc ^Counter emitted-meter)
           (when debug?
             (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
           (let [target-component (.getComponentId worker-context out-task-id)
@@ -145,13 +144,13 @@
               (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))                          
             (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
             (when (emit-sampler)
+              (.inc ^Counter emitted-meter)
               (stats/emitted-tuple! executor-stats stream)
               (if out-task-id
                 (stats/transferred-tuples! executor-stats stream 1)))
             (if out-task-id [out-task-id])
             ))
         ([^String stream ^List values]
-           (.inc ^Counter emitted-meter)
            (when debug?
              (log-message "Emitting: " component-id " " stream " " values))
            (let [out-tasks (ArrayList.)]
@@ -166,6 +165,7 @@
                    )))
              (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
              (when (emit-sampler)
+               (.inc ^Counter emitted-meter)
                (stats/emitted-tuple! executor-stats stream)
                (stats/transferred-tuples! executor-stats stream (count out-tasks)))
              out-tasks)))