You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/12 23:11:07 UTC

[09/14] Merge branch 'master' into idiomatic-clojure-01

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 6da383e..d8ad29a 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -20,7 +20,8 @@
   (:use [hiccup core page-helpers])
   (:use [backtype.storm config util log])
   (:use [backtype.storm.ui helpers])
-  (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID system-id?]]])
+  (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID
+                                              ACKER-ACK-STREAM-ID ACKER-FAIL-STREAM-ID system-id?]]])
   (:use [ring.adapter.jetty :only [run-jetty]])
   (:use [clojure.string :only [trim]])
   (:import [backtype.storm.utils Utils])
@@ -51,93 +52,8 @@
        (map #(.get_stats ^ExecutorSummary %))
        (filter not-nil?)))
 
-(def tips
-  "Defines a mapping of help texts for elements of the UI pages."
-  {:sys-stats "Use this to toggle inclusion of storm system components."
-   :version (str "The version of storm installed on the UI node. (Hopefully, "
-                 "this is the same on all storm nodes!)")
-   :nimbus-uptime (str "The duration the current Nimbus instance has been "
-                       "running. (Note that the storm cluster may have been "
-                       "deployed and available for a much longer period than "
-                       "the current Nimbus process has been running.)")
-   :num-supervisors "The number of nodes in the cluster currently."
-   :num-slots "Slots are Workers (processes)."
-   :num-execs "Executors are threads in a Worker process."
-   :num-tasks (str "A Task is an instance of a Bolt or Spout. The number of "
-                   "Tasks is almost always equal to the number of Executors.")
-   :name "The name given to the topology by when it was submitted."
-   :name-link "Click the name to view the Topology's information."
-   :topo-id "The unique ID given to a Topology each time it is launched."
-   :status "The status can be one of ACTIVE, INACTIVE, KILLED, or REBALANCING."
-   :topo-uptime "The time since the Topology was submitted."
-   :num-workers "The number of Workers (processes)."
-   :sup-id (str "A unique identifier given to a Supervisor when it joins the "
-                "cluster.")
-   :sup-host (str "The hostname reported by the remote host. (Note that this "
-                  "hostname is not the result of a reverse lookup at the "
-                  "Nimbus node.)")
-   :sup-uptime (str "The length of time a Supervisor has been registered to the "
-                    "cluster.")
-   :window (str "The past period of time for which the statistics apply. "
-                "Click on a value to set the window for this page.")
-   :emitted "The number of Tuples emitted."
-   :transferred "The number of Tuples emitted that sent to one or more bolts."
-   :complete-lat (str "The average time a Tuple \"tree\" takes to be completely "
-                      "processed by the Topology. A value of 0 is expected "
-                      "if no acking is done.")
-   :spout-acked (str "The number of Tuple \"trees\" successfully processed. A "
-                     "value of 0 is expected if no acking is done.")
-   :spout-failed (str "The number of Tuple \"trees\" that were explicitly "
-                      "failed or timed out before acking was completed. A value "
-                      "of 0 is expected if no acking is done.")
-   :comp-id "The ID assigned to a the Component by the Topology."
-   :comp-id-link "Click on the name to view the Component's page."
-   :capacity (str "If this is around 1.0, the corresponding Bolt is running as "
-                  "fast as it can, so you may want to increase the Bolt's "
-                  "parallelism. This is (number executed * average execute "
-                  "latency) / measurement time.")
-   :exec-lat (str "The average time a Tuple spends in the execute method. The "
-                  "execute method may complete without sending an Ack for the "
-                  "tuple.")
-   :num-executed "The number of incoming Tuples processed."
-   :proc-lat (str "The average time it takes to Ack a Tuple after it is first "
-                  "received.  Bolts that join, aggregate or batch may not Ack a "
-                  "tuple until a number of other Tuples have been received.")
-   :bolt-acked "The number of Tuples acknowledged by this Bolt."
-   :bolt-failed "The number of tuples Failed by this Bolt."
-   :stream (str "The name of the Tuple stream given in the Topolgy, or \""
-                Utils/DEFAULT_STREAM_ID "\" if none was given.")
-   :exec-id "The unique executor ID."
-   :exec-uptime "The length of time an Executor (thread) has been alive."
-   :port (str "The port number used by the Worker to which an Executor is "
-              "assigned. Click on the port number to open the logviewer page "
-              "for this Worker.")})
-
-(defn mk-system-toggle-button
-  [include-sys?]
-  [:p {:class "js-only"}
-   [:span.tip.right {:title (:sys-stats tips)}
-    [:input {:type "button"
-             :value (str (if include-sys? "Hide" "Show") " System Stats")
-             :onclick "toggleSys()"}]]])
-
-(defn ui-template
-  [body]
-  (html4
-    [:head
-     [:title "Storm UI"]
-     (include-css "/css/bootstrap-1.4.0.css")
-     (include-css "/css/style.css")
-     (include-js "/js/jquery-1.6.2.min.js")
-     (include-js "/js/jquery.tablesorter.min.js")
-     (include-js "/js/jquery.cookies.2.2.0.min.js")
-     (include-js "/js/bootstrap-twipsy.js")
-     (include-js "/js/script.js")]
-    [:body
-     [:h1 (link-to "/" "Storm UI")]
-     (seq body)]))
-
 (defn read-storm-version
+  "Returns a string containing the Storm version or 'Unknown'."
   []
   (let [storm-home (System/getProperty "storm.home")
         release-path (format "%s/RELEASE" storm-home)
@@ -146,117 +62,12 @@
       (trim (slurp release-path))
       "Unknown")))
 
-(defn cluster-summary-table
-  [^ClusterSummary summ]
-  (let [sups (.get_supervisors summ)
-        used-slots (reduce + (map #(.get_num_used_workers ^SupervisorSummary %) sups))
-        total-slots (reduce + (map #(.get_num_workers ^SupervisorSummary %) sups))
-        free-slots (- total-slots used-slots)
-        total-tasks (->> (.get_topologies summ)
-                         (map #(.get_num_tasks ^TopologySummary %))
-                         (reduce +))
-        total-executors (->> (.get_topologies summ)
-                             (map #(.get_num_executors ^TopologySummary %))
-                             (reduce +))]
-    (table [{:text "Version" :attr {:class "tip right"
-                                    :title (:version tips)}}
-            {:text "Nimbus uptime" :attr {:class "tip right"
-                                          :title (:nimbus-uptime tips)}}
-            {:text "Supervisors" :attr {:class "tip above"
-                                        :title (:num-supervisors tips)}}
-            {:text "Used slots" :attr {:class "tip above"
-                                       :title (:num-slots tips)}}
-            {:text "Free slots" :attr {:class "tip above"
-                                       :title (:num-slots tips)}}
-            {:text "Total slots" :attr {:class "tip above"
-                                        :title (:num-slots tips)}}
-            {:text  "Executors" :attr {:class "tip above"
-                                       :title (:num-execs tips)}}
-            {:text "Tasks" :attr {:class "tip left"
-                                  :title (:num-tasks tips)}}]
-           [[(read-storm-version)
-             (pretty-uptime-sec (.get_nimbus_uptime_secs summ))
-             (count sups)
-             used-slots
-             free-slots
-             total-slots
-             total-executors
-             total-tasks]])))
-
-(defn topology-link
-  ([id] (topology-link id id))
-  ([id content]
-   (link-to (url-format "/topology/%s" id) (escape-html content))))
-
-(defn main-topology-summary-table
-  [summs]
-  (sorted-table
-    [{:text "Name" :attr {:class "tip right"
-                          :title (str (:name tips) " " (:name-link tips))}}
-     {:text "Id" :attr {:class "tip right"
-                        :title (:topo-id tips)}}
-     {:text "Status" :attr {:class "tip above"
-                            :title (:status tips)}}
-     {:text "Uptime" :attr {:class "tip above"
-                            :title (:topo-uptime tips)}}
-     {:text "Num workers" :attr {:class "tip above"
-                                 :title (:num-workers tips)}}
-     {:text "Num executors" :attr {:class "tip above"
-                                   :title (:num-execs tips)}}
-     {:text "Num tasks" :attr {:class "tip above"
-                               :title (:num-tasks tips)}}]
-    (for [^TopologySummary t summs]
-      [(topology-link (.get_id t) (.get_name t))
-       (escape-html (.get_id t))
-       (.get_status t)
-       (pretty-uptime-sec (.get_uptime_secs t))
-       (.get_num_workers t)
-       (.get_num_executors t)
-       (.get_num_tasks t)])
-    :time-cols [3]
-    :sort-list "[[0,0]]"))
-
-(defn supervisor-summary-table
-  [summs]
-  (sorted-table
-    [{:text "Id" :attr {:class "tip right"
-                        :title (:sup-id tips)}}
-     {:text "Host" :attr {:class "tip above"
-                          :title (:sup-host tips)}}
-     {:text "Uptime" :attr {:class "tip above"
-                            :title (:sup-uptime tips)}}
-     {:text "Slots" :attr {:class "tip above"
-                           :title (:num-slots tips)}}
-     {:text "Used slots" :attr {:class "tip left"
-                                :title (:num-slots tips)}}]
-    (for [^SupervisorSummary s summs]
-      [(.get_supervisor_id s)
-       (.get_host s)
-       (pretty-uptime-sec (.get_uptime_secs s))
-       (.get_num_workers s)
-       (.get_num_used_workers s)])
-    :time-cols [2]))
-
-(defn configuration-table
-  [conf]
-  (sorted-table ["Key" "Value"]
-                (map #(vector (key %) (str (val %))) conf)))
-
-(defn main-page
-  []
-  (with-nimbus nimbus
-               (let [summ (.getClusterInfo ^Nimbus$Client nimbus)]
-                 (concat
-                   [[:h2 "Cluster Summary"]]
-                   [(cluster-summary-table summ)]
-                   [[:h2 "Topology summary"]]
-                   (main-topology-summary-table (.get_topologies summ))
-                   [[:h2 "Supervisor summary"]]
-                   (supervisor-summary-table (.get_supervisors summ))
-                   [[:h2 "Nimbus Configuration"]]
-                   (configuration-table (from-json (.getNimbusConf ^Nimbus$Client nimbus)))))))
-
+;; TODO: What is the desired behavior if a particular component id
+;; corresponded to both a bolt and spout. Where should the error
+;; occur? -DCJ
 (defn component-type
+  "Returns the component type (either :bolt or :spout) for a given
+  topology and component id. Returns nil if not found."
   [^StormTopology topology id]
   (let [bolts (.get_bolts topology)
         spouts (.get_spouts topology)]
@@ -287,19 +98,14 @@
                        c]]
                      ))]))))
 
-
 (defn expand-averages-seq
   [average-seq counts-seq]
   (->> (map vector average-seq counts-seq)
        (map #(apply expand-averages %))
-       (apply merge-with
-              (fn [s1 s2]
-                (merge-with
-                  add-pairs
-                  s1
-                  s2)))))
+       (apply merge-with (fn [s1 s2] (merge-with add-pairs s1 s2)))))
 
-(defn- val-avg [[t c]]
+(defn- val-avg
+  [[t c]]
   (if (= t 0) 0
     (double (/ t c))))
 
@@ -341,6 +147,14 @@
     (fn [_] true)
     (fn [stream] (and (string? stream) (not (system-id? stream))))))
 
+(defn is-ack-stream
+  [stream]
+  (let [acker-streams
+        [ACKER-INIT-STREAM-ID
+         ACKER-ACK-STREAM-ID
+         ACKER-FAIL-STREAM-ID]]
+    (every? #(not= %1 stream) acker-streams)))
+
 (defn pre-process
   [stream-summary include-sys?]
   (let [filter-fn (mk-include-sys-fn include-sys?)
@@ -423,86 +237,6 @@
   [topology s]
   (= :bolt (executor-summary-type topology s)))
 
-(defn topology-summary-table
-  [^TopologyInfo summ]
-  (let [executors (.get_executors summ)
-        workers (set (for [^ExecutorSummary e executors] [(.get_host e) (.get_port e)]))]
-    (table [{:text "Name" :attr {:class "tip right"
-                                 :title (:name tips)}}
-            {:text "Id" :attr {:class "tip right"
-                               :title (:topo-id tips)}}
-            {:text "Status" :attr {:class "tip above"
-                                   :title (:status tips)}}
-            {:text "Uptime" :attr {:class "tip above"
-                                   :title (:topo-uptime tips)}}
-            {:text "Num workers" :attr {:class "tip above"
-                                        :title (:num-workers tips)}}
-            {:text "Num executors" :attr {:class "tip above"
-                                          :title (:num-execs tips)}}
-            {:text "Num tasks" :attr {:class "tip above"
-                                      :title (:num-tasks tips)}}]
-           [[(escape-html (.get_name summ))
-             (escape-html (.get_id summ))
-             (.get_status summ)
-             (pretty-uptime-sec (.get_uptime_secs summ))
-             (count workers)
-             (count executors)
-             (sum-tasks executors)]])))
-
-(defn total-aggregate-stats
-  [spout-summs bolt-summs include-sys?]
-  (let [spout-stats (get-filled-stats spout-summs)
-        bolt-stats (get-filled-stats bolt-summs)
-        agg-spout-stats (-> spout-stats
-                            (aggregate-spout-stats include-sys?)
-                            aggregate-spout-streams)
-        agg-bolt-stats (-> bolt-stats
-                           (aggregate-bolt-stats include-sys?)
-                           aggregate-bolt-streams)]
-    (merge-with
-      (fn [s1 s2]
-        (merge-with + s1 s2))
-      (select-keys agg-bolt-stats [:emitted :transferred])
-      agg-spout-stats)))
-
-(defn stats-times
-  [stats-map]
-  (sort-by #(Integer/parseInt %)
-           (-> stats-map
-               clojurify-structure
-               (dissoc ":all-time")
-               keys)))
-
-(defn topology-stats-table
-  [id window stats]
-  (let [times (stats-times (:emitted stats))
-        display-map (into {} (for [t times] [t pretty-uptime-sec]))
-        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
-    (sorted-table
-      [{:text "Window" :attr {:class "tip right"
-                              :title (:window tips)}}
-       {:text "Emitted" :attr {:class "tip above"
-                               :title (:emitted tips)}}
-       {:text "Transferred" :attr {:class "tip above"
-                                   :title (:transferred tips)}}
-       {:text "Complete latency (ms)" :attr {:class "tip above"
-                                             :title (:complete-lat tips)}}
-       {:text "Acked" :attr {:class "tip above"
-                             :title (:spout-acked tips)}}
-       {:text "Failed" :attr {:class "tip left"
-                              :title (:spout-failed tips)}}]
-      (for [k (concat times [":all-time"])
-            :let [disp ((display-map k) k)]]
-        [(link-to (if (= k window) {:class "red"} {})
-                  (url-format "/topology/%s?window=%s" id k)
-                  (escape-html disp))
-         (get-in stats [:emitted k])
-         (get-in stats [:transferred k])
-         (float-str (get-in stats [:complete-latencies k]))
-         (get-in stats [:acked k])
-         (get-in stats [:failed k])])
-      :time-cols [0])))
-
 (defn group-by-comp
   [summs]
   (let [ret (group-by #(.get_component_id ^ExecutorSummary %) summs)]
@@ -519,28 +253,23 @@
                    reverse
                    first)]
     (if error
-      [:span (if (< (time-delta (.get_error_time_secs ^ErrorInfo error))
-                    (* 60 30))
-               {:class "red"}
-               {})
-       (error-subset (.get_error ^ErrorInfo error))])))
-
-(defn component-link
-  [storm-id id]
-  (link-to (url-format "/topology/%s/component/%s" storm-id id) (escape-html id)))
-
-(defn worker-log-link
-  [host port]
-  (link-to (url-format "http://%s:%s/log?file=worker-%s.log"
-                       host (*STORM-CONF* LOGVIEWER-PORT) port) (str port)))
-
-(defn render-capacity
-  [capacity]
-  (let [capacity (nil-to-zero capacity)]
-    [:span (if (> capacity 0.9)
-             {:class "red"}
-             {})
-     (float-str capacity)]))
+      (error-subset (.get_error ^ErrorInfo error))
+      "")))
+
+(defn component-task-summs
+  [^TopologyInfo summ topology id]
+  (let [spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
+        bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
+        spout-comp-summs (group-by-comp spout-summs)
+        bolt-comp-summs (group-by-comp bolt-summs)
+        ret (if (contains? spout-comp-summs id)
+              (spout-comp-summs id)
+              (bolt-comp-summs id))]
+    (sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start) ret)))
+
+(defn worker-log-link [host port]
+  (url-format "http://%s:%s/log?file=worker-%s.log"
+              host (*STORM-CONF* LOGVIEWER-PORT) port))
 
 (defn compute-executor-capacity
   [^ExecutorSummary e]
@@ -565,91 +294,56 @@
        (map nil-to-zero)
        (apply max)))
 
-(defn spout-comp-table
-  [top-id summ-map errors window include-sys?]
-  (sorted-table
-    [{:text "Id" :attr {:class "tip right"
-                        :title (str (:comp-id tips) " " (:comp-id-link tips))}}
-     {:text "Executors" :attr {:class "tip right"
-                               :title (:num-execs tips)}}
-     {:text "Tasks" :attr {:class "tip above"
-                           :title (:num-tasks tips)}}
-     {:text "Emitted" :attr {:class "tip above"
-                             :title (:emitted tips)}}
-     {:text "Transferred" :attr {:class "tip above"
-                                 :title (:transferred tips)}}
-     {:text "Complete latency (ms)" :attr {:class "tip above"
-                                           :title (:complete-lat tips)}}
-     {:text "Acked" :attr {:class "tip above"
-                           :title (:spout-acked tips)}}
-     {:text "Failed" :attr {:class "tip above"
-                            :title (:spout-failed tips)}}
-     "Last error"]
-    (for [[id summs] summ-map
-          :let [stats-seq (get-filled-stats summs)
-                stats (aggregate-spout-streams
-                        (aggregate-spout-stats
-                          stats-seq include-sys?))]]
-      [(component-link top-id id)
-       (count summs)
-       (sum-tasks summs)
-       (get-in stats [:emitted window])
-       (get-in stats [:transferred window])
-       (float-str (get-in stats [:complete-latencies window]))
-       (get-in stats [:acked window])
-       (get-in stats [:failed window])
-       (most-recent-error (get errors id))])))
-
-(defn bolt-comp-table
-  [top-id summ-map errors window include-sys?]
-  (sorted-table
-    [{:text "Id" :attr {:class "tip right"
-                        :title (str (:comp-id tips) " " (:comp-id-link tips))}}
-     {:text "Executors" :attr {:class "tip right"
-                               :title (:num-execs tips)}}
-     {:text "Tasks" :attr {:class "tip above"
-                           :title (:num-tasks tips)}}
-     {:text "Emitted" :attr {:class "tip above"
-                             :title (:emitted tips)}}
-     {:text "Transferred" :attr {:class "tip above"
-                                 :title (:transferred tips)}}
-     {:text "Capacity (last 10m)" :attr {:class "tip above"
-                                         :title (:capacity tips)}}
-     {:text "Execute latency (ms)" :attr {:class "tip above"
-                                          :title (:exec-lat tips)}}
-     {:text "Executed" :attr {:class "tip above"
-                              :title (:num-executed tips)}}
-     {:text "Process latency (ms)":attr {:class "tip above"
-                                         :title (:proc-lat tips)}}
-     {:text "Acked" :attr {:class "tip above"
-                           :title (:bolt-acked tips)}}
-     {:text "Failed" :attr {:class "tip left"
-                            :title (:bolt-failed tips)}}
-     "Last error"]
-    (for [[id summs] summ-map
-          :let [stats-seq (get-filled-stats summs)
-                stats (aggregate-bolt-streams
-                        (aggregate-bolt-stats
-                          stats-seq include-sys?))]]
-      [(component-link top-id id)
-       (count summs)
-       (sum-tasks summs)
-       (get-in stats [:emitted window])
-       (get-in stats [:transferred window])
-       (render-capacity (compute-bolt-capacity summs))
-       (float-str (get-in stats [:execute-latencies window]))
-       (get-in stats [:executed window])
-       (float-str (get-in stats [:process-latencies window]))
-       (get-in stats [:acked window])
-       (get-in stats [:failed window])
-       (most-recent-error (get errors id))])))
-
-(defn window-hint [window]
+(defn spout-streams-stats
+  [summs include-sys?]
+  (let [stats-seq (get-filled-stats summs)]
+    (aggregate-spout-streams
+      (aggregate-spout-stats
+        stats-seq include-sys?))))
+
+(defn bolt-streams-stats
+  [summs include-sys?]
+  (let [stats-seq (get-filled-stats summs)]
+    (aggregate-bolt-streams
+      (aggregate-bolt-stats
+        stats-seq include-sys?))))
+
+(defn total-aggregate-stats
+  [spout-summs bolt-summs include-sys?]
+  (let [spout-stats (get-filled-stats spout-summs)
+        bolt-stats (get-filled-stats bolt-summs)
+        agg-spout-stats (-> spout-stats
+                            (aggregate-spout-stats include-sys?)
+                            aggregate-spout-streams)
+        agg-bolt-stats (-> bolt-stats
+                           (aggregate-bolt-stats include-sys?)
+                           aggregate-bolt-streams)]
+    (merge-with
+      (fn [s1 s2]
+        (merge-with + s1 s2))
+      (select-keys
+        agg-bolt-stats
+        [:emitted :transferred :acked :failed :complete-latencies])
+      (select-keys
+        agg-spout-stats
+        [:emitted :transferred :acked :failed :complete-latencies]))))
+
+(defn stats-times
+  [stats-map]
+  (sort-by #(Integer/parseInt %)
+           (-> stats-map
+               clojurify-structure
+               (dissoc ":all-time")
+               keys)))
+
+(defn window-hint
+  [window]
   (if (= window ":all-time")
     "All time"
     (pretty-uptime-sec window)))
 
-(defn topology-action-button [id name action command is-wait default-wait enabled]
+(defn topology-action-button
+  [id name action command is-wait default-wait enabled]
   [:input {:type "button"
            :value action
            (if enabled :enabled :disabled) ""
@@ -658,427 +352,507 @@
                          (StringEscapeUtils/escapeJavaScript name) "', '"
                          command "', " is-wait ", " default-wait ")")}])
 
-(defn topology-page
-  [id window include-sys?]
+(defn sanitize-stream-name
+  [name]
+  (let [sym-regex #"(?![A-Za-z_\-:\.])."]
+    (str
+     (if (re-find #"^[A-Za-z]" name)
+       (clojure.string/replace name sym-regex "_")
+       (clojure.string/replace (str \s name) sym-regex "_"))
+     (hash name))))
+
+(defn sanitize-transferred
+  [transferred]
+  (into {}
+        (for [[time, stream-map] transferred]
+          [time, (into {}
+                       (for [[stream, trans] stream-map]
+                         [(sanitize-stream-name stream), trans]))])))
+
+(defn visualization-data
+  [spout-bolt spout-comp-summs bolt-comp-summs window storm-id]
+  (let [components (for [[id spec] spout-bolt]
+            [id
+             (let [inputs (.get_inputs (.get_common spec))
+                   bolt-summs (get bolt-comp-summs id)
+                   spout-summs (get spout-comp-summs id)
+                   bolt-cap (if bolt-summs
+                              (compute-bolt-capacity bolt-summs)
+                              0)]
+               {:type (if bolt-summs "bolt" "spout")
+                :capacity bolt-cap
+                :latency (if bolt-summs
+                           (get-in
+                             (bolt-streams-stats bolt-summs true)
+                             [:process-latencies window])
+                           (get-in
+                             (spout-streams-stats spout-summs true)
+                             [:complete-latencies window]))
+                :transferred (or
+                               (get-in
+                                 (spout-streams-stats spout-summs true)
+                                 [:transferred window])
+                               (get-in
+                                 (bolt-streams-stats bolt-summs true)
+                                 [:transferred window]))
+                :stats (let [mapfn (fn [dat]
+                                     (map (fn [^ExecutorSummary summ]
+                                            {:host (.get_host summ)
+                                             :port (.get_port summ)
+                                             :uptime_secs (.get_uptime_secs summ)
+                                             :transferred (if-let [stats (.get_stats summ)]
+                                                            (sanitize-transferred (.get_transferred stats)))})
+                                          dat))]
+                         (if bolt-summs
+                           (mapfn bolt-summs)
+                           (mapfn spout-summs)))
+                :link (url-format "/component.html?id=%s&topology_id=%s" id storm-id)
+                :inputs (for [[global-stream-id group] inputs]
+                          {:component (.get_componentId global-stream-id)
+                           :stream (.get_streamId global-stream-id)
+                           :sani-stream (sanitize-stream-name (.get_streamId global-stream-id))
+                           :grouping (clojure.core/name (thrift/grouping-type group))})})])]
+    (into {} (doall components))))
+
+(defn stream-boxes [datmap]
+  (let [filter-fn (mk-include-sys-fn true)
+        streams
+        (vec (doall (distinct
+                     (apply concat
+                            (for [[k v] datmap]
+                              (for [m (get v :inputs)]
+                                {:stream (get m :stream)
+                                 :sani-stream (get m :sani-stream)
+                                 :checked (is-ack-stream (get m :stream))}))))))]
+    (map (fn [row]
+           {:row row}) (partition 4 4 nil streams))))
+
+(defn mk-visualization-data [id window include-sys?]
+  (with-nimbus
+    nimbus
+    (let [window (if window window ":all-time")
+          topology (.getTopology ^Nimbus$Client nimbus id)
+          spouts (.get_spouts topology)
+          bolts (.get_bolts topology)
+          summ (.getTopologyInfo ^Nimbus$Client nimbus id)
+          execs (.get_executors summ)
+          spout-summs (filter (partial spout-summary? topology) execs)
+          bolt-summs (filter (partial bolt-summary? topology) execs)
+          spout-comp-summs (group-by-comp spout-summs)
+          bolt-comp-summs (group-by-comp bolt-summs)
+          bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?)
+                                      bolt-comp-summs)
+          topology-conf (from-json
+                          (.getTopologyConf ^Nimbus$Client nimbus id))]
+      (visualization-data
+        (merge (hashmap-to-persistent spouts)
+               (hashmap-to-persistent bolts))
+        spout-comp-summs bolt-comp-summs window id))))
+
+(defn cluster-configuration []
   (with-nimbus nimbus
-               (let [window (if window window ":all-time")
-                     window-hint (window-hint window)
-                     summ (.getTopologyInfo ^Nimbus$Client nimbus id)
-                     topology (.getTopology ^Nimbus$Client nimbus id)
-                     topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus id))
-                     spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
-                     bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
-                     spout-comp-summs (group-by-comp spout-summs)
-                     bolt-comp-summs (group-by-comp bolt-summs)
-                     bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?) bolt-comp-summs)
-                     name (.get_name summ)
-                     status (.get_status summ)
-                     msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)]
-                 (concat
-                   [[:h2 "Topology summary"]]
-                   [(topology-summary-table summ)]
-                   [[:h2 {:class "js-only"} "Topology actions"]]
-                   [[:p {:class "js-only"}
-                     (concat
-                       [(topology-action-button id name "Activate" "activate" false 0 (= "INACTIVE" status))]
-                       [(topology-action-button id name "Deactivate" "deactivate" false 0 (= "ACTIVE" status))]
-                       [(topology-action-button id name "Rebalance" "rebalance" true msg-timeout (or (= "ACTIVE" status) (= "INACTIVE" status)))]
-                       [(topology-action-button id name "Kill" "kill" true msg-timeout (not= "KILLED" status))])]]
-                   [[:h2 "Topology stats"]]
-                   (topology-stats-table id window (total-aggregate-stats spout-summs bolt-summs include-sys?))
-                   [[:h2 "Spouts (" window-hint ")"]]
-                   (spout-comp-table id spout-comp-summs (.get_errors summ) window include-sys?)
-                   [[:h2 "Bolts (" window-hint ")"]]
-                   (bolt-comp-table id bolt-comp-summs (.get_errors summ) window include-sys?)
-                   [[:h2 "Topology Configuration"]]
-                   (configuration-table topology-conf)))))
-
-(defn component-task-summs
-  [^TopologyInfo summ topology id]
-  (let [spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
-        bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
-        spout-comp-summs (group-by-comp spout-summs)
-        bolt-comp-summs (group-by-comp bolt-summs)
-        ret (if (contains? spout-comp-summs id)
-              (spout-comp-summs id)
-              (bolt-comp-summs id))]
-    (sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start) ret)))
-
-(defn spout-summary-table
-  [topology-id id stats window]
+    (.getNimbusConf ^Nimbus$Client nimbus)))
+
+(defn cluster-summary
+  ([]
+     (with-nimbus nimbus
+        (cluster-summary (.getClusterInfo ^Nimbus$Client nimbus))))
+  ([^ClusterSummary summ]
+     (let [sups (.get_supervisors summ)
+        used-slots (reduce + (map #(.get_num_used_workers ^SupervisorSummary %) sups))
+        total-slots (reduce + (map #(.get_num_workers ^SupervisorSummary %) sups))
+        free-slots (- total-slots used-slots)
+        total-tasks (->> (.get_topologies summ)
+                         (map #(.get_num_tasks ^TopologySummary %))
+                         (reduce +))
+        total-executors (->> (.get_topologies summ)
+                             (map #(.get_num_executors ^TopologySummary %))
+                             (reduce +))]
+       {"stormVersion" (read-storm-version)
+        "nimbusUptime" (pretty-uptime-sec (.get_nimbus_uptime_secs summ))
+        "supervisors" (count sups)
+        "slotsTotal" total-slots
+        "slotsUsed"  used-slots
+        "slotsFree" free-slots
+        "executorsTotal" total-executors
+        "tasksTotal" total-tasks})))
+
+(defn supervisor-summary
+  ([]
+   (with-nimbus nimbus
+                (supervisor-summary
+                  (.get_supervisors (.getClusterInfo ^Nimbus$Client nimbus)))))
+  ([summs]
+   {"supervisors"
+    (for [^SupervisorSummary s summs]
+      {"id" (.get_supervisor_id s)
+       "host" (.get_host s)
+       "uptime" (pretty-uptime-sec (.get_uptime_secs s))
+       "slotsTotal" (.get_num_workers s)
+       "slotsUsed" (.get_num_used_workers s)})}))
+
+(defn all-topologies-summary
+  ([]
+   (with-nimbus
+     nimbus
+     (all-topologies-summary
+       (.get_topologies (.getClusterInfo ^Nimbus$Client nimbus)))))
+  ([summs]
+   {"topologies"
+    (for [^TopologySummary t summs]
+      {"id" (.get_id t)
+       "name" (.get_name t)
+       "status" (.get_status t)
+       "uptime" (pretty-uptime-sec (.get_uptime_secs t))
+       "tasksTotal" (.get_num_tasks t)
+       "workersTotal" (.get_num_workers t)
+       "executorsTotal" (.get_num_executors t)})}))
+
+(defn topology-stats [id window stats]
   (let [times (stats-times (:emitted stats))
         display-map (into {} (for [t times] [t pretty-uptime-sec]))
         display-map (assoc display-map ":all-time" (fn [_] "All time"))]
-    (sorted-table
-      [{:text "Window" :attr {:class "tip right"
-                              :title (:window tips)}}
-       {:text "Emitted" :attr {:class "tip above"
-                               :title (:emitted tips)}}
-       {:text "Transferred" :attr {:class "tip above"
-                                   :title (:transferred tips)}}
-       {:text "Complete latency (ms)" :attr {:class "tip above"
-                                             :title (:complete-lat tips)}}
-       {:text "Acked" :attr {:class "tip above"
-                             :title (:spout-acked tips)}}
-       {:text "Failed" :attr {:class "tip left"
-                              :title (:spout-failed tips)}}]
-      (for [k (concat times [":all-time"])
-            :let [disp ((display-map k) k)]]
-        [(link-to (if (= k window) {:class "red"} {})
-                  (url-format "/topology/%s/component/%s?window=%s" topology-id id k)
-                  (escape-html disp))
-         (get-in stats [:emitted k])
-         (get-in stats [:transferred k])
-         (float-str (get-in stats [:complete-latencies k]))
-         (get-in stats [:acked k])
-         (get-in stats [:failed k])])
-      :time-cols [0])))
-
-(defn spout-output-summary-table
+    (for [k (concat times [":all-time"])
+          :let [disp ((display-map k) k)]]
+      {"windowPretty" disp
+       "window" k
+       "emitted" (get-in stats [:emitted k])
+       "transferred" (get-in stats [:transferred k])
+       "completeLatency" (float-str (get-in stats [:complete-latencies k]))
+       "acked" (get-in stats [:acked k])
+       "failed" (get-in stats [:failed k])})))
+
+(defn spout-comp [top-id summ-map errors window include-sys?]
+  (for [[id summs] summ-map
+        :let [stats-seq (get-filled-stats summs)
+              stats (aggregate-spout-streams
+                     (aggregate-spout-stats
+                      stats-seq include-sys?))]]
+    {"spoutId" id
+     "executors" (count summs)
+     "tasks" (sum-tasks summs)
+     "emitted" (get-in stats [:emitted window])
+     "transferred" (get-in stats [:transferred window])
+     "completeLatency" (float-str (get-in stats [:complete-latencies window]))
+     "acked" (get-in stats [:acked window])
+     "failed" (get-in stats [:failed window])
+     "lastError" (most-recent-error (get errors id))}))
+
+(defn bolt-comp [top-id summ-map errors window include-sys?]
+  (for [[id summs] summ-map
+        :let [stats-seq (get-filled-stats summs)
+              stats (aggregate-bolt-streams
+                     (aggregate-bolt-stats
+                      stats-seq include-sys?))]]
+    {"boltId" id
+     "executors" (count summs)
+     "tasks" (sum-tasks summs)
+     "emitted" (get-in stats [:emitted window])
+     "transferred" (get-in stats [:transferred window])
+     "capacity" (float-str (nil-to-zero (compute-bolt-capacity summs)))
+     "executeLatency" (float-str (get-in stats [:execute-latencies window]))
+     "executed" (get-in stats [:executed window])
+     "processLatency" (float-str (get-in stats [:process-latencies window]))
+     "acked" (get-in stats [:acked window])
+     "failed" (get-in stats [:failed window])
+     "lastError" (most-recent-error (get errors id))}))
+
+(defn topology-summary [^TopologyInfo summ]
+  (let [executors (.get_executors summ)
+        workers (set (for [^ExecutorSummary e executors]
+                       [(.get_host e) (.get_port e)]))]
+      {"id" (.get_id summ)
+       "name" (.get_name summ)
+       "status" (.get_status summ)
+       "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
+       "tasksTotal" (sum-tasks executors)
+       "workersTotal" (count workers)
+       "executorsTotal" (count executors)}))
+
+(defn spout-summary-json [topology-id id stats window]
+  (let [times (stats-times (:emitted stats))
+        display-map (into {} (for [t times] [t pretty-uptime-sec]))
+        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
+     (for [k (concat times [":all-time"])
+           :let [disp ((display-map k) k)]]
+       {"windowPretty" disp
+        "window" k
+        "emitted" (get-in stats [:emitted k])
+        "transferred" (get-in stats [:transferred k])
+        "completeLatency" (float-str (get-in stats [:complete-latencies k]))
+        "acked" (get-in stats [:acked k])
+        "failed" (get-in stats [:failed k])})))
+
+(defn topology-page [id window include-sys?]
+  (with-nimbus nimbus
+    (let [window (if window window ":all-time")
+          window-hint (window-hint window)
+          summ (.getTopologyInfo ^Nimbus$Client nimbus id)
+          topology (.getTopology ^Nimbus$Client nimbus id)
+          topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus id))
+          spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
+          bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
+          spout-comp-summs (group-by-comp spout-summs)
+          bolt-comp-summs (group-by-comp bolt-summs)
+          bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?) bolt-comp-summs)
+          name (.get_name summ)
+          status (.get_status summ)
+          msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)
+          spouts (.get_spouts topology)
+          bolts (.get_bolts topology)
+          visualizer-data (visualization-data (merge (hashmap-to-persistent spouts)
+                                                     (hashmap-to-persistent bolts))
+                                              spout-comp-summs
+                                              bolt-comp-summs
+                                              window
+                                              id)]
+      (merge
+       (topology-summary summ)
+       {"window" window
+        "windowHint" window-hint
+        "msgTimeout" msg-timeout
+        "topologyStats" (topology-stats id window (total-aggregate-stats spout-summs bolt-summs include-sys?))
+        "spouts" (spout-comp id spout-comp-summs (.get_errors summ) window include-sys?)
+        "bolts" (bolt-comp id bolt-comp-summs (.get_errors summ) window include-sys?)
+        "configuration" topology-conf
+        "visualizationTable" (stream-boxes visualizer-data)}))))
+
+(defn spout-output-stats
   [stream-summary window]
   (let [stream-summary (map-val swap-map-order (swap-map-order stream-summary))]
-    (sorted-table
-      [{:text "Stream" :attr {:class "tip right"
-                              :title (:stream tips)}}
-       {:text "Emitted" :attr {:class "tip above"
-                               :title (:emitted tips)}}
-       {:text "Transferred" :attr {:class "tip above"
-                                   :title (:transferred tips)}}
-       {:text "Complete latency (ms)" :attr {:class "tip above"
-                                             :title (:complete-lat tips)}}
-       {:text "Acked" :attr {:class "tip above"
-                             :title (:spout-acked tips)}}
-       {:text "Failed" :attr {:class "tip left"
-                              :title (:spout-failed tips)}}]
-      (for [[s stats] (stream-summary window)]
-        [s
-         (nil-to-zero (:emitted stats))
-         (nil-to-zero (:transferred stats))
-         (float-str (:complete-latencies stats))
-         (nil-to-zero (:acked stats))
-         (nil-to-zero (:failed stats))]))))
-
-(defn spout-executor-table
+    (for [[s stats] (stream-summary window)]
+      {"stream" s
+       "emitted" (nil-to-zero (:emitted stats))
+       "transferred" (nil-to-zero (:transferred stats))
+       "completeLatency" (float-str (:complete-latencies stats))
+       "acked" (nil-to-zero (:acked stats))
+       "failed" (nil-to-zero (:failed stats))})))
+
+(defn spout-executor-stats
   [topology-id executors window include-sys?]
-  (sorted-table
-    [{:text "Id" :attr {:class "tip right"
-                        :title (:exec-id tips)}}
-     {:text "Uptime" :attr {:class "tip right"
-                            :title (:exec-uptime tips)}}
-     {:text "Host" :attr {:class "tip above"
-                          :title (:sup-host tips)}}
-     {:text "Port" :attr {:class "tip above"
-                          :title (:port tips)}}
-     {:text "Emitted" :attr {:class "tip above"
-                             :title (:emitted tips)}}
-     {:text "Transferred" :attr {:class "tip above"
-                                 :title (:transferred tips)}}
-     {:text "Complete latency (ms)" :attr {:class "tip above"
-                                           :title (:complete-lat tips)}}
-     {:text "Acked" :attr {:class "tip above"
-                           :title (:spout-acked tips)}}
-     {:text "Failed" :attr {:class "tip left"
-                            :title (:spout-failed tips)}}]
-    (for [^ExecutorSummary e executors
-          :let [stats (.get_stats e)
-                stats (if stats
-                        (-> stats
-                            (aggregate-spout-stats include-sys?)
-                            aggregate-spout-streams
-                            swap-map-order
-                            (get window)))]]
-      [(pretty-executor-info (.get_executor_info e))
-       (pretty-uptime-sec (.get_uptime_secs e))
-       (.get_host e)
-       (worker-log-link (.get_host e) (.get_port e))
-       (nil-to-zero (:emitted stats))
-       (nil-to-zero (:transferred stats))
-       (float-str (:complete-latencies stats))
-       (nil-to-zero (:acked stats))
-       (nil-to-zero (:failed stats))])
-    :time-cols [1]))
-
-(defn spout-page
+  (for [^ExecutorSummary e executors
+        :let [stats (.get_stats e)
+              stats (if stats
+                      (-> stats
+                          (aggregate-spout-stats include-sys?)
+                          aggregate-spout-streams
+                          swap-map-order
+                          (get window)))]]
+    {"id" (pretty-executor-info (.get_executor_info e))
+     "uptime" (pretty-uptime-sec (.get_uptime_secs e))
+     "host" (.get_host e)
+     "port" (.get_port e)
+     "emitted" (nil-to-zero (:emitted stats))
+     "transferred" (nil-to-zero (:transferred stats))
+     "completeLatency" (float-str (:complete-latencies stats))
+     "acked" (nil-to-zero (:acked stats))
+     "failed" (nil-to-zero (:failed stats))
+     "workerLogLink" (worker-log-link (.get_host e) (.get_port e))}))
+
+(defn component-errors
+  [errors-list]
+  (let [errors (->> errors-list
+                    (sort-by #(.get_error_time_secs ^ErrorInfo %))
+                    reverse)]
+    {"componentErrors"
+     (for [^ErrorInfo e errors]
+       {"time" (date-str (.get_error_time_secs e))
+        "error" (.get_error e)})}))
+
+(defn spout-stats
   [window ^TopologyInfo topology-info component executors include-sys?]
   (let [window-hint (str " (" (window-hint window) ")")
         stats (get-filled-stats executors)
         stream-summary (-> stats (aggregate-spout-stats include-sys?))
         summary (-> stream-summary aggregate-spout-streams)]
-    (concat
-      [[:h2 "Spout stats"]]
-      (spout-summary-table (.get_id topology-info) component summary window)
-      [[:h2 "Output stats" window-hint]]
-      (spout-output-summary-table stream-summary window)
-      [[:h2 "Executors" window-hint]]
-      ;; task id, task uptime, stream aggregated stats, last error
-      (spout-executor-table (.get_id topology-info) executors window include-sys?))))
-
-(defn bolt-output-summary-table
+    {"spoutSummary" (spout-summary-json
+                      (.get_id topology-info) component summary window)
+     "outputStats" (spout-output-stats stream-summary window)
+     "executorStats" (spout-executor-stats (.get_id topology-info)
+                                           executors window include-sys?)}))
+
+(defn bolt-summary
+  [topology-id id stats window]
+  (let [times (stats-times (:emitted stats))
+        display-map (into {} (for [t times] [t pretty-uptime-sec]))
+        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
+    (for [k (concat times [":all-time"])
+          :let [disp ((display-map k) k)]]
+      {"window" k
+       "windowPretty" disp
+       "emitted" (get-in stats [:emitted k])
+       "transferred" (get-in stats [:transferred k])
+       "executeLatency" (float-str (get-in stats [:execute-latencies k]))
+       "executed" (get-in stats [:executed k])
+       "processLatency" (float-str (get-in stats [:process-latencies k]))
+       "acked" (get-in stats [:acked k])
+       "failed" (get-in stats [:failed k])})))
+
+(defn bolt-output-stats
   [stream-summary window]
   (let [stream-summary (-> stream-summary
                            swap-map-order
                            (get window)
                            (select-keys [:emitted :transferred])
                            swap-map-order)]
-    (sorted-table
-      [{:text "Stream" :attr {:class "tip right"
-                              :title (:stream tips)}}
-       {:text "Emitted" :attr {:class "tip above"
-                               :title (:emitted tips)}}
-       {:text "Transferred" :attr {:class "tip above"
-                                   :title (:transferred tips)}}]
-      (for [[s stats] stream-summary]
-        [s
-         (nil-to-zero (:emitted stats))
-         (nil-to-zero (:transferred stats))]))))
-
-(defn bolt-input-summary-table
+    (for [[s stats] stream-summary]
+      {"stream" s
+        "emitted" (nil-to-zero (:emitted stats))
+        "transferred" (nil-to-zero (:transferred stats))})))
+
+(defn bolt-input-stats
   [stream-summary window]
-  (let [stream-summary (-> stream-summary
-                           swap-map-order
-                           (get window)
-                           (select-keys [:acked :failed :process-latencies :executed :execute-latencies])
-                           swap-map-order)]
-    (sorted-table
-      [{:text "Component" :attr {:class "tip right"
-                                 :title (:comp-id tips)}}
-       {:text "Stream" :attr {:class "tip right"
-                              :title (:stream tips)}}
-       {:text "Execute latency (ms)" :attr {:class "tip above"
-                                            :title (:exec-lat tips)}}
-       {:text "Executed" :attr {:class "tip above"
-                                :title (:num-executed tips)}}
-       {:text "Process latency (ms)":attr {:class "tip above"
-                                           :title (:proc-lat tips)}}
-       {:text "Acked" :attr {:class "tip above"
-                             :title (:bolt-acked tips)}}
-       {:text "Failed" :attr {:class "tip left"
-                              :title (:bolt-failed tips)}}]
-      (for [[^GlobalStreamId s stats] stream-summary]
-        [(escape-html (.get_componentId s))
-         (.get_streamId s)
-         (float-str (:execute-latencies stats))
-         (nil-to-zero (:executed stats))
-         (float-str (:process-latencies stats))
-         (nil-to-zero (:acked stats))
-         (nil-to-zero (:failed stats))]))))
-
-(defn bolt-executor-table
+  (let [stream-summary
+        (-> stream-summary
+            swap-map-order
+            (get window)
+            (select-keys [:acked :failed :process-latencies
+                          :executed :execute-latencies])
+            swap-map-order)]
+    (for [[^GlobalStreamId s stats] stream-summary]
+      {"component" (.get_componentId s)
+       "stream" (.get_streamId s)
+       "executeLatency" (float-str (:execute-latencies stats))
+       "processLatency" (float-str (:execute-latencies stats))
+       "executed" (nil-to-zero (:executed stats))
+       "acked" (nil-to-zero (:acked stats))
+       "failed" (nil-to-zero (:failed stats))})))
+
+(defn bolt-executor-stats
   [topology-id executors window include-sys?]
-  (sorted-table
-    [{:text "Id" :attr {:class "tip right"
-                        :title (:exec-id tips)}}
-     {:text "Uptime" :attr {:class "tip right"
-                            :title (:exec-uptime tips)}}
-     {:text "Host" :attr {:class "tip above"
-                          :title (:sup-host tips)}}
-     {:text "Port" :attr {:class "tip above"
-                          :title (:port tips)}}
-     {:text "Emitted" :attr {:class "tip above"
-                             :title (:emitted tips)}}
-     {:text "Transferred" :attr {:class "tip above"
-                                 :title (:transferred tips)}}
-     {:text "Capacity (last 10m)" :attr {:class "tip above"
-                                         :title (:capacity tips)}}
-     {:text "Execute latency (ms)" :attr {:class "tip above"
-                                          :title (:exec-lat tips)}}
-     {:text "Executed" :attr {:class "tip above"
-                              :title (:num-executed tips)}}
-     {:text "Process latency (ms)":attr {:class "tip above"
-                                         :title (:proc-lat tips)}}
-     {:text "Acked" :attr {:class "tip above"
-                           :title (:bolt-acked tips)}}
-     {:text "Failed" :attr {:class "tip left"
-                            :title (:bolt-failed tips)}}]
-    (for [^ExecutorSummary e executors
-          :let [stats (.get_stats e)
-                stats (if stats
-                        (-> stats
-                            (aggregate-bolt-stats include-sys?)
-                            (aggregate-bolt-streams)
-                            swap-map-order
-                            (get window)))]]
-      [(pretty-executor-info (.get_executor_info e))
-       (pretty-uptime-sec (.get_uptime_secs e))
-       (.get_host e)
-       (worker-log-link (.get_host e) (.get_port e))
-       (nil-to-zero (:emitted stats))
-       (nil-to-zero (:transferred stats))
-       (render-capacity (compute-executor-capacity e))
-       (float-str (:execute-latencies stats))
-       (nil-to-zero (:executed stats))
-       (float-str (:process-latencies stats))
-       (nil-to-zero (:acked stats))
-       (nil-to-zero (:failed stats))])
-    :time-cols [1]))
-
-(defn bolt-summary-table
-  [topology-id id stats window]
-  (let [times (stats-times (:emitted stats))
-        display-map (into {} (for [t times] [t pretty-uptime-sec]))
-        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
-    (sorted-table
-      [{:text "Window" :attr {:class "tip right"
-                              :title (:window tips)}}
-       {:text "Emitted" :attr {:class "tip above"
-                               :title (:emitted tips)}}
-       {:text "Transferred" :attr {:class "tip above"
-                                   :title (:transferred tips)}}
-       {:text "Execute latency (ms)" :attr {:class "tip above"
-                                            :title (:exec-lat tips)}}
-       {:text "Executed" :attr {:class "tip above"
-                                :title (:num-executed tips)}}
-       {:text "Process latency (ms)":attr {:class "tip above"
-                                           :title (:proc-lat tips)}}
-       {:text "Acked" :attr {:class "tip above"
-                             :title (:bolt-acked tips)}}
-       {:text "Failed" :attr {:class "tip left"
-                              :title (:bolt-failed tips)}}]
-      (for [k (concat times [":all-time"])
-            :let [disp ((display-map k) k)]]
-        [(link-to (if (= k window) {:class "red"} {})
-                  (url-format "/topology/%s/component/%s?window=%s" topology-id id k)
-                  (escape-html disp))
-         (get-in stats [:emitted k])
-         (get-in stats [:transferred k])
-         (float-str (get-in stats [:execute-latencies k]))
-         (get-in stats [:executed k])
-         (float-str (get-in stats [:process-latencies k]))
-         (get-in stats [:acked k])
-         (get-in stats [:failed k])])
-      :time-cols [0])))
-
-(defn bolt-page
+  (for [^ExecutorSummary e executors
+        :let [stats (.get_stats e)
+              stats (if stats
+                      (-> stats
+                          (aggregate-bolt-stats include-sys?)
+                          (aggregate-bolt-streams)
+                          swap-map-order
+                          (get window)))]]
+    {"id" (pretty-executor-info (.get_executor_info e))
+     "uptime" (pretty-uptime-sec (.get_uptime_secs e))
+     "host" (.get_host e)
+     "port" (.get_port e)
+     "emitted" (nil-to-zero (:emitted stats))
+     "transferred" (nil-to-zero (:transferred stats))
+     "capacity" (float-str (nil-to-zero (compute-executor-capacity e)))
+     "executeLatency" (float-str (:execute-latencies stats))
+     "executed" (nil-to-zero (:executed stats))
+     "processLatency" (float-str (:process-latencies stats))
+     "acked" (nil-to-zero (:acked stats))
+     "failed" (nil-to-zero (:failed stats))
+     "workerLogLink" (worker-log-link (.get_host e) (.get_port e))}))
+
+(defn bolt-stats
   [window ^TopologyInfo topology-info component executors include-sys?]
   (let [window-hint (str " (" (window-hint window) ")")
         stats (get-filled-stats executors)
         stream-summary (-> stats (aggregate-bolt-stats include-sys?))
         summary (-> stream-summary aggregate-bolt-streams)]
-    (concat
-      [[:h2 "Bolt stats"]]
-      (bolt-summary-table (.get_id topology-info) component summary window)
-
-      [[:h2 "Input stats" window-hint]]
-      (bolt-input-summary-table stream-summary window)
-
-      [[:h2 "Output stats" window-hint]]
-      (bolt-output-summary-table stream-summary window)
-
-      [[:h2 "Executors"]]
-      (bolt-executor-table (.get_id topology-info) executors window include-sys?))))
-
-(defn errors-table
-  [errors-list]
-  (let [errors (->> errors-list
-                    (sort-by #(.get_error_time_secs ^ErrorInfo %))
-                    reverse)]
-    (sorted-table
-      ["Time" "Error"]
-      (for [^ErrorInfo e errors]
-        [(date-str (.get_error_time_secs e))
-         [:pre (.get_error e)]])
-      :sort-list "[[0,1]]")))
+    {"boltStats" (bolt-summary (.get_id topology-info) component summary window)
+     "inputStats" (bolt-input-stats stream-summary window)
+     "outputStats" (bolt-output-stats stream-summary window)
+     "executorStats" (bolt-executor-stats
+                       (.get_id topology-info) executors window include-sys?)}))
 
 (defn component-page
   [topology-id component window include-sys?]
   (with-nimbus nimbus
-               (let [window (if window window ":all-time")
-                     summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id)
-                     topology (.getTopology ^Nimbus$Client nimbus topology-id)
-                     type (component-type topology component)
-                     summs (component-task-summs summ topology component)
-                     spec (cond (= type :spout) (spout-page window summ component summs include-sys?)
-                                (= type :bolt) (bolt-page window summ component summs include-sys?))]
-                 (concat
-                   [[:h2 "Component summary"]
-                    (table [{:text "Id" :attr {:class "tip right"
-                                               :title (:comp-id tips)}}
-                            {:text "Topology" :attr {:class "tip above"
-                                                     :title (str (:name tips) " " (:name-link tips))}}
-                            {:text "Executors" :attr {:class "tip above"
-                                                      :title (:num-execs tips)}}
-                            {:text "Tasks" :attr {:class "tip above"
-                                                  :title (:num-tasks tips)}}]
-                           [[(escape-html component)
-                             (topology-link (.get_id summ) (.get_name summ))
-                             (count summs)
-                             (sum-tasks summs)
-                             ]])]
-                   spec
-                   [[:h2 "Errors"]
-                    (errors-table (get (.get_errors summ) component))]))))
-
-(defn get-include-sys?
-  [cookies]
-  (let [sys? (get cookies "sys")
-        sys? (if (or (nil? sys?) (= "false" (:value sys?))) false true)]
-    sys?))
+    (let [window (if window window ":all-time")
+          summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id)
+          topology (.getTopology ^Nimbus$Client nimbus topology-id)
+          type (component-type topology component)
+          summs (component-task-summs summ topology component)
+          spec (cond (= type :spout) (spout-stats window summ component summs include-sys?)
+                     (= type :bolt) (bolt-stats window summ component summs include-sys?))
+          errors (component-errors (get (.get_errors summ) component))]
+      (merge
+       {"id" component
+         "name" (.get_name summ)
+         "executors" (count summs)
+         "tasks" (sum-tasks summs)
+         "topologyId" topology-id
+         "window" window
+         "componentType" (name type)
+         "windowHint" (window-hint window)}
+       spec errors))))
+
+(defn check-include-sys?
+  [sys?]
+  (if (or (nil? sys?) (= "false" sys?)) false true))
+
+(defn json-response [data & [status]]
+  {:status (or status 200)
+   :headers {"Content-Type" "application/json"}
+   :body (to-json data)})
 
 (defroutes main-routes
-  (GET "/" [:as {cookies :cookies}]
-       (-> (main-page)
-           ui-template))
-  (GET "/topology/:id" [:as {cookies :cookies} id & m]
-       (let [include-sys? (get-include-sys? cookies)
-             id (url-decode id)]
-         (try
-           (-> (topology-page (url-decode id) (:window m) include-sys?)
-               (concat [(mk-system-toggle-button include-sys?)])
-               ui-template)
-           (catch Exception e (resp/redirect "/")))))
-  (GET "/topology/:id/component/:component" [:as {cookies :cookies} id component & m]
-       (let [include-sys? (get-include-sys? cookies)
-             id (url-decode id)
+  (GET "/api/v1/cluster/configuration" []
+       (cluster-configuration))
+  (GET "/api/v1/cluster/summary" []
+       (json-response (cluster-summary)))
+  (GET "/api/v1/supervisor/summary" []
+       (json-response (supervisor-summary)))
+  (GET "/api/v1/topology/summary" []
+       (json-response (all-topologies-summary)))
+  (GET  "/api/v1/topology/:id" [id & m]
+        (let [id (url-decode id)]
+          (json-response (topology-page id (:window m) (check-include-sys? (:sys m))))))
+  (GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies servlet-request]} id & m]
+       (json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys m)))))
+  (GET "/api/v1/topology/:id/component/:component" [id component & m]
+       (let [id (url-decode id)
              component (url-decode component)]
-         (-> (component-page id component (:window m) include-sys?)
-             (concat [(mk-system-toggle-button include-sys?)])
-             ui-template)))
-  (POST "/topology/:id/activate" [id]
-        (with-nimbus nimbus
-                     (let [id (url-decode id)
-                           tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
-                           name (.get_name tplg)]
-                       (.activate nimbus name)
-                       (log-message "Activating topology '" name "'")))
-        (resp/redirect (str "/topology/" id)))
-  (POST "/topology/:id/deactivate" [id]
-        (with-nimbus nimbus
-                     (let [id (url-decode id)
-                           tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
-                           name (.get_name tplg)]
-                       (.deactivate nimbus name)
-                       (log-message "Deactivating topology '" name "'")))
-        (resp/redirect (str "/topology/" id)))
-  (POST "/topology/:id/rebalance/:wait-time" [id wait-time]
-        (with-nimbus nimbus
-                     (let [id (url-decode id)
-                           tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
-                           name (.get_name tplg)
-                           options (RebalanceOptions.)]
-                       (.set_wait_secs options (Integer/parseInt wait-time))
-                       (.rebalance nimbus name options)
-                       (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
-        (resp/redirect (str "/topology/" id)))
-  (POST "/topology/:id/kill/:wait-time" [id wait-time]
-        (with-nimbus nimbus
-                     (let [id (url-decode id)
-                           tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
-                           name (.get_name tplg)
-                           options (KillOptions.)]
-                       (.set_wait_secs options (Integer/parseInt wait-time))
-                       (.killTopologyWithOpts nimbus name options)
-                       (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
-        (resp/redirect (str "/topology/" id)))
+         (json-response (component-page id component (:window m) (check-include-sys? (:sys m))))))
+  (POST "/api/v1/topology/:id/activate" [id]
+    (with-nimbus nimbus
+      (let [id (url-decode id)
+            tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+            name (.get_name tplg)]
+        (.activate nimbus name)
+        (log-message "Activating topology '" name "'")))
+    (resp/redirect (str "/api/v1/topology/" id)))
+
+  (POST "/api/v1/topology/:id/deactivate" [id]
+    (with-nimbus nimbus
+      (let [id (url-decode id)
+            tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+            name (.get_name tplg)]
+        (.deactivate nimbus name)
+        (log-message "Deactivating topology '" name "'")))
+    (resp/redirect (str "/api/v1/topology/" id)))
+  (POST "/api/v1/topology/:id/rebalance/:wait-time" [id wait-time]
+    (with-nimbus nimbus
+      (let [id (url-decode id)
+            tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+            name (.get_name tplg)
+            options (RebalanceOptions.)]
+        (.set_wait_secs options (Integer/parseInt wait-time))
+        (.rebalance nimbus name options)
+        (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
+    (resp/redirect (str "/api/v1/topology/" id)))
+  (POST "/api/v1/topology/:id/kill/:wait-time" [id wait-time]
+    (with-nimbus nimbus
+      (let [id (url-decode id)
+            tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+            name (.get_name tplg)
+            options (KillOptions.)]
+        (.set_wait_secs options (Integer/parseInt wait-time))
+        (.killTopologyWithOpts nimbus name options)
+        (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
+    (resp/redirect (str "/api/v1/topology/" id)))
+
+  (GET "/" [:as {cookies :cookies}]
+       (resp/redirect "/index.html"))
   (route/resources "/")
   (route/not-found "Page not found"))
 
-(defn exception->html
+(defn exception->json
   [ex]
-  (concat
-    [[:h2 "Internal Server Error"]]
-    [[:pre (let [sw (java.io.StringWriter.)]
-             (.printStackTrace ex (java.io.PrintWriter. sw))
-             (.toString sw))]]))
+  {"error" "Internal Server Error"
+   "errorMessage"
+   (let [sw (java.io.StringWriter.)]
+     (.printStackTrace ex (java.io.PrintWriter. sw))
+     (.toString sw))})
 
 (defn catch-errors
   [handler]
@@ -1086,9 +860,7 @@
     (try
       (handler request)
       (catch Exception ex
-        (-> (resp/response (ui-template (exception->html ex)))
-            (resp/status 500)
-            (resp/content-type "text/html"))))))
+        (json-response (exception->json ex) 500)))))
 
 (def app
   (handler/site (-> main-routes

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/clj/backtype/storm/ui/helpers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/helpers.clj b/storm-core/src/clj/backtype/storm/ui/helpers.clj
index dcd4a21..c2de7c7 100644
--- a/storm-core/src/clj/backtype/storm/ui/helpers.clj
+++ b/storm-core/src/clj/backtype/storm/ui/helpers.clj
@@ -146,4 +146,3 @@ $(\"table#%s\").each(function(i) { $(this).tablesorter({ sortList: %s, headers:
 
 (defn pretty-executor-info [^ExecutorInfo e]
   (str "[" (.get_task_start e) "-" (.get_task_end e) "]"))
-

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj
index 891d8eb..91b0713 100644
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@ -914,12 +914,13 @@
 
 (defmacro -<>
   ([x] x)
-  ([x form]
-   (if (seq? form)
-     (with-meta
-       (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
-         (concat begin [x] end))
-       (meta form))
-     (list form x)))
-  ([x form & more]
-   `(-<> (-<> ~x ~form) ~@more)))
+  ([x form] (if (seq? form)
+              (with-meta
+                (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
+                  (concat begin [x] end))
+                (meta form))
+              (list form x)))
+  ([x form & more] `(-<> (-<> ~x ~form) ~@more)))
+
+(defn hashmap-to-persistent [^HashMap m]
+  (zipmap (.keySet m) (.values m)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 83ee232..ff309a5 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -84,8 +84,21 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads"; 
     public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = Number.class;
+    
+    /**
+     * If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes
+     */
+    public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "storm.messaging.netty.transfer.batch.size";
+    public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = Number.class;
 
     /**
+     * We check with this interval that whether the Netty channel is writable and try to write pending messages
+     */
+    public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
+    public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
+    
+    
+    /**
      * A list of hosts of ZooKeeper servers used to manage the cluster.
      */
     public static final String STORM_ZOOKEEPER_SERVERS = "storm.zookeeper.servers";
@@ -462,6 +475,12 @@ public class Config extends HashMap<String, Object> {
     public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
 
     /**
+     * control how many worker receiver threads we need per worker
+     */
+    public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
+    public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = Number.class;
+    
+    /**
      * How often this worker should heartbeat to the supervisor.
      */
     public static final String WORKER_HEARTBEAT_FREQUENCY_SECS = "worker.heartbeat.frequency.secs";

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
index 41ae3f5..ead4935 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
@@ -17,13 +17,17 @@
  */
 package backtype.storm.messaging;
 
+import java.util.Iterator;
+
 public interface IConnection {   
+    
     /**
-     * receive a message (consists taskId and payload)
+     * receive a batch message iterator (consists taskId and payload)
      * @param flags 0: block, 1: non-block
      * @return
      */
-    public TaskMessage recv(int flags);
+    public Iterator<TaskMessage> recv(int flags, int clientId);
+    
     /**
      * send a message with taskId and payload
      * @param taskId task ID
@@ -32,6 +36,13 @@ public interface IConnection {
     public void send(int taskId,  byte[] payload);
     
     /**
+     * send batch messages
+     * @param msgs
+     */
+
+    public void send(Iterator<TaskMessage> msgs);
+    
+    /**
      * close this connection
      */
     public void close();