You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/05 23:00:42 UTC

[10/37] storm git commit: Aggregate topo stats on nimbus, not ui

http://git-wip-us.apache.org/repos/asf/storm/blob/a16b50c9/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index c440043..e5fb708 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -21,7 +21,7 @@
         ring.middleware.multipart-params)
   (:use [ring.middleware.json :only [wrap-json-params]])
   (:use [hiccup core page-helpers])
-  (:use [backtype.storm config util log])
+  (:use [backtype.storm config util log stats])
   (:use [backtype.storm.ui helpers])
   (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
                                               ACKER-FAIL-STREAM-ID system-id? mk-authorization-handler]]])
@@ -31,7 +31,10 @@
             ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats
             ErrorInfo ClusterSummary SupervisorSummary TopologySummary
             Nimbus$Client StormTopology GlobalStreamId RebalanceOptions
-            KillOptions GetInfoOptions NumErrorsChoice])
+            KillOptions GetInfoOptions NumErrorsChoice TopologyPageInfo
+            TopologyStats CommonAggregateStats ComponentAggregateStats
+            ComponentType BoltAggregateStats SpoutAggregateStats
+            ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo])
   (:import [backtype.storm.security.auth AuthUtils ReqContext])
   (:import [backtype.storm.generated AuthorizationException])
   (:import [backtype.storm.security.auth AuthUtils])
@@ -87,94 +90,10 @@
            (throw (AuthorizationException.
                    (str "UI request '" op "' for '" user "' user is not authorized")))))))))
 
-(defn get-filled-stats
-  [summs]
-  (->> summs
-       (map #(.get_stats ^ExecutorSummary %))
-       (filter not-nil?)))
-
-(defn component-type
-  "Returns the component type (either :bolt or :spout) for a given
-  topology and component id. Returns nil if not found."
-  [^StormTopology topology id]
-  (let [bolts (.get_bolts topology)
-        spouts (.get_spouts topology)]
-    (cond
-      (.containsKey bolts id) :bolt
-      (.containsKey spouts id) :spout)))
-
 (defn executor-summary-type
   [topology ^ExecutorSummary s]
   (component-type topology (.get_component_id s)))
 
-(defn add-pairs
-  ([] [0 0])
-  ([[a1 a2] [b1 b2]]
-   [(+ a1 b1) (+ a2 b2)]))
-
-(defn expand-averages
-  [avg counts]
-  (let [avg (clojurify-structure avg)
-        counts (clojurify-structure counts)]
-    (into {}
-          (for [[slice streams] counts]
-            [slice
-             (into {}
-                   (for [[stream c] streams]
-                     [stream
-                      [(* c (get-in avg [slice stream]))
-                       c]]
-                     ))]))))
-
-(defn expand-averages-seq
-  [average-seq counts-seq]
-  (->> (map vector average-seq counts-seq)
-       (map #(apply expand-averages %))
-       (apply merge-with (fn [s1 s2] (merge-with add-pairs s1 s2)))))
-
-(defn- val-avg
-  [[t c]]
-  (if (= t 0) 0
-    (double (/ t c))))
-
-(defn aggregate-averages
-  [average-seq counts-seq]
-  (->> (expand-averages-seq average-seq counts-seq)
-       (map-val
-         (fn [s]
-           (map-val val-avg s)))))
-
-(defn aggregate-counts
-  [counts-seq]
-  (->> counts-seq
-       (map clojurify-structure)
-       (apply merge-with
-              (fn [s1 s2]
-                (merge-with + s1 s2)))))
-
-(defn aggregate-avg-streams
-  [avg counts]
-  (let [expanded (expand-averages avg counts)]
-    (->> expanded
-         (map-val #(reduce add-pairs (vals %)))
-         (map-val val-avg))))
-
-(defn aggregate-count-streams
-  [stats]
-  (->> stats
-       (map-val #(reduce + (vals %)))))
-
-(defn aggregate-common-stats
-  [stats-seq]
-  {:emitted (aggregate-counts (map #(.get_emitted ^ExecutorStats %) stats-seq))
-   :transferred (aggregate-counts (map #(.get_transferred ^ExecutorStats %) stats-seq))})
-
-(defn mk-include-sys-fn
-  [include-sys?]
-  (if include-sys?
-    (fn [_] true)
-    (fn [stream] (and (string? stream) (not (system-id? stream))))))
-
 (defn is-ack-stream
   [stream]
   (let [acker-streams
@@ -183,80 +102,6 @@
          ACKER-FAIL-STREAM-ID]]
     (every? #(not= %1 stream) acker-streams)))
 
-(defn pre-process
-  [stream-summary include-sys?]
-  (let [filter-fn (mk-include-sys-fn include-sys?)
-        emitted (:emitted stream-summary)
-        emitted (into {} (for [[window stat] emitted]
-                           {window (filter-key filter-fn stat)}))
-        transferred (:transferred stream-summary)
-        transferred (into {} (for [[window stat] transferred]
-                               {window (filter-key filter-fn stat)}))
-        stream-summary (-> stream-summary (dissoc :emitted) (assoc :emitted emitted))
-        stream-summary (-> stream-summary (dissoc :transferred) (assoc :transferred transferred))]
-    stream-summary))
-
-(defn aggregate-bolt-stats
-  [stats-seq include-sys?]
-  (let [stats-seq (collectify stats-seq)]
-    (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
-           {:acked
-            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_acked)
-                                   stats-seq))
-            :failed
-            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_failed)
-                                   stats-seq))
-            :executed
-            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
-                                   stats-seq))
-            :process-latencies
-            (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_process_ms_avg)
-                                     stats-seq)
-                                (map #(.. ^ExecutorStats % get_specific get_bolt get_acked)
-                                     stats-seq))
-            :execute-latencies
-            (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_execute_ms_avg)
-                                     stats-seq)
-                                (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
-                                     stats-seq))})))
-
-(defn aggregate-spout-stats
-  [stats-seq include-sys?]
-  (let [stats-seq (collectify stats-seq)]
-    (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
-           {:acked
-            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_acked)
-                                   stats-seq))
-            :failed
-            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_failed)
-                                   stats-seq))
-            :complete-latencies
-            (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_spout get_complete_ms_avg)
-                                     stats-seq)
-                                (map #(.. ^ExecutorStats % get_specific get_spout get_acked)
-                                     stats-seq))})))
-
-(defn aggregate-bolt-streams
-  [stats]
-  {:acked (aggregate-count-streams (:acked stats))
-   :failed (aggregate-count-streams (:failed stats))
-   :emitted (aggregate-count-streams (:emitted stats))
-   :transferred (aggregate-count-streams (:transferred stats))
-   :process-latencies (aggregate-avg-streams (:process-latencies stats)
-                                             (:acked stats))
-   :executed (aggregate-count-streams (:executed stats))
-   :execute-latencies (aggregate-avg-streams (:execute-latencies stats)
-                                             (:executed stats))})
-
-(defn aggregate-spout-streams
-  [stats]
-  {:acked (aggregate-count-streams (:acked stats))
-   :failed (aggregate-count-streams (:failed stats))
-   :emitted (aggregate-count-streams (:emitted stats))
-   :transferred (aggregate-count-streams (:transferred stats))
-   :complete-latencies (aggregate-avg-streams (:complete-latencies stats)
-                                              (:acked stats))})
-
 (defn spout-summary?
   [topology s]
   (= :spout (executor-summary-type topology s)))
@@ -270,57 +115,11 @@
   (let [ret (group-by #(.get_component_id ^ExecutorSummary %) summs)]
     (into (sorted-map) ret )))
 
-(defn error-subset
-  [error-str]
-  (apply str (take 200 error-str)))
-
-(defn most-recent-error
-  [errors-list]
-  (let [error (->> errors-list
-                   (sort-by #(.get_error_time_secs ^ErrorInfo %))
-                   reverse
-                   first)]
-     error))
-
-(defn component-task-summs
-  [^TopologyInfo summ topology id]
-  (let [spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
-        bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
-        spout-comp-summs (group-by-comp spout-summs)
-        bolt-comp-summs (group-by-comp bolt-summs)
-        ret (if (contains? spout-comp-summs id)
-              (spout-comp-summs id)
-              (bolt-comp-summs id))]
-    (sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start) ret)))
-
 (defn worker-log-link [host port topology-id]
   (let [fname (logs-filename topology-id port)]
     (url-format (str "http://%s:%s/log?file=%s")
           host (*STORM-CONF* LOGVIEWER-PORT) fname)))
 
-(defn compute-executor-capacity
-  [^ExecutorSummary e]
-  (let [stats (.get_stats e)
-        stats (if stats
-                (-> stats
-                    (aggregate-bolt-stats true)
-                    (aggregate-bolt-streams)
-                    swap-map-order
-                    (get "600")))
-        uptime (nil-to-zero (.get_uptime_secs e))
-        window (if (< uptime 600) uptime 600)
-        executed (-> stats :executed nil-to-zero)
-        latency (-> stats :execute-latencies nil-to-zero)]
-    (if (> window 0)
-      (div (* executed latency) (* 1000 window)))))
-
-(defn compute-bolt-capacity
-  [executors]
-  (->> executors
-       (map compute-executor-capacity)
-       (map nil-to-zero)
-       (apply max)))
-
 (defn get-error-time
   [error]
   (if error
@@ -333,10 +132,9 @@
     ""))
 
 (defn get-error-port
-  [error error-host top-id]
+  [error]
   (if error
-    (.get_port ^ErrorInfo error)
-    ""))
+    (.get_port ^ErrorInfo error)))
 
 (defn get-error-host
   [error]
@@ -344,41 +142,6 @@
     (.get_host ^ErrorInfo error)
     ""))
 
-(defn spout-streams-stats
-  [summs include-sys?]
-  (let [stats-seq (get-filled-stats summs)]
-    (aggregate-spout-streams
-      (aggregate-spout-stats
-        stats-seq include-sys?))))
-
-(defn bolt-streams-stats
-  [summs include-sys?]
-  (let [stats-seq (get-filled-stats summs)]
-    (aggregate-bolt-streams
-      (aggregate-bolt-stats
-        stats-seq include-sys?))))
-
-(defn total-aggregate-stats
-  [spout-summs bolt-summs include-sys?]
-  (let [spout-stats (get-filled-stats spout-summs)
-        bolt-stats (get-filled-stats bolt-summs)
-        agg-spout-stats (-> spout-stats
-                            (aggregate-spout-stats include-sys?)
-                            aggregate-spout-streams)
-        agg-bolt-stats (-> bolt-stats
-                           (aggregate-bolt-stats include-sys?)
-                           aggregate-bolt-streams)]
-    (merge-with
-      (fn [s1 s2]
-        (merge-with + s1 s2))
-      (select-keys
-        agg-bolt-stats
-        ;; Include only keys that will be used.  We want to count acked and
-        ;; failed only for the "tuple trees," so we do not include those keys
-        ;; from the bolt executors.
-        [:emitted :transferred])
-      agg-spout-stats)))
-
 (defn stats-times
   [stats-map]
   (sort-by #(Integer/parseInt %)
@@ -393,16 +156,6 @@
     "All time"
     (pretty-uptime-sec window)))
 
-(defn topology-action-button
-  [id name action command is-wait default-wait enabled]
-  [:input {:type "button"
-           :value action
-           (if enabled :enabled :disabled) ""
-           :onclick (str "confirmAction('"
-                         (StringEscapeUtils/escapeJavaScript id) "', '"
-                         (StringEscapeUtils/escapeJavaScript name) "', '"
-                         command "', " is-wait ", " default-wait ")")}])
-
 (defn sanitize-stream-name
   [name]
   (let [sym-regex #"(?![A-Za-z_\-:\.])."]
@@ -496,9 +249,7 @@
           spout-comp-summs (group-by-comp spout-summs)
           bolt-comp-summs (group-by-comp bolt-summs)
           bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?)
-                                      bolt-comp-summs)
-          topology-conf (from-json
-                          (.getTopologyConf ^Nimbus$Client nimbus id))]
+                                      bolt-comp-summs)]
       (visualization-data
        (merge (hashmap-to-persistent spouts)
               (hashmap-to-persistent bolts))
@@ -604,170 +355,140 @@
        "executorsTotal" (.get_num_executors t)
        "schedulerInfo" (.get_sched_status t)})}))
 
-(defn topology-stats [id window stats]
+(defn topology-stats [window stats]
   (let [times (stats-times (:emitted stats))
         display-map (into {} (for [t times] [t pretty-uptime-sec]))
         display-map (assoc display-map ":all-time" (fn [_] "All time"))]
-    (for [k (concat times [":all-time"])
-          :let [disp ((display-map k) k)]]
+    (for [w (concat times [":all-time"])
+          :let [disp ((display-map w) w)]]
       {"windowPretty" disp
-       "window" k
-       "emitted" (get-in stats [:emitted k])
-       "transferred" (get-in stats [:transferred k])
-       "completeLatency" (float-str (get-in stats [:complete-latencies k]))
-       "acked" (get-in stats [:acked k])
-       "failed" (get-in stats [:failed k])})))
-
-(defn spout-comp [top-id summ-map errors window include-sys?]
-  (for [[id summs] summ-map
-        :let [stats-seq (get-filled-stats summs)
-              stats (aggregate-spout-streams
-                     (aggregate-spout-stats
-                      stats-seq include-sys?))
-              last-error (most-recent-error (get errors id))
-              error-host (get-error-host last-error)
-              error-port (get-error-port last-error error-host top-id)]]
-    {"spoutId" id
-     "encodedSpoutId" (url-encode id)
-     "executors" (count summs)
-     "tasks" (sum-tasks summs)
-     "emitted" (get-in stats [:emitted window])
-     "transferred" (get-in stats [:transferred window])
-     "completeLatency" (float-str (get-in stats [:complete-latencies window]))
-     "acked" (get-in stats [:acked window])
-     "failed" (get-in stats [:failed window])
-     "errorHost" error-host
-     "errorPort" error-port
-     "errorWorkerLogLink" (worker-log-link error-host error-port top-id)
-     "errorLapsedSecs" (get-error-time last-error)
-     "lastError" (get-error-data last-error)}))
-
-(defn bolt-comp [top-id summ-map errors window include-sys?]
-  (for [[id summs] summ-map
-        :let [stats-seq (get-filled-stats summs)
-              stats (aggregate-bolt-streams
-                     (aggregate-bolt-stats
-                      stats-seq include-sys?))
-              last-error (most-recent-error (get errors id))
-              error-host (get-error-host last-error)
-              error-port (get-error-port last-error error-host top-id)]]
-    {"boltId" id
-     "encodedBoltId" (url-encode id)
-     "executors" (count summs)
-     "tasks" (sum-tasks summs)
-     "emitted" (get-in stats [:emitted window])
-     "transferred" (get-in stats [:transferred window])
-     "capacity" (float-str (nil-to-zero (compute-bolt-capacity summs)))
-     "executeLatency" (float-str (get-in stats [:execute-latencies window]))
-     "executed" (get-in stats [:executed window])
-     "processLatency" (float-str (get-in stats [:process-latencies window]))
-     "acked" (get-in stats [:acked window])
-     "failed" (get-in stats [:failed window])
-     "errorHost" error-host
-     "errorPort" error-port
-     "errorWorkerLogLink" (worker-log-link error-host error-port top-id)
-     "errorLapsedSecs" (get-error-time last-error)
-     "lastError" (get-error-data last-error)}))
-
-(defn topology-summary [^TopologyInfo summ]
-  (let [executors (.get_executors summ)
-        workers (set (for [^ExecutorSummary e executors]
-                       [(.get_host e) (.get_port e)]))]
-      {"id" (.get_id summ)
-       "encodedId" (url-encode (.get_id summ))
-       "owner" (.get_owner summ)
-       "name" (.get_name summ)
-       "status" (.get_status summ)
-       "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
-       "tasksTotal" (sum-tasks executors)
-       "workersTotal" (count workers)
-       "executorsTotal" (count executors)
-       "schedulerInfo" (.get_sched_status summ)}))
-
-(defn spout-summary-json [topology-id id stats window]
-  (let [times (stats-times (:emitted stats))
-        display-map (into {} (for [t times] [t pretty-uptime-sec]))
-        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
-     (for [k (concat times [":all-time"])
-           :let [disp ((display-map k) k)]]
-       {"windowPretty" disp
-        "window" k
-        "emitted" (get-in stats [:emitted k])
-        "transferred" (get-in stats [:transferred k])
-        "completeLatency" (float-str (get-in stats [:complete-latencies k]))
-        "acked" (get-in stats [:acked k])
-        "failed" (get-in stats [:failed k])})))
-
-(defn topology-page [id window include-sys? user]
+       "window" w
+       "emitted" (get-in stats [:emitted w])
+       "transferred" (get-in stats [:transferred w])
+       "completeLatency" (float-str (get-in stats [:complete-latencies w]))
+       "acked" (get-in stats [:acked w])
+       "failed" (get-in stats [:failed w])})))
+
+(defn build-visualization [id window include-sys? user]
   (with-nimbus nimbus
     (let [window (if window window ":all-time")
-          window-hint (window-hint window)
-          summ (->> (doto
-                      (GetInfoOptions.)
-                      (.set_num_err_choice NumErrorsChoice/ONE))
-                    (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
-          topology (.getTopology ^Nimbus$Client nimbus id)
-          topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus id))
-          spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
-          bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
-          spout-comp-summs (group-by-comp spout-summs)
-          bolt-comp-summs (group-by-comp bolt-summs)
-          bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?) bolt-comp-summs)
-          name (.get_name summ)
-          status (.get_status summ)
-          msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)
-          spouts (.get_spouts topology)
-          bolts (.get_bolts topology)
-          visualizer-data (visualization-data (merge (hashmap-to-persistent spouts)
-                                                     (hashmap-to-persistent bolts))
-                                              spout-comp-summs
-                                              bolt-comp-summs
+          topology-info (->> (doto
+                               (GetInfoOptions.)
+                               (.set_num_err_choice NumErrorsChoice/ONE))
+                             (.getTopologyInfoWithOpts ^Nimbus$Client nimbus
+                                                       id))
+          storm-topology (.getTopology ^Nimbus$Client nimbus id)
+          spout-executor-summaries (filter (partial spout-summary? storm-topology) (.get_executors topology-info))
+          bolt-executor-summaries (filter (partial bolt-summary? storm-topology) (.get_executors topology-info))
+          spout-comp-id->executor-summaries (group-by-comp spout-executor-summaries)
+          bolt-comp-id->executor-summaries (group-by-comp bolt-executor-summaries)
+          bolt-comp-id->executor-summaries (filter-key (mk-include-sys-fn include-sys?) bolt-comp-id->executor-summaries)
+          id->spout-spec (.get_spouts storm-topology)
+          id->bolt (.get_bolts storm-topology)
+          visualizer-data (visualization-data (merge (hashmap-to-persistent id->spout-spec)
+                                                     (hashmap-to-persistent id->bolt))
+                                              spout-comp-id->executor-summaries
+                                              bolt-comp-id->executor-summaries
                                               window
                                               id)]
+       {"visualizationTable" (stream-boxes visualizer-data)})))
+
+(defn- get-error-json
+  [topo-id error-info]
+  (let [host (get-error-host error-info)
+        port (get-error-port error-info)]
+    {"lastError" (get-error-data error-info)
+     "errorHost" host
+     "errorPort" port
+     "errorLapsedSecs" (get-error-time error-info)
+     "errorWorkerLogLink" (worker-log-link host port topo-id)}))
+
+(defn- common-agg-stats-json
+  "Returns a JSON representation of a common aggregated statistics."
+  [^CommonAggregateStats common-stats]
+  {"executors" (.get_num_executors common-stats)
+   "tasks" (.get_num_tasks common-stats)
+   "emitted" (.get_emitted common-stats)
+   "transferred" (.get_transferred common-stats)
+   "acked" (.get_acked common-stats)
+   "failed" (.get_failed common-stats)})
+
+(defmulti comp-agg-stats-json
+  "Returns a JSON representation of aggregated statistics."
+  (fn [_ [id ^ComponentAggregateStats s]] (.get_type s)))
+
+(defmethod comp-agg-stats-json ComponentType/SPOUT
+  [topo-id [id ^ComponentAggregateStats s]]
+  (let [^SpoutAggregateStats ss (.. s get_specific_stats get_spout)
+        cs (.get_common_stats s)]
+    (merge
+      (common-agg-stats-json cs)
+      (get-error-json topo-id (.get_last_error s))
+      {"spoutId" id
+       "encodedSpoutId" (url-encode id)
+       "completeLatency" (float-str (.get_complete_latency_ms ss))})))
+
+(defmethod comp-agg-stats-json ComponentType/BOLT
+  [topo-id [id ^ComponentAggregateStats s]]
+  (let [^BoltAggregateStats ss (.. s get_specific_stats get_bolt)
+        cs (.get_common_stats s)]
+    (merge
+      (common-agg-stats-json cs)
+      (get-error-json topo-id (.get_last_error s))
+      {"boltId" id
+       "encodedBoltId" (url-encode id)
+       "capacity" (float-str (.get_capacity ss))
+       "executeLatency" (float-str (.get_execute_latency_ms ss))
+       "executed" (.get_executed ss)
+       "processLatency" (float-str (.get_process_latency_ms ss))})))
+
+(defn- unpack-topology-page-info
+  "Unpacks the serialized object to data structures"
+  [^TopologyPageInfo topo-info window]
+  (let [id (.get_id topo-info)
+        ^TopologyStats topo-stats (.get_topology_stats topo-info)
+        stat->window->number
+          {:emitted (.get_window_to_emitted topo-stats)
+           :transferred (.get_window_to_transferred topo-stats)
+           :complete-latencies (.get_window_to_complete_latencies_ms topo-stats)
+           :acked (.get_window_to_acked topo-stats)
+           :failed (.get_window_to_failed topo-stats)}
+        topo-stats (topology-stats window stat->window->number)]
+    {"id" id
+     "encodedId" (url-encode id)
+     "owner" (.get_owner topo-info)
+     "name" (.get_name topo-info)
+     "status" (.get_status topo-info)
+     "uptime" (pretty-uptime-sec (.get_uptime_secs topo-info))
+     "tasksTotal" (.get_num_tasks topo-info)
+     "workersTotal" (.get_num_workers topo-info)
+     "executorsTotal" (.get_num_executors topo-info)
+     "schedulerInfo" (.get_sched_status topo-info)
+     "topologyStats" topo-stats
+     "spouts" (map (partial comp-agg-stats-json id)
+                   (.get_id_to_spout_agg_stats topo-info))
+     "bolts" (map (partial comp-agg-stats-json id)
+                  (.get_id_to_bolt_agg_stats topo-info))
+     "configuration" (.get_topology_conf topo-info)}))
+
+(defn topology-page [id window include-sys? user]
+  (with-nimbus nimbus
+    (let [window (or window ":all-time")
+          window-hint (window-hint window)
+          topo-page-info (.getTopologyPageInfo ^Nimbus$Client nimbus
+                                               id
+                                               window
+                                               include-sys?)
+          topology-conf (from-json (.get_topology_conf topo-page-info))
+          msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)]
       (merge
-       (topology-summary summ)
+       (unpack-topology-page-info topo-page-info window)
        {"user" user
         "window" window
         "windowHint" window-hint
         "msgTimeout" msg-timeout
-        "topologyStats" (topology-stats id window (total-aggregate-stats spout-summs bolt-summs include-sys?))
-        "spouts" (spout-comp id spout-comp-summs (.get_errors summ) window include-sys?)
-        "bolts" (bolt-comp id bolt-comp-summs (.get_errors summ) window include-sys?)
         "configuration" topology-conf
-        "visualizationTable" (stream-boxes visualizer-data)}))))
-
-(defn spout-output-stats
-  [stream-summary window]
-  (let [stream-summary (map-val swap-map-order (swap-map-order stream-summary))]
-    (for [[s stats] (stream-summary window)]
-      {"stream" s
-       "emitted" (nil-to-zero (:emitted stats))
-       "transferred" (nil-to-zero (:transferred stats))
-       "completeLatency" (float-str (:complete-latencies stats))
-       "acked" (nil-to-zero (:acked stats))
-       "failed" (nil-to-zero (:failed stats))})))
-
-(defn spout-executor-stats
-  [topology-id executors window include-sys?]
-  (for [^ExecutorSummary e executors
-        :let [stats (.get_stats e)
-              stats (if stats
-                      (-> stats
-                          (aggregate-spout-stats include-sys?)
-                          aggregate-spout-streams
-                          swap-map-order
-                          (get window)))]]
-    {"id" (pretty-executor-info (.get_executor_info e))
-     "encodedId" (url-encode (pretty-executor-info (.get_executor_info e)))
-     "uptime" (pretty-uptime-sec (.get_uptime_secs e))
-     "host" (.get_host e)
-     "port" (.get_port e)
-     "emitted" (nil-to-zero (:emitted stats))
-     "transferred" (nil-to-zero (:transferred stats))
-     "completeLatency" (float-str (:complete-latencies stats))
-     "acked" (nil-to-zero (:acked stats))
-     "failed" (nil-to-zero (:failed stats))
-     "workerLogLink" (worker-log-link (.get_host e) (.get_port e) topology-id)}))
+        "visualizationTable" []}))))
 
 (defn component-errors
   [errors-list topology-id]
@@ -783,128 +504,180 @@
         "errorLapsedSecs" (get-error-time e)
         "error" (.get_error e)})}))
 
-(defn spout-stats
-  [window ^TopologyInfo topology-info component executors include-sys?]
-  (let [window-hint (str " (" (window-hint window) ")")
-        stats (get-filled-stats executors)
-        stream-summary (-> stats (aggregate-spout-stats include-sys?))
-        summary (-> stream-summary aggregate-spout-streams)]
-    {"spoutSummary" (spout-summary-json
-                      (.get_id topology-info) component summary window)
-     "outputStats" (spout-output-stats stream-summary window)
-     "executorStats" (spout-executor-stats (.get_id topology-info)
-                                           executors window include-sys?)}))
-
-(defn bolt-summary
-  [topology-id id stats window]
-  (let [times (stats-times (:emitted stats))
-        display-map (into {} (for [t times] [t pretty-uptime-sec]))
-        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
-    (for [k (concat times [":all-time"])
-          :let [disp ((display-map k) k)]]
-      {"window" k
-       "windowPretty" disp
-       "emitted" (get-in stats [:emitted k])
-       "transferred" (get-in stats [:transferred k])
-       "executeLatency" (float-str (get-in stats [:execute-latencies k]))
-       "executed" (get-in stats [:executed k])
-       "processLatency" (float-str (get-in stats [:process-latencies k]))
-       "acked" (get-in stats [:acked k])
-       "failed" (get-in stats [:failed k])})))
-
-(defn bolt-output-stats
-  [stream-summary window]
-  (let [stream-summary (-> stream-summary
-                           swap-map-order
-                           (get window)
-                           (select-keys [:emitted :transferred])
-                           swap-map-order)]
-    (for [[s stats] stream-summary]
-      {"stream" s
-        "emitted" (nil-to-zero (:emitted stats))
-        "transferred" (nil-to-zero (:transferred stats))})))
-
-(defn bolt-input-stats
-  [stream-summary window]
-  (let [stream-summary
-        (-> stream-summary
-            swap-map-order
-            (get window)
-            (select-keys [:acked :failed :process-latencies
-                          :executed :execute-latencies])
-            swap-map-order)]
-    (for [[^GlobalStreamId s stats] stream-summary]
-      {"component" (.get_componentId s)
-       "encodedComponent" (url-encode (.get_componentId s))
-       "stream" (.get_streamId s)
-       "executeLatency" (float-str (:execute-latencies stats))
-       "processLatency" (float-str (:process-latencies stats))
-       "executed" (nil-to-zero (:executed stats))
-       "acked" (nil-to-zero (:acked stats))
-       "failed" (nil-to-zero (:failed stats))})))
-
-(defn bolt-executor-stats
-  [topology-id executors window include-sys?]
-  (for [^ExecutorSummary e executors
-        :let [stats (.get_stats e)
-              stats (if stats
-                      (-> stats
-                          (aggregate-bolt-stats include-sys?)
-                          (aggregate-bolt-streams)
-                          swap-map-order
-                          (get window)))]]
-    {"id" (pretty-executor-info (.get_executor_info e))
-     "encodedId" (url-encode (pretty-executor-info (.get_executor_info e)))
-     "uptime" (pretty-uptime-sec (.get_uptime_secs e))
-     "host" (.get_host e)
-     "port" (.get_port e)
-     "emitted" (nil-to-zero (:emitted stats))
-     "transferred" (nil-to-zero (:transferred stats))
-     "capacity" (float-str (nil-to-zero (compute-executor-capacity e)))
-     "executeLatency" (float-str (:execute-latencies stats))
-     "executed" (nil-to-zero (:executed stats))
-     "processLatency" (float-str (:process-latencies stats))
-     "acked" (nil-to-zero (:acked stats))
-     "failed" (nil-to-zero (:failed stats))
-     "workerLogLink" (worker-log-link (.get_host e) (.get_port e) topology-id)}))
-
-(defn bolt-stats
-  [window ^TopologyInfo topology-info component executors include-sys?]
-  (let [window-hint (str " (" (window-hint window) ")")
-        stats (get-filled-stats executors)
-        stream-summary (-> stats (aggregate-bolt-stats include-sys?))
-        summary (-> stream-summary aggregate-bolt-streams)]
-    {"boltStats" (bolt-summary (.get_id topology-info) component summary window)
-     "inputStats" (bolt-input-stats stream-summary window)
-     "outputStats" (bolt-output-stats stream-summary window)
-     "executorStats" (bolt-executor-stats
-                       (.get_id topology-info) executors window include-sys?)}))
+(defmulti unpack-comp-agg-stat
+  (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
+
+(defmethod unpack-comp-agg-stat ComponentType/BOLT
+  [[window ^ComponentAggregateStats s]]
+  (let [^CommonAggregateStats comm-s (.get_common_stats s)
+        ^SpecificAggregateStats spec-s (.get_specific_stats s)
+        ^BoltAggregateStats bolt-s (.get_bolt spec-s)]
+    {"window" window
+     "windowPretty" (window-hint window)
+     "emitted" (.get_emitted comm-s)
+     "transferred" (.get_transferred comm-s)
+     "acked" (.get_acked comm-s)
+     "failed" (.get_failed comm-s)
+     "executeLatency" (float-str (.get_execute_latency_ms bolt-s))
+     "processLatency"  (float-str (.get_process_latency_ms bolt-s))
+     "executed" (.get_executed bolt-s)
+     "capacity" (float-str (.get_capacity bolt-s))}))
+
+(defmethod unpack-comp-agg-stat ComponentType/SPOUT
+  [[window ^ComponentAggregateStats s]]
+  (let [^CommonAggregateStats comm-s (.get_common_stats s)
+        ^SpecificAggregateStats spec-s (.get_specific_stats s)
+        ^SpoutAggregateStats spout-s (.get_spout spec-s)]
+    {"window" window
+     "windowPretty" (window-hint window)
+     "emitted" (.get_emitted comm-s)
+     "transferred" (.get_transferred comm-s)
+     "acked" (.get_acked comm-s)
+     "failed" (.get_failed comm-s)
+     "completeLatency" (float-str (.get_complete_latency_ms spout-s))}))
+
+(defn- unpack-bolt-input-stat
+  [[^GlobalStreamId s ^ComponentAggregateStats stats]]
+  (let [^SpecificAggregateStats sas (.get_specific_stats stats)
+        ^BoltAggregateStats bas (.get_bolt sas)
+        ^CommonAggregateStats cas (.get_common_stats stats)
+        comp-id (.get_componentId s)]
+    {"component" comp-id
+     "encodedComponentId" (url-encode comp-id)
+     "stream" (.get_streamId s)
+     "executeLatency" (float-str (.get_execute_latency_ms bas))
+     "processLatency" (float-str (.get_process_latency_ms bas))
+     "executed" (nil-to-zero (.get_executed bas))
+     "acked" (nil-to-zero (.get_acked cas))
+     "failed" (nil-to-zero (.get_failed cas))}))
+
+(defmulti unpack-comp-output-stat
+  (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
+
+(defmethod unpack-comp-output-stat ComponentType/BOLT
+  [[stream-id ^ComponentAggregateStats stats]]
+  (let [^CommonAggregateStats cas (.get_common_stats stats)]
+    {"stream" stream-id
+     "emitted" (nil-to-zero (.get_emitted cas))
+     "transferred" (nil-to-zero (.get_transferred cas))}))
+
+(defmethod unpack-comp-output-stat ComponentType/SPOUT
+  [[stream-id ^ComponentAggregateStats stats]]
+  (let [^CommonAggregateStats cas (.get_common_stats stats)
+        ^SpecificAggregateStats spec-s (.get_specific_stats stats)
+        ^SpoutAggregateStats spout-s (.get_spout spec-s)]
+    {"stream" stream-id
+     "emitted" (nil-to-zero (.get_emitted cas))
+     "transferred" (nil-to-zero (.get_transferred cas))
+     "completeLatency" (float-str (.get_complete_latency_ms spout-s))
+     "acked" (nil-to-zero (.get_acked cas))
+     "failed" (nil-to-zero (.get_failed cas))}))
+
+(defmulti unpack-comp-exec-stat
+  (fn [_ ^ComponentAggregateStats cas] (.get_type (.get_stats ^ExecutorAggregateStats cas))))
+
+(defmethod unpack-comp-exec-stat ComponentType/BOLT
+  [topology-id ^ExecutorAggregateStats eas]
+  (let [^ExecutorSummary summ (.get_exec_summary eas)
+        ^ExecutorInfo info (.get_executor_info summ)
+        ^ComponentAggregateStats stats (.get_stats eas)
+        ^SpecificAggregateStats ss (.get_specific_stats stats)
+        ^BoltAggregateStats bas (.get_bolt ss)
+        ^CommonAggregateStats cas (.get_common_stats stats)
+        host (.get_host summ)
+        port (.get_port summ)
+        exec-id (pretty-executor-info info)]
+    {"id" exec-id
+     "encodedId" (url-encode exec-id)
+     "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
+     "host" host
+     "port" port
+     "emitted" (nil-to-zero (.get_emitted cas))
+     "transferred" (nil-to-zero (.get_transferred cas))
+     "capacity" (float-str (nil-to-zero (.get_capacity bas)))
+     "executeLatency" (float-str (.get_execute_latency_ms bas))
+     "executed" (nil-to-zero (.get_executed bas))
+     "processLatency" (float-str (.get_process_latency_ms bas))
+     "acked" (nil-to-zero (.get_acked cas))
+     "failed" (nil-to-zero (.get_failed cas))
+     "workerLogLink" (worker-log-link host port topology-id)}))
+
+(defmethod unpack-comp-exec-stat ComponentType/SPOUT
+  [topology-id ^ExecutorAggregateStats eas]
+  (let [^ExecutorSummary summ (.get_exec_summary eas)
+        ^ExecutorInfo info (.get_executor_info summ)
+        ^ComponentAggregateStats stats (.get_stats eas)
+        ^SpecificAggregateStats ss (.get_specific_stats stats)
+        ^SpoutAggregateStats sas (.get_spout ss)
+        ^CommonAggregateStats cas (.get_common_stats stats)
+        host (.get_host summ)
+        port (.get_port summ)
+        exec-id (pretty-executor-info info)]
+    {"id" exec-id
+     "encodedId" (url-encode exec-id)
+     "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
+     "host" host
+     "port" port
+     "emitted" (nil-to-zero (.get_emitted cas))
+     "transferred" (nil-to-zero (.get_transferred cas))
+     "completeLatency" (float-str (.get_complete_latency_ms sas))
+     "acked" (nil-to-zero (.get_acked cas))
+     "failed" (nil-to-zero (.get_failed cas))
+     "workerLogLink" (worker-log-link host port topology-id)}))
+
+(defmulti unpack-component-page-info
+  "Unpacks component-specific info to clojure data structures"
+  (fn [^ComponentPageInfo info & _]
+    (.get_component_type info)))
+
+(defmethod unpack-component-page-info ComponentType/BOLT
+  [^ComponentPageInfo info topology-id window include-sys?]
+  (merge
+    {"boltStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
+     "inputStats" (map unpack-bolt-input-stat (.get_gsid_to_input_stats info))
+     "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
+     "executorStats" (map (partial unpack-comp-exec-stat topology-id)
+                          (.get_exec_stats info))}
+    (-> info .get_errors (component-errors topology-id))))
+
+(defmethod unpack-component-page-info ComponentType/SPOUT
+  [^ComponentPageInfo info topology-id window include-sys?]
+  (merge
+    {"spoutStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
+     "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
+     "executorStats" (map (partial unpack-comp-exec-stat topology-id)
+                          (.get_exec_stats info))}
+    (-> info .get_errors (component-errors topology-id))))
 
 (defn component-page
   [topology-id component window include-sys? user]
   (with-nimbus nimbus
-    (let [window (if window window ":all-time")
-          summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id)
-          topology (.getTopology ^Nimbus$Client nimbus topology-id)
-          topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id))
-          type (component-type topology component)
-          summs (component-task-summs summ topology component)
-          spec (cond (= type :spout) (spout-stats window summ component summs include-sys?)
-                     (= type :bolt) (bolt-stats window summ component summs include-sys?))
-          errors (component-errors (get (.get_errors summ) component) topology-id)]
-      (merge
-        {"user" user
-         "id" component
-         "encodedId" (url-encode component)
-         "name" (.get_name summ)
-         "executors" (count summs)
-         "tasks" (sum-tasks summs)
-         "topologyId" topology-id
-         "encodedTopologyId" (url-encode topology-id)
-         "window" window
-         "componentType" (name type)
-         "windowHint" (window-hint window)}
-       spec errors))))
+    (let [window (or window ":all-time")
+          window-hint (window-hint window)
+          comp-page-info (.getComponentPageInfo ^Nimbus$Client nimbus
+                                                topology-id
+                                                component
+                                                window
+                                                include-sys?)
+          topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus
+                                                     topology-id))
+          msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)]
+      (assoc
+       (unpack-component-page-info comp-page-info
+                                   topology-id
+                                   window
+                                   include-sys?)
+       "user" user
+       "id" component
+       "encodedId" (url-encode component)
+       "name" (.get_topology_name comp-page-info)
+       "executors" (.get_num_executors comp-page-info)
+       "tasks" (.get_num_tasks comp-page-info)
+       "topologyId" topology-id
+       "encodedTopologyId" (url-encode topology-id)
+       "window" window
+       "componentType" (-> comp-page-info .get_component_type str lower-case)
+       "windowHint" window-hint))))
 
 (defn topology-config [topology-id]
   (with-nimbus nimbus
@@ -955,6 +728,11 @@
         (let [user (.getUserName http-creds-handler servlet-request)]
           (assert-authorized-user servlet-request "getTopology" (topology-config id))
           (json-response (topology-page id (:window m) (check-include-sys? (:sys m)) user) (:callback m))))
+  (GET "/api/v1/topology/:id/visualization-init" [:as {:keys [cookies servlet-request]} id & m]
+        (assert-authorized-user servlet-request "getTopology" (topology-config id))
+        (let [id (url-decode id)
+              user (.getUserName http-creds-handler servlet-request)]
+          (json-response (build-visualization id (:window m) (check-include-sys? (:sys m)) user) (:callback m))))
   (GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies servlet-request]} id & m]
         (assert-authorized-user servlet-request "getTopology" (topology-config id))
         (json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys m))) (:callback m)))

http://git-wip-us.apache.org/repos/asf/storm/blob/a16b50c9/storm-core/src/clj/backtype/storm/ui/helpers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/helpers.clj b/storm-core/src/clj/backtype/storm/ui/helpers.clj
index 70ef179..a5ff810 100644
--- a/storm-core/src/clj/backtype/storm/ui/helpers.clj
+++ b/storm-core/src/clj/backtype/storm/ui/helpers.clj
@@ -96,40 +96,10 @@
       )]
    ])
 
-(defn float-str [n]
-  (if n
-    (format "%.3f" (float n))
-    "0"
-    ))
-
-(defn swap-map-order [m]
-  (->> m
-       (map (fn [[k v]]
-              (into
-               {}
-               (for [[k2 v2] v]
-                 [k2 {k v2}]
-                 ))
-              ))
-       (apply merge-with merge)
-       ))
-
 (defn url-format [fmt & args]
   (String/format fmt
     (to-array (map #(url-encode (str %)) args))))
 
-(defn to-tasks [^ExecutorInfo e]
-  (let [start (.get_task_start e)
-        end (.get_task_end e)]
-    (range start (inc end))
-    ))
-
-(defn sum-tasks [executors]
-  (reduce + (->> executors
-                 (map #(.get_executor_info ^ExecutorSummary %))
-                 (map to-tasks)
-                 (map count))))
-
 (defn pretty-executor-info [^ExecutorInfo e]
   (str "[" (.get_task_start e) "-" (.get_task_end e) "]"))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a16b50c9/storm-core/src/jvm/backtype/storm/generated/Assignment.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/Assignment.java b/storm-core/src/jvm/backtype/storm/generated/Assignment.java
index 2e6e8a3..bb74a78 100644
--- a/storm-core/src/jvm/backtype/storm/generated/Assignment.java
+++ b/storm-core/src/jvm/backtype/storm/generated/Assignment.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-3-5")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-5-14")
 public class Assignment implements org.apache.thrift.TBase<Assignment, Assignment._Fields>, java.io.Serializable, Cloneable, Comparable<Assignment> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Assignment");
 
@@ -678,15 +678,15 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           case 2: // NODE_HOST
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map370 = iprot.readMapBegin();
-                struct.node_host = new HashMap<String,String>(2*_map370.size);
-                String _key371;
-                String _val372;
-                for (int _i373 = 0; _i373 < _map370.size; ++_i373)
+                org.apache.thrift.protocol.TMap _map486 = iprot.readMapBegin();
+                struct.node_host = new HashMap<String,String>(2*_map486.size);
+                String _key487;
+                String _val488;
+                for (int _i489 = 0; _i489 < _map486.size; ++_i489)
                 {
-                  _key371 = iprot.readString();
-                  _val372 = iprot.readString();
-                  struct.node_host.put(_key371, _val372);
+                  _key487 = iprot.readString();
+                  _val488 = iprot.readString();
+                  struct.node_host.put(_key487, _val488);
                 }
                 iprot.readMapEnd();
               }
@@ -698,26 +698,26 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           case 3: // EXECUTOR_NODE_PORT
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map374 = iprot.readMapBegin();
-                struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map374.size);
-                List<Long> _key375;
-                NodeInfo _val376;
-                for (int _i377 = 0; _i377 < _map374.size; ++_i377)
+                org.apache.thrift.protocol.TMap _map490 = iprot.readMapBegin();
+                struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map490.size);
+                List<Long> _key491;
+                NodeInfo _val492;
+                for (int _i493 = 0; _i493 < _map490.size; ++_i493)
                 {
                   {
-                    org.apache.thrift.protocol.TList _list378 = iprot.readListBegin();
-                    _key375 = new ArrayList<Long>(_list378.size);
-                    long _elem379;
-                    for (int _i380 = 0; _i380 < _list378.size; ++_i380)
+                    org.apache.thrift.protocol.TList _list494 = iprot.readListBegin();
+                    _key491 = new ArrayList<Long>(_list494.size);
+                    long _elem495;
+                    for (int _i496 = 0; _i496 < _list494.size; ++_i496)
                     {
-                      _elem379 = iprot.readI64();
-                      _key375.add(_elem379);
+                      _elem495 = iprot.readI64();
+                      _key491.add(_elem495);
                     }
                     iprot.readListEnd();
                   }
-                  _val376 = new NodeInfo();
-                  _val376.read(iprot);
-                  struct.executor_node_port.put(_key375, _val376);
+                  _val492 = new NodeInfo();
+                  _val492.read(iprot);
+                  struct.executor_node_port.put(_key491, _val492);
                 }
                 iprot.readMapEnd();
               }
@@ -729,25 +729,25 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           case 4: // EXECUTOR_START_TIME_SECS
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map381 = iprot.readMapBegin();
-                struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map381.size);
-                List<Long> _key382;
-                long _val383;
-                for (int _i384 = 0; _i384 < _map381.size; ++_i384)
+                org.apache.thrift.protocol.TMap _map497 = iprot.readMapBegin();
+                struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map497.size);
+                List<Long> _key498;
+                long _val499;
+                for (int _i500 = 0; _i500 < _map497.size; ++_i500)
                 {
                   {
-                    org.apache.thrift.protocol.TList _list385 = iprot.readListBegin();
-                    _key382 = new ArrayList<Long>(_list385.size);
-                    long _elem386;
-                    for (int _i387 = 0; _i387 < _list385.size; ++_i387)
+                    org.apache.thrift.protocol.TList _list501 = iprot.readListBegin();
+                    _key498 = new ArrayList<Long>(_list501.size);
+                    long _elem502;
+                    for (int _i503 = 0; _i503 < _list501.size; ++_i503)
                     {
-                      _elem386 = iprot.readI64();
-                      _key382.add(_elem386);
+                      _elem502 = iprot.readI64();
+                      _key498.add(_elem502);
                     }
                     iprot.readListEnd();
                   }
-                  _val383 = iprot.readI64();
-                  struct.executor_start_time_secs.put(_key382, _val383);
+                  _val499 = iprot.readI64();
+                  struct.executor_start_time_secs.put(_key498, _val499);
                 }
                 iprot.readMapEnd();
               }
@@ -779,10 +779,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           oprot.writeFieldBegin(NODE_HOST_FIELD_DESC);
           {
             oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.node_host.size()));
-            for (Map.Entry<String, String> _iter388 : struct.node_host.entrySet())
+            for (Map.Entry<String, String> _iter504 : struct.node_host.entrySet())
             {
-              oprot.writeString(_iter388.getKey());
-              oprot.writeString(_iter388.getValue());
+              oprot.writeString(_iter504.getKey());
+              oprot.writeString(_iter504.getValue());
             }
             oprot.writeMapEnd();
           }
@@ -794,17 +794,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           oprot.writeFieldBegin(EXECUTOR_NODE_PORT_FIELD_DESC);
           {
             oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, struct.executor_node_port.size()));
-            for (Map.Entry<List<Long>, NodeInfo> _iter389 : struct.executor_node_port.entrySet())
+            for (Map.Entry<List<Long>, NodeInfo> _iter505 : struct.executor_node_port.entrySet())
             {
               {
-                oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter389.getKey().size()));
-                for (long _iter390 : _iter389.getKey())
+                oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter505.getKey().size()));
+                for (long _iter506 : _iter505.getKey())
                 {
-                  oprot.writeI64(_iter390);
+                  oprot.writeI64(_iter506);
                 }
                 oprot.writeListEnd();
               }
-              _iter389.getValue().write(oprot);
+              _iter505.getValue().write(oprot);
             }
             oprot.writeMapEnd();
           }
@@ -816,17 +816,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           oprot.writeFieldBegin(EXECUTOR_START_TIME_SECS_FIELD_DESC);
           {
             oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, struct.executor_start_time_secs.size()));
-            for (Map.Entry<List<Long>, Long> _iter391 : struct.executor_start_time_secs.entrySet())
+            for (Map.Entry<List<Long>, Long> _iter507 : struct.executor_start_time_secs.entrySet())
             {
               {
-                oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter391.getKey().size()));
-                for (long _iter392 : _iter391.getKey())
+                oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter507.getKey().size()));
+                for (long _iter508 : _iter507.getKey())
                 {
-                  oprot.writeI64(_iter392);
+                  oprot.writeI64(_iter508);
                 }
                 oprot.writeListEnd();
               }
-              oprot.writeI64(_iter391.getValue());
+              oprot.writeI64(_iter507.getValue());
             }
             oprot.writeMapEnd();
           }
@@ -865,42 +865,42 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
       if (struct.is_set_node_host()) {
         {
           oprot.writeI32(struct.node_host.size());
-          for (Map.Entry<String, String> _iter393 : struct.node_host.entrySet())
+          for (Map.Entry<String, String> _iter509 : struct.node_host.entrySet())
           {
-            oprot.writeString(_iter393.getKey());
-            oprot.writeString(_iter393.getValue());
+            oprot.writeString(_iter509.getKey());
+            oprot.writeString(_iter509.getValue());
           }
         }
       }
       if (struct.is_set_executor_node_port()) {
         {
           oprot.writeI32(struct.executor_node_port.size());
-          for (Map.Entry<List<Long>, NodeInfo> _iter394 : struct.executor_node_port.entrySet())
+          for (Map.Entry<List<Long>, NodeInfo> _iter510 : struct.executor_node_port.entrySet())
           {
             {
-              oprot.writeI32(_iter394.getKey().size());
-              for (long _iter395 : _iter394.getKey())
+              oprot.writeI32(_iter510.getKey().size());
+              for (long _iter511 : _iter510.getKey())
               {
-                oprot.writeI64(_iter395);
+                oprot.writeI64(_iter511);
               }
             }
-            _iter394.getValue().write(oprot);
+            _iter510.getValue().write(oprot);
           }
         }
       }
       if (struct.is_set_executor_start_time_secs()) {
         {
           oprot.writeI32(struct.executor_start_time_secs.size());
-          for (Map.Entry<List<Long>, Long> _iter396 : struct.executor_start_time_secs.entrySet())
+          for (Map.Entry<List<Long>, Long> _iter512 : struct.executor_start_time_secs.entrySet())
           {
             {
-              oprot.writeI32(_iter396.getKey().size());
-              for (long _iter397 : _iter396.getKey())
+              oprot.writeI32(_iter512.getKey().size());
+              for (long _iter513 : _iter512.getKey())
               {
-                oprot.writeI64(_iter397);
+                oprot.writeI64(_iter513);
               }
             }
-            oprot.writeI64(_iter396.getValue());
+            oprot.writeI64(_iter512.getValue());
           }
         }
       }
@@ -914,64 +914,64 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
       BitSet incoming = iprot.readBitSet(3);
       if (incoming.get(0)) {
         {
-          org.apache.thrift.protocol.TMap _map398 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
-          struct.node_host = new HashMap<String,String>(2*_map398.size);
-          String _key399;
-          String _val400;
-          for (int _i401 = 0; _i401 < _map398.size; ++_i401)
+          org.apache.thrift.protocol.TMap _map514 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+          struct.node_host = new HashMap<String,String>(2*_map514.size);
+          String _key515;
+          String _val516;
+          for (int _i517 = 0; _i517 < _map514.size; ++_i517)
           {
-            _key399 = iprot.readString();
-            _val400 = iprot.readString();
-            struct.node_host.put(_key399, _val400);
+            _key515 = iprot.readString();
+            _val516 = iprot.readString();
+            struct.node_host.put(_key515, _val516);
           }
         }
         struct.set_node_host_isSet(true);
       }
       if (incoming.get(1)) {
         {
-          org.apache.thrift.protocol.TMap _map402 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
-          struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map402.size);
-          List<Long> _key403;
-          NodeInfo _val404;
-          for (int _i405 = 0; _i405 < _map402.size; ++_i405)
+          org.apache.thrift.protocol.TMap _map518 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+          struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map518.size);
+          List<Long> _key519;
+          NodeInfo _val520;
+          for (int _i521 = 0; _i521 < _map518.size; ++_i521)
           {
             {
-              org.apache.thrift.protocol.TList _list406 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
-              _key403 = new ArrayList<Long>(_list406.size);
-              long _elem407;
-              for (int _i408 = 0; _i408 < _list406.size; ++_i408)
+              org.apache.thrift.protocol.TList _list522 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
+              _key519 = new ArrayList<Long>(_list522.size);
+              long _elem523;
+              for (int _i524 = 0; _i524 < _list522.size; ++_i524)
               {
-                _elem407 = iprot.readI64();
-                _key403.add(_elem407);
+                _elem523 = iprot.readI64();
+                _key519.add(_elem523);
               }
             }
-            _val404 = new NodeInfo();
-            _val404.read(iprot);
-            struct.executor_node_port.put(_key403, _val404);
+            _val520 = new NodeInfo();
+            _val520.read(iprot);
+            struct.executor_node_port.put(_key519, _val520);
           }
         }
         struct.set_executor_node_port_isSet(true);
       }
       if (incoming.get(2)) {
         {
-          org.apache.thrift.protocol.TMap _map409 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32());
-          struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map409.size);
-          List<Long> _key410;
-          long _val411;
-          for (int _i412 = 0; _i412 < _map409.size; ++_i412)
+          org.apache.thrift.protocol.TMap _map525 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32());
+          struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map525.size);
+          List<Long> _key526;
+          long _val527;
+          for (int _i528 = 0; _i528 < _map525.size; ++_i528)
           {
             {
-              org.apache.thrift.protocol.TList _list413 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
-              _key410 = new ArrayList<Long>(_list413.size);
-              long _elem414;
-              for (int _i415 = 0; _i415 < _list413.size; ++_i415)
+              org.apache.thrift.protocol.TList _list529 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
+              _key526 = new ArrayList<Long>(_list529.size);
+              long _elem530;
+              for (int _i531 = 0; _i531 < _list529.size; ++_i531)
               {
-                _elem414 = iprot.readI64();
-                _key410.add(_elem414);
+                _elem530 = iprot.readI64();
+                _key526.add(_elem530);
               }
             }
-            _val411 = iprot.readI64();
-            struct.executor_start_time_secs.put(_key410, _val411);
+            _val527 = iprot.readI64();
+            struct.executor_start_time_secs.put(_key526, _val527);
           }
         }
         struct.set_executor_start_time_secs_isSet(true);

http://git-wip-us.apache.org/repos/asf/storm/blob/a16b50c9/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java b/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java
new file mode 100644
index 0000000..386ccfd
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java
@@ -0,0 +1,704 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package backtype.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-5-14")
+public class BoltAggregateStats implements org.apache.thrift.TBase<BoltAggregateStats, BoltAggregateStats._Fields>, java.io.Serializable, Cloneable, Comparable<BoltAggregateStats> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("BoltAggregateStats");
+
+  private static final org.apache.thrift.protocol.TField EXECUTE_LATENCY_MS_FIELD_DESC = new org.apache.thrift.protocol.TField("execute_latency_ms", org.apache.thrift.protocol.TType.DOUBLE, (short)513);
+  private static final org.apache.thrift.protocol.TField PROCESS_LATENCY_MS_FIELD_DESC = new org.apache.thrift.protocol.TField("process_latency_ms", org.apache.thrift.protocol.TType.DOUBLE, (short)514);
+  private static final org.apache.thrift.protocol.TField EXECUTED_FIELD_DESC = new org.apache.thrift.protocol.TField("executed", org.apache.thrift.protocol.TType.I64, (short)515);
+  private static final org.apache.thrift.protocol.TField CAPACITY_FIELD_DESC = new org.apache.thrift.protocol.TField("capacity", org.apache.thrift.protocol.TType.DOUBLE, (short)516);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new BoltAggregateStatsStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new BoltAggregateStatsTupleSchemeFactory());
+  }
+
+  private double execute_latency_ms; // optional
+  private double process_latency_ms; // optional
+  private long executed; // optional
+  private double capacity; // optional
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    EXECUTE_LATENCY_MS((short)513, "execute_latency_ms"),
+    PROCESS_LATENCY_MS((short)514, "process_latency_ms"),
+    EXECUTED((short)515, "executed"),
+    CAPACITY((short)516, "capacity");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 513: // EXECUTE_LATENCY_MS
+          return EXECUTE_LATENCY_MS;
+        case 514: // PROCESS_LATENCY_MS
+          return PROCESS_LATENCY_MS;
+        case 515: // EXECUTED
+          return EXECUTED;
+        case 516: // CAPACITY
+          return CAPACITY;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __EXECUTE_LATENCY_MS_ISSET_ID = 0;
+  private static final int __PROCESS_LATENCY_MS_ISSET_ID = 1;
+  private static final int __EXECUTED_ISSET_ID = 2;
+  private static final int __CAPACITY_ISSET_ID = 3;
+  private byte __isset_bitfield = 0;
+  private static final _Fields optionals[] = {_Fields.EXECUTE_LATENCY_MS,_Fields.PROCESS_LATENCY_MS,_Fields.EXECUTED,_Fields.CAPACITY};
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.EXECUTE_LATENCY_MS, new org.apache.thrift.meta_data.FieldMetaData("execute_latency_ms", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
+    tmpMap.put(_Fields.PROCESS_LATENCY_MS, new org.apache.thrift.meta_data.FieldMetaData("process_latency_ms", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
+    tmpMap.put(_Fields.EXECUTED, new org.apache.thrift.meta_data.FieldMetaData("executed", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.CAPACITY, new org.apache.thrift.meta_data.FieldMetaData("capacity", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(BoltAggregateStats.class, metaDataMap);
+  }
+
+  public BoltAggregateStats() {
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public BoltAggregateStats(BoltAggregateStats other) {
+    __isset_bitfield = other.__isset_bitfield;
+    this.execute_latency_ms = other.execute_latency_ms;
+    this.process_latency_ms = other.process_latency_ms;
+    this.executed = other.executed;
+    this.capacity = other.capacity;
+  }
+
+  public BoltAggregateStats deepCopy() {
+    return new BoltAggregateStats(this);
+  }
+
+  @Override
+  public void clear() {
+    set_execute_latency_ms_isSet(false);
+    this.execute_latency_ms = 0.0;
+    set_process_latency_ms_isSet(false);
+    this.process_latency_ms = 0.0;
+    set_executed_isSet(false);
+    this.executed = 0;
+    set_capacity_isSet(false);
+    this.capacity = 0.0;
+  }
+
+  public double get_execute_latency_ms() {
+    return this.execute_latency_ms;
+  }
+
+  public void set_execute_latency_ms(double execute_latency_ms) {
+    this.execute_latency_ms = execute_latency_ms;
+    set_execute_latency_ms_isSet(true);
+  }
+
+  public void unset_execute_latency_ms() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __EXECUTE_LATENCY_MS_ISSET_ID);
+  }
+
+  /** Returns true if field execute_latency_ms is set (has been assigned a value) and false otherwise */
+  public boolean is_set_execute_latency_ms() {
+    return EncodingUtils.testBit(__isset_bitfield, __EXECUTE_LATENCY_MS_ISSET_ID);
+  }
+
+  public void set_execute_latency_ms_isSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __EXECUTE_LATENCY_MS_ISSET_ID, value);
+  }
+
+  public double get_process_latency_ms() {
+    return this.process_latency_ms;
+  }
+
+  public void set_process_latency_ms(double process_latency_ms) {
+    this.process_latency_ms = process_latency_ms;
+    set_process_latency_ms_isSet(true);
+  }
+
+  public void unset_process_latency_ms() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __PROCESS_LATENCY_MS_ISSET_ID);
+  }
+
+  /** Returns true if field process_latency_ms is set (has been assigned a value) and false otherwise */
+  public boolean is_set_process_latency_ms() {
+    return EncodingUtils.testBit(__isset_bitfield, __PROCESS_LATENCY_MS_ISSET_ID);
+  }
+
+  public void set_process_latency_ms_isSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PROCESS_LATENCY_MS_ISSET_ID, value);
+  }
+
+  public long get_executed() {
+    return this.executed;
+  }
+
+  public void set_executed(long executed) {
+    this.executed = executed;
+    set_executed_isSet(true);
+  }
+
+  public void unset_executed() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __EXECUTED_ISSET_ID);
+  }
+
+  /** Returns true if field executed is set (has been assigned a value) and false otherwise */
+  public boolean is_set_executed() {
+    return EncodingUtils.testBit(__isset_bitfield, __EXECUTED_ISSET_ID);
+  }
+
+  public void set_executed_isSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __EXECUTED_ISSET_ID, value);
+  }
+
+  public double get_capacity() {
+    return this.capacity;
+  }
+
+  public void set_capacity(double capacity) {
+    this.capacity = capacity;
+    set_capacity_isSet(true);
+  }
+
+  public void unset_capacity() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __CAPACITY_ISSET_ID);
+  }
+
+  /** Returns true if field capacity is set (has been assigned a value) and false otherwise */
+  public boolean is_set_capacity() {
+    return EncodingUtils.testBit(__isset_bitfield, __CAPACITY_ISSET_ID);
+  }
+
+  public void set_capacity_isSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __CAPACITY_ISSET_ID, value);
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case EXECUTE_LATENCY_MS:
+      if (value == null) {
+        unset_execute_latency_ms();
+      } else {
+        set_execute_latency_ms((Double)value);
+      }
+      break;
+
+    case PROCESS_LATENCY_MS:
+      if (value == null) {
+        unset_process_latency_ms();
+      } else {
+        set_process_latency_ms((Double)value);
+      }
+      break;
+
+    case EXECUTED:
+      if (value == null) {
+        unset_executed();
+      } else {
+        set_executed((Long)value);
+      }
+      break;
+
+    case CAPACITY:
+      if (value == null) {
+        unset_capacity();
+      } else {
+        set_capacity((Double)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case EXECUTE_LATENCY_MS:
+      return Double.valueOf(get_execute_latency_ms());
+
+    case PROCESS_LATENCY_MS:
+      return Double.valueOf(get_process_latency_ms());
+
+    case EXECUTED:
+      return Long.valueOf(get_executed());
+
+    case CAPACITY:
+      return Double.valueOf(get_capacity());
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case EXECUTE_LATENCY_MS:
+      return is_set_execute_latency_ms();
+    case PROCESS_LATENCY_MS:
+      return is_set_process_latency_ms();
+    case EXECUTED:
+      return is_set_executed();
+    case CAPACITY:
+      return is_set_capacity();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof BoltAggregateStats)
+      return this.equals((BoltAggregateStats)that);
+    return false;
+  }
+
+  public boolean equals(BoltAggregateStats that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_execute_latency_ms = true && this.is_set_execute_latency_ms();
+    boolean that_present_execute_latency_ms = true && that.is_set_execute_latency_ms();
+    if (this_present_execute_latency_ms || that_present_execute_latency_ms) {
+      if (!(this_present_execute_latency_ms && that_present_execute_latency_ms))
+        return false;
+      if (this.execute_latency_ms != that.execute_latency_ms)
+        return false;
+    }
+
+    boolean this_present_process_latency_ms = true && this.is_set_process_latency_ms();
+    boolean that_present_process_latency_ms = true && that.is_set_process_latency_ms();
+    if (this_present_process_latency_ms || that_present_process_latency_ms) {
+      if (!(this_present_process_latency_ms && that_present_process_latency_ms))
+        return false;
+      if (this.process_latency_ms != that.process_latency_ms)
+        return false;
+    }
+
+    boolean this_present_executed = true && this.is_set_executed();
+    boolean that_present_executed = true && that.is_set_executed();
+    if (this_present_executed || that_present_executed) {
+      if (!(this_present_executed && that_present_executed))
+        return false;
+      if (this.executed != that.executed)
+        return false;
+    }
+
+    boolean this_present_capacity = true && this.is_set_capacity();
+    boolean that_present_capacity = true && that.is_set_capacity();
+    if (this_present_capacity || that_present_capacity) {
+      if (!(this_present_capacity && that_present_capacity))
+        return false;
+      if (this.capacity != that.capacity)
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    List<Object> list = new ArrayList<Object>();
+
+    boolean present_execute_latency_ms = true && (is_set_execute_latency_ms());
+    list.add(present_execute_latency_ms);
+    if (present_execute_latency_ms)
+      list.add(execute_latency_ms);
+
+    boolean present_process_latency_ms = true && (is_set_process_latency_ms());
+    list.add(present_process_latency_ms);
+    if (present_process_latency_ms)
+      list.add(process_latency_ms);
+
+    boolean present_executed = true && (is_set_executed());
+    list.add(present_executed);
+    if (present_executed)
+      list.add(executed);
+
+    boolean present_capacity = true && (is_set_capacity());
+    list.add(present_capacity);
+    if (present_capacity)
+      list.add(capacity);
+
+    return list.hashCode();
+  }
+
+  @Override
+  public int compareTo(BoltAggregateStats other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = Boolean.valueOf(is_set_execute_latency_ms()).compareTo(other.is_set_execute_latency_ms());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_execute_latency_ms()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.execute_latency_ms, other.execute_latency_ms);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_process_latency_ms()).compareTo(other.is_set_process_latency_ms());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_process_latency_ms()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.process_latency_ms, other.process_latency_ms);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_executed()).compareTo(other.is_set_executed());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_executed()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.executed, other.executed);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_capacity()).compareTo(other.is_set_capacity());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_capacity()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.capacity, other.capacity);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("BoltAggregateStats(");
+    boolean first = true;
+
+    if (is_set_execute_latency_ms()) {
+      sb.append("execute_latency_ms:");
+      sb.append(this.execute_latency_ms);
+      first = false;
+    }
+    if (is_set_process_latency_ms()) {
+      if (!first) sb.append(", ");
+      sb.append("process_latency_ms:");
+      sb.append(this.process_latency_ms);
+      first = false;
+    }
+    if (is_set_executed()) {
+      if (!first) sb.append(", ");
+      sb.append("executed:");
+      sb.append(this.executed);
+      first = false;
+    }
+    if (is_set_capacity()) {
+      if (!first) sb.append(", ");
+      sb.append("capacity:");
+      sb.append(this.capacity);
+      first = false;
+    }
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class BoltAggregateStatsStandardSchemeFactory implements SchemeFactory {
+    public BoltAggregateStatsStandardScheme getScheme() {
+      return new BoltAggregateStatsStandardScheme();
+    }
+  }
+
+  private static class BoltAggregateStatsStandardScheme extends StandardScheme<BoltAggregateStats> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, BoltAggregateStats struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 513: // EXECUTE_LATENCY_MS
+            if (schemeField.type == org.apache.thrift.protocol.TType.DOUBLE) {
+              struct.execute_latency_ms = iprot.readDouble();
+              struct.set_execute_latency_ms_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 514: // PROCESS_LATENCY_MS
+            if (schemeField.type == org.apache.thrift.protocol.TType.DOUBLE) {
+              struct.process_latency_ms = iprot.readDouble();
+              struct.set_process_latency_ms_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 515: // EXECUTED
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.executed = iprot.readI64();
+              struct.set_executed_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 516: // CAPACITY
+            if (schemeField.type == org.apache.thrift.protocol.TType.DOUBLE) {
+              struct.capacity = iprot.readDouble();
+              struct.set_capacity_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, BoltAggregateStats struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.is_set_execute_latency_ms()) {
+        oprot.writeFieldBegin(EXECUTE_LATENCY_MS_FIELD_DESC);
+        oprot.writeDouble(struct.execute_latency_ms);
+        oprot.writeFieldEnd();
+      }
+      if (struct.is_set_process_latency_ms()) {
+        oprot.writeFieldBegin(PROCESS_LATENCY_MS_FIELD_DESC);
+        oprot.writeDouble(struct.process_latency_ms);
+        oprot.writeFieldEnd();
+      }
+      if (struct.is_set_executed()) {
+        oprot.writeFieldBegin(EXECUTED_FIELD_DESC);
+        oprot.writeI64(struct.executed);
+        oprot.writeFieldEnd();
+      }
+      if (struct.is_set_capacity()) {
+        oprot.writeFieldBegin(CAPACITY_FIELD_DESC);
+        oprot.writeDouble(struct.capacity);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class BoltAggregateStatsTupleSchemeFactory implements SchemeFactory {
+    public BoltAggregateStatsTupleScheme getScheme() {
+      return new BoltAggregateStatsTupleScheme();
+    }
+  }
+
+  private static class BoltAggregateStatsTupleScheme extends TupleScheme<BoltAggregateStats> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, BoltAggregateStats struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      BitSet optionals = new BitSet();
+      if (struct.is_set_execute_latency_ms()) {
+        optionals.set(0);
+      }
+      if (struct.is_set_process_latency_ms()) {
+        optionals.set(1);
+      }
+      if (struct.is_set_executed()) {
+        optionals.set(2);
+      }
+      if (struct.is_set_capacity()) {
+        optionals.set(3);
+      }
+      oprot.writeBitSet(optionals, 4);
+      if (struct.is_set_execute_latency_ms()) {
+        oprot.writeDouble(struct.execute_latency_ms);
+      }
+      if (struct.is_set_process_latency_ms()) {
+        oprot.writeDouble(struct.process_latency_ms);
+      }
+      if (struct.is_set_executed()) {
+        oprot.writeI64(struct.executed);
+      }
+      if (struct.is_set_capacity()) {
+        oprot.writeDouble(struct.capacity);
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, BoltAggregateStats struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      BitSet incoming = iprot.readBitSet(4);
+      if (incoming.get(0)) {
+        struct.execute_latency_ms = iprot.readDouble();
+        struct.set_execute_latency_ms_isSet(true);
+      }
+      if (incoming.get(1)) {
+        struct.process_latency_ms = iprot.readDouble();
+        struct.set_process_latency_ms_isSet(true);
+      }
+      if (incoming.get(2)) {
+        struct.executed = iprot.readI64();
+        struct.set_executed_isSet(true);
+      }
+      if (incoming.get(3)) {
+        struct.capacity = iprot.readDouble();
+        struct.set_capacity_isSet(true);
+      }
+    }
+  }
+
+}
+