You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/05 23:00:46 UTC

[14/37] storm git commit: do not use multimethods when it does not help

do not use multimethods when it does not help


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/40ddae26
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/40ddae26
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/40ddae26

Branch: refs/heads/master
Commit: 40ddae268d7c56156db7ca164788f0625d48c505
Parents: c1cc0ba
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Fri May 22 14:53:17 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Fri May 22 14:53:17 2015 -0500

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/stats.clj | 81 ++++++++----------------
 1 file changed, 26 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/40ddae26/storm-core/src/clj/backtype/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/stats.clj b/storm-core/src/clj/backtype/storm/stats.clj
index cce2b3a..ef9842b 100644
--- a/storm-core/src/clj/backtype/storm/stats.clj
+++ b/storm-core/src/clj/backtype/storm/stats.clj
@@ -472,16 +472,8 @@
       ((fn [[weighted-avg cnt]]
         (div weighted-avg (* 1000 (min uptime 600))))))))
 
-(defn- page+comp-dispatch
-  [page-type comp-type & _]
-  [page-type comp-type])
-
-(defmulti agg-pre-merge
-  page+comp-dispatch)
-
-(defmethod agg-pre-merge [:component-page :bolt]
-  [_ _
-   {exec-id :exec-id
+(defn agg-pre-merge-comp-page-bolt
+  [{exec-id :exec-id
     host :host
     port :port
     uptime :uptime
@@ -536,9 +528,8 @@
                          (get window)
                          handle-sys-components-fn)})}))
 
-(defmethod agg-pre-merge [:component-page :spout]
-  [_ _
-   {exec-id :exec-id
+(defn agg-pre-merge-comp-page-spout
+  [{exec-id :exec-id
     host :host
     port :port
     uptime :uptime
@@ -586,9 +577,8 @@
                            (get window)
                            handle-sys-components-fn)}))}))
 
-(defmethod agg-pre-merge [:topology-page :bolt]
-  [_ _
-   {comp-id :comp-id
+(defn agg-pre-merge-topo-page-bolt
+  [{comp-id :comp-id
     num-tasks :num-tasks
     statk->w->sid->num :stats
     uptime :uptime}
@@ -640,9 +630,8 @@
                     vals
                     sum)})}))
 
-(defmethod agg-pre-merge [:topology-page :spout]
-  [_ _
-   {comp-id :comp-id
+(defn agg-pre-merge-topo-page-spout
+  [{comp-id :comp-id
     num-tasks :num-tasks
     statk->w->sid->num :stats}
    window
@@ -698,13 +687,8 @@
   [& args]
   (apply apply-or-0 max args))
 
-(defmulti merge-agg-comp-stats
-  "Merges all bolt stats from one executor with the given accumulated stats."
-  page+comp-dispatch)
-
-(defmethod merge-agg-comp-stats [:component-page :bolt]
-  [_ _
-   {acc-in :cid+sid->input-stats
+(defn merge-agg-comp-stats-comp-page-bolt
+  [{acc-in :cid+sid->input-stats
     acc-out :sid->output-stats
     :as acc-bolt-stats}
    {bolt-in :cid+sid->input-stats
@@ -738,9 +722,8 @@
                (mapcat vector [:execute-latency :process-latency])
                (apply assoc {})))))})
 
-(defmethod merge-agg-comp-stats [:component-page :spout]
-  [_ _
-   {acc-out :sid->output-stats
+(defn merge-agg-comp-stats-comp-page-spout
+  [{acc-out :sid->output-stats
     :as acc-spout-stats}
    {spout-out :sid->output-stats
     :as spout-stats}]
@@ -765,8 +748,8 @@
                                        acked)
                                   nil)})))})
 
-(defmethod merge-agg-comp-stats [:topology-page :bolt]
-  [_ _ acc-bolt-stats bolt-stats]
+(defn merge-agg-comp-stats-topo-page-bolt
+  [acc-bolt-stats bolt-stats]
   {:num-executors (inc (or (:num-executors acc-bolt-stats) 0))
    :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks bolt-stats))
    :emitted (sum-or-0 (:emitted acc-bolt-stats) (:emitted bolt-stats))
@@ -783,8 +766,8 @@
    :acked (sum-or-0 (:acked acc-bolt-stats) (:acked bolt-stats))
    :failed (sum-or-0 (:failed acc-bolt-stats) (:failed bolt-stats))})
 
-(defmethod merge-agg-comp-stats [:topology-page :spout]
-  [_ _ acc-spout-stats spout-stats]
+(defn merge-agg-comp-stats-topo-page-spout
+  [acc-spout-stats spout-stats]
   {:num-executors (inc (or (:num-executors acc-spout-stats) 0))
    :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks spout-stats))
    :emitted (sum-or-0 (:emitted acc-spout-stats) (:emitted spout-stats))
@@ -870,8 +853,8 @@
                         include-sys?
                         acc-stats
                         new-data
-                        (partial agg-pre-merge :topology-page :bolt)
-                        (partial merge-agg-comp-stats :topology-page :bolt)
+                        agg-pre-merge-topo-page-bolt
+                        merge-agg-comp-stats-topo-page-bolt
                         :bolt-id->stats))
 
 (defmethod agg-topo-exec-stats :spout
@@ -880,8 +863,8 @@
                         include-sys?
                         acc-stats
                         new-data
-                        (partial agg-pre-merge :topology-page :spout)
-                        (partial merge-agg-comp-stats :topology-page :spout)
+                        agg-pre-merge-topo-page-spout
+                        merge-agg-comp-stats-topo-page-spout
                         :spout-id->stats))
 
 (defmethod agg-topo-exec-stats :default [_ _ acc-stats _] acc-stats)
@@ -1173,29 +1156,17 @@
 (defmethod agg-comp-exec-stats :bolt
   [window include-sys? acc-stats new-data]
   (assoc (agg-bolt-exec-win-stats acc-stats (:stats new-data) include-sys?)
-         :stats
-         (merge-agg-comp-stats :component-page
-                               :bolt
-                               (:stats acc-stats)
-                               (agg-pre-merge :component-page
-                                              :bolt
-                                              new-data
-                                              window
-                                              include-sys?))
+         :stats (merge-agg-comp-stats-comp-page-bolt
+                  (:stats acc-stats)
+                  (agg-pre-merge-comp-page-bolt new-data window include-sys?))
          :type :bolt))
 
 (defmethod agg-comp-exec-stats :spout
   [window include-sys? acc-stats new-data]
   (assoc (agg-spout-exec-win-stats acc-stats (:stats new-data) include-sys?)
-         :stats
-         (merge-agg-comp-stats :component-page
-                               :spout
-                               (:stats acc-stats)
-                               (agg-pre-merge :component-page
-                                              :spout
-                                              new-data
-                                              window
-                                              include-sys?))
+         :stats (merge-agg-comp-stats-comp-page-spout
+                  (:stats acc-stats)
+                  (agg-pre-merge-comp-page-spout new-data window include-sys?))
          :type :spout))
 
 (defn- aggregate-comp-stats*