You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/05 23:00:46 UTC
[14/37] storm git commit: do not use multimethods when it does not
help
do not use multimethods when it does not help
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/40ddae26
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/40ddae26
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/40ddae26
Branch: refs/heads/master
Commit: 40ddae268d7c56156db7ca164788f0625d48c505
Parents: c1cc0ba
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Fri May 22 14:53:17 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Fri May 22 14:53:17 2015 -0500
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/stats.clj | 81 ++++++++----------------
1 file changed, 26 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/40ddae26/storm-core/src/clj/backtype/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/stats.clj b/storm-core/src/clj/backtype/storm/stats.clj
index cce2b3a..ef9842b 100644
--- a/storm-core/src/clj/backtype/storm/stats.clj
+++ b/storm-core/src/clj/backtype/storm/stats.clj
@@ -472,16 +472,8 @@
((fn [[weighted-avg cnt]]
(div weighted-avg (* 1000 (min uptime 600))))))))
-(defn- page+comp-dispatch
- [page-type comp-type & _]
- [page-type comp-type])
-
-(defmulti agg-pre-merge
- page+comp-dispatch)
-
-(defmethod agg-pre-merge [:component-page :bolt]
- [_ _
- {exec-id :exec-id
+(defn agg-pre-merge-comp-page-bolt
+ [{exec-id :exec-id
host :host
port :port
uptime :uptime
@@ -536,9 +528,8 @@
(get window)
handle-sys-components-fn)})}))
-(defmethod agg-pre-merge [:component-page :spout]
- [_ _
- {exec-id :exec-id
+(defn agg-pre-merge-comp-page-spout
+ [{exec-id :exec-id
host :host
port :port
uptime :uptime
@@ -586,9 +577,8 @@
(get window)
handle-sys-components-fn)}))}))
-(defmethod agg-pre-merge [:topology-page :bolt]
- [_ _
- {comp-id :comp-id
+(defn agg-pre-merge-topo-page-bolt
+ [{comp-id :comp-id
num-tasks :num-tasks
statk->w->sid->num :stats
uptime :uptime}
@@ -640,9 +630,8 @@
vals
sum)})}))
-(defmethod agg-pre-merge [:topology-page :spout]
- [_ _
- {comp-id :comp-id
+(defn agg-pre-merge-topo-page-spout
+ [{comp-id :comp-id
num-tasks :num-tasks
statk->w->sid->num :stats}
window
@@ -698,13 +687,8 @@
[& args]
(apply apply-or-0 max args))
-(defmulti merge-agg-comp-stats
- "Merges all bolt stats from one executor with the given accumulated stats."
- page+comp-dispatch)
-
-(defmethod merge-agg-comp-stats [:component-page :bolt]
- [_ _
- {acc-in :cid+sid->input-stats
+(defn merge-agg-comp-stats-comp-page-bolt
+ [{acc-in :cid+sid->input-stats
acc-out :sid->output-stats
:as acc-bolt-stats}
{bolt-in :cid+sid->input-stats
@@ -738,9 +722,8 @@
(mapcat vector [:execute-latency :process-latency])
(apply assoc {})))))})
-(defmethod merge-agg-comp-stats [:component-page :spout]
- [_ _
- {acc-out :sid->output-stats
+(defn merge-agg-comp-stats-comp-page-spout
+ [{acc-out :sid->output-stats
:as acc-spout-stats}
{spout-out :sid->output-stats
:as spout-stats}]
@@ -765,8 +748,8 @@
acked)
nil)})))})
-(defmethod merge-agg-comp-stats [:topology-page :bolt]
- [_ _ acc-bolt-stats bolt-stats]
+(defn merge-agg-comp-stats-topo-page-bolt
+ [acc-bolt-stats bolt-stats]
{:num-executors (inc (or (:num-executors acc-bolt-stats) 0))
:num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks bolt-stats))
:emitted (sum-or-0 (:emitted acc-bolt-stats) (:emitted bolt-stats))
@@ -783,8 +766,8 @@
:acked (sum-or-0 (:acked acc-bolt-stats) (:acked bolt-stats))
:failed (sum-or-0 (:failed acc-bolt-stats) (:failed bolt-stats))})
-(defmethod merge-agg-comp-stats [:topology-page :spout]
- [_ _ acc-spout-stats spout-stats]
+(defn merge-agg-comp-stats-topo-page-spout
+ [acc-spout-stats spout-stats]
{:num-executors (inc (or (:num-executors acc-spout-stats) 0))
:num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks spout-stats))
:emitted (sum-or-0 (:emitted acc-spout-stats) (:emitted spout-stats))
@@ -870,8 +853,8 @@
include-sys?
acc-stats
new-data
- (partial agg-pre-merge :topology-page :bolt)
- (partial merge-agg-comp-stats :topology-page :bolt)
+ agg-pre-merge-topo-page-bolt
+ merge-agg-comp-stats-topo-page-bolt
:bolt-id->stats))
(defmethod agg-topo-exec-stats :spout
@@ -880,8 +863,8 @@
include-sys?
acc-stats
new-data
- (partial agg-pre-merge :topology-page :spout)
- (partial merge-agg-comp-stats :topology-page :spout)
+ agg-pre-merge-topo-page-spout
+ merge-agg-comp-stats-topo-page-spout
:spout-id->stats))
(defmethod agg-topo-exec-stats :default [_ _ acc-stats _] acc-stats)
@@ -1173,29 +1156,17 @@
(defmethod agg-comp-exec-stats :bolt
[window include-sys? acc-stats new-data]
(assoc (agg-bolt-exec-win-stats acc-stats (:stats new-data) include-sys?)
- :stats
- (merge-agg-comp-stats :component-page
- :bolt
- (:stats acc-stats)
- (agg-pre-merge :component-page
- :bolt
- new-data
- window
- include-sys?))
+ :stats (merge-agg-comp-stats-comp-page-bolt
+ (:stats acc-stats)
+ (agg-pre-merge-comp-page-bolt new-data window include-sys?))
:type :bolt))
(defmethod agg-comp-exec-stats :spout
[window include-sys? acc-stats new-data]
(assoc (agg-spout-exec-win-stats acc-stats (:stats new-data) include-sys?)
- :stats
- (merge-agg-comp-stats :component-page
- :spout
- (:stats acc-stats)
- (agg-pre-merge :component-page
- :spout
- new-data
- window
- include-sys?))
+ :stats (merge-agg-comp-stats-comp-page-spout
+ (:stats acc-stats)
+ (agg-pre-merge-comp-page-spout new-data window include-sys?))
:type :spout))
(defn- aggregate-comp-stats*