You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/26 18:18:41 UTC

storm git commit: STORM-2912 Revert optimization of sharing tick tuple

Repository: storm
Updated Branches:
  refs/heads/1.1.x-branch c269e4090 -> e07e98945


STORM-2912 Revert optimization of sharing tick tuple

* since it incurs side effect and messes metrics


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e07e9894
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e07e9894
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e07e9894

Branch: refs/heads/1.1.x-branch
Commit: e07e9894520989ef185a81cc21eec8c59d5f7626
Parents: c269e40
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Jan 25 15:25:01 2018 +0900
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jan 26 13:18:19 2018 -0500

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/executor.clj    | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e07e9894/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 66d1851..2473e1a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -356,13 +356,15 @@
               (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
                    (= :spout (:type executor-data))))
         (log-message "Timeouts disabled for executor " comp-id ":" (:executor-id executor-data))
-        (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
-          (schedule-recurring
-            (:user-timer worker)
-            tick-time-secs
-            tick-time-secs
-            (fn []
-                (disruptor/publish receive-queue val))))))))
+        (schedule-recurring
+          (:user-timer worker)
+          tick-time-secs
+          tick-time-secs
+          (fn []
+            ;; We should create a new tick tuple for each recurrence instead of sharing object
+            ;; More detail on https://issues.apache.org/jira/browse/STORM-2912
+            (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
+              (disruptor/publish receive-queue val))))))))
 
 (defn mk-executor [worker executor-id initial-credentials]
   (let [executor-data (mk-executor-data worker executor-id)