You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/04/27 20:29:55 UTC
[1/5] storm git commit: STORM-1729 Get rid of reflections while
recording stats
Repository: storm
Updated Branches:
refs/heads/1.x-branch 30e69e804 -> 5b5dd184e
STORM-1729 Get rid of reflections while recording stats
* define MultiCount/LatencyStatAndMetric in let statement and set type hint to there
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ae78815e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ae78815e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ae78815e
Branch: refs/heads/1.x-branch
Commit: ae78815e6ad27e2301d80c258f4ad535fa1193ff
Parents: 7c6355d
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Apr 26 12:53:38 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Apr 26 12:53:38 2016 +0900
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/stats.clj | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ae78815e/storm-core/src/clj/org/apache/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/stats.clj b/storm-core/src/clj/org/apache/storm/stats.clj
index 83a0cce..26a4eb4 100644
--- a/storm-core/src/clj/org/apache/storm/stats.clj
+++ b/storm-core/src/clj/org/apache/storm/stats.clj
@@ -116,11 +116,13 @@
(defn emitted-tuple!
[stats stream]
- (.incBy ^MultiCountStatAndMetric (stats-emitted stats) ^Object stream ^long (stats-rate stats)))
+ (let [^MultiCountStatAndMetric emitted (stats-emitted stats)]
+ (.incBy emitted ^Object stream ^long (stats-rate stats))))
(defn transferred-tuples!
[stats stream amt]
- (.incBy ^MultiCountStatAndMetric (stats-transferred stats) ^Object stream ^long (* (stats-rate stats) amt)))
+ (let [^MultiCountStatAndMetric transferred (stats-transferred stats)]
+ (.incBy transferred ^Object stream ^long (* (stats-rate stats) amt))))
(defn bolt-execute-tuple!
[^BoltExecutorStats stats component stream latency-ms]
@@ -146,12 +148,15 @@
(defn spout-acked-tuple!
[^SpoutExecutorStats stats stream latency-ms]
- (.incBy ^MultiCountStatAndMetric (stats-acked stats) stream (stats-rate stats))
- (.record ^MultiLatencyStatAndMetric (stats-complete-latencies stats) stream latency-ms))
+ (let [^MultiCountStatAndMetric acked (stats-acked stats)
+ ^MultiLatencyStatAndMetric complete-latencies (stats-complete-latencies stats)]
+ (.incBy acked stream (stats-rate stats))
+ (.record complete-latencies stream latency-ms)))
(defn spout-failed-tuple!
[^SpoutExecutorStats stats stream latency-ms]
- (.incBy ^MultiCountStatAndMetric (stats-failed stats) stream (stats-rate stats)))
+ (let [^MultiCountStatAndMetric failed (stats-failed stats)]
+ (.incBy failed stream (stats-rate stats))))
(defn- close-stat! [stat]
(.close stat))
[2/5] storm git commit: STORM-1731 Avoid looking up debug /
backpressure enable flags within critical path
Posted by pt...@apache.org.
STORM-1731 Avoid looking up debug / backpressure enable flags within critical path
* preload the value of flags and use that value
* topology.debug
* topology.backpressure.enable
* also remove unnecessary lookup: receive-queue
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7a15ebc5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7a15ebc5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7a15ebc5
Branch: refs/heads/1.x-branch
Commit: 7a15ebc57300a027f10a65046c85e9d1d0bef4dc
Parents: ae78815
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Apr 26 19:29:44 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Apr 26 19:29:44 2016 +0900
----------------------------------------------------------------------
.../clj/org/apache/storm/daemon/executor.clj | 50 +++++++++-----------
1 file changed, 23 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/7a15ebc5/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 9ea4eb4..07925b8 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -214,12 +214,12 @@
;; in its own function so that it can be mocked out by tracked topologies
(defn mk-executor-transfer-fn [batch-transfer->worker storm-conf]
- (fn this
- [task tuple]
- (let [val (AddressedTuple. task tuple)]
- (when (= true (storm-conf TOPOLOGY-DEBUG))
- (log-message "TRANSFERING tuple " val))
- (disruptor/publish batch-transfer->worker val))))
+ (let [debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+ (fn this
+ [task tuple]
+ (let [val (AddressedTuple. task tuple)]
+ (when debug? (log-message "TRANSFERING tuple " val))
+ (disruptor/publish batch-transfer->worker val)))))
(defn mk-executor-data [worker executor-id]
(let [worker-context (worker-context worker)
@@ -426,24 +426,22 @@
(log-message "Shut down executor " component-id ":" (pr-str executor-id)))
)))
-(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id]
+(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id debug?]
(let [^ISpout spout (:object task-data)
storm-conf (:storm-conf executor-data)
task-id (:task-id task-data)]
;;TODO: need to throttle these when there's lots of failures
- (when (= true (storm-conf TOPOLOGY-DEBUG))
+ (when debug?
(log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id))
(.fail spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
(when time-delta
(stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
-(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
- (let [storm-conf (:storm-conf executor-data)
- ^ISpout spout (:object task-data)
+(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?]
+ (let [^ISpout spout (:object task-data)
task-id (:task-id task-data)]
- (when (= true (storm-conf TOPOLOGY-DEBUG))
- (log-message "SPOUT Acking message " id " " msg-id))
+ (when debug? (log-message "SPOUT Acking message " id " " msg-id))
(.ack spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
(when time-delta
@@ -501,13 +499,14 @@
rand (Random. (Utils/secureRandomLong))
^DisruptorQueue transfer-queue (executor-data :batch-transfer-queue)
debug? (= true (storm-conf TOPOLOGY-DEBUG))
+ backpressure-enabled? (= true (storm-conf TOPOLOGY-BACKPRESSURE-ENABLE))
pending (RotatingMap.
2 ;; microoptimize for performance of .size method
(reify RotatingMap$ExpiredCallback
(expire [this id [task-id spout-id tuple-info start-time-ms]]
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
- (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta "TIMEOUT" id)
+ (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta "TIMEOUT" id debug?)
))))
tuple-action-fn (fn [task-id ^TupleImpl tuple]
(let [stream-id (.getSourceStreamId tuple)]
@@ -532,9 +531,9 @@
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
(condp = stream-id
ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
- spout-id tuple-finished-info time-delta id)
+ spout-id tuple-finished-info time-delta id debug?)
ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
- spout-id tuple-finished-info time-delta "FAIL-STREAM" id)
+ spout-id tuple-finished-info time-delta "FAIL-STREAM" id debug?)
)))
;; TODO: on failure, emit tuple to failure stream
))))
@@ -590,7 +589,7 @@
(when message-id
(ack-spout-msg executor-data task-data message-id
{:stream out-stream-id :values values}
- (if (sampler) 0) "0:")))
+ (if (sampler) 0) "0:" debug?)))
(or out-tasks [])
))]]
(builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))
@@ -627,8 +626,7 @@
(let [active? @(:storm-active-atom executor-data)
curr-count (.get emitted-count)
- backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
- throttle-on (and backpressure-enabled
+ throttle-on (and backpressure-enabled?
@(:throttle-on (:worker executor-data)))
reached-max-spout-pending (and max-spout-pending
(>= (.size pending) max-spout-pending))
@@ -685,12 +683,12 @@
(.put pending key (bit-xor curr id))))
(defmethod mk-threads :bolt [executor-data task-datas initial-credentials]
- (let [storm-conf (:storm-conf executor-data)
+ (let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler
+ open-or-prepare-was-called?]} executor-data
execute-sampler (mk-stats-sampler storm-conf)
executor-stats (:stats executor-data)
- {:keys [storm-conf component-id worker-context transfer-fn report-error sampler
- open-or-prepare-was-called?]} executor-data
rand (Random. (Utils/secureRandomLong))
+ debug? (= true (storm-conf TOPOLOGY-DEBUG))
tuple-action-fn (fn [task-id ^TupleImpl tuple]
;; synchronization needs to be done with a key provided by this bolt, otherwise:
@@ -722,15 +720,14 @@
user-context (:user-context task-data)
sampler? (sampler)
execute-sampler? (execute-sampler)
- now (if (or sampler? execute-sampler?) (System/currentTimeMillis))
- receive-queue (:receive-queue executor-data)]
+ now (if (or sampler? execute-sampler?) (System/currentTimeMillis))]
(when sampler?
(.setProcessSampleStartTime tuple now))
(when execute-sampler?
(.setExecuteSampleStartTime tuple now))
(.execute bolt-obj tuple)
(let [delta (tuple-execute-time-delta! tuple)]
- (when (= true (storm-conf TOPOLOGY-DEBUG))
+ (when debug?
(log-message "Execute done TUPLE " tuple " TASK: " task-id " DELTA: " delta))
(task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
@@ -808,8 +805,7 @@
(task/send-unanchored task-data
ACKER-ACK-STREAM-ID
[root (bit-xor id ack-val)])))
- (let [delta (tuple-time-delta! tuple)
- debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+ (let [delta (tuple-time-delta! tuple)]
(when debug?
(log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
[5/5] storm git commit: add STORM-1729 and STORM-1731 to changelog
Posted by pt...@apache.org.
add STORM-1729 and STORM-1731 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5b5dd184
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5b5dd184
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5b5dd184
Branch: refs/heads/1.x-branch
Commit: 5b5dd184eb9b346c886725756b61f0901db72883
Parents: 6525223
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Apr 27 13:20:08 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Apr 27 13:20:08 2016 -0400
----------------------------------------------------------------------
CHANGELOG.md | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5b5dd184/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8f83201..3607a71 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,6 @@
## 1.0.1
+ * STORM-1729: Get rid of reflections while recording stats
+ * STORM-1731: Avoid looking up debug / backpressure enable flags within critical path
* STORM-1535: Make sure hdfs key tab login happens only once for multiple bolts/executors.
* STORM-1725: Kafka Spout New Consumer API - KafkaSpoutRetryExponential Backoff method should use HashMap instead of TreeMap not to throw Exception
* STORM-1544: Document Debug/Sampling of Topologies
[3/5] storm git commit: Merge branch 'STORM-1729-1.x-branch' of
github.com:HeartSaVioR/storm into 1.x-branch
Posted by pt...@apache.org.
Merge branch 'STORM-1729-1.x-branch' of github.com:HeartSaVioR/storm into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/baa0c4b7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/baa0c4b7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/baa0c4b7
Branch: refs/heads/1.x-branch
Commit: baa0c4b76505684970914f5422221bc4f8041e10
Parents: 30e69e8 ae78815
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Apr 27 13:16:55 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Apr 27 13:16:55 2016 -0400
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/stats.clj | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
[4/5] storm git commit: Merge branch 'STORM-1731-1.x' of
github.com:HeartSaVioR/storm into 1.x-branch
Posted by pt...@apache.org.
Merge branch 'STORM-1731-1.x' of github.com:HeartSaVioR/storm into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/65252230
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/65252230
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/65252230
Branch: refs/heads/1.x-branch
Commit: 6525223014619c4ea1143255ea075eee968c6d52
Parents: baa0c4b 7a15ebc
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Apr 27 13:17:33 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Apr 27 13:17:33 2016 -0400
----------------------------------------------------------------------
.../clj/org/apache/storm/daemon/executor.clj | 50 +++++++++-----------
1 file changed, 23 insertions(+), 27 deletions(-)
----------------------------------------------------------------------