You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/22 08:34:46 UTC

[7/9] storm git commit: STORM-1994: Add table with per-topology and worker resource usage and components in (new) supervisor and topology pages

STORM-1994: Add table with per-topology and worker resource usage and components in (new) supervisor and topology pages


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0e0bcf27
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0e0bcf27
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0e0bcf27

Branch: refs/heads/1.x-branch
Commit: 0e0bcf27f4b7787cc3e6886ccbcd5dc55daef771
Parents: ce38849
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Wed Jul 6 14:23:18 2016 -0500
Committer: Alessandro Bellina <ab...@yahoo-inc.com>
Committed: Sun Aug 21 22:31:08 2016 -0500

----------------------------------------------------------------------
 docs/STORM-UI-REST-API.md                       |  121 +-
 docs/images/supervisor_page.png                 |  Bin 0 -> 133290 bytes
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  296 +-
 storm-core/src/clj/org/apache/storm/stats.clj   |   68 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |   94 +-
 .../org/apache/storm/generated/Assignment.java  |  244 +-
 .../storm/generated/ClusterWorkerHeartbeat.java |   52 +-
 .../storm/generated/ComponentPageInfo.java      |  220 +-
 .../org/apache/storm/generated/Credentials.java |   44 +-
 .../jvm/org/apache/storm/generated/HBNodes.java |   32 +-
 .../org/apache/storm/generated/HBRecords.java   |   36 +-
 .../storm/generated/LSApprovedWorkers.java      |   44 +-
 .../generated/LSSupervisorAssignments.java      |   48 +-
 .../apache/storm/generated/LSTopoHistory.java   |   64 +-
 .../storm/generated/LSTopoHistoryList.java      |   36 +-
 .../storm/generated/LSWorkerHeartbeat.java      |   36 +-
 .../apache/storm/generated/ListBlobsResult.java |   32 +-
 .../apache/storm/generated/LocalAssignment.java |   36 +-
 .../apache/storm/generated/LocalStateData.java  |   48 +-
 .../org/apache/storm/generated/LogConfig.java   |   48 +-
 .../jvm/org/apache/storm/generated/Nimbus.java  | 3486 ++++++++++++------
 .../org/apache/storm/generated/NodeInfo.java    |   32 +-
 .../storm/generated/RebalanceOptions.java       |   44 +-
 .../storm/generated/SettableBlobMeta.java       |   36 +-
 .../org/apache/storm/generated/StormBase.java   |   92 +-
 .../apache/storm/generated/SupervisorInfo.java  |  152 +-
 .../storm/generated/SupervisorPageInfo.java     |  624 ++++
 .../storm/generated/TopologyHistoryInfo.java    |   32 +-
 .../storm/generated/TopologyPageInfo.java       |  284 +-
 .../apache/storm/generated/WorkerSummary.java   | 1880 ++++++++++
 .../jvm/org/apache/storm/scheduler/Cluster.java |  217 +-
 .../resource/ResourceAwareScheduler.java        |    9 +
 .../auth/authorizer/SimpleACLAuthorizer.java    |    7 +-
 storm-core/src/py/storm/Nimbus-remote           |    7 +
 storm-core/src/py/storm/Nimbus.py               |  272 +-
 storm-core/src/py/storm/ttypes.py               | 1457 ++++++--
 storm-core/src/storm.thrift                     |   25 +
 storm-core/src/ui/public/component.html         |    8 +
 storm-core/src/ui/public/css/style.css          |   20 +
 storm-core/src/ui/public/js/script.js           |  191 +
 storm-core/src/ui/public/supervisor.html        |  132 +
 .../public/templates/index-page-template.html   |    4 +-
 .../templates/supervisor-page-template.html     |  145 +
 .../templates/topology-page-template.html       |  208 +-
 storm-core/src/ui/public/topology.html          |   12 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |   72 +-
 .../test/clj/org/apache/storm/stats_test.clj    |  134 +
 47 files changed, 8820 insertions(+), 2361 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/docs/STORM-UI-REST-API.md
----------------------------------------------------------------------
diff --git a/docs/STORM-UI-REST-API.md b/docs/STORM-UI-REST-API.md
index 884c6d5..340137d 100644
--- a/docs/STORM-UI-REST-API.md
+++ b/docs/STORM-UI-REST-API.md
@@ -125,6 +125,7 @@ Response fields:
 |uptimeSeconds| Integer| Shows how long the supervisor is running in seconds|
 |slotsTotal| Integer| Total number of available worker slots for this supervisor|
 |slotsUsed| Integer| Number of worker slots used on this supervisor|
+|schedulerDisplayResource| Boolean | Whether to display scheduler resource information|
 |totalMem| Double| Total memory capacity on this supervisor|
 |totalCpu| Double| Total CPU capacity on this supervisor|
 |usedMem| Double| Used memory capacity on this supervisor|
@@ -207,6 +208,123 @@ Sample response:
 }
 ```
 
+### /api/v1/supervisor (GET)
+
+Returns summary for a supervisor by id, or all supervisors running on a host.
+
+Examples:
+
+```no-highlight
+ 1. By host: http://ui-daemon-host-name:8080/api/v1/supervisor?host=supervisor-daemon-host-name
+ 2. By id: http://ui-daemon-host-name:8080/api/v1/supervisor?id=f5449110-1daa-43e2-89e3-69917b16dec9-192.168.1.1
+```
+
+Request parameters:
+
+|Parameter |Value   |Description  |
+|----------|--------|-------------|
+|id   	   |String. Supervisor id | If specified, respond with the supervisor and worker stats with id. Note that when id is specified, the host argument is ignored. |
+|host      |String. Host name| If specified, respond with all supervisors and worker stats in the host (normally just one)|
+|sys       |String. Values 1 or 0. Default value 0| Controls including sys stats part of the response|
+
+Response fields:
+
+|Field  |Value|Description|
+|---	|---	|---
+|supervisors| Array| Array of supervisor summaries|
+|workers| Array| Array of worker summaries |
+|schedulerDisplayResource| Boolean | Whether to display scheduler resource information|
+
+Each supervisor is defined by:
+
+|Field  |Value|Description|
+|---	|---	|---
+|id| String | Supervisor's id|
+|host| String| Supervisor's host name|
+|uptime| String| Shows how long the supervisor is running|
+|uptimeSeconds| Integer| Shows how long the supervisor is running in seconds|
+|slotsTotal| Integer| Total number of worker slots for this supervisor|
+|slotsUsed| Integer| Number of worker slots used on this supervisor|
+|totalMem| Double| Total memory capacity on this supervisor|
+|totalCpu| Double| Total CPU capacity on this supervisor|
+|usedMem| Double| Used memory capacity on this supervisor|
+|usedCpu| Double| Used CPU capacity on this supervisor|
+
+Each worker is defined by:
+
+|Field  |Value  |Description|
+|-------|-------|-----------|
+|supervisorId | String| Supervisor's id|
+|host | String | Worker's host name|
+|port | Integer | Worker's port|
+|topologyId | String | Topology Id|
+|topologyName | String | Topology Name|
+|executorsTotal | Integer | Number of executors used by the topology in this worker|
+|assignedMemOnHeap | Double | Assigned On-Heap Memory by Scheduler (MB)|
+|assignedMemOffHeap | Double | Assigned Off-Heap Memory by Scheduler (MB)|
+|assignedCpu | Number | Assigned CPU by Scheduler (%)| 
+|componentNumTasks | Dictionary | Components -> # of executing tasks|
+|uptime| String| Shows how long the worker is running|
+|uptimeSeconds| Integer| Shows how long the worker is running in seconds|
+|workerLogLink | String | Link to worker log viewer page|
+
+Sample response:
+
+```json
+{
+    "supervisors": [{ 
+        "totalMem": 4096.0, 
+        "host":"192.168.10.237",
+        "id":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212",
+        "uptime":"7m 8s",
+        "totalCpu":400.0,
+        "usedCpu":495.0,
+        "usedMem":3432.0,
+        "slotsUsed":2,
+        "version":"0.10.1",
+        "slotsTotal":4,
+        "uptimeSeconds":428
+    }],
+    "schedulerDisplayResource":true,
+    "workers":[{
+        "topologyName":"ras",
+        "topologyId":"ras-4-1460229987",
+        "host":"192.168.10.237",
+        "supervisorId":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212",
+        "assignedMemOnHeap":704.0,
+        "uptime":"2m 47s",
+        "uptimeSeconds":167,
+        "port":6707,
+        "workerLogLink":"http:\/\/192.168.10.237:8000\/log?file=ras-4-1460229987%2F6707%2Fworker.log",
+        "componentNumTasks": {
+            "word":5
+        },
+        "executorsTotal":8,
+        "assignedCpu":130.0,
+        "assignedMemOffHeap":80.0
+    },
+    {
+        "topologyName":"ras",
+        "topologyId":"ras-4-1460229987",
+        "host":"192.168.10.237",
+        "supervisorId":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212",
+        "assignedMemOnHeap":904.0,
+        "uptime":"2m 53s",
+        "port":6706,
+        "workerLogLink":"http:\/\/192.168.10.237:8000\/log?file=ras-4-1460229987%2F6706%2Fworker.log",
+        "componentNumTasks":{
+            "exclaim2":2,
+            "exclaim1":3,
+            "word":5
+        },
+        "executorsTotal":10,
+        "uptimeSeconds":173,
+        "assignedCpu":165.0,
+        "assignedMemOffHeap":80.0
+    }]
+}
+```
+
 ### /api/v1/topology/summary (GET)
 
 Returns summary information for all topologies.
@@ -232,6 +350,7 @@ Response fields:
 |assignedMemOffHeap| Double|Assigned Off-Heap Memory by Scheduler (MB)|
 |assignedTotalMem| Double|Assigned Total Memory by Scheduler (MB)|
 |assignedCpu| Double|Assigned CPU by Scheduler (%)|
+|schedulerDisplayResource| Boolean | Whether to display scheduler resource information|
 
 Sample response:
 
@@ -257,7 +376,7 @@ Sample response:
             "assignedTotalMem": 768,
             "assignedCpu": 80
         }
-    ]
+    ],
     "schedulerDisplayResource": true
 }
 ```

http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/docs/images/supervisor_page.png
----------------------------------------------------------------------
diff --git a/docs/images/supervisor_page.png b/docs/images/supervisor_page.png
new file mode 100644
index 0000000..5133681
Binary files /dev/null and b/docs/images/supervisor_page.png differ

http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 29d9f28..c17e2fd 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -45,7 +45,7 @@
             KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo TopologyHistoryInfo
             ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta
             BeginDownloadResult ListBlobsResult ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction
-            ProfileRequest ProfileAction NodeInfo])
+            ProfileRequest ProfileAction NodeInfo SupervisorPageInfo WorkerSummary WorkerResources])
   (:import [org.apache.storm.daemon Shutdownable])
   (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
   (:use [org.apache.storm util config log timer zookeeper local-state])
@@ -95,6 +95,7 @@
 (defmeter nimbus:num-getTopologyInfoWithOpts-calls)
 (defmeter nimbus:num-getTopologyInfo-calls)
 (defmeter nimbus:num-getTopologyPageInfo-calls)
+(defmeter nimbus:num-getSupervisorPageInfo-calls)
 (defmeter nimbus:num-getComponentPageInfo-calls)
 (defmeter nimbus:num-shutdown-calls)
 
@@ -210,6 +211,7 @@
      :id->sched-status (atom {})
      :node-id->resources (atom {}) ;;resources of supervisors
      :id->resources (atom {}) ;;resources of topologies
+     :id->worker-resources (atom {}) ; resources of workers per topology
      :cred-renewers (AuthUtils/GetCredentialRenewers conf)
      :topology-history-lock (Object.)
      :topo-history-state (nimbus-topo-history-state conf)
@@ -428,7 +430,8 @@
       {})
     ))
 
-(defn- all-supervisor-info
+;; public for testing
+(defn all-supervisor-info
   ([storm-cluster-state] (all-supervisor-info storm-cluster-state nil))
   ([storm-cluster-state callback]
      (let [supervisor-ids (.supervisors storm-cluster-state callback)]
@@ -738,8 +741,7 @@
                                                     all-ports (-> (get all-scheduling-slots sid)
                                                                   (set/difference dead-ports)
                                                                   ((fn [ports] (map int ports))))
-                                                    supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports (:resources-map supervisor-info))
-                                                    ]]
+                                                    supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports (:resources-map supervisor-info))]]
                                           {sid supervisor-details}))]
     (merge all-supervisor-details
            (into {}
@@ -818,6 +820,9 @@
 
     new-topology->executor->node+port))
 
+(defrecord TopologyResources [requested-mem-on-heap requested-mem-off-heap requested-cpu
+                              assigned-mem-on-heap assigned-mem-off-heap assigned-cpu])
+ 
 ;; public so it can be mocked out
 (defn compute-new-scheduler-assignments [nimbus existing-assignments topologies scratch-topology-id]
   (let [conf (:conf nimbus)
@@ -855,19 +860,77 @@
                                   (apply merge-with set/union))
 
         supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
-        cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment conf)
-        _ (.setStatusMap cluster (deref (:id->sched-status nimbus)))
-        ;; call scheduler.schedule to schedule all the topologies
-        ;; the new assignments for all the topologies are in the cluster object.
-        _ (.schedule (:scheduler nimbus) topologies cluster)
-        _ (.setTopologyResourcesMap cluster @(:id->resources nimbus))
-        _ (if-not (conf SCHEDULER-DISPLAY-RESOURCE) (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies))
-        ;;merge with existing statuses
-        _ (reset! (:id->sched-status nimbus) (merge (deref (:id->sched-status nimbus)) (.getStatusMap cluster)))
-        _ (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster))
-        _ (reset! (:id->resources nimbus) (.getTopologyResourcesMap cluster))]
+        cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment conf)]
+
+    ;; set the status map with existing topology statuses
+    (.setStatusMap cluster (deref (:id->sched-status nimbus)))
+    ;; call scheduler.schedule to schedule all the topologies
+    ;; the new assignments for all the topologies are in the cluster object.
+    (.schedule (:scheduler nimbus) topologies cluster)
+
+    ;;merge with existing statuses
+    (reset! (:id->sched-status nimbus) (merge (deref (:id->sched-status nimbus)) (.getStatusMap cluster)))
+    (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster))
+
+    (if-not (conf SCHEDULER-DISPLAY-RESOURCE) 
+      (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies))
+
+    ; Remove both of swaps below at first opportunity. This is a hack for non-ras scheduler topology and worker resources.
+    (swap! (:id->resources nimbus) merge (into {} (map (fn [[k v]] [k (->TopologyResources (nth v 0) (nth v 1) (nth v 2)
+                                                                                           (nth v 3) (nth v 4) (nth v 5))])
+                                                       (.getTopologyResourcesMap cluster))))
+    ; Remove this also at first chance
+    (swap! (:id->worker-resources nimbus) merge 
+           (into {} (map (fn [[k v]] [k (map-val #(doto (WorkerResources.)
+                                                        (.set_mem_on_heap (nth % 0))
+                                                        (.set_mem_off_heap (nth % 1))
+                                                        (.set_cpu (nth % 2))) v)])
+                         (.getWorkerResourcesMap cluster))))
+
     (.getAssignments cluster)))
 
+(defn get-resources-for-topology [nimbus topo-id]
+  (or (get @(:id->resources nimbus) topo-id)
+      (try
+        (let [storm-cluster-state (:storm-cluster-state nimbus)
+              topology-details (read-topology-details nimbus topo-id)
+              assigned-resources (->> (.assignment-info storm-cluster-state topo-id nil)
+                                      :worker->resources
+                                      (vals)
+                                        ; Default to [[0 0 0]] if there are no values
+                                      (#(or % [[0 0 0]]))
+                                        ; [[on-heap, off-heap, cpu]] -> [[on-heap], [off-heap], [cpu]]
+                                      (apply map vector)
+                                        ; [[on-heap], [off-heap], [cpu]] -> [on-heap-sum, off-heap-sum, cpu-sum]
+                                      (map (partial reduce +)))
+              worker-resources (->TopologyResources (.getTotalRequestedMemOnHeap topology-details)
+                                                    (.getTotalRequestedMemOffHeap topology-details)
+                                                    (.getTotalRequestedCpu topology-details)
+                                                    (nth assigned-resources 0)
+                                                    (nth assigned-resources 1)
+                                                    (nth assigned-resources 2))]
+          (swap! (:id->resources nimbus) assoc topo-id worker-resources)
+          worker-resources)
+        (catch KeyNotFoundException e
+          ; This can happen when a topology is first coming up.
+          ; It's thrown by the blobstore code.
+          (log-error e "Failed to get topology details")
+          (->TopologyResources 0 0 0 0 0 0)))))
+
+(defn- get-worker-resources-for-topology [nimbus topo-id]
+  (or (get @(:id->worker-resources nimbus) topo-id)
+      (try
+        (let [storm-cluster-state (:storm-cluster-state nimbus)
+              assigned-resources (->> (.assignment-info storm-cluster-state topo-id nil)
+                                      :worker->resources)
+              worker-resources (into {} (map #(identity {(WorkerSlot. (first (key %)) (second (key %)))  
+                                                         (doto (WorkerResources.)
+                                                             (.set_mem_on_heap (nth (val %) 0))
+                                                             (.set_mem_off_heap (nth (val %) 1))
+                                                             (.set_cpu (nth (val %) 2)))}) assigned-resources))]
+          (swap! (:id->worker-resources nimbus) assoc topo-id worker-resources)
+          worker-resources))))
+          
 (defn changed-executors [executor->node+port new-executor->node+port]
   (let [executor->node+port (if executor->node+port (sort executor->node+port) nil)
         new-executor->node+port (if new-executor->node+port (sort new-executor->node+port) nil)
@@ -959,6 +1022,10 @@
                                                  start-times
                                                  worker->resources)}))]
 
+    (when (not= new-assignments existing-assignments)
+      (log-debug "RESETTING id->resources and id->worker-resources cache!")
+      (reset! (:id->resources nimbus) {})
+      (reset! (:id->worker-resources nimbus) {}))
     ;; tasks figure out what tasks to talk to by looking at topology at runtime
     ;; only log/set when there's been a change to the assignment
     (doseq [[topology-id assignment] new-assignments
@@ -1026,6 +1093,18 @@
       (throw (AlreadyAliveException. (str storm-name " is already active"))))
     ))
 
+(defn try-read-storm-conf [conf storm-id blob-store]
+  (try-cause
+    (read-storm-conf-as-nimbus conf storm-id blob-store)
+    (catch KeyNotFoundException e
+       (throw (NotAliveException. (str storm-id))))))
+
+(defn try-read-storm-conf-from-name [conf storm-name nimbus]
+  (let [storm-cluster-state (:storm-cluster-state nimbus)
+        blob-store (:blob-store nimbus)
+        id (get-storm-id storm-cluster-state storm-name)]
+   (try-read-storm-conf conf id blob-store)))
+
 (defn check-authorization!
   ([nimbus storm-name storm-conf operation context]
      (let [aclHandler (:authorization-handler nimbus)
@@ -1051,6 +1130,15 @@
   ([nimbus storm-name storm-conf operation]
      (check-authorization! nimbus storm-name storm-conf operation (ReqContext/context))))
 
+;; no-throw version of check-authorization!
+(defn is-authorized?
+  [nimbus conf blob-store operation topology-id]
+  (let [topology-conf (try-read-storm-conf conf topology-id blob-store)
+        storm-name (topology-conf TOPOLOGY-NAME)]
+    (try (check-authorization! nimbus storm-name topology-conf operation)
+         true
+      (catch AuthorizationException e false))))
+
 (defn code-ids [blob-store]
   (let [to-id (reify KeyFilter
                 (filter [this key] (get-id-from-blob-key key)))]
@@ -1355,24 +1443,55 @@
 (defmethod blob-sync :local [conf nimbus]
   nil)
 
+(defn make-supervisor-summary 
+  [nimbus id info]
+    (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
+          sup-sum (SupervisorSummary. (:hostname info)
+                                      (:uptime-secs info)
+                                      (count ports)
+                                      (count (:used-ports info))
+                                      id)]
+      (.set_total_resources sup-sum (map-val double (:resources-map info)))
+      (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
+        (.set_used_mem sup-sum (or used-mem 0))
+        (.set_used_cpu sup-sum (or used-cpu 0)))
+      (when-let [version (:version info)] (.set_version sup-sum version))
+      sup-sum))
+
+(defn user-and-supervisor-topos
+  [nimbus conf blob-store assignments supervisor-id]
+  (let [topo-id->supervisors 
+          (into {} (for [[topo-id assignment] assignments] 
+                     {topo-id (into #{} 
+                                    (map #(first (second %)) 
+                                         (:executor->node+port assignment)))}))
+        supervisor-topologies (keys (filter #(get (val %) supervisor-id) topo-id->supervisors))]
+    {:supervisor-topologies supervisor-topologies
+     :user-topologies (into #{} (filter (partial is-authorized? nimbus 
+                                                 conf 
+                                                 blob-store 
+                                                 "getTopology") 
+                  supervisor-topologies))}))
+
+(defn topology-assignments 
+  [storm-cluster-state]
+  (let [assigned-topology-ids (.assignments storm-cluster-state nil)]
+    (into {} (for [tid assigned-topology-ids]
+               {tid (.assignment-info storm-cluster-state tid nil)}))))
+
+(defn get-launch-time-secs 
+  [base storm-id]
+  (if base (:launch-time-secs base)
+    (throw
+      (NotAliveException. (str storm-id)))))
+
 (defn get-cluster-info [nimbus]
   (let [storm-cluster-state (:storm-cluster-state nimbus)
         supervisor-infos (all-supervisor-info storm-cluster-state)
         ;; TODO: need to get the port info about supervisors...
         ;; in standalone just look at metadata, otherwise just say N/A?
         supervisor-summaries (dofor [[id info] supervisor-infos]
-                                    (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
-                                          sup-sum (SupervisorSummary. (:hostname info)
-                                                                      (:uptime-secs info)
-                                                                      (count ports)
-                                                                      (count (:used-ports info))
-                                                                      id) ]
-                                      (.set_total_resources sup-sum (map-val double (:resources-map info)))
-                                      (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
-                                        (.set_used_mem sup-sum used-mem)
-                                        (.set_used_cpu sup-sum used-cpu))
-                                      (when-let [version (:version info)] (.set_version sup-sum version))
-                                      sup-sum))
+                                    (make-supervisor-summary nimbus id info))
         nimbus-uptime ((:uptime nimbus))
         bases (topology-bases storm-cluster-state)
         nimbuses (.nimbuses storm-cluster-state)
@@ -1404,13 +1523,13 @@
                                                                     (extract-status-str base))]
                                     (when-let [owner (:owner base)] (.set_owner topo-summ owner))
                                     (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
-                                    (when-let [resources (.get @(:id->resources nimbus) id)]
-                                      (.set_requested_memonheap topo-summ (get resources 0))
-                                      (.set_requested_memoffheap topo-summ (get resources 1))
-                                      (.set_requested_cpu topo-summ (get resources 2))
-                                      (.set_assigned_memonheap topo-summ (get resources 3))
-                                      (.set_assigned_memoffheap topo-summ (get resources 4))
-                                      (.set_assigned_cpu topo-summ (get resources 5)))
+                                    (when-let [resources (get-resources-for-topology nimbus id)]
+                                      (.set_requested_memonheap topo-summ (:requested-mem-on-heap resources))
+                                      (.set_requested_memoffheap topo-summ (:requested-mem-off-heap resources))
+                                      (.set_requested_cpu topo-summ (:requested-cpu resources))
+                                      (.set_assigned_memonheap topo-summ (:assigned-mem-on-heap resources))
+                                      (.set_assigned_memoffheap topo-summ (:assigned-mem-off-heap resources))
+                                      (.set_assigned_cpu topo-summ (:assigned-cpu resources)))
                                     (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus))
                                     topo-summ))
         ret (ClusterSummary. supervisor-summaries
@@ -1469,9 +1588,7 @@
                   topology (try-read-storm-topology storm-id blob-store)
                   task->component (storm-task-info topology topology-conf)
                   base (.storm-base storm-cluster-state storm-id nil)
-                  launch-time-secs (if base (:launch-time-secs base)
-                                     (throw
-                                       (NotAliveException. (str storm-id))))
+                  launch-time-secs (get-launch-time-secs base storm-id)
                   assignment (.assignment-info storm-cluster-state storm-id nil)
                   beats (map-val :heartbeat (get @(:heartbeats-cache nimbus)
                                                  storm-id))
@@ -1874,13 +1991,13 @@
                                       )]
         (when-let [owner (:owner base)] (.set_owner topo-info owner))
         (when-let [sched-status (.get @(:id->sched-status nimbus) storm-id)] (.set_sched_status topo-info sched-status))
-        (when-let [resources (.get @(:id->resources nimbus) storm-id)]
-          (.set_requested_memonheap topo-info (get resources 0))
-          (.set_requested_memoffheap topo-info (get resources 1))
-          (.set_requested_cpu topo-info (get resources 2))
-          (.set_assigned_memonheap topo-info (get resources 3))
-          (.set_assigned_memoffheap topo-info (get resources 4))
-          (.set_assigned_cpu topo-info (get resources 5)))
+        (when-let [resources (get-resources-for-topology nimbus storm-id)]
+          (.set_requested_memonheap topo-info (:requested-mem-on-heap resources))
+          (.set_requested_memoffheap topo-info (:requested-mem-off-heap resources))
+          (.set_requested_cpu topo-info (:requested-cpu resources))
+          (.set_assigned_memonheap topo-info (:assigned-mem-on-heap resources))
+          (.set_assigned_memoffheap topo-info (:assigned-mem-off-heap resources))
+          (.set_assigned_cpu topo-info (:assigned-cpu resources)))
         (when-let [component->debug (:component->debug base)]
           (.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug)))
         (.set_replication_count topo-info (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
@@ -2046,45 +2163,98 @@
     (^TopologyPageInfo getTopologyPageInfo
       [this ^String topo-id ^String window ^boolean include-sys?]
       (mark! nimbus:num-getTopologyPageInfo-calls)
-      (let [info (get-common-topo-info topo-id "getTopologyPageInfo")
-
-            exec->node+port (:executor->node+port (:assignment info))
+      (let [topo-info (get-common-topo-info topo-id "getTopologyPageInfo")
+            {:keys [storm-name
+                    storm-cluster-state
+                    launch-time-secs
+                    assignment
+                    beats
+                    task->component
+                    topology
+                    base]} topo-info
+            exec->node+port (:executor->node+port assignment)
+            node->host (:node->host assignment)
+            worker->resources (get-worker-resources-for-topology nimbus topo-id)
+            worker-summaries (stats/agg-worker-stats topo-id 
+                                                     topo-info
+                                                     worker->resources
+                                                     include-sys?
+                                                     true)  ;; this is the topology page, so we know the user is authorized 
+
+            exec->node+port (:executor->node+port assignment)
             last-err-fn (partial get-last-error
-                                 (:storm-cluster-state info)
+                                 storm-cluster-state
                                  topo-id)
             topo-page-info (stats/agg-topo-execs-stats topo-id
                                                        exec->node+port
-                                                       (:task->component info)
-                                                       (:beats info)
-                                                       (:topology info)
+                                                       task->component
+                                                       beats
+                                                       topology
                                                        window
                                                        include-sys?
                                                        last-err-fn)]
-        (when-let [owner (:owner (:base info))]
+        (.set_workers topo-page-info worker-summaries)
+        (when-let [owner (:owner base)]
           (.set_owner topo-page-info owner))
         (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)]
           (.set_sched_status topo-page-info sched-status))
-        (when-let [resources (.get @(:id->resources nimbus) topo-id)]
-          (.set_requested_memonheap topo-page-info (get resources 0))
-          (.set_requested_memoffheap topo-page-info (get resources 1))
-          (.set_requested_cpu topo-page-info (get resources 2))
-          (.set_assigned_memonheap topo-page-info (get resources 3))
-          (.set_assigned_memoffheap topo-page-info (get resources 4))
-          (.set_assigned_cpu topo-page-info (get resources 5)))
+        (when-let [resources (get-resources-for-topology nimbus topo-id)]
+          (.set_requested_memonheap topo-page-info (:requested-mem-on-heap resources))
+          (.set_requested_memoffheap topo-page-info (:requested-mem-off-heap resources))
+          (.set_requested_cpu topo-page-info (:requested-cpu resources))
+          (.set_assigned_memonheap topo-page-info (:assigned-mem-on-heap resources))
+          (.set_assigned_memoffheap topo-page-info (:assigned-mem-off-heap resources))
+          (.set_assigned_cpu topo-page-info (:assigned-cpu resources)))
         (doto topo-page-info
-          (.set_name (:storm-name info))
-          (.set_status (extract-status-str (:base info)))
-          (.set_uptime_secs (time-delta (:launch-time-secs info)))
+          (.set_name storm-name)
+          (.set_status (extract-status-str base))
+          (.set_uptime_secs (time-delta launch-time-secs))
           (.set_topology_conf (to-json (try-read-storm-conf conf
                                                             topo-id (:blob-store nimbus))))
           (.set_replication_count (get-blob-replication-count (master-stormcode-key topo-id) nimbus)))
         (when-let [debug-options
-                   (get-in info [:base :component->debug topo-id])]
+                   (get-in topo-info [:base :component->debug topo-id])]
           (.set_debug_options
             topo-page-info
             (converter/thriftify-debugoptions debug-options)))
         topo-page-info))
 
+    (^SupervisorPageInfo getSupervisorPageInfo
+      [this
+       ^String supervisor-id
+       ^String host 
+       ^boolean include-sys?]
+      (.mark nimbus:num-getSupervisorPageInfo-calls)
+      (let [storm-cluster-state (:storm-cluster-state nimbus)
+            supervisor-infos (all-supervisor-info storm-cluster-state)
+            host->supervisor-id (reverse-map (map-val :hostname supervisor-infos))
+            supervisor-ids (if (nil? supervisor-id)
+                              (get host->supervisor-id host)
+                                [supervisor-id])
+            page-info (SupervisorPageInfo.)]
+            (doseq [sid supervisor-ids]
+              (let [supervisor-info (get supervisor-infos sid)
+                    sup-sum (make-supervisor-summary nimbus sid supervisor-info)
+                    _ (.add_to_supervisor_summaries page-info sup-sum)
+                    topo-id->assignments (topology-assignments storm-cluster-state)
+                    {:keys [user-topologies 
+                            supervisor-topologies]} (user-and-supervisor-topos nimbus
+                                                                               conf
+                                                                               blob-store
+                                                                               topo-id->assignments 
+                                                                               sid)]
+                (doseq [storm-id supervisor-topologies]
+                    (let [topo-info (get-common-topo-info storm-id "getSupervisorPageInfo")
+                          worker->resources (get-worker-resources-for-topology nimbus storm-id)]
+                      (doseq [worker-summary (stats/agg-worker-stats storm-id 
+                                                                     topo-info
+                                                                     worker->resources
+                                                                     include-sys?
+                                                                     (get user-topologies storm-id)
+                                                                     sid)]
+                        (.add_to_worker_summaries page-info worker-summary)))))) 
+            page-info))
+
     (^ComponentPageInfo getComponentPageInfo
       [this
        ^String topo-id

http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/storm-core/src/clj/org/apache/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/stats.clj b/storm-core/src/clj/org/apache/storm/stats.clj
index 26a4eb4..17d0219 100644
--- a/storm-core/src/clj/org/apache/storm/stats.clj
+++ b/storm-core/src/clj/org/apache/storm/stats.clj
@@ -21,9 +21,11 @@
             ExecutorSpecificStats SpoutStats BoltStats ErrorInfo
             SupervisorSummary CommonAggregateStats ComponentAggregateStats
             ComponentPageInfo ComponentType BoltAggregateStats
-            ExecutorAggregateStats SpecificAggregateStats
-            SpoutAggregateStats TopologyPageInfo TopologyStats])
+            ExecutorAggregateStats WorkerSummary SpecificAggregateStats
+            SpoutAggregateStats TopologyPageInfo TopologyStats
+            WorkerResources])
   (:import [org.apache.storm.utils Utils])
+  (:import [org.apache.storm.scheduler WorkerSlot])
   (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric])
   (:use [org.apache.storm log util])
   (:use [clojure.math.numeric-tower :only [ceil]]))
@@ -256,7 +258,6 @@
    (.get_failed stats)
    (.get_complete_ms_avg stats)])
 
-
 (defn clojurify-executor-stats
   [^ExecutorStats stats]
   (let [ specific-stats (.get_specific stats)
@@ -1002,15 +1003,62 @@
                            window->complete-latency)
                          (.set_window_to_acked window->acked)
                          (.set_window_to_failed window->failed))
-      topo-page-info (doto (TopologyPageInfo. topology-id)
-                       (.set_num_tasks num-tasks)
-                       (.set_num_workers num-workers)
-                       (.set_num_executors num-executors)
-                       (.set_id_to_spout_agg_stats spout-agg-stats)
-                       (.set_id_to_bolt_agg_stats bolt-agg-stats)
-                       (.set_topology_stats topology-stats))]
+        topo-page-info (doto (TopologyPageInfo. topology-id)
+                         (.set_num_tasks num-tasks)
+                         (.set_num_workers num-workers)
+                         (.set_num_executors num-executors)
+                         (.set_id_to_spout_agg_stats spout-agg-stats)
+                         (.set_id_to_bolt_agg_stats bolt-agg-stats)
+                         (.set_topology_stats topology-stats))]
     topo-page-info))
 
+(defn agg-worker-stats
+  "Aggregate statistics per worker for a topology. Optionally filtering on specific supervisors."
+  ([storm-id topo-info worker->resources include-sys? user-authorized]
+    (agg-worker-stats storm-id topo-info worker->resources include-sys?  user-authorized nil))
+  ([storm-id topo-info worker->resources include-sys? user-authorized filter-supervisor]
+    (let [{:keys [storm-name
+                  assignment
+                  beats
+                  task->component]} topo-info
+          exec->node+port (:executor->node+port assignment)
+          node->host (:node->host assignment)
+          all-node+port->exec (reverse-map exec->node+port)
+          node+port->exec (if (nil? filter-supervisor) 
+                            all-node+port->exec 
+                            (filter #(= filter-supervisor (ffirst %)) all-node+port->exec))
+          handle-sys-components-fn (mk-include-sys-fn include-sys?)]
+      (dofor [[[node port] executors] node+port->exec]
+        (let [executor-tasks (map #(range (first %) (inc (last %))) executors)
+              worker-beats (vals (select-keys beats executors))
+              not-null-worker-beat (first (filter identity worker-beats))
+              worker-uptime (or (:uptime not-null-worker-beat) 0)
+              ;; list of components per executor ((c1 c2 c3) (c4) (c5))
+              ;; if the executor was running only system components, an empty list for that executor is possible
+              components-per-executor (for [tasks executor-tasks] 
+                                        (filter handle-sys-components-fn (map #(get task->component %) tasks)))
+              component->num-tasks (frequencies (flatten components-per-executor))
+              num-executors (count executors)
+              default-worker-resources (WorkerResources.)
+              resources (if (nil? worker->resources) 
+                            default-worker-resources 
+                            (or (.get worker->resources (WorkerSlot. node port)) 
+                                default-worker-resources))
+              worker-summary (doto 
+                 (WorkerSummary.)
+                   (.set_host (node->host node))
+                   (.set_uptime_secs worker-uptime)
+                   (.set_supervisor_id node)
+                   (.set_port port)
+                   (.set_topology_id storm-id)
+                   (.set_topology_name storm-name)
+                   (.set_num_executors num-executors)
+                   (.set_assigned_memonheap (.get_mem_on_heap resources))
+                   (.set_assigned_memoffheap (.get_mem_off_heap resources))
+                   (.set_assigned_cpu (.get_cpu resources)))]
+          (if user-authorized (.set_component_to_num_tasks worker-summary component->num-tasks))
+          worker-summary)))))
+
 (defn agg-topo-execs-stats
   "Aggregate various executor statistics for a topology from the given
   heartbeats."

http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 8b59aab..4cb01f9 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -38,7 +38,7 @@
             TopologyStats CommonAggregateStats ComponentAggregateStats
             ComponentType BoltAggregateStats SpoutAggregateStats
             ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo
-            LogConfig LogLevel LogLevelAction])
+            LogConfig LogLevel LogLevelAction SupervisorPageInfo WorkerSummary])
   (:import [org.apache.storm.security.auth AuthUtils ReqContext])
   (:import [org.apache.storm.generated AuthorizationException ProfileRequest ProfileAction NodeInfo])
   (:import [org.apache.storm.security.auth AuthUtils])
@@ -64,6 +64,7 @@
 (defmeter ui:num-cluster-configuration-http-requests)
 (defmeter ui:num-cluster-summary-http-requests)
 (defmeter ui:num-nimbus-summary-http-requests)
+(defmeter ui:num-supervisor-http-requests)
 (defmeter ui:num-supervisor-summary-http-requests)
 (defmeter ui:num-all-topologies-summary-http-requests)
 (defmeter ui:num-topology-page-http-requests)
@@ -410,26 +411,77 @@
           "nimbusUpTime" (pretty-uptime-sec uptime)
           "nimbusUpTimeSeconds" uptime}))})))
 
+(defn worker-summary-to-json
+  [secure? ^WorkerSummary worker-summary]
+  (let [host (.get_host worker-summary)
+        port (.get_port worker-summary)
+        topology-id (.get_topology_id worker-summary)
+        uptime-secs (.get_uptime_secs worker-summary)]
+    {"supervisorId" (.get_supervisor_id worker-summary)
+     "host" host
+     "port" port
+     "topologyId" topology-id
+     "topologyName" (.get_topology_name worker-summary)
+     "executorsTotal" (.get_num_executors worker-summary)
+     "assignedMemOnHeap" (.get_assigned_memonheap worker-summary)
+     "assignedMemOffHeap" (.get_assigned_memoffheap worker-summary)
+     "assignedCpu" (.get_assigned_cpu worker-summary)
+     "componentNumTasks" (.get_component_to_num_tasks worker-summary)
+     "uptime" (pretty-uptime-sec uptime-secs)
+     "uptimeSeconds" uptime-secs
+     "workerLogLink" (worker-log-link host port topology-id secure?)}))
+
+(defn supervisor-summary-to-json 
+  [summary]
+  (let [slotsTotal (.get_num_workers summary)
+        slotsUsed (.get_num_used_workers summary)
+        slotsFree (max (- slotsTotal slotsUsed) 0)
+        totalMem (get (.get_total_resources summary) Config/SUPERVISOR_MEMORY_CAPACITY_MB)
+        totalCpu (get (.get_total_resources summary) Config/SUPERVISOR_CPU_CAPACITY)
+        usedMem (.get_used_mem summary)
+        usedCpu (.get_used_cpu summary)
+        availMem (max (- totalMem usedMem) 0)
+        availCpu (max (- totalCpu usedCpu) 0)]
+  {"id" (.get_supervisor_id summary)
+   "host" (.get_host summary)
+   "uptime" (pretty-uptime-sec (.get_uptime_secs summary))
+   "uptimeSeconds" (.get_uptime_secs summary)
+   "slotsTotal" slotsTotal
+   "slotsUsed" slotsUsed
+   "slotsFree" slotsFree
+   "totalMem" totalMem
+   "totalCpu" totalCpu
+   "usedMem" usedMem
+   "usedCpu" usedCpu
+   "logLink" (supervisor-log-link (.get_host summary))
+   "availMem" availMem
+   "availCpu" availCpu
+   "version" (.get_version summary)}))
+
+(defn supervisor-page-info
+  ([supervisor-id host include-sys? secure?]
+     (thrift/with-configured-nimbus-connection 
+        nimbus (supervisor-page-info (.getSupervisorPageInfo ^Nimbus$Client nimbus
+                                                      supervisor-id
+                                                      host
+                                                      include-sys?) secure?)))
+  ([^SupervisorPageInfo supervisor-page-info secure?]
+    ;; ask nimbus to return supervisor workers + any details user is allowed
+    ;; access on a per-topology basis (i.e. components)
+    (let [supervisors-json (map supervisor-summary-to-json (.get_supervisor_summaries supervisor-page-info))]
+      {"supervisors" supervisors-json
+       "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)
+       "workers" (into [] (for [^WorkerSummary worker-summary (.get_worker_summaries supervisor-page-info)]
+                            (worker-summary-to-json secure? worker-summary)))})))
+
 (defn supervisor-summary
   ([]
    (thrift/with-configured-nimbus-connection nimbus
                 (supervisor-summary
                   (.get_supervisors (.getClusterInfo ^Nimbus$Client nimbus)))))
   ([summs]
-   {"supervisors"
-    (for [^SupervisorSummary s summs]
-      {"id" (.get_supervisor_id s)
-       "host" (.get_host s)
-       "uptime" (pretty-uptime-sec (.get_uptime_secs s))
-       "uptimeSeconds" (.get_uptime_secs s)
-       "slotsTotal" (.get_num_workers s)
-       "slotsUsed" (.get_num_used_workers s)
-       "totalMem" (get (.get_total_resources s) Config/SUPERVISOR_MEMORY_CAPACITY_MB)
-       "totalCpu" (get (.get_total_resources s) Config/SUPERVISOR_CPU_CAPACITY)
-       "usedMem" (.get_used_mem s)
-       "usedCpu" (.get_used_cpu s)
-       "logLink" (supervisor-log-link (.get_host s))
-       "version" (.get_version s)})
+   {"supervisors" (for [^SupervisorSummary s summs]
+                    (supervisor-summary-to-json s))
     "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)}))
 
 (defn all-topologies-summary
@@ -588,6 +640,8 @@
      "assignedTotalMem" (+ (.get_assigned_memonheap topo-info) (.get_assigned_memoffheap topo-info))
      "assignedCpu" (.get_assigned_cpu topo-info)
      "topologyStats" topo-stats
+     "workers"  (map (partial worker-summary-to-json secure?)
+                     (.get_workers topo-info))
      "spouts" (map (partial comp-agg-stats-json id secure?)
                    (.get_id_to_spout_agg_stats topo-info))
      "bolts" (map (partial comp-agg-stats-json id secure?)
@@ -1046,6 +1100,16 @@
     (assert-authorized-user "getClusterInfo")
     (json-response (assoc (supervisor-summary)
                      "logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)) (:callback m)))
+  (GET "/api/v1/supervisor" [:as {:keys [cookies servlet-request scheme]} & m]
+    (.mark ui:num-supervisor-http-requests)
+    (populate-context! servlet-request)
+    (assert-authorized-user "getSupervisorPageInfo")
+    ;; supervisor takes either an id or a host query parameter, and technically both
+    ;; that said, if both the id and host are provided, the id wins
+    (let [id (:id m)
+          host (:host m)]
+      (json-response (assoc (supervisor-page-info id host (check-include-sys? (:sys m)) (= scheme :https))
+                            "logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)) (:callback m))))
   (GET "/api/v1/topology/summary" [:as {:keys [cookies servlet-request]} & m]
     (mark! ui:num-all-topologies-summary-http-requests)
     (populate-context! servlet-request)

http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/Assignment.java b/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
index c7a3f8a..90b7516 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
@@ -787,15 +787,15 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           case 2: // NODE_HOST
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map548 = iprot.readMapBegin();
-                struct.node_host = new HashMap<String,String>(2*_map548.size);
-                String _key549;
-                String _val550;
-                for (int _i551 = 0; _i551 < _map548.size; ++_i551)
+                org.apache.thrift.protocol.TMap _map582 = iprot.readMapBegin();
+                struct.node_host = new HashMap<String,String>(2*_map582.size);
+                String _key583;
+                String _val584;
+                for (int _i585 = 0; _i585 < _map582.size; ++_i585)
                 {
-                  _key549 = iprot.readString();
-                  _val550 = iprot.readString();
-                  struct.node_host.put(_key549, _val550);
+                  _key583 = iprot.readString();
+                  _val584 = iprot.readString();
+                  struct.node_host.put(_key583, _val584);
                 }
                 iprot.readMapEnd();
               }
@@ -807,26 +807,26 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           case 3: // EXECUTOR_NODE_PORT
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map552 = iprot.readMapBegin();
-                struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map552.size);
-                List<Long> _key553;
-                NodeInfo _val554;
-                for (int _i555 = 0; _i555 < _map552.size; ++_i555)
+                org.apache.thrift.protocol.TMap _map586 = iprot.readMapBegin();
+                struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map586.size);
+                List<Long> _key587;
+                NodeInfo _val588;
+                for (int _i589 = 0; _i589 < _map586.size; ++_i589)
                 {
                   {
-                    org.apache.thrift.protocol.TList _list556 = iprot.readListBegin();
-                    _key553 = new ArrayList<Long>(_list556.size);
-                    long _elem557;
-                    for (int _i558 = 0; _i558 < _list556.size; ++_i558)
+                    org.apache.thrift.protocol.TList _list590 = iprot.readListBegin();
+                    _key587 = new ArrayList<Long>(_list590.size);
+                    long _elem591;
+                    for (int _i592 = 0; _i592 < _list590.size; ++_i592)
                     {
-                      _elem557 = iprot.readI64();
-                      _key553.add(_elem557);
+                      _elem591 = iprot.readI64();
+                      _key587.add(_elem591);
                     }
                     iprot.readListEnd();
                   }
-                  _val554 = new NodeInfo();
-                  _val554.read(iprot);
-                  struct.executor_node_port.put(_key553, _val554);
+                  _val588 = new NodeInfo();
+                  _val588.read(iprot);
+                  struct.executor_node_port.put(_key587, _val588);
                 }
                 iprot.readMapEnd();
               }
@@ -838,25 +838,25 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           case 4: // EXECUTOR_START_TIME_SECS
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map559 = iprot.readMapBegin();
-                struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map559.size);
-                List<Long> _key560;
-                long _val561;
-                for (int _i562 = 0; _i562 < _map559.size; ++_i562)
+                org.apache.thrift.protocol.TMap _map593 = iprot.readMapBegin();
+                struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map593.size);
+                List<Long> _key594;
+                long _val595;
+                for (int _i596 = 0; _i596 < _map593.size; ++_i596)
                 {
                   {
-                    org.apache.thrift.protocol.TList _list563 = iprot.readListBegin();
-                    _key560 = new ArrayList<Long>(_list563.size);
-                    long _elem564;
-                    for (int _i565 = 0; _i565 < _list563.size; ++_i565)
+                    org.apache.thrift.protocol.TList _list597 = iprot.readListBegin();
+                    _key594 = new ArrayList<Long>(_list597.size);
+                    long _elem598;
+                    for (int _i599 = 0; _i599 < _list597.size; ++_i599)
                     {
-                      _elem564 = iprot.readI64();
-                      _key560.add(_elem564);
+                      _elem598 = iprot.readI64();
+                      _key594.add(_elem598);
                     }
                     iprot.readListEnd();
                   }
-                  _val561 = iprot.readI64();
-                  struct.executor_start_time_secs.put(_key560, _val561);
+                  _val595 = iprot.readI64();
+                  struct.executor_start_time_secs.put(_key594, _val595);
                 }
                 iprot.readMapEnd();
               }
@@ -868,17 +868,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           case 5: // WORKER_RESOURCES
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map566 = iprot.readMapBegin();
-                struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map566.size);
-                NodeInfo _key567;
-                WorkerResources _val568;
-                for (int _i569 = 0; _i569 < _map566.size; ++_i569)
+                org.apache.thrift.protocol.TMap _map600 = iprot.readMapBegin();
+                struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map600.size);
+                NodeInfo _key601;
+                WorkerResources _val602;
+                for (int _i603 = 0; _i603 < _map600.size; ++_i603)
                 {
-                  _key567 = new NodeInfo();
-                  _key567.read(iprot);
-                  _val568 = new WorkerResources();
-                  _val568.read(iprot);
-                  struct.worker_resources.put(_key567, _val568);
+                  _key601 = new NodeInfo();
+                  _key601.read(iprot);
+                  _val602 = new WorkerResources();
+                  _val602.read(iprot);
+                  struct.worker_resources.put(_key601, _val602);
                 }
                 iprot.readMapEnd();
               }
@@ -910,10 +910,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           oprot.writeFieldBegin(NODE_HOST_FIELD_DESC);
           {
             oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.node_host.size()));
-            for (Map.Entry<String, String> _iter570 : struct.node_host.entrySet())
+            for (Map.Entry<String, String> _iter604 : struct.node_host.entrySet())
             {
-              oprot.writeString(_iter570.getKey());
-              oprot.writeString(_iter570.getValue());
+              oprot.writeString(_iter604.getKey());
+              oprot.writeString(_iter604.getValue());
             }
             oprot.writeMapEnd();
           }
@@ -925,17 +925,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           oprot.writeFieldBegin(EXECUTOR_NODE_PORT_FIELD_DESC);
           {
             oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, struct.executor_node_port.size()));
-            for (Map.Entry<List<Long>, NodeInfo> _iter571 : struct.executor_node_port.entrySet())
+            for (Map.Entry<List<Long>, NodeInfo> _iter605 : struct.executor_node_port.entrySet())
             {
               {
-                oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter571.getKey().size()));
-                for (long _iter572 : _iter571.getKey())
+                oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter605.getKey().size()));
+                for (long _iter606 : _iter605.getKey())
                 {
-                  oprot.writeI64(_iter572);
+                  oprot.writeI64(_iter606);
                 }
                 oprot.writeListEnd();
               }
-              _iter571.getValue().write(oprot);
+              _iter605.getValue().write(oprot);
             }
             oprot.writeMapEnd();
           }
@@ -947,17 +947,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           oprot.writeFieldBegin(EXECUTOR_START_TIME_SECS_FIELD_DESC);
           {
             oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, struct.executor_start_time_secs.size()));
-            for (Map.Entry<List<Long>, Long> _iter573 : struct.executor_start_time_secs.entrySet())
+            for (Map.Entry<List<Long>, Long> _iter607 : struct.executor_start_time_secs.entrySet())
             {
               {
-                oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter573.getKey().size()));
-                for (long _iter574 : _iter573.getKey())
+                oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter607.getKey().size()));
+                for (long _iter608 : _iter607.getKey())
                 {
-                  oprot.writeI64(_iter574);
+                  oprot.writeI64(_iter608);
                 }
                 oprot.writeListEnd();
               }
-              oprot.writeI64(_iter573.getValue());
+              oprot.writeI64(_iter607.getValue());
             }
             oprot.writeMapEnd();
           }
@@ -969,10 +969,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           oprot.writeFieldBegin(WORKER_RESOURCES_FIELD_DESC);
           {
             oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, struct.worker_resources.size()));
-            for (Map.Entry<NodeInfo, WorkerResources> _iter575 : struct.worker_resources.entrySet())
+            for (Map.Entry<NodeInfo, WorkerResources> _iter609 : struct.worker_resources.entrySet())
             {
-              _iter575.getKey().write(oprot);
-              _iter575.getValue().write(oprot);
+              _iter609.getKey().write(oprot);
+              _iter609.getValue().write(oprot);
             }
             oprot.writeMapEnd();
           }
@@ -1014,52 +1014,52 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
       if (struct.is_set_node_host()) {
         {
           oprot.writeI32(struct.node_host.size());
-          for (Map.Entry<String, String> _iter576 : struct.node_host.entrySet())
+          for (Map.Entry<String, String> _iter610 : struct.node_host.entrySet())
           {
-            oprot.writeString(_iter576.getKey());
-            oprot.writeString(_iter576.getValue());
+            oprot.writeString(_iter610.getKey());
+            oprot.writeString(_iter610.getValue());
           }
         }
       }
       if (struct.is_set_executor_node_port()) {
         {
           oprot.writeI32(struct.executor_node_port.size());
-          for (Map.Entry<List<Long>, NodeInfo> _iter577 : struct.executor_node_port.entrySet())
+          for (Map.Entry<List<Long>, NodeInfo> _iter611 : struct.executor_node_port.entrySet())
           {
             {
-              oprot.writeI32(_iter577.getKey().size());
-              for (long _iter578 : _iter577.getKey())
+              oprot.writeI32(_iter611.getKey().size());
+              for (long _iter612 : _iter611.getKey())
               {
-                oprot.writeI64(_iter578);
+                oprot.writeI64(_iter612);
               }
             }
-            _iter577.getValue().write(oprot);
+            _iter611.getValue().write(oprot);
           }
         }
       }
       if (struct.is_set_executor_start_time_secs()) {
         {
           oprot.writeI32(struct.executor_start_time_secs.size());
-          for (Map.Entry<List<Long>, Long> _iter579 : struct.executor_start_time_secs.entrySet())
+          for (Map.Entry<List<Long>, Long> _iter613 : struct.executor_start_time_secs.entrySet())
           {
             {
-              oprot.writeI32(_iter579.getKey().size());
-              for (long _iter580 : _iter579.getKey())
+              oprot.writeI32(_iter613.getKey().size());
+              for (long _iter614 : _iter613.getKey())
               {
-                oprot.writeI64(_iter580);
+                oprot.writeI64(_iter614);
               }
             }
-            oprot.writeI64(_iter579.getValue());
+            oprot.writeI64(_iter613.getValue());
           }
         }
       }
       if (struct.is_set_worker_resources()) {
         {
           oprot.writeI32(struct.worker_resources.size());
-          for (Map.Entry<NodeInfo, WorkerResources> _iter581 : struct.worker_resources.entrySet())
+          for (Map.Entry<NodeInfo, WorkerResources> _iter615 : struct.worker_resources.entrySet())
           {
-            _iter581.getKey().write(oprot);
-            _iter581.getValue().write(oprot);
+            _iter615.getKey().write(oprot);
+            _iter615.getValue().write(oprot);
           }
         }
       }
@@ -1073,81 +1073,81 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
       BitSet incoming = iprot.readBitSet(4);
       if (incoming.get(0)) {
         {
-          org.apache.thrift.protocol.TMap _map582 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
-          struct.node_host = new HashMap<String,String>(2*_map582.size);
-          String _key583;
-          String _val584;
-          for (int _i585 = 0; _i585 < _map582.size; ++_i585)
+          org.apache.thrift.protocol.TMap _map616 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+          struct.node_host = new HashMap<String,String>(2*_map616.size);
+          String _key617;
+          String _val618;
+          for (int _i619 = 0; _i619 < _map616.size; ++_i619)
           {
-            _key583 = iprot.readString();
-            _val584 = iprot.readString();
-            struct.node_host.put(_key583, _val584);
+            _key617 = iprot.readString();
+            _val618 = iprot.readString();
+            struct.node_host.put(_key617, _val618);
           }
         }
         struct.set_node_host_isSet(true);
       }
       if (incoming.get(1)) {
         {
-          org.apache.thrift.protocol.TMap _map586 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
-          struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map586.size);
-          List<Long> _key587;
-          NodeInfo _val588;
-          for (int _i589 = 0; _i589 < _map586.size; ++_i589)
+          org.apache.thrift.protocol.TMap _map620 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+          struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map620.size);
+          List<Long> _key621;
+          NodeInfo _val622;
+          for (int _i623 = 0; _i623 < _map620.size; ++_i623)
           {
             {
-              org.apache.thrift.protocol.TList _list590 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
-              _key587 = new ArrayList<Long>(_list590.size);
-              long _elem591;
-              for (int _i592 = 0; _i592 < _list590.size; ++_i592)
+              org.apache.thrift.protocol.TList _list624 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
+              _key621 = new ArrayList<Long>(_list624.size);
+              long _elem625;
+              for (int _i626 = 0; _i626 < _list624.size; ++_i626)
               {
-                _elem591 = iprot.readI64();
-                _key587.add(_elem591);
+                _elem625 = iprot.readI64();
+                _key621.add(_elem625);
               }
             }
-            _val588 = new NodeInfo();
-            _val588.read(iprot);
-            struct.executor_node_port.put(_key587, _val588);
+            _val622 = new NodeInfo();
+            _val622.read(iprot);
+            struct.executor_node_port.put(_key621, _val622);
           }
         }
         struct.set_executor_node_port_isSet(true);
       }
       if (incoming.get(2)) {
         {
-          org.apache.thrift.protocol.TMap _map593 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32());
-          struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map593.size);
-          List<Long> _key594;
-          long _val595;
-          for (int _i596 = 0; _i596 < _map593.size; ++_i596)
+          org.apache.thrift.protocol.TMap _map627 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32());
+          struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map627.size);
+          List<Long> _key628;
+          long _val629;
+          for (int _i630 = 0; _i630 < _map627.size; ++_i630)
           {
             {
-              org.apache.thrift.protocol.TList _list597 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
-              _key594 = new ArrayList<Long>(_list597.size);
-              long _elem598;
-              for (int _i599 = 0; _i599 < _list597.size; ++_i599)
+              org.apache.thrift.protocol.TList _list631 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
+              _key628 = new ArrayList<Long>(_list631.size);
+              long _elem632;
+              for (int _i633 = 0; _i633 < _list631.size; ++_i633)
               {
-                _elem598 = iprot.readI64();
-                _key594.add(_elem598);
+                _elem632 = iprot.readI64();
+                _key628.add(_elem632);
               }
             }
-            _val595 = iprot.readI64();
-            struct.executor_start_time_secs.put(_key594, _val595);
+            _val629 = iprot.readI64();
+            struct.executor_start_time_secs.put(_key628, _val629);
           }
         }
         struct.set_executor_start_time_secs_isSet(true);
       }
       if (incoming.get(3)) {
         {
-          org.apache.thrift.protocol.TMap _map600 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
-          struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map600.size);
-          NodeInfo _key601;
-          WorkerResources _val602;
-          for (int _i603 = 0; _i603 < _map600.size; ++_i603)
+          org.apache.thrift.protocol.TMap _map634 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+          struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map634.size);
+          NodeInfo _key635;
+          WorkerResources _val636;
+          for (int _i637 = 0; _i637 < _map634.size; ++_i637)
           {
-            _key601 = new NodeInfo();
-            _key601.read(iprot);
-            _val602 = new WorkerResources();
-            _val602.read(iprot);
-            struct.worker_resources.put(_key601, _val602);
+            _key635 = new NodeInfo();
+            _key635.read(iprot);
+            _val636 = new WorkerResources();
+            _val636.read(iprot);
+            struct.worker_resources.put(_key635, _val636);
           }
         }
         struct.set_worker_resources_isSet(true);

http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/storm-core/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java b/storm-core/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java
index 8585a7d..59c0894 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java
@@ -635,17 +635,17 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
           case 2: // EXECUTOR_STATS
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map624 = iprot.readMapBegin();
-                struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map624.size);
-                ExecutorInfo _key625;
-                ExecutorStats _val626;
-                for (int _i627 = 0; _i627 < _map624.size; ++_i627)
+                org.apache.thrift.protocol.TMap _map658 = iprot.readMapBegin();
+                struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map658.size);
+                ExecutorInfo _key659;
+                ExecutorStats _val660;
+                for (int _i661 = 0; _i661 < _map658.size; ++_i661)
                 {
-                  _key625 = new ExecutorInfo();
-                  _key625.read(iprot);
-                  _val626 = new ExecutorStats();
-                  _val626.read(iprot);
-                  struct.executor_stats.put(_key625, _val626);
+                  _key659 = new ExecutorInfo();
+                  _key659.read(iprot);
+                  _val660 = new ExecutorStats();
+                  _val660.read(iprot);
+                  struct.executor_stats.put(_key659, _val660);
                 }
                 iprot.readMapEnd();
               }
@@ -692,10 +692,10 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
         oprot.writeFieldBegin(EXECUTOR_STATS_FIELD_DESC);
         {
           oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, struct.executor_stats.size()));
-          for (Map.Entry<ExecutorInfo, ExecutorStats> _iter628 : struct.executor_stats.entrySet())
+          for (Map.Entry<ExecutorInfo, ExecutorStats> _iter662 : struct.executor_stats.entrySet())
           {
-            _iter628.getKey().write(oprot);
-            _iter628.getValue().write(oprot);
+            _iter662.getKey().write(oprot);
+            _iter662.getValue().write(oprot);
           }
           oprot.writeMapEnd();
         }
@@ -727,10 +727,10 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
       oprot.writeString(struct.storm_id);
       {
         oprot.writeI32(struct.executor_stats.size());
-        for (Map.Entry<ExecutorInfo, ExecutorStats> _iter629 : struct.executor_stats.entrySet())
+        for (Map.Entry<ExecutorInfo, ExecutorStats> _iter663 : struct.executor_stats.entrySet())
         {
-          _iter629.getKey().write(oprot);
-          _iter629.getValue().write(oprot);
+          _iter663.getKey().write(oprot);
+          _iter663.getValue().write(oprot);
         }
       }
       oprot.writeI32(struct.time_secs);
@@ -743,17 +743,17 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
       struct.storm_id = iprot.readString();
       struct.set_storm_id_isSet(true);
       {
-        org.apache.thrift.protocol.TMap _map630 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
-        struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map630.size);
-        ExecutorInfo _key631;
-        ExecutorStats _val632;
-        for (int _i633 = 0; _i633 < _map630.size; ++_i633)
+        org.apache.thrift.protocol.TMap _map664 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+        struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map664.size);
+        ExecutorInfo _key665;
+        ExecutorStats _val666;
+        for (int _i667 = 0; _i667 < _map664.size; ++_i667)
         {
-          _key631 = new ExecutorInfo();
-          _key631.read(iprot);
-          _val632 = new ExecutorStats();
-          _val632.read(iprot);
-          struct.executor_stats.put(_key631, _val632);
+          _key665 = new ExecutorInfo();
+          _key665.read(iprot);
+          _val666 = new ExecutorStats();
+          _val666.read(iprot);
+          struct.executor_stats.put(_key665, _val666);
         }
       }
       struct.set_executor_stats_isSet(true);