You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/04/27 20:29:56 UTC

[2/5] storm git commit: STORM-1731 Avoid looking up debug / backpressure enable flags within critical path

STORM-1731 Avoid looking up debug / backpressure enable flags within critical path

* preload the value of flags and use that value
  * topology.debug
  * topology.backpressure.enable
* also remove unnecessary lookup: receive-queue


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7a15ebc5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7a15ebc5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7a15ebc5

Branch: refs/heads/1.x-branch
Commit: 7a15ebc57300a027f10a65046c85e9d1d0bef4dc
Parents: ae78815
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Apr 26 19:29:44 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Apr 26 19:29:44 2016 +0900

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/executor.clj    | 50 +++++++++-----------
 1 file changed, 23 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7a15ebc5/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 9ea4eb4..07925b8 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -214,12 +214,12 @@
 
 ;; in its own function so that it can be mocked out by tracked topologies
 (defn mk-executor-transfer-fn [batch-transfer->worker storm-conf]
-  (fn this
-    [task tuple]
-    (let [val (AddressedTuple. task tuple)]
-      (when (= true (storm-conf TOPOLOGY-DEBUG))
-        (log-message "TRANSFERING tuple " val))
-      (disruptor/publish batch-transfer->worker val))))
+  (let [debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+    (fn this
+      [task tuple]
+      (let [val (AddressedTuple. task tuple)]
+        (when debug? (log-message "TRANSFERING tuple " val))
+        (disruptor/publish batch-transfer->worker val)))))
 
 (defn mk-executor-data [worker executor-id]
   (let [worker-context (worker-context worker)
@@ -426,24 +426,22 @@
         (log-message "Shut down executor " component-id ":" (pr-str executor-id)))
         )))
 
-(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id]
+(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id debug?]
   (let [^ISpout spout (:object task-data)
         storm-conf (:storm-conf executor-data)
         task-id (:task-id task-data)]
     ;;TODO: need to throttle these when there's lots of failures
-    (when (= true (storm-conf TOPOLOGY-DEBUG))
+    (when debug?
       (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id))
     (.fail spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
     (when time-delta
       (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
 
-(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
-  (let [storm-conf (:storm-conf executor-data)
-        ^ISpout spout (:object task-data)
+(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?]
+  (let [^ISpout spout (:object task-data)
         task-id (:task-id task-data)]
-    (when (= true (storm-conf TOPOLOGY-DEBUG))
-      (log-message "SPOUT Acking message " id " " msg-id))
+    (when debug? (log-message "SPOUT Acking message " id " " msg-id))
     (.ack spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
     (when time-delta
@@ -501,13 +499,14 @@
         rand (Random. (Utils/secureRandomLong))
         ^DisruptorQueue transfer-queue (executor-data :batch-transfer-queue)
         debug? (= true (storm-conf TOPOLOGY-DEBUG))
+        backpressure-enabled? (= true (storm-conf TOPOLOGY-BACKPRESSURE-ENABLE))
 
         pending (RotatingMap.
                  2 ;; microoptimize for performance of .size method
                  (reify RotatingMap$ExpiredCallback
                    (expire [this id [task-id spout-id tuple-info start-time-ms]]
                      (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
-                       (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta "TIMEOUT" id)
+                       (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta "TIMEOUT" id debug?)
                        ))))
         tuple-action-fn (fn [task-id ^TupleImpl tuple]
                           (let [stream-id (.getSourceStreamId tuple)]
@@ -532,9 +531,9 @@
                                   (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
                                     (condp = stream-id
                                       ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
-                                                                         spout-id tuple-finished-info time-delta id)
+                                                                         spout-id tuple-finished-info time-delta id debug?)
                                       ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
-                                                                           spout-id tuple-finished-info time-delta "FAIL-STREAM" id)
+                                                                           spout-id tuple-finished-info time-delta "FAIL-STREAM" id debug?)
                                       )))
                                 ;; TODO: on failure, emit tuple to failure stream
                                 ))))
@@ -590,7 +589,7 @@
                                            (when message-id
                                              (ack-spout-msg executor-data task-data message-id
                                                             {:stream out-stream-id :values values}
-                                                            (if (sampler) 0) "0:")))
+                                                            (if (sampler) 0) "0:" debug?)))
                                          (or out-tasks [])
                                          ))]]
           (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))
@@ -627,8 +626,7 @@
           
           (let [active? @(:storm-active-atom executor-data)
                 curr-count (.get emitted-count)
-                backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
-                throttle-on (and backpressure-enabled
+                throttle-on (and backpressure-enabled?
                               @(:throttle-on (:worker executor-data)))
                 reached-max-spout-pending (and max-spout-pending
                                                (>= (.size pending) max-spout-pending))
@@ -685,12 +683,12 @@
     (.put pending key (bit-xor curr id))))
 
 (defmethod mk-threads :bolt [executor-data task-datas initial-credentials]
-  (let [storm-conf (:storm-conf executor-data)
+  (let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler
+                open-or-prepare-was-called?]} executor-data
         execute-sampler (mk-stats-sampler storm-conf)
         executor-stats (:stats executor-data)
-        {:keys [storm-conf component-id worker-context transfer-fn report-error sampler
-                open-or-prepare-was-called?]} executor-data
         rand (Random. (Utils/secureRandomLong))
+        debug? (= true (storm-conf TOPOLOGY-DEBUG))
 
         tuple-action-fn (fn [task-id ^TupleImpl tuple]
                           ;; synchronization needs to be done with a key provided by this bolt, otherwise:
@@ -722,15 +720,14 @@
                                     user-context (:user-context task-data)
                                     sampler? (sampler)
                                     execute-sampler? (execute-sampler)
-                                    now (if (or sampler? execute-sampler?) (System/currentTimeMillis))
-                                    receive-queue (:receive-queue executor-data)]
+                                    now (if (or sampler? execute-sampler?) (System/currentTimeMillis))]
                                 (when sampler?
                                   (.setProcessSampleStartTime tuple now))
                                 (when execute-sampler?
                                   (.setExecuteSampleStartTime tuple now))
                                 (.execute bolt-obj tuple)
                                 (let [delta (tuple-execute-time-delta! tuple)]
-                                  (when (= true (storm-conf TOPOLOGY-DEBUG))
+                                  (when debug?
                                     (log-message "Execute done TUPLE " tuple " TASK: " task-id " DELTA: " delta))
  
                                   (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
@@ -808,8 +805,7 @@
                                           (task/send-unanchored task-data
                                                                 ACKER-ACK-STREAM-ID
                                                                 [root (bit-xor id ack-val)])))
-                         (let [delta (tuple-time-delta! tuple)
-                               debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+                         (let [delta (tuple-time-delta! tuple)]
                            (when debug? 
                              (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
                            (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))