You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/12 23:11:07 UTC
[09/14] Merge branch 'master' into idiomatic-clojure-01
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 6da383e..d8ad29a 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -20,7 +20,8 @@
(:use [hiccup core page-helpers])
(:use [backtype.storm config util log])
(:use [backtype.storm.ui helpers])
- (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID system-id?]]])
+ (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID
+ ACKER-ACK-STREAM-ID ACKER-FAIL-STREAM-ID system-id?]]])
(:use [ring.adapter.jetty :only [run-jetty]])
(:use [clojure.string :only [trim]])
(:import [backtype.storm.utils Utils])
@@ -51,93 +52,8 @@
(map #(.get_stats ^ExecutorSummary %))
(filter not-nil?)))
-(def tips
- "Defines a mapping of help texts for elements of the UI pages."
- {:sys-stats "Use this to toggle inclusion of storm system components."
- :version (str "The version of storm installed on the UI node. (Hopefully, "
- "this is the same on all storm nodes!)")
- :nimbus-uptime (str "The duration the current Nimbus instance has been "
- "running. (Note that the storm cluster may have been "
- "deployed and available for a much longer period than "
- "the current Nimbus process has been running.)")
- :num-supervisors "The number of nodes in the cluster currently."
- :num-slots "Slots are Workers (processes)."
- :num-execs "Executors are threads in a Worker process."
- :num-tasks (str "A Task is an instance of a Bolt or Spout. The number of "
- "Tasks is almost always equal to the number of Executors.")
- :name "The name given to the topology by when it was submitted."
- :name-link "Click the name to view the Topology's information."
- :topo-id "The unique ID given to a Topology each time it is launched."
- :status "The status can be one of ACTIVE, INACTIVE, KILLED, or REBALANCING."
- :topo-uptime "The time since the Topology was submitted."
- :num-workers "The number of Workers (processes)."
- :sup-id (str "A unique identifier given to a Supervisor when it joins the "
- "cluster.")
- :sup-host (str "The hostname reported by the remote host. (Note that this "
- "hostname is not the result of a reverse lookup at the "
- "Nimbus node.)")
- :sup-uptime (str "The length of time a Supervisor has been registered to the "
- "cluster.")
- :window (str "The past period of time for which the statistics apply. "
- "Click on a value to set the window for this page.")
- :emitted "The number of Tuples emitted."
- :transferred "The number of Tuples emitted that sent to one or more bolts."
- :complete-lat (str "The average time a Tuple \"tree\" takes to be completely "
- "processed by the Topology. A value of 0 is expected "
- "if no acking is done.")
- :spout-acked (str "The number of Tuple \"trees\" successfully processed. A "
- "value of 0 is expected if no acking is done.")
- :spout-failed (str "The number of Tuple \"trees\" that were explicitly "
- "failed or timed out before acking was completed. A value "
- "of 0 is expected if no acking is done.")
- :comp-id "The ID assigned to a the Component by the Topology."
- :comp-id-link "Click on the name to view the Component's page."
- :capacity (str "If this is around 1.0, the corresponding Bolt is running as "
- "fast as it can, so you may want to increase the Bolt's "
- "parallelism. This is (number executed * average execute "
- "latency) / measurement time.")
- :exec-lat (str "The average time a Tuple spends in the execute method. The "
- "execute method may complete without sending an Ack for the "
- "tuple.")
- :num-executed "The number of incoming Tuples processed."
- :proc-lat (str "The average time it takes to Ack a Tuple after it is first "
- "received. Bolts that join, aggregate or batch may not Ack a "
- "tuple until a number of other Tuples have been received.")
- :bolt-acked "The number of Tuples acknowledged by this Bolt."
- :bolt-failed "The number of tuples Failed by this Bolt."
- :stream (str "The name of the Tuple stream given in the Topolgy, or \""
- Utils/DEFAULT_STREAM_ID "\" if none was given.")
- :exec-id "The unique executor ID."
- :exec-uptime "The length of time an Executor (thread) has been alive."
- :port (str "The port number used by the Worker to which an Executor is "
- "assigned. Click on the port number to open the logviewer page "
- "for this Worker.")})
-
-(defn mk-system-toggle-button
- [include-sys?]
- [:p {:class "js-only"}
- [:span.tip.right {:title (:sys-stats tips)}
- [:input {:type "button"
- :value (str (if include-sys? "Hide" "Show") " System Stats")
- :onclick "toggleSys()"}]]])
-
-(defn ui-template
- [body]
- (html4
- [:head
- [:title "Storm UI"]
- (include-css "/css/bootstrap-1.4.0.css")
- (include-css "/css/style.css")
- (include-js "/js/jquery-1.6.2.min.js")
- (include-js "/js/jquery.tablesorter.min.js")
- (include-js "/js/jquery.cookies.2.2.0.min.js")
- (include-js "/js/bootstrap-twipsy.js")
- (include-js "/js/script.js")]
- [:body
- [:h1 (link-to "/" "Storm UI")]
- (seq body)]))
-
(defn read-storm-version
+ "Returns a string containing the Storm version or 'Unknown'."
[]
(let [storm-home (System/getProperty "storm.home")
release-path (format "%s/RELEASE" storm-home)
@@ -146,117 +62,12 @@
(trim (slurp release-path))
"Unknown")))
-(defn cluster-summary-table
- [^ClusterSummary summ]
- (let [sups (.get_supervisors summ)
- used-slots (reduce + (map #(.get_num_used_workers ^SupervisorSummary %) sups))
- total-slots (reduce + (map #(.get_num_workers ^SupervisorSummary %) sups))
- free-slots (- total-slots used-slots)
- total-tasks (->> (.get_topologies summ)
- (map #(.get_num_tasks ^TopologySummary %))
- (reduce +))
- total-executors (->> (.get_topologies summ)
- (map #(.get_num_executors ^TopologySummary %))
- (reduce +))]
- (table [{:text "Version" :attr {:class "tip right"
- :title (:version tips)}}
- {:text "Nimbus uptime" :attr {:class "tip right"
- :title (:nimbus-uptime tips)}}
- {:text "Supervisors" :attr {:class "tip above"
- :title (:num-supervisors tips)}}
- {:text "Used slots" :attr {:class "tip above"
- :title (:num-slots tips)}}
- {:text "Free slots" :attr {:class "tip above"
- :title (:num-slots tips)}}
- {:text "Total slots" :attr {:class "tip above"
- :title (:num-slots tips)}}
- {:text "Executors" :attr {:class "tip above"
- :title (:num-execs tips)}}
- {:text "Tasks" :attr {:class "tip left"
- :title (:num-tasks tips)}}]
- [[(read-storm-version)
- (pretty-uptime-sec (.get_nimbus_uptime_secs summ))
- (count sups)
- used-slots
- free-slots
- total-slots
- total-executors
- total-tasks]])))
-
-(defn topology-link
- ([id] (topology-link id id))
- ([id content]
- (link-to (url-format "/topology/%s" id) (escape-html content))))
-
-(defn main-topology-summary-table
- [summs]
- (sorted-table
- [{:text "Name" :attr {:class "tip right"
- :title (str (:name tips) " " (:name-link tips))}}
- {:text "Id" :attr {:class "tip right"
- :title (:topo-id tips)}}
- {:text "Status" :attr {:class "tip above"
- :title (:status tips)}}
- {:text "Uptime" :attr {:class "tip above"
- :title (:topo-uptime tips)}}
- {:text "Num workers" :attr {:class "tip above"
- :title (:num-workers tips)}}
- {:text "Num executors" :attr {:class "tip above"
- :title (:num-execs tips)}}
- {:text "Num tasks" :attr {:class "tip above"
- :title (:num-tasks tips)}}]
- (for [^TopologySummary t summs]
- [(topology-link (.get_id t) (.get_name t))
- (escape-html (.get_id t))
- (.get_status t)
- (pretty-uptime-sec (.get_uptime_secs t))
- (.get_num_workers t)
- (.get_num_executors t)
- (.get_num_tasks t)])
- :time-cols [3]
- :sort-list "[[0,0]]"))
-
-(defn supervisor-summary-table
- [summs]
- (sorted-table
- [{:text "Id" :attr {:class "tip right"
- :title (:sup-id tips)}}
- {:text "Host" :attr {:class "tip above"
- :title (:sup-host tips)}}
- {:text "Uptime" :attr {:class "tip above"
- :title (:sup-uptime tips)}}
- {:text "Slots" :attr {:class "tip above"
- :title (:num-slots tips)}}
- {:text "Used slots" :attr {:class "tip left"
- :title (:num-slots tips)}}]
- (for [^SupervisorSummary s summs]
- [(.get_supervisor_id s)
- (.get_host s)
- (pretty-uptime-sec (.get_uptime_secs s))
- (.get_num_workers s)
- (.get_num_used_workers s)])
- :time-cols [2]))
-
-(defn configuration-table
- [conf]
- (sorted-table ["Key" "Value"]
- (map #(vector (key %) (str (val %))) conf)))
-
-(defn main-page
- []
- (with-nimbus nimbus
- (let [summ (.getClusterInfo ^Nimbus$Client nimbus)]
- (concat
- [[:h2 "Cluster Summary"]]
- [(cluster-summary-table summ)]
- [[:h2 "Topology summary"]]
- (main-topology-summary-table (.get_topologies summ))
- [[:h2 "Supervisor summary"]]
- (supervisor-summary-table (.get_supervisors summ))
- [[:h2 "Nimbus Configuration"]]
- (configuration-table (from-json (.getNimbusConf ^Nimbus$Client nimbus)))))))
-
+;; TODO: What is the desired behavior if a particular component id
+;; corresponded to both a bolt and spout. Where should the error
+;; occur? -DCJ
(defn component-type
+ "Returns the component type (either :bolt or :spout) for a given
+ topology and component id. Returns nil if not found."
[^StormTopology topology id]
(let [bolts (.get_bolts topology)
spouts (.get_spouts topology)]
@@ -287,19 +98,14 @@
c]]
))]))))
-
(defn expand-averages-seq
[average-seq counts-seq]
(->> (map vector average-seq counts-seq)
(map #(apply expand-averages %))
- (apply merge-with
- (fn [s1 s2]
- (merge-with
- add-pairs
- s1
- s2)))))
+ (apply merge-with (fn [s1 s2] (merge-with add-pairs s1 s2)))))
-(defn- val-avg [[t c]]
+(defn- val-avg
+ [[t c]]
(if (= t 0) 0
(double (/ t c))))
@@ -341,6 +147,14 @@
(fn [_] true)
(fn [stream] (and (string? stream) (not (system-id? stream))))))
+(defn is-ack-stream
+ [stream]
+ (let [acker-streams
+ [ACKER-INIT-STREAM-ID
+ ACKER-ACK-STREAM-ID
+ ACKER-FAIL-STREAM-ID]]
+ (every? #(not= %1 stream) acker-streams)))
+
(defn pre-process
[stream-summary include-sys?]
(let [filter-fn (mk-include-sys-fn include-sys?)
@@ -423,86 +237,6 @@
[topology s]
(= :bolt (executor-summary-type topology s)))
-(defn topology-summary-table
- [^TopologyInfo summ]
- (let [executors (.get_executors summ)
- workers (set (for [^ExecutorSummary e executors] [(.get_host e) (.get_port e)]))]
- (table [{:text "Name" :attr {:class "tip right"
- :title (:name tips)}}
- {:text "Id" :attr {:class "tip right"
- :title (:topo-id tips)}}
- {:text "Status" :attr {:class "tip above"
- :title (:status tips)}}
- {:text "Uptime" :attr {:class "tip above"
- :title (:topo-uptime tips)}}
- {:text "Num workers" :attr {:class "tip above"
- :title (:num-workers tips)}}
- {:text "Num executors" :attr {:class "tip above"
- :title (:num-execs tips)}}
- {:text "Num tasks" :attr {:class "tip above"
- :title (:num-tasks tips)}}]
- [[(escape-html (.get_name summ))
- (escape-html (.get_id summ))
- (.get_status summ)
- (pretty-uptime-sec (.get_uptime_secs summ))
- (count workers)
- (count executors)
- (sum-tasks executors)]])))
-
-(defn total-aggregate-stats
- [spout-summs bolt-summs include-sys?]
- (let [spout-stats (get-filled-stats spout-summs)
- bolt-stats (get-filled-stats bolt-summs)
- agg-spout-stats (-> spout-stats
- (aggregate-spout-stats include-sys?)
- aggregate-spout-streams)
- agg-bolt-stats (-> bolt-stats
- (aggregate-bolt-stats include-sys?)
- aggregate-bolt-streams)]
- (merge-with
- (fn [s1 s2]
- (merge-with + s1 s2))
- (select-keys agg-bolt-stats [:emitted :transferred])
- agg-spout-stats)))
-
-(defn stats-times
- [stats-map]
- (sort-by #(Integer/parseInt %)
- (-> stats-map
- clojurify-structure
- (dissoc ":all-time")
- keys)))
-
-(defn topology-stats-table
- [id window stats]
- (let [times (stats-times (:emitted stats))
- display-map (into {} (for [t times] [t pretty-uptime-sec]))
- display-map (assoc display-map ":all-time" (fn [_] "All time"))]
- (sorted-table
- [{:text "Window" :attr {:class "tip right"
- :title (:window tips)}}
- {:text "Emitted" :attr {:class "tip above"
- :title (:emitted tips)}}
- {:text "Transferred" :attr {:class "tip above"
- :title (:transferred tips)}}
- {:text "Complete latency (ms)" :attr {:class "tip above"
- :title (:complete-lat tips)}}
- {:text "Acked" :attr {:class "tip above"
- :title (:spout-acked tips)}}
- {:text "Failed" :attr {:class "tip left"
- :title (:spout-failed tips)}}]
- (for [k (concat times [":all-time"])
- :let [disp ((display-map k) k)]]
- [(link-to (if (= k window) {:class "red"} {})
- (url-format "/topology/%s?window=%s" id k)
- (escape-html disp))
- (get-in stats [:emitted k])
- (get-in stats [:transferred k])
- (float-str (get-in stats [:complete-latencies k]))
- (get-in stats [:acked k])
- (get-in stats [:failed k])])
- :time-cols [0])))
-
(defn group-by-comp
[summs]
(let [ret (group-by #(.get_component_id ^ExecutorSummary %) summs)]
@@ -519,28 +253,23 @@
reverse
first)]
(if error
- [:span (if (< (time-delta (.get_error_time_secs ^ErrorInfo error))
- (* 60 30))
- {:class "red"}
- {})
- (error-subset (.get_error ^ErrorInfo error))])))
-
-(defn component-link
- [storm-id id]
- (link-to (url-format "/topology/%s/component/%s" storm-id id) (escape-html id)))
-
-(defn worker-log-link
- [host port]
- (link-to (url-format "http://%s:%s/log?file=worker-%s.log"
- host (*STORM-CONF* LOGVIEWER-PORT) port) (str port)))
-
-(defn render-capacity
- [capacity]
- (let [capacity (nil-to-zero capacity)]
- [:span (if (> capacity 0.9)
- {:class "red"}
- {})
- (float-str capacity)]))
+ (error-subset (.get_error ^ErrorInfo error))
+ "")))
+
+(defn component-task-summs
+ [^TopologyInfo summ topology id]
+ (let [spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
+ bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
+ spout-comp-summs (group-by-comp spout-summs)
+ bolt-comp-summs (group-by-comp bolt-summs)
+ ret (if (contains? spout-comp-summs id)
+ (spout-comp-summs id)
+ (bolt-comp-summs id))]
+ (sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start) ret)))
+
+(defn worker-log-link [host port]
+ (url-format "http://%s:%s/log?file=worker-%s.log"
+ host (*STORM-CONF* LOGVIEWER-PORT) port))
(defn compute-executor-capacity
[^ExecutorSummary e]
@@ -565,91 +294,56 @@
(map nil-to-zero)
(apply max)))
-(defn spout-comp-table
- [top-id summ-map errors window include-sys?]
- (sorted-table
- [{:text "Id" :attr {:class "tip right"
- :title (str (:comp-id tips) " " (:comp-id-link tips))}}
- {:text "Executors" :attr {:class "tip right"
- :title (:num-execs tips)}}
- {:text "Tasks" :attr {:class "tip above"
- :title (:num-tasks tips)}}
- {:text "Emitted" :attr {:class "tip above"
- :title (:emitted tips)}}
- {:text "Transferred" :attr {:class "tip above"
- :title (:transferred tips)}}
- {:text "Complete latency (ms)" :attr {:class "tip above"
- :title (:complete-lat tips)}}
- {:text "Acked" :attr {:class "tip above"
- :title (:spout-acked tips)}}
- {:text "Failed" :attr {:class "tip above"
- :title (:spout-failed tips)}}
- "Last error"]
- (for [[id summs] summ-map
- :let [stats-seq (get-filled-stats summs)
- stats (aggregate-spout-streams
- (aggregate-spout-stats
- stats-seq include-sys?))]]
- [(component-link top-id id)
- (count summs)
- (sum-tasks summs)
- (get-in stats [:emitted window])
- (get-in stats [:transferred window])
- (float-str (get-in stats [:complete-latencies window]))
- (get-in stats [:acked window])
- (get-in stats [:failed window])
- (most-recent-error (get errors id))])))
-
-(defn bolt-comp-table
- [top-id summ-map errors window include-sys?]
- (sorted-table
- [{:text "Id" :attr {:class "tip right"
- :title (str (:comp-id tips) " " (:comp-id-link tips))}}
- {:text "Executors" :attr {:class "tip right"
- :title (:num-execs tips)}}
- {:text "Tasks" :attr {:class "tip above"
- :title (:num-tasks tips)}}
- {:text "Emitted" :attr {:class "tip above"
- :title (:emitted tips)}}
- {:text "Transferred" :attr {:class "tip above"
- :title (:transferred tips)}}
- {:text "Capacity (last 10m)" :attr {:class "tip above"
- :title (:capacity tips)}}
- {:text "Execute latency (ms)" :attr {:class "tip above"
- :title (:exec-lat tips)}}
- {:text "Executed" :attr {:class "tip above"
- :title (:num-executed tips)}}
- {:text "Process latency (ms)":attr {:class "tip above"
- :title (:proc-lat tips)}}
- {:text "Acked" :attr {:class "tip above"
- :title (:bolt-acked tips)}}
- {:text "Failed" :attr {:class "tip left"
- :title (:bolt-failed tips)}}
- "Last error"]
- (for [[id summs] summ-map
- :let [stats-seq (get-filled-stats summs)
- stats (aggregate-bolt-streams
- (aggregate-bolt-stats
- stats-seq include-sys?))]]
- [(component-link top-id id)
- (count summs)
- (sum-tasks summs)
- (get-in stats [:emitted window])
- (get-in stats [:transferred window])
- (render-capacity (compute-bolt-capacity summs))
- (float-str (get-in stats [:execute-latencies window]))
- (get-in stats [:executed window])
- (float-str (get-in stats [:process-latencies window]))
- (get-in stats [:acked window])
- (get-in stats [:failed window])
- (most-recent-error (get errors id))])))
-
-(defn window-hint [window]
+(defn spout-streams-stats
+ [summs include-sys?]
+ (let [stats-seq (get-filled-stats summs)]
+ (aggregate-spout-streams
+ (aggregate-spout-stats
+ stats-seq include-sys?))))
+
+(defn bolt-streams-stats
+ [summs include-sys?]
+ (let [stats-seq (get-filled-stats summs)]
+ (aggregate-bolt-streams
+ (aggregate-bolt-stats
+ stats-seq include-sys?))))
+
+(defn total-aggregate-stats
+ [spout-summs bolt-summs include-sys?]
+ (let [spout-stats (get-filled-stats spout-summs)
+ bolt-stats (get-filled-stats bolt-summs)
+ agg-spout-stats (-> spout-stats
+ (aggregate-spout-stats include-sys?)
+ aggregate-spout-streams)
+ agg-bolt-stats (-> bolt-stats
+ (aggregate-bolt-stats include-sys?)
+ aggregate-bolt-streams)]
+ (merge-with
+ (fn [s1 s2]
+ (merge-with + s1 s2))
+ (select-keys
+ agg-bolt-stats
+ [:emitted :transferred :acked :failed :complete-latencies])
+ (select-keys
+ agg-spout-stats
+ [:emitted :transferred :acked :failed :complete-latencies]))))
+
+(defn stats-times
+ [stats-map]
+ (sort-by #(Integer/parseInt %)
+ (-> stats-map
+ clojurify-structure
+ (dissoc ":all-time")
+ keys)))
+
+(defn window-hint
+ [window]
(if (= window ":all-time")
"All time"
(pretty-uptime-sec window)))
-(defn topology-action-button [id name action command is-wait default-wait enabled]
+(defn topology-action-button
+ [id name action command is-wait default-wait enabled]
[:input {:type "button"
:value action
(if enabled :enabled :disabled) ""
@@ -658,427 +352,507 @@
(StringEscapeUtils/escapeJavaScript name) "', '"
command "', " is-wait ", " default-wait ")")}])
-(defn topology-page
- [id window include-sys?]
+(defn sanitize-stream-name
+ [name]
+ (let [sym-regex #"(?![A-Za-z_\-:\.])."]
+ (str
+ (if (re-find #"^[A-Za-z]" name)
+ (clojure.string/replace name sym-regex "_")
+ (clojure.string/replace (str \s name) sym-regex "_"))
+ (hash name))))
+
+(defn sanitize-transferred
+ [transferred]
+ (into {}
+ (for [[time, stream-map] transferred]
+ [time, (into {}
+ (for [[stream, trans] stream-map]
+ [(sanitize-stream-name stream), trans]))])))
+
+(defn visualization-data
+ [spout-bolt spout-comp-summs bolt-comp-summs window storm-id]
+ (let [components (for [[id spec] spout-bolt]
+ [id
+ (let [inputs (.get_inputs (.get_common spec))
+ bolt-summs (get bolt-comp-summs id)
+ spout-summs (get spout-comp-summs id)
+ bolt-cap (if bolt-summs
+ (compute-bolt-capacity bolt-summs)
+ 0)]
+ {:type (if bolt-summs "bolt" "spout")
+ :capacity bolt-cap
+ :latency (if bolt-summs
+ (get-in
+ (bolt-streams-stats bolt-summs true)
+ [:process-latencies window])
+ (get-in
+ (spout-streams-stats spout-summs true)
+ [:complete-latencies window]))
+ :transferred (or
+ (get-in
+ (spout-streams-stats spout-summs true)
+ [:transferred window])
+ (get-in
+ (bolt-streams-stats bolt-summs true)
+ [:transferred window]))
+ :stats (let [mapfn (fn [dat]
+ (map (fn [^ExecutorSummary summ]
+ {:host (.get_host summ)
+ :port (.get_port summ)
+ :uptime_secs (.get_uptime_secs summ)
+ :transferred (if-let [stats (.get_stats summ)]
+ (sanitize-transferred (.get_transferred stats)))})
+ dat))]
+ (if bolt-summs
+ (mapfn bolt-summs)
+ (mapfn spout-summs)))
+ :link (url-format "/component.html?id=%s&topology_id=%s" id storm-id)
+ :inputs (for [[global-stream-id group] inputs]
+ {:component (.get_componentId global-stream-id)
+ :stream (.get_streamId global-stream-id)
+ :sani-stream (sanitize-stream-name (.get_streamId global-stream-id))
+ :grouping (clojure.core/name (thrift/grouping-type group))})})])]
+ (into {} (doall components))))
+
+(defn stream-boxes [datmap]
+ (let [filter-fn (mk-include-sys-fn true)
+ streams
+ (vec (doall (distinct
+ (apply concat
+ (for [[k v] datmap]
+ (for [m (get v :inputs)]
+ {:stream (get m :stream)
+ :sani-stream (get m :sani-stream)
+ :checked (is-ack-stream (get m :stream))}))))))]
+ (map (fn [row]
+ {:row row}) (partition 4 4 nil streams))))
+
+(defn mk-visualization-data [id window include-sys?]
+ (with-nimbus
+ nimbus
+ (let [window (if window window ":all-time")
+ topology (.getTopology ^Nimbus$Client nimbus id)
+ spouts (.get_spouts topology)
+ bolts (.get_bolts topology)
+ summ (.getTopologyInfo ^Nimbus$Client nimbus id)
+ execs (.get_executors summ)
+ spout-summs (filter (partial spout-summary? topology) execs)
+ bolt-summs (filter (partial bolt-summary? topology) execs)
+ spout-comp-summs (group-by-comp spout-summs)
+ bolt-comp-summs (group-by-comp bolt-summs)
+ bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?)
+ bolt-comp-summs)
+ topology-conf (from-json
+ (.getTopologyConf ^Nimbus$Client nimbus id))]
+ (visualization-data
+ (merge (hashmap-to-persistent spouts)
+ (hashmap-to-persistent bolts))
+ spout-comp-summs bolt-comp-summs window id))))
+
+(defn cluster-configuration []
(with-nimbus nimbus
- (let [window (if window window ":all-time")
- window-hint (window-hint window)
- summ (.getTopologyInfo ^Nimbus$Client nimbus id)
- topology (.getTopology ^Nimbus$Client nimbus id)
- topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus id))
- spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
- bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
- spout-comp-summs (group-by-comp spout-summs)
- bolt-comp-summs (group-by-comp bolt-summs)
- bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?) bolt-comp-summs)
- name (.get_name summ)
- status (.get_status summ)
- msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)]
- (concat
- [[:h2 "Topology summary"]]
- [(topology-summary-table summ)]
- [[:h2 {:class "js-only"} "Topology actions"]]
- [[:p {:class "js-only"}
- (concat
- [(topology-action-button id name "Activate" "activate" false 0 (= "INACTIVE" status))]
- [(topology-action-button id name "Deactivate" "deactivate" false 0 (= "ACTIVE" status))]
- [(topology-action-button id name "Rebalance" "rebalance" true msg-timeout (or (= "ACTIVE" status) (= "INACTIVE" status)))]
- [(topology-action-button id name "Kill" "kill" true msg-timeout (not= "KILLED" status))])]]
- [[:h2 "Topology stats"]]
- (topology-stats-table id window (total-aggregate-stats spout-summs bolt-summs include-sys?))
- [[:h2 "Spouts (" window-hint ")"]]
- (spout-comp-table id spout-comp-summs (.get_errors summ) window include-sys?)
- [[:h2 "Bolts (" window-hint ")"]]
- (bolt-comp-table id bolt-comp-summs (.get_errors summ) window include-sys?)
- [[:h2 "Topology Configuration"]]
- (configuration-table topology-conf)))))
-
-(defn component-task-summs
- [^TopologyInfo summ topology id]
- (let [spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
- bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
- spout-comp-summs (group-by-comp spout-summs)
- bolt-comp-summs (group-by-comp bolt-summs)
- ret (if (contains? spout-comp-summs id)
- (spout-comp-summs id)
- (bolt-comp-summs id))]
- (sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start) ret)))
-
-(defn spout-summary-table
- [topology-id id stats window]
+ (.getNimbusConf ^Nimbus$Client nimbus)))
+
+(defn cluster-summary
+ ([]
+ (with-nimbus nimbus
+ (cluster-summary (.getClusterInfo ^Nimbus$Client nimbus))))
+ ([^ClusterSummary summ]
+ (let [sups (.get_supervisors summ)
+ used-slots (reduce + (map #(.get_num_used_workers ^SupervisorSummary %) sups))
+ total-slots (reduce + (map #(.get_num_workers ^SupervisorSummary %) sups))
+ free-slots (- total-slots used-slots)
+ total-tasks (->> (.get_topologies summ)
+ (map #(.get_num_tasks ^TopologySummary %))
+ (reduce +))
+ total-executors (->> (.get_topologies summ)
+ (map #(.get_num_executors ^TopologySummary %))
+ (reduce +))]
+ {"stormVersion" (read-storm-version)
+ "nimbusUptime" (pretty-uptime-sec (.get_nimbus_uptime_secs summ))
+ "supervisors" (count sups)
+ "slotsTotal" total-slots
+ "slotsUsed" used-slots
+ "slotsFree" free-slots
+ "executorsTotal" total-executors
+ "tasksTotal" total-tasks})))
+
+(defn supervisor-summary
+ ([]
+ (with-nimbus nimbus
+ (supervisor-summary
+ (.get_supervisors (.getClusterInfo ^Nimbus$Client nimbus)))))
+ ([summs]
+ {"supervisors"
+ (for [^SupervisorSummary s summs]
+ {"id" (.get_supervisor_id s)
+ "host" (.get_host s)
+ "uptime" (pretty-uptime-sec (.get_uptime_secs s))
+ "slotsTotal" (.get_num_workers s)
+ "slotsUsed" (.get_num_used_workers s)})}))
+
+(defn all-topologies-summary
+ ([]
+ (with-nimbus
+ nimbus
+ (all-topologies-summary
+ (.get_topologies (.getClusterInfo ^Nimbus$Client nimbus)))))
+ ([summs]
+ {"topologies"
+ (for [^TopologySummary t summs]
+ {"id" (.get_id t)
+ "name" (.get_name t)
+ "status" (.get_status t)
+ "uptime" (pretty-uptime-sec (.get_uptime_secs t))
+ "tasksTotal" (.get_num_tasks t)
+ "workersTotal" (.get_num_workers t)
+ "executorsTotal" (.get_num_executors t)})}))
+
+(defn topology-stats [id window stats]
(let [times (stats-times (:emitted stats))
display-map (into {} (for [t times] [t pretty-uptime-sec]))
display-map (assoc display-map ":all-time" (fn [_] "All time"))]
- (sorted-table
- [{:text "Window" :attr {:class "tip right"
- :title (:window tips)}}
- {:text "Emitted" :attr {:class "tip above"
- :title (:emitted tips)}}
- {:text "Transferred" :attr {:class "tip above"
- :title (:transferred tips)}}
- {:text "Complete latency (ms)" :attr {:class "tip above"
- :title (:complete-lat tips)}}
- {:text "Acked" :attr {:class "tip above"
- :title (:spout-acked tips)}}
- {:text "Failed" :attr {:class "tip left"
- :title (:spout-failed tips)}}]
- (for [k (concat times [":all-time"])
- :let [disp ((display-map k) k)]]
- [(link-to (if (= k window) {:class "red"} {})
- (url-format "/topology/%s/component/%s?window=%s" topology-id id k)
- (escape-html disp))
- (get-in stats [:emitted k])
- (get-in stats [:transferred k])
- (float-str (get-in stats [:complete-latencies k]))
- (get-in stats [:acked k])
- (get-in stats [:failed k])])
- :time-cols [0])))
-
-(defn spout-output-summary-table
+ (for [k (concat times [":all-time"])
+ :let [disp ((display-map k) k)]]
+ {"windowPretty" disp
+ "window" k
+ "emitted" (get-in stats [:emitted k])
+ "transferred" (get-in stats [:transferred k])
+ "completeLatency" (float-str (get-in stats [:complete-latencies k]))
+ "acked" (get-in stats [:acked k])
+ "failed" (get-in stats [:failed k])})))
+
+(defn spout-comp [top-id summ-map errors window include-sys?]
+ (for [[id summs] summ-map
+ :let [stats-seq (get-filled-stats summs)
+ stats (aggregate-spout-streams
+ (aggregate-spout-stats
+ stats-seq include-sys?))]]
+ {"spoutId" id
+ "executors" (count summs)
+ "tasks" (sum-tasks summs)
+ "emitted" (get-in stats [:emitted window])
+ "transferred" (get-in stats [:transferred window])
+ "completeLatency" (float-str (get-in stats [:complete-latencies window]))
+ "acked" (get-in stats [:acked window])
+ "failed" (get-in stats [:failed window])
+ "lastError" (most-recent-error (get errors id))}))
+
+(defn bolt-comp [top-id summ-map errors window include-sys?]
+ (for [[id summs] summ-map
+ :let [stats-seq (get-filled-stats summs)
+ stats (aggregate-bolt-streams
+ (aggregate-bolt-stats
+ stats-seq include-sys?))]]
+ {"boltId" id
+ "executors" (count summs)
+ "tasks" (sum-tasks summs)
+ "emitted" (get-in stats [:emitted window])
+ "transferred" (get-in stats [:transferred window])
+ "capacity" (float-str (nil-to-zero (compute-bolt-capacity summs)))
+ "executeLatency" (float-str (get-in stats [:execute-latencies window]))
+ "executed" (get-in stats [:executed window])
+ "processLatency" (float-str (get-in stats [:process-latencies window]))
+ "acked" (get-in stats [:acked window])
+ "failed" (get-in stats [:failed window])
+ "lastError" (most-recent-error (get errors id))}))
+
+(defn topology-summary [^TopologyInfo summ]
+ (let [executors (.get_executors summ)
+ workers (set (for [^ExecutorSummary e executors]
+ [(.get_host e) (.get_port e)]))]
+ {"id" (.get_id summ)
+ "name" (.get_name summ)
+ "status" (.get_status summ)
+ "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
+ "tasksTotal" (sum-tasks executors)
+ "workersTotal" (count workers)
+ "executorsTotal" (count executors)}))
+
+(defn spout-summary-json [topology-id id stats window]
+ (let [times (stats-times (:emitted stats))
+ display-map (into {} (for [t times] [t pretty-uptime-sec]))
+ display-map (assoc display-map ":all-time" (fn [_] "All time"))]
+ (for [k (concat times [":all-time"])
+ :let [disp ((display-map k) k)]]
+ {"windowPretty" disp
+ "window" k
+ "emitted" (get-in stats [:emitted k])
+ "transferred" (get-in stats [:transferred k])
+ "completeLatency" (float-str (get-in stats [:complete-latencies k]))
+ "acked" (get-in stats [:acked k])
+ "failed" (get-in stats [:failed k])})))
+
+(defn topology-page [id window include-sys?]
+ (with-nimbus nimbus
+ (let [window (if window window ":all-time")
+ window-hint (window-hint window)
+ summ (.getTopologyInfo ^Nimbus$Client nimbus id)
+ topology (.getTopology ^Nimbus$Client nimbus id)
+ topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus id))
+ spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
+ bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
+ spout-comp-summs (group-by-comp spout-summs)
+ bolt-comp-summs (group-by-comp bolt-summs)
+ bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?) bolt-comp-summs)
+ name (.get_name summ)
+ status (.get_status summ)
+ msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)
+ spouts (.get_spouts topology)
+ bolts (.get_bolts topology)
+ visualizer-data (visualization-data (merge (hashmap-to-persistent spouts)
+ (hashmap-to-persistent bolts))
+ spout-comp-summs
+ bolt-comp-summs
+ window
+ id)]
+ (merge
+ (topology-summary summ)
+ {"window" window
+ "windowHint" window-hint
+ "msgTimeout" msg-timeout
+ "topologyStats" (topology-stats id window (total-aggregate-stats spout-summs bolt-summs include-sys?))
+ "spouts" (spout-comp id spout-comp-summs (.get_errors summ) window include-sys?)
+ "bolts" (bolt-comp id bolt-comp-summs (.get_errors summ) window include-sys?)
+ "configuration" topology-conf
+ "visualizationTable" (stream-boxes visualizer-data)}))))
+
+(defn spout-output-stats
[stream-summary window]
(let [stream-summary (map-val swap-map-order (swap-map-order stream-summary))]
- (sorted-table
- [{:text "Stream" :attr {:class "tip right"
- :title (:stream tips)}}
- {:text "Emitted" :attr {:class "tip above"
- :title (:emitted tips)}}
- {:text "Transferred" :attr {:class "tip above"
- :title (:transferred tips)}}
- {:text "Complete latency (ms)" :attr {:class "tip above"
- :title (:complete-lat tips)}}
- {:text "Acked" :attr {:class "tip above"
- :title (:spout-acked tips)}}
- {:text "Failed" :attr {:class "tip left"
- :title (:spout-failed tips)}}]
- (for [[s stats] (stream-summary window)]
- [s
- (nil-to-zero (:emitted stats))
- (nil-to-zero (:transferred stats))
- (float-str (:complete-latencies stats))
- (nil-to-zero (:acked stats))
- (nil-to-zero (:failed stats))]))))
-
-(defn spout-executor-table
+ (for [[s stats] (stream-summary window)]
+ {"stream" s
+ "emitted" (nil-to-zero (:emitted stats))
+ "transferred" (nil-to-zero (:transferred stats))
+ "completeLatency" (float-str (:complete-latencies stats))
+ "acked" (nil-to-zero (:acked stats))
+ "failed" (nil-to-zero (:failed stats))})))
+
+(defn spout-executor-stats
[topology-id executors window include-sys?]
- (sorted-table
- [{:text "Id" :attr {:class "tip right"
- :title (:exec-id tips)}}
- {:text "Uptime" :attr {:class "tip right"
- :title (:exec-uptime tips)}}
- {:text "Host" :attr {:class "tip above"
- :title (:sup-host tips)}}
- {:text "Port" :attr {:class "tip above"
- :title (:port tips)}}
- {:text "Emitted" :attr {:class "tip above"
- :title (:emitted tips)}}
- {:text "Transferred" :attr {:class "tip above"
- :title (:transferred tips)}}
- {:text "Complete latency (ms)" :attr {:class "tip above"
- :title (:complete-lat tips)}}
- {:text "Acked" :attr {:class "tip above"
- :title (:spout-acked tips)}}
- {:text "Failed" :attr {:class "tip left"
- :title (:spout-failed tips)}}]
- (for [^ExecutorSummary e executors
- :let [stats (.get_stats e)
- stats (if stats
- (-> stats
- (aggregate-spout-stats include-sys?)
- aggregate-spout-streams
- swap-map-order
- (get window)))]]
- [(pretty-executor-info (.get_executor_info e))
- (pretty-uptime-sec (.get_uptime_secs e))
- (.get_host e)
- (worker-log-link (.get_host e) (.get_port e))
- (nil-to-zero (:emitted stats))
- (nil-to-zero (:transferred stats))
- (float-str (:complete-latencies stats))
- (nil-to-zero (:acked stats))
- (nil-to-zero (:failed stats))])
- :time-cols [1]))
-
-(defn spout-page
+ (for [^ExecutorSummary e executors
+ :let [stats (.get_stats e)
+ stats (if stats
+ (-> stats
+ (aggregate-spout-stats include-sys?)
+ aggregate-spout-streams
+ swap-map-order
+ (get window)))]]
+ {"id" (pretty-executor-info (.get_executor_info e))
+ "uptime" (pretty-uptime-sec (.get_uptime_secs e))
+ "host" (.get_host e)
+ "port" (.get_port e)
+ "emitted" (nil-to-zero (:emitted stats))
+ "transferred" (nil-to-zero (:transferred stats))
+ "completeLatency" (float-str (:complete-latencies stats))
+ "acked" (nil-to-zero (:acked stats))
+ "failed" (nil-to-zero (:failed stats))
+ "workerLogLink" (worker-log-link (.get_host e) (.get_port e))}))
+
+(defn component-errors
+ [errors-list]
+ (let [errors (->> errors-list
+ (sort-by #(.get_error_time_secs ^ErrorInfo %))
+ reverse)]
+ {"componentErrors"
+ (for [^ErrorInfo e errors]
+ {"time" (date-str (.get_error_time_secs e))
+ "error" (.get_error e)})}))
+
+(defn spout-stats
[window ^TopologyInfo topology-info component executors include-sys?]
(let [window-hint (str " (" (window-hint window) ")")
stats (get-filled-stats executors)
stream-summary (-> stats (aggregate-spout-stats include-sys?))
summary (-> stream-summary aggregate-spout-streams)]
- (concat
- [[:h2 "Spout stats"]]
- (spout-summary-table (.get_id topology-info) component summary window)
- [[:h2 "Output stats" window-hint]]
- (spout-output-summary-table stream-summary window)
- [[:h2 "Executors" window-hint]]
- ;; task id, task uptime, stream aggregated stats, last error
- (spout-executor-table (.get_id topology-info) executors window include-sys?))))
-
-(defn bolt-output-summary-table
+ {"spoutSummary" (spout-summary-json
+ (.get_id topology-info) component summary window)
+ "outputStats" (spout-output-stats stream-summary window)
+ "executorStats" (spout-executor-stats (.get_id topology-info)
+ executors window include-sys?)}))
+
+(defn bolt-summary
+ [topology-id id stats window]
+ (let [times (stats-times (:emitted stats))
+ display-map (into {} (for [t times] [t pretty-uptime-sec]))
+ display-map (assoc display-map ":all-time" (fn [_] "All time"))]
+ (for [k (concat times [":all-time"])
+ :let [disp ((display-map k) k)]]
+ {"window" k
+ "windowPretty" disp
+ "emitted" (get-in stats [:emitted k])
+ "transferred" (get-in stats [:transferred k])
+ "executeLatency" (float-str (get-in stats [:execute-latencies k]))
+ "executed" (get-in stats [:executed k])
+ "processLatency" (float-str (get-in stats [:process-latencies k]))
+ "acked" (get-in stats [:acked k])
+ "failed" (get-in stats [:failed k])})))
+
+(defn bolt-output-stats
[stream-summary window]
(let [stream-summary (-> stream-summary
swap-map-order
(get window)
(select-keys [:emitted :transferred])
swap-map-order)]
- (sorted-table
- [{:text "Stream" :attr {:class "tip right"
- :title (:stream tips)}}
- {:text "Emitted" :attr {:class "tip above"
- :title (:emitted tips)}}
- {:text "Transferred" :attr {:class "tip above"
- :title (:transferred tips)}}]
- (for [[s stats] stream-summary]
- [s
- (nil-to-zero (:emitted stats))
- (nil-to-zero (:transferred stats))]))))
-
-(defn bolt-input-summary-table
+ (for [[s stats] stream-summary]
+ {"stream" s
+ "emitted" (nil-to-zero (:emitted stats))
+ "transferred" (nil-to-zero (:transferred stats))})))
+
+(defn bolt-input-stats
[stream-summary window]
- (let [stream-summary (-> stream-summary
- swap-map-order
- (get window)
- (select-keys [:acked :failed :process-latencies :executed :execute-latencies])
- swap-map-order)]
- (sorted-table
- [{:text "Component" :attr {:class "tip right"
- :title (:comp-id tips)}}
- {:text "Stream" :attr {:class "tip right"
- :title (:stream tips)}}
- {:text "Execute latency (ms)" :attr {:class "tip above"
- :title (:exec-lat tips)}}
- {:text "Executed" :attr {:class "tip above"
- :title (:num-executed tips)}}
- {:text "Process latency (ms)":attr {:class "tip above"
- :title (:proc-lat tips)}}
- {:text "Acked" :attr {:class "tip above"
- :title (:bolt-acked tips)}}
- {:text "Failed" :attr {:class "tip left"
- :title (:bolt-failed tips)}}]
- (for [[^GlobalStreamId s stats] stream-summary]
- [(escape-html (.get_componentId s))
- (.get_streamId s)
- (float-str (:execute-latencies stats))
- (nil-to-zero (:executed stats))
- (float-str (:process-latencies stats))
- (nil-to-zero (:acked stats))
- (nil-to-zero (:failed stats))]))))
-
-(defn bolt-executor-table
+ (let [stream-summary
+ (-> stream-summary
+ swap-map-order
+ (get window)
+ (select-keys [:acked :failed :process-latencies
+ :executed :execute-latencies])
+ swap-map-order)]
+ (for [[^GlobalStreamId s stats] stream-summary]
+ {"component" (.get_componentId s)
+ "stream" (.get_streamId s)
+ "executeLatency" (float-str (:execute-latencies stats))
+ "processLatency" (float-str (:execute-latencies stats))
+ "executed" (nil-to-zero (:executed stats))
+ "acked" (nil-to-zero (:acked stats))
+ "failed" (nil-to-zero (:failed stats))})))
+
+(defn bolt-executor-stats
[topology-id executors window include-sys?]
- (sorted-table
- [{:text "Id" :attr {:class "tip right"
- :title (:exec-id tips)}}
- {:text "Uptime" :attr {:class "tip right"
- :title (:exec-uptime tips)}}
- {:text "Host" :attr {:class "tip above"
- :title (:sup-host tips)}}
- {:text "Port" :attr {:class "tip above"
- :title (:port tips)}}
- {:text "Emitted" :attr {:class "tip above"
- :title (:emitted tips)}}
- {:text "Transferred" :attr {:class "tip above"
- :title (:transferred tips)}}
- {:text "Capacity (last 10m)" :attr {:class "tip above"
- :title (:capacity tips)}}
- {:text "Execute latency (ms)" :attr {:class "tip above"
- :title (:exec-lat tips)}}
- {:text "Executed" :attr {:class "tip above"
- :title (:num-executed tips)}}
- {:text "Process latency (ms)":attr {:class "tip above"
- :title (:proc-lat tips)}}
- {:text "Acked" :attr {:class "tip above"
- :title (:bolt-acked tips)}}
- {:text "Failed" :attr {:class "tip left"
- :title (:bolt-failed tips)}}]
- (for [^ExecutorSummary e executors
- :let [stats (.get_stats e)
- stats (if stats
- (-> stats
- (aggregate-bolt-stats include-sys?)
- (aggregate-bolt-streams)
- swap-map-order
- (get window)))]]
- [(pretty-executor-info (.get_executor_info e))
- (pretty-uptime-sec (.get_uptime_secs e))
- (.get_host e)
- (worker-log-link (.get_host e) (.get_port e))
- (nil-to-zero (:emitted stats))
- (nil-to-zero (:transferred stats))
- (render-capacity (compute-executor-capacity e))
- (float-str (:execute-latencies stats))
- (nil-to-zero (:executed stats))
- (float-str (:process-latencies stats))
- (nil-to-zero (:acked stats))
- (nil-to-zero (:failed stats))])
- :time-cols [1]))
-
-(defn bolt-summary-table
- [topology-id id stats window]
- (let [times (stats-times (:emitted stats))
- display-map (into {} (for [t times] [t pretty-uptime-sec]))
- display-map (assoc display-map ":all-time" (fn [_] "All time"))]
- (sorted-table
- [{:text "Window" :attr {:class "tip right"
- :title (:window tips)}}
- {:text "Emitted" :attr {:class "tip above"
- :title (:emitted tips)}}
- {:text "Transferred" :attr {:class "tip above"
- :title (:transferred tips)}}
- {:text "Execute latency (ms)" :attr {:class "tip above"
- :title (:exec-lat tips)}}
- {:text "Executed" :attr {:class "tip above"
- :title (:num-executed tips)}}
- {:text "Process latency (ms)":attr {:class "tip above"
- :title (:proc-lat tips)}}
- {:text "Acked" :attr {:class "tip above"
- :title (:bolt-acked tips)}}
- {:text "Failed" :attr {:class "tip left"
- :title (:bolt-failed tips)}}]
- (for [k (concat times [":all-time"])
- :let [disp ((display-map k) k)]]
- [(link-to (if (= k window) {:class "red"} {})
- (url-format "/topology/%s/component/%s?window=%s" topology-id id k)
- (escape-html disp))
- (get-in stats [:emitted k])
- (get-in stats [:transferred k])
- (float-str (get-in stats [:execute-latencies k]))
- (get-in stats [:executed k])
- (float-str (get-in stats [:process-latencies k]))
- (get-in stats [:acked k])
- (get-in stats [:failed k])])
- :time-cols [0])))
-
-(defn bolt-page
+ (for [^ExecutorSummary e executors
+ :let [stats (.get_stats e)
+ stats (if stats
+ (-> stats
+ (aggregate-bolt-stats include-sys?)
+ (aggregate-bolt-streams)
+ swap-map-order
+ (get window)))]]
+ {"id" (pretty-executor-info (.get_executor_info e))
+ "uptime" (pretty-uptime-sec (.get_uptime_secs e))
+ "host" (.get_host e)
+ "port" (.get_port e)
+ "emitted" (nil-to-zero (:emitted stats))
+ "transferred" (nil-to-zero (:transferred stats))
+ "capacity" (float-str (nil-to-zero (compute-executor-capacity e)))
+ "executeLatency" (float-str (:execute-latencies stats))
+ "executed" (nil-to-zero (:executed stats))
+ "processLatency" (float-str (:process-latencies stats))
+ "acked" (nil-to-zero (:acked stats))
+ "failed" (nil-to-zero (:failed stats))
+ "workerLogLink" (worker-log-link (.get_host e) (.get_port e))}))
+
+(defn bolt-stats
[window ^TopologyInfo topology-info component executors include-sys?]
(let [window-hint (str " (" (window-hint window) ")")
stats (get-filled-stats executors)
stream-summary (-> stats (aggregate-bolt-stats include-sys?))
summary (-> stream-summary aggregate-bolt-streams)]
- (concat
- [[:h2 "Bolt stats"]]
- (bolt-summary-table (.get_id topology-info) component summary window)
-
- [[:h2 "Input stats" window-hint]]
- (bolt-input-summary-table stream-summary window)
-
- [[:h2 "Output stats" window-hint]]
- (bolt-output-summary-table stream-summary window)
-
- [[:h2 "Executors"]]
- (bolt-executor-table (.get_id topology-info) executors window include-sys?))))
-
-(defn errors-table
- [errors-list]
- (let [errors (->> errors-list
- (sort-by #(.get_error_time_secs ^ErrorInfo %))
- reverse)]
- (sorted-table
- ["Time" "Error"]
- (for [^ErrorInfo e errors]
- [(date-str (.get_error_time_secs e))
- [:pre (.get_error e)]])
- :sort-list "[[0,1]]")))
+ {"boltStats" (bolt-summary (.get_id topology-info) component summary window)
+ "inputStats" (bolt-input-stats stream-summary window)
+ "outputStats" (bolt-output-stats stream-summary window)
+ "executorStats" (bolt-executor-stats
+ (.get_id topology-info) executors window include-sys?)}))
(defn component-page
[topology-id component window include-sys?]
(with-nimbus nimbus
- (let [window (if window window ":all-time")
- summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id)
- topology (.getTopology ^Nimbus$Client nimbus topology-id)
- type (component-type topology component)
- summs (component-task-summs summ topology component)
- spec (cond (= type :spout) (spout-page window summ component summs include-sys?)
- (= type :bolt) (bolt-page window summ component summs include-sys?))]
- (concat
- [[:h2 "Component summary"]
- (table [{:text "Id" :attr {:class "tip right"
- :title (:comp-id tips)}}
- {:text "Topology" :attr {:class "tip above"
- :title (str (:name tips) " " (:name-link tips))}}
- {:text "Executors" :attr {:class "tip above"
- :title (:num-execs tips)}}
- {:text "Tasks" :attr {:class "tip above"
- :title (:num-tasks tips)}}]
- [[(escape-html component)
- (topology-link (.get_id summ) (.get_name summ))
- (count summs)
- (sum-tasks summs)
- ]])]
- spec
- [[:h2 "Errors"]
- (errors-table (get (.get_errors summ) component))]))))
-
-(defn get-include-sys?
- [cookies]
- (let [sys? (get cookies "sys")
- sys? (if (or (nil? sys?) (= "false" (:value sys?))) false true)]
- sys?))
+ (let [window (if window window ":all-time")
+ summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id)
+ topology (.getTopology ^Nimbus$Client nimbus topology-id)
+ type (component-type topology component)
+ summs (component-task-summs summ topology component)
+ spec (cond (= type :spout) (spout-stats window summ component summs include-sys?)
+ (= type :bolt) (bolt-stats window summ component summs include-sys?))
+ errors (component-errors (get (.get_errors summ) component))]
+ (merge
+ {"id" component
+ "name" (.get_name summ)
+ "executors" (count summs)
+ "tasks" (sum-tasks summs)
+ "topologyId" topology-id
+ "window" window
+ "componentType" (name type)
+ "windowHint" (window-hint window)}
+ spec errors))))
+
+(defn check-include-sys?
+ [sys?]
+ (if (or (nil? sys?) (= "false" sys?)) false true))
+
+(defn json-response [data & [status]]
+ {:status (or status 200)
+ :headers {"Content-Type" "application/json"}
+ :body (to-json data)})
(defroutes main-routes
- (GET "/" [:as {cookies :cookies}]
- (-> (main-page)
- ui-template))
- (GET "/topology/:id" [:as {cookies :cookies} id & m]
- (let [include-sys? (get-include-sys? cookies)
- id (url-decode id)]
- (try
- (-> (topology-page (url-decode id) (:window m) include-sys?)
- (concat [(mk-system-toggle-button include-sys?)])
- ui-template)
- (catch Exception e (resp/redirect "/")))))
- (GET "/topology/:id/component/:component" [:as {cookies :cookies} id component & m]
- (let [include-sys? (get-include-sys? cookies)
- id (url-decode id)
+ (GET "/api/v1/cluster/configuration" []
+ (cluster-configuration))
+ (GET "/api/v1/cluster/summary" []
+ (json-response (cluster-summary)))
+ (GET "/api/v1/supervisor/summary" []
+ (json-response (supervisor-summary)))
+ (GET "/api/v1/topology/summary" []
+ (json-response (all-topologies-summary)))
+ (GET "/api/v1/topology/:id" [id & m]
+ (let [id (url-decode id)]
+ (json-response (topology-page id (:window m) (check-include-sys? (:sys m))))))
+ (GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies servlet-request]} id & m]
+ (json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys m)))))
+ (GET "/api/v1/topology/:id/component/:component" [id component & m]
+ (let [id (url-decode id)
component (url-decode component)]
- (-> (component-page id component (:window m) include-sys?)
- (concat [(mk-system-toggle-button include-sys?)])
- ui-template)))
- (POST "/topology/:id/activate" [id]
- (with-nimbus nimbus
- (let [id (url-decode id)
- tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- name (.get_name tplg)]
- (.activate nimbus name)
- (log-message "Activating topology '" name "'")))
- (resp/redirect (str "/topology/" id)))
- (POST "/topology/:id/deactivate" [id]
- (with-nimbus nimbus
- (let [id (url-decode id)
- tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- name (.get_name tplg)]
- (.deactivate nimbus name)
- (log-message "Deactivating topology '" name "'")))
- (resp/redirect (str "/topology/" id)))
- (POST "/topology/:id/rebalance/:wait-time" [id wait-time]
- (with-nimbus nimbus
- (let [id (url-decode id)
- tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- name (.get_name tplg)
- options (RebalanceOptions.)]
- (.set_wait_secs options (Integer/parseInt wait-time))
- (.rebalance nimbus name options)
- (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
- (resp/redirect (str "/topology/" id)))
- (POST "/topology/:id/kill/:wait-time" [id wait-time]
- (with-nimbus nimbus
- (let [id (url-decode id)
- tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- name (.get_name tplg)
- options (KillOptions.)]
- (.set_wait_secs options (Integer/parseInt wait-time))
- (.killTopologyWithOpts nimbus name options)
- (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
- (resp/redirect (str "/topology/" id)))
+ (json-response (component-page id component (:window m) (check-include-sys? (:sys m))))))
+ (POST "/api/v1/topology/:id/activate" [id]
+ (with-nimbus nimbus
+ (let [id (url-decode id)
+ tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)]
+ (.activate nimbus name)
+ (log-message "Activating topology '" name "'")))
+ (resp/redirect (str "/api/v1/topology/" id)))
+
+ (POST "/api/v1/topology/:id/deactivate" [id]
+ (with-nimbus nimbus
+ (let [id (url-decode id)
+ tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)]
+ (.deactivate nimbus name)
+ (log-message "Deactivating topology '" name "'")))
+ (resp/redirect (str "/api/v1/topology/" id)))
+ (POST "/api/v1/topology/:id/rebalance/:wait-time" [id wait-time]
+ (with-nimbus nimbus
+ (let [id (url-decode id)
+ tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)
+ options (RebalanceOptions.)]
+ (.set_wait_secs options (Integer/parseInt wait-time))
+ (.rebalance nimbus name options)
+ (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
+ (resp/redirect (str "/api/v1/topology/" id)))
+ (POST "/api/v1/topology/:id/kill/:wait-time" [id wait-time]
+ (with-nimbus nimbus
+ (let [id (url-decode id)
+ tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)
+ options (KillOptions.)]
+ (.set_wait_secs options (Integer/parseInt wait-time))
+ (.killTopologyWithOpts nimbus name options)
+ (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
+ (resp/redirect (str "/api/v1/topology/" id)))
+
+ (GET "/" [:as {cookies :cookies}]
+ (resp/redirect "/index.html"))
(route/resources "/")
(route/not-found "Page not found"))
-(defn exception->html
+(defn exception->json
[ex]
- (concat
- [[:h2 "Internal Server Error"]]
- [[:pre (let [sw (java.io.StringWriter.)]
- (.printStackTrace ex (java.io.PrintWriter. sw))
- (.toString sw))]]))
+ {"error" "Internal Server Error"
+ "errorMessage"
+ (let [sw (java.io.StringWriter.)]
+ (.printStackTrace ex (java.io.PrintWriter. sw))
+ (.toString sw))})
(defn catch-errors
[handler]
@@ -1086,9 +860,7 @@
(try
(handler request)
(catch Exception ex
- (-> (resp/response (ui-template (exception->html ex)))
- (resp/status 500)
- (resp/content-type "text/html"))))))
+ (json-response (exception->json ex) 500)))))
(def app
(handler/site (-> main-routes
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/clj/backtype/storm/ui/helpers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/helpers.clj b/storm-core/src/clj/backtype/storm/ui/helpers.clj
index dcd4a21..c2de7c7 100644
--- a/storm-core/src/clj/backtype/storm/ui/helpers.clj
+++ b/storm-core/src/clj/backtype/storm/ui/helpers.clj
@@ -146,4 +146,3 @@ $(\"table#%s\").each(function(i) { $(this).tablesorter({ sortList: %s, headers:
(defn pretty-executor-info [^ExecutorInfo e]
(str "[" (.get_task_start e) "-" (.get_task_end e) "]"))
-
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj
index 891d8eb..91b0713 100644
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@ -914,12 +914,13 @@
(defmacro -<>
([x] x)
- ([x form]
- (if (seq? form)
- (with-meta
- (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
- (concat begin [x] end))
- (meta form))
- (list form x)))
- ([x form & more]
- `(-<> (-<> ~x ~form) ~@more)))
+ ([x form] (if (seq? form)
+ (with-meta
+ (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
+ (concat begin [x] end))
+ (meta form))
+ (list form x)))
+ ([x form & more] `(-<> (-<> ~x ~form) ~@more)))
+
+(defn hashmap-to-persistent [^HashMap m]
+ (zipmap (.keySet m) (.values m)))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 83ee232..ff309a5 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -84,8 +84,21 @@ public class Config extends HashMap<String, Object> {
*/
public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads";
public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = Number.class;
+
+ /**
+ * If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes
+ */
+ public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "storm.messaging.netty.transfer.batch.size";
+ public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = Number.class;
/**
+ * We check with this interval that whether the Netty channel is writable and try to write pending messages
+ */
+ public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
+ public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
+
+
+ /**
* A list of hosts of ZooKeeper servers used to manage the cluster.
*/
public static final String STORM_ZOOKEEPER_SERVERS = "storm.zookeeper.servers";
@@ -462,6 +475,12 @@ public class Config extends HashMap<String, Object> {
public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
/**
+ * control how many worker receiver threads we need per worker
+ */
+ public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
+ public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = Number.class;
+
+ /**
* How often this worker should heartbeat to the supervisor.
*/
public static final String WORKER_HEARTBEAT_FREQUENCY_SECS = "worker.heartbeat.frequency.secs";
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
index 41ae3f5..ead4935 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
@@ -17,13 +17,17 @@
*/
package backtype.storm.messaging;
+import java.util.Iterator;
+
public interface IConnection {
+
/**
- * receive a message (consists taskId and payload)
+ * receive a batch message iterator (consists taskId and payload)
* @param flags 0: block, 1: non-block
* @return
*/
- public TaskMessage recv(int flags);
+ public Iterator<TaskMessage> recv(int flags, int clientId);
+
/**
* send a message with taskId and payload
* @param taskId task ID
@@ -32,6 +36,13 @@ public interface IConnection {
public void send(int taskId, byte[] payload);
/**
+ * send batch messages
+ * @param msgs
+ */
+
+ public void send(Iterator<TaskMessage> msgs);
+
+ /**
* close this connection
*/
public void close();