You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/26 18:18:41 UTC
storm git commit: STORM-2912 Revert optimization of sharing tick tuple
Repository: storm
Updated Branches:
refs/heads/1.1.x-branch c269e4090 -> e07e98945
STORM-2912 Revert optimization of sharing tick tuple
* since it incurs side effect and messes metrics
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e07e9894
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e07e9894
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e07e9894
Branch: refs/heads/1.1.x-branch
Commit: e07e9894520989ef185a81cc21eec8c59d5f7626
Parents: c269e40
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Jan 25 15:25:01 2018 +0900
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jan 26 13:18:19 2018 -0500
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/executor.clj | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e07e9894/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 66d1851..2473e1a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -356,13 +356,15 @@
(and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
(= :spout (:type executor-data))))
(log-message "Timeouts disabled for executor " comp-id ":" (:executor-id executor-data))
- (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
- (schedule-recurring
- (:user-timer worker)
- tick-time-secs
- tick-time-secs
- (fn []
- (disruptor/publish receive-queue val))))))))
+ (schedule-recurring
+ (:user-timer worker)
+ tick-time-secs
+ tick-time-secs
+ (fn []
+ ;; We should create a new tick tuple for each recurrence instead of sharing object
+ ;; More detail on https://issues.apache.org/jira/browse/STORM-2912
+ (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
+ (disruptor/publish receive-queue val))))))))
(defn mk-executor [worker executor-id initial-credentials]
(let [executor-data (mk-executor-data worker executor-id)