You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/22 08:34:46 UTC
[7/9] storm git commit: STORM-1994: Add table with per-topology and
worker resource usage and components in (new) supervisor and topology pages
STORM-1994: Add table with per-topology and worker resource usage and components in (new) supervisor and topology pages
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0e0bcf27
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0e0bcf27
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0e0bcf27
Branch: refs/heads/1.x-branch
Commit: 0e0bcf27f4b7787cc3e6886ccbcd5dc55daef771
Parents: ce38849
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Wed Jul 6 14:23:18 2016 -0500
Committer: Alessandro Bellina <ab...@yahoo-inc.com>
Committed: Sun Aug 21 22:31:08 2016 -0500
----------------------------------------------------------------------
docs/STORM-UI-REST-API.md | 121 +-
docs/images/supervisor_page.png | Bin 0 -> 133290 bytes
.../src/clj/org/apache/storm/daemon/nimbus.clj | 296 +-
storm-core/src/clj/org/apache/storm/stats.clj | 68 +-
storm-core/src/clj/org/apache/storm/ui/core.clj | 94 +-
.../org/apache/storm/generated/Assignment.java | 244 +-
.../storm/generated/ClusterWorkerHeartbeat.java | 52 +-
.../storm/generated/ComponentPageInfo.java | 220 +-
.../org/apache/storm/generated/Credentials.java | 44 +-
.../jvm/org/apache/storm/generated/HBNodes.java | 32 +-
.../org/apache/storm/generated/HBRecords.java | 36 +-
.../storm/generated/LSApprovedWorkers.java | 44 +-
.../generated/LSSupervisorAssignments.java | 48 +-
.../apache/storm/generated/LSTopoHistory.java | 64 +-
.../storm/generated/LSTopoHistoryList.java | 36 +-
.../storm/generated/LSWorkerHeartbeat.java | 36 +-
.../apache/storm/generated/ListBlobsResult.java | 32 +-
.../apache/storm/generated/LocalAssignment.java | 36 +-
.../apache/storm/generated/LocalStateData.java | 48 +-
.../org/apache/storm/generated/LogConfig.java | 48 +-
.../jvm/org/apache/storm/generated/Nimbus.java | 3486 ++++++++++++------
.../org/apache/storm/generated/NodeInfo.java | 32 +-
.../storm/generated/RebalanceOptions.java | 44 +-
.../storm/generated/SettableBlobMeta.java | 36 +-
.../org/apache/storm/generated/StormBase.java | 92 +-
.../apache/storm/generated/SupervisorInfo.java | 152 +-
.../storm/generated/SupervisorPageInfo.java | 624 ++++
.../storm/generated/TopologyHistoryInfo.java | 32 +-
.../storm/generated/TopologyPageInfo.java | 284 +-
.../apache/storm/generated/WorkerSummary.java | 1880 ++++++++++
.../jvm/org/apache/storm/scheduler/Cluster.java | 217 +-
.../resource/ResourceAwareScheduler.java | 9 +
.../auth/authorizer/SimpleACLAuthorizer.java | 7 +-
storm-core/src/py/storm/Nimbus-remote | 7 +
storm-core/src/py/storm/Nimbus.py | 272 +-
storm-core/src/py/storm/ttypes.py | 1457 ++++++--
storm-core/src/storm.thrift | 25 +
storm-core/src/ui/public/component.html | 8 +
storm-core/src/ui/public/css/style.css | 20 +
storm-core/src/ui/public/js/script.js | 191 +
storm-core/src/ui/public/supervisor.html | 132 +
.../public/templates/index-page-template.html | 4 +-
.../templates/supervisor-page-template.html | 145 +
.../templates/topology-page-template.html | 208 +-
storm-core/src/ui/public/topology.html | 12 +-
.../test/clj/org/apache/storm/nimbus_test.clj | 72 +-
.../test/clj/org/apache/storm/stats_test.clj | 134 +
47 files changed, 8820 insertions(+), 2361 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/docs/STORM-UI-REST-API.md
----------------------------------------------------------------------
diff --git a/docs/STORM-UI-REST-API.md b/docs/STORM-UI-REST-API.md
index 884c6d5..340137d 100644
--- a/docs/STORM-UI-REST-API.md
+++ b/docs/STORM-UI-REST-API.md
@@ -125,6 +125,7 @@ Response fields:
|uptimeSeconds| Integer| Shows how long the supervisor is running in seconds|
|slotsTotal| Integer| Total number of available worker slots for this supervisor|
|slotsUsed| Integer| Number of worker slots used on this supervisor|
+|schedulerDisplayResource| Boolean | Whether to display scheduler resource information|
|totalMem| Double| Total memory capacity on this supervisor|
|totalCpu| Double| Total CPU capacity on this supervisor|
|usedMem| Double| Used memory capacity on this supervisor|
@@ -207,6 +208,123 @@ Sample response:
}
```
+### /api/v1/supervisor (GET)
+
+Returns summary for a supervisor by id, or all supervisors running on a host.
+
+Examples:
+
+```no-highlight
+ 1. By host: http://ui-daemon-host-name:8080/api/v1/supervisor?host=supervisor-daemon-host-name
+ 2. By id: http://ui-daemon-host-name:8080/api/v1/supervisor?id=f5449110-1daa-43e2-89e3-69917b16dec9-192.168.1.1
+```
+
+Request parameters:
+
+|Parameter |Value |Description |
+|----------|--------|-------------|
+|id |String. Supervisor id | If specified, respond with the supervisor and worker stats with id. Note that when id is specified, the host argument is ignored. |
+|host |String. Host name| If specified, respond with all supervisors and worker stats in the host (normally just one)|
+|sys |String. Values 1 or 0. Default value 0| Controls including sys stats part of the response|
+
+Response fields:
+
+|Field |Value|Description|
+|--- |--- |---
+|supervisors| Array| Array of supervisor summaries|
+|workers| Array| Array of worker summaries |
+|schedulerDisplayResource| Boolean | Whether to display scheduler resource information|
+
+Each supervisor is defined by:
+
+|Field |Value|Description|
+|--- |--- |---
+|id| String | Supervisor's id|
+|host| String| Supervisor's host name|
+|uptime| String| Shows how long the supervisor is running|
+|uptimeSeconds| Integer| Shows how long the supervisor is running in seconds|
+|slotsTotal| Integer| Total number of worker slots for this supervisor|
+|slotsUsed| Integer| Number of worker slots used on this supervisor|
+|totalMem| Double| Total memory capacity on this supervisor|
+|totalCpu| Double| Total CPU capacity on this supervisor|
+|usedMem| Double| Used memory capacity on this supervisor|
+|usedCpu| Double| Used CPU capacity on this supervisor|
+
+Each worker is defined by:
+
+|Field |Value |Description|
+|-------|-------|-----------|
+|supervisorId | String| Supervisor's id|
+|host | String | Worker's host name|
+|port | Integer | Worker's port|
+|topologyId | String | Topology Id|
+|topologyName | String | Topology Name|
+|executorsTotal | Integer | Number of executors used by the topology in this worker|
+|assignedMemOnHeap | Double | Assigned On-Heap Memory by Scheduler (MB)|
+|assignedMemOffHeap | Double | Assigned Off-Heap Memory by Scheduler (MB)|
+|assignedCpu | Number | Assigned CPU by Scheduler (%)|
+|componentNumTasks | Dictionary | Components -> # of executing tasks|
+|uptime| String| Shows how long the worker is running|
+|uptimeSeconds| Integer| Shows how long the worker is running in seconds|
+|workerLogLink | String | Link to worker log viewer page|
+
+Sample response:
+
+```json
+{
+ "supervisors": [{
+ "totalMem": 4096.0,
+ "host":"192.168.10.237",
+ "id":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212",
+ "uptime":"7m 8s",
+ "totalCpu":400.0,
+ "usedCpu":495.0,
+ "usedMem":3432.0,
+ "slotsUsed":2,
+ "version":"0.10.1",
+ "slotsTotal":4,
+ "uptimeSeconds":428
+ }],
+ "schedulerDisplayResource":true,
+ "workers":[{
+ "topologyName":"ras",
+ "topologyId":"ras-4-1460229987",
+ "host":"192.168.10.237",
+ "supervisorId":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212",
+ "assignedMemOnHeap":704.0,
+ "uptime":"2m 47s",
+ "uptimeSeconds":167,
+ "port":6707,
+ "workerLogLink":"http:\/\/192.168.10.237:8000\/log?file=ras-4-1460229987%2F6707%2Fworker.log",
+ "componentNumTasks": {
+ "word":5
+ },
+ "executorsTotal":8,
+ "assignedCpu":130.0,
+ "assignedMemOffHeap":80.0
+ },
+ {
+ "topologyName":"ras",
+ "topologyId":"ras-4-1460229987",
+ "host":"192.168.10.237",
+ "supervisorId":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212",
+ "assignedMemOnHeap":904.0,
+ "uptime":"2m 53s",
+ "port":6706,
+ "workerLogLink":"http:\/\/192.168.10.237:8000\/log?file=ras-4-1460229987%2F6706%2Fworker.log",
+ "componentNumTasks":{
+ "exclaim2":2,
+ "exclaim1":3,
+ "word":5
+ },
+ "executorsTotal":10,
+ "uptimeSeconds":173,
+ "assignedCpu":165.0,
+ "assignedMemOffHeap":80.0
+ }]
+}
+```
+
### /api/v1/topology/summary (GET)
Returns summary information for all topologies.
@@ -232,6 +350,7 @@ Response fields:
|assignedMemOffHeap| Double|Assigned Off-Heap Memory by Scheduler (MB)|
|assignedTotalMem| Double|Assigned Total Memory by Scheduler (MB)|
|assignedCpu| Double|Assigned CPU by Scheduler (%)|
+|schedulerDisplayResource| Boolean | Whether to display scheduler resource information|
Sample response:
@@ -257,7 +376,7 @@ Sample response:
"assignedTotalMem": 768,
"assignedCpu": 80
}
- ]
+ ],
"schedulerDisplayResource": true
}
```
http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/docs/images/supervisor_page.png
----------------------------------------------------------------------
diff --git a/docs/images/supervisor_page.png b/docs/images/supervisor_page.png
new file mode 100644
index 0000000..5133681
Binary files /dev/null and b/docs/images/supervisor_page.png differ
http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 29d9f28..c17e2fd 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -45,7 +45,7 @@
KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo TopologyHistoryInfo
ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta
BeginDownloadResult ListBlobsResult ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction
- ProfileRequest ProfileAction NodeInfo])
+ ProfileRequest ProfileAction NodeInfo SupervisorPageInfo WorkerSummary WorkerResources])
(:import [org.apache.storm.daemon Shutdownable])
(:import [org.apache.storm.cluster ClusterStateContext DaemonType])
(:use [org.apache.storm util config log timer zookeeper local-state])
@@ -95,6 +95,7 @@
(defmeter nimbus:num-getTopologyInfoWithOpts-calls)
(defmeter nimbus:num-getTopologyInfo-calls)
(defmeter nimbus:num-getTopologyPageInfo-calls)
+(defmeter nimbus:num-getSupervisorPageInfo-calls)
(defmeter nimbus:num-getComponentPageInfo-calls)
(defmeter nimbus:num-shutdown-calls)
@@ -210,6 +211,7 @@
:id->sched-status (atom {})
:node-id->resources (atom {}) ;;resources of supervisors
:id->resources (atom {}) ;;resources of topologies
+ :id->worker-resources (atom {}) ; resources of workers per topology
:cred-renewers (AuthUtils/GetCredentialRenewers conf)
:topology-history-lock (Object.)
:topo-history-state (nimbus-topo-history-state conf)
@@ -428,7 +430,8 @@
{})
))
-(defn- all-supervisor-info
+;; public for testing
+(defn all-supervisor-info
([storm-cluster-state] (all-supervisor-info storm-cluster-state nil))
([storm-cluster-state callback]
(let [supervisor-ids (.supervisors storm-cluster-state callback)]
@@ -738,8 +741,7 @@
all-ports (-> (get all-scheduling-slots sid)
(set/difference dead-ports)
((fn [ports] (map int ports))))
- supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports (:resources-map supervisor-info))
- ]]
+ supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports (:resources-map supervisor-info))]]
{sid supervisor-details}))]
(merge all-supervisor-details
(into {}
@@ -818,6 +820,9 @@
new-topology->executor->node+port))
+(defrecord TopologyResources [requested-mem-on-heap requested-mem-off-heap requested-cpu
+ assigned-mem-on-heap assigned-mem-off-heap assigned-cpu])
+
;; public so it can be mocked out
(defn compute-new-scheduler-assignments [nimbus existing-assignments topologies scratch-topology-id]
(let [conf (:conf nimbus)
@@ -855,19 +860,77 @@
(apply merge-with set/union))
supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
- cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment conf)
- _ (.setStatusMap cluster (deref (:id->sched-status nimbus)))
- ;; call scheduler.schedule to schedule all the topologies
- ;; the new assignments for all the topologies are in the cluster object.
- _ (.schedule (:scheduler nimbus) topologies cluster)
- _ (.setTopologyResourcesMap cluster @(:id->resources nimbus))
- _ (if-not (conf SCHEDULER-DISPLAY-RESOURCE) (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies))
- ;;merge with existing statuses
- _ (reset! (:id->sched-status nimbus) (merge (deref (:id->sched-status nimbus)) (.getStatusMap cluster)))
- _ (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster))
- _ (reset! (:id->resources nimbus) (.getTopologyResourcesMap cluster))]
+ cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment conf)]
+
+ ;; set the status map with existing topology statuses
+ (.setStatusMap cluster (deref (:id->sched-status nimbus)))
+ ;; call scheduler.schedule to schedule all the topologies
+ ;; the new assignments for all the topologies are in the cluster object.
+ (.schedule (:scheduler nimbus) topologies cluster)
+
+ ;;merge with existing statuses
+ (reset! (:id->sched-status nimbus) (merge (deref (:id->sched-status nimbus)) (.getStatusMap cluster)))
+ (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster))
+
+ (if-not (conf SCHEDULER-DISPLAY-RESOURCE)
+ (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies))
+
+ ; Remove both of swaps below at first opportunity. This is a hack for non-ras scheduler topology and worker resources.
+ (swap! (:id->resources nimbus) merge (into {} (map (fn [[k v]] [k (->TopologyResources (nth v 0) (nth v 1) (nth v 2)
+ (nth v 3) (nth v 4) (nth v 5))])
+ (.getTopologyResourcesMap cluster))))
+ ; Remove this also at first chance
+ (swap! (:id->worker-resources nimbus) merge
+ (into {} (map (fn [[k v]] [k (map-val #(doto (WorkerResources.)
+ (.set_mem_on_heap (nth % 0))
+ (.set_mem_off_heap (nth % 1))
+ (.set_cpu (nth % 2))) v)])
+ (.getWorkerResourcesMap cluster))))
+
(.getAssignments cluster)))
+(defn get-resources-for-topology [nimbus topo-id]
+ (or (get @(:id->resources nimbus) topo-id)
+ (try
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ topology-details (read-topology-details nimbus topo-id)
+ assigned-resources (->> (.assignment-info storm-cluster-state topo-id nil)
+ :worker->resources
+ (vals)
+ ; Default to [[0 0 0]] if there are no values
+ (#(or % [[0 0 0]]))
+ ; [[on-heap, off-heap, cpu]] -> [[on-heap], [off-heap], [cpu]]
+ (apply map vector)
+ ; [[on-heap], [off-heap], [cpu]] -> [on-heap-sum, off-heap-sum, cpu-sum]
+ (map (partial reduce +)))
+ worker-resources (->TopologyResources (.getTotalRequestedMemOnHeap topology-details)
+ (.getTotalRequestedMemOffHeap topology-details)
+ (.getTotalRequestedCpu topology-details)
+ (nth assigned-resources 0)
+ (nth assigned-resources 1)
+ (nth assigned-resources 2))]
+ (swap! (:id->resources nimbus) assoc topo-id worker-resources)
+ worker-resources)
+ (catch KeyNotFoundException e
+ ; This can happen when a topology is first coming up.
+ ; It's thrown by the blobstore code.
+ (log-error e "Failed to get topology details")
+ (->TopologyResources 0 0 0 0 0 0)))))
+
+(defn- get-worker-resources-for-topology [nimbus topo-id]
+ (or (get @(:id->worker-resources nimbus) topo-id)
+ (try
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ assigned-resources (->> (.assignment-info storm-cluster-state topo-id nil)
+ :worker->resources)
+ worker-resources (into {} (map #(identity {(WorkerSlot. (first (key %)) (second (key %)))
+ (doto (WorkerResources.)
+ (.set_mem_on_heap (nth (val %) 0))
+ (.set_mem_off_heap (nth (val %) 1))
+ (.set_cpu (nth (val %) 2)))}) assigned-resources))]
+ (swap! (:id->worker-resources nimbus) assoc topo-id worker-resources)
+ worker-resources))))
+
(defn changed-executors [executor->node+port new-executor->node+port]
(let [executor->node+port (if executor->node+port (sort executor->node+port) nil)
new-executor->node+port (if new-executor->node+port (sort new-executor->node+port) nil)
@@ -959,6 +1022,10 @@
start-times
worker->resources)}))]
+ (when (not= new-assignments existing-assignments)
+ (log-debug "RESETTING id->resources and id->worker-resources cache!")
+ (reset! (:id->resources nimbus) {})
+ (reset! (:id->worker-resources nimbus) {}))
;; tasks figure out what tasks to talk to by looking at topology at runtime
;; only log/set when there's been a change to the assignment
(doseq [[topology-id assignment] new-assignments
@@ -1026,6 +1093,18 @@
(throw (AlreadyAliveException. (str storm-name " is already active"))))
))
+(defn try-read-storm-conf [conf storm-id blob-store]
+ (try-cause
+ (read-storm-conf-as-nimbus conf storm-id blob-store)
+ (catch KeyNotFoundException e
+ (throw (NotAliveException. (str storm-id))))))
+
+(defn try-read-storm-conf-from-name [conf storm-name nimbus]
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ blob-store (:blob-store nimbus)
+ id (get-storm-id storm-cluster-state storm-name)]
+ (try-read-storm-conf conf id blob-store)))
+
(defn check-authorization!
([nimbus storm-name storm-conf operation context]
(let [aclHandler (:authorization-handler nimbus)
@@ -1051,6 +1130,15 @@
([nimbus storm-name storm-conf operation]
(check-authorization! nimbus storm-name storm-conf operation (ReqContext/context))))
+;; no-throw version of check-authorization!
+(defn is-authorized?
+ [nimbus conf blob-store operation topology-id]
+ (let [topology-conf (try-read-storm-conf conf topology-id blob-store)
+ storm-name (topology-conf TOPOLOGY-NAME)]
+ (try (check-authorization! nimbus storm-name topology-conf operation)
+ true
+ (catch AuthorizationException e false))))
+
(defn code-ids [blob-store]
(let [to-id (reify KeyFilter
(filter [this key] (get-id-from-blob-key key)))]
@@ -1355,24 +1443,55 @@
(defmethod blob-sync :local [conf nimbus]
nil)
+(defn make-supervisor-summary
+ [nimbus id info]
+ (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
+ sup-sum (SupervisorSummary. (:hostname info)
+ (:uptime-secs info)
+ (count ports)
+ (count (:used-ports info))
+ id)]
+ (.set_total_resources sup-sum (map-val double (:resources-map info)))
+ (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
+ (.set_used_mem sup-sum (or used-mem 0))
+ (.set_used_cpu sup-sum (or used-cpu 0)))
+ (when-let [version (:version info)] (.set_version sup-sum version))
+ sup-sum))
+
+(defn user-and-supervisor-topos
+ [nimbus conf blob-store assignments supervisor-id]
+ (let [topo-id->supervisors
+ (into {} (for [[topo-id assignment] assignments]
+ {topo-id (into #{}
+ (map #(first (second %))
+ (:executor->node+port assignment)))}))
+ supervisor-topologies (keys (filter #(get (val %) supervisor-id) topo-id->supervisors))]
+ {:supervisor-topologies supervisor-topologies
+ :user-topologies (into #{} (filter (partial is-authorized? nimbus
+ conf
+ blob-store
+ "getTopology")
+ supervisor-topologies))}))
+
+(defn topology-assignments
+ [storm-cluster-state]
+ (let [assigned-topology-ids (.assignments storm-cluster-state nil)]
+ (into {} (for [tid assigned-topology-ids]
+ {tid (.assignment-info storm-cluster-state tid nil)}))))
+
+(defn get-launch-time-secs
+ [base storm-id]
+ (if base (:launch-time-secs base)
+ (throw
+ (NotAliveException. (str storm-id)))))
+
(defn get-cluster-info [nimbus]
(let [storm-cluster-state (:storm-cluster-state nimbus)
supervisor-infos (all-supervisor-info storm-cluster-state)
;; TODO: need to get the port info about supervisors...
;; in standalone just look at metadata, otherwise just say N/A?
supervisor-summaries (dofor [[id info] supervisor-infos]
- (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
- sup-sum (SupervisorSummary. (:hostname info)
- (:uptime-secs info)
- (count ports)
- (count (:used-ports info))
- id) ]
- (.set_total_resources sup-sum (map-val double (:resources-map info)))
- (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
- (.set_used_mem sup-sum used-mem)
- (.set_used_cpu sup-sum used-cpu))
- (when-let [version (:version info)] (.set_version sup-sum version))
- sup-sum))
+ (make-supervisor-summary nimbus id info))
nimbus-uptime ((:uptime nimbus))
bases (topology-bases storm-cluster-state)
nimbuses (.nimbuses storm-cluster-state)
@@ -1404,13 +1523,13 @@
(extract-status-str base))]
(when-let [owner (:owner base)] (.set_owner topo-summ owner))
(when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
- (when-let [resources (.get @(:id->resources nimbus) id)]
- (.set_requested_memonheap topo-summ (get resources 0))
- (.set_requested_memoffheap topo-summ (get resources 1))
- (.set_requested_cpu topo-summ (get resources 2))
- (.set_assigned_memonheap topo-summ (get resources 3))
- (.set_assigned_memoffheap topo-summ (get resources 4))
- (.set_assigned_cpu topo-summ (get resources 5)))
+ (when-let [resources (get-resources-for-topology nimbus id)]
+ (.set_requested_memonheap topo-summ (:requested-mem-on-heap resources))
+ (.set_requested_memoffheap topo-summ (:requested-mem-off-heap resources))
+ (.set_requested_cpu topo-summ (:requested-cpu resources))
+ (.set_assigned_memonheap topo-summ (:assigned-mem-on-heap resources))
+ (.set_assigned_memoffheap topo-summ (:assigned-mem-off-heap resources))
+ (.set_assigned_cpu topo-summ (:assigned-cpu resources)))
(.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus))
topo-summ))
ret (ClusterSummary. supervisor-summaries
@@ -1469,9 +1588,7 @@
topology (try-read-storm-topology storm-id blob-store)
task->component (storm-task-info topology topology-conf)
base (.storm-base storm-cluster-state storm-id nil)
- launch-time-secs (if base (:launch-time-secs base)
- (throw
- (NotAliveException. (str storm-id))))
+ launch-time-secs (get-launch-time-secs base storm-id)
assignment (.assignment-info storm-cluster-state storm-id nil)
beats (map-val :heartbeat (get @(:heartbeats-cache nimbus)
storm-id))
@@ -1874,13 +1991,13 @@
)]
(when-let [owner (:owner base)] (.set_owner topo-info owner))
(when-let [sched-status (.get @(:id->sched-status nimbus) storm-id)] (.set_sched_status topo-info sched-status))
- (when-let [resources (.get @(:id->resources nimbus) storm-id)]
- (.set_requested_memonheap topo-info (get resources 0))
- (.set_requested_memoffheap topo-info (get resources 1))
- (.set_requested_cpu topo-info (get resources 2))
- (.set_assigned_memonheap topo-info (get resources 3))
- (.set_assigned_memoffheap topo-info (get resources 4))
- (.set_assigned_cpu topo-info (get resources 5)))
+ (when-let [resources (get-resources-for-topology nimbus storm-id)]
+ (.set_requested_memonheap topo-info (:requested-mem-on-heap resources))
+ (.set_requested_memoffheap topo-info (:requested-mem-off-heap resources))
+ (.set_requested_cpu topo-info (:requested-cpu resources))
+ (.set_assigned_memonheap topo-info (:assigned-mem-on-heap resources))
+ (.set_assigned_memoffheap topo-info (:assigned-mem-off-heap resources))
+ (.set_assigned_cpu topo-info (:assigned-cpu resources)))
(when-let [component->debug (:component->debug base)]
(.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug)))
(.set_replication_count topo-info (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
@@ -2046,45 +2163,98 @@
(^TopologyPageInfo getTopologyPageInfo
[this ^String topo-id ^String window ^boolean include-sys?]
(mark! nimbus:num-getTopologyPageInfo-calls)
- (let [info (get-common-topo-info topo-id "getTopologyPageInfo")
-
- exec->node+port (:executor->node+port (:assignment info))
+ (let [topo-info (get-common-topo-info topo-id "getTopologyPageInfo")
+ {:keys [storm-name
+ storm-cluster-state
+ launch-time-secs
+ assignment
+ beats
+ task->component
+ topology
+ base]} topo-info
+ exec->node+port (:executor->node+port assignment)
+ node->host (:node->host assignment)
+ worker->resources (get-worker-resources-for-topology nimbus topo-id)
+ worker-summaries (stats/agg-worker-stats topo-id
+ topo-info
+ worker->resources
+ include-sys?
+ true) ;; this is the topology page, so we know the user is authorized
+
+ exec->node+port (:executor->node+port assignment)
last-err-fn (partial get-last-error
- (:storm-cluster-state info)
+ storm-cluster-state
topo-id)
topo-page-info (stats/agg-topo-execs-stats topo-id
exec->node+port
- (:task->component info)
- (:beats info)
- (:topology info)
+ task->component
+ beats
+ topology
window
include-sys?
last-err-fn)]
- (when-let [owner (:owner (:base info))]
+ (.set_workers topo-page-info worker-summaries)
+ (when-let [owner (:owner base)]
(.set_owner topo-page-info owner))
(when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)]
(.set_sched_status topo-page-info sched-status))
- (when-let [resources (.get @(:id->resources nimbus) topo-id)]
- (.set_requested_memonheap topo-page-info (get resources 0))
- (.set_requested_memoffheap topo-page-info (get resources 1))
- (.set_requested_cpu topo-page-info (get resources 2))
- (.set_assigned_memonheap topo-page-info (get resources 3))
- (.set_assigned_memoffheap topo-page-info (get resources 4))
- (.set_assigned_cpu topo-page-info (get resources 5)))
+ (when-let [resources (get-resources-for-topology nimbus topo-id)]
+ (.set_requested_memonheap topo-page-info (:requested-mem-on-heap resources))
+ (.set_requested_memoffheap topo-page-info (:requested-mem-off-heap resources))
+ (.set_requested_cpu topo-page-info (:requested-cpu resources))
+ (.set_assigned_memonheap topo-page-info (:assigned-mem-on-heap resources))
+ (.set_assigned_memoffheap topo-page-info (:assigned-mem-off-heap resources))
+ (.set_assigned_cpu topo-page-info (:assigned-cpu resources)))
(doto topo-page-info
- (.set_name (:storm-name info))
- (.set_status (extract-status-str (:base info)))
- (.set_uptime_secs (time-delta (:launch-time-secs info)))
+ (.set_name storm-name)
+ (.set_status (extract-status-str base))
+ (.set_uptime_secs (time-delta launch-time-secs))
(.set_topology_conf (to-json (try-read-storm-conf conf
topo-id (:blob-store nimbus))))
(.set_replication_count (get-blob-replication-count (master-stormcode-key topo-id) nimbus)))
(when-let [debug-options
- (get-in info [:base :component->debug topo-id])]
+ (get-in topo-info [:base :component->debug topo-id])]
(.set_debug_options
topo-page-info
(converter/thriftify-debugoptions debug-options)))
topo-page-info))
+ (^SupervisorPageInfo getSupervisorPageInfo
+ [this
+ ^String supervisor-id
+ ^String host
+ ^boolean include-sys?]
+ (.mark nimbus:num-getSupervisorPageInfo-calls)
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ supervisor-infos (all-supervisor-info storm-cluster-state)
+ host->supervisor-id (reverse-map (map-val :hostname supervisor-infos))
+ supervisor-ids (if (nil? supervisor-id)
+ (get host->supervisor-id host)
+ [supervisor-id])
+ page-info (SupervisorPageInfo.)]
+ (doseq [sid supervisor-ids]
+ (let [supervisor-info (get supervisor-infos sid)
+ sup-sum (make-supervisor-summary nimbus sid supervisor-info)
+ _ (.add_to_supervisor_summaries page-info sup-sum)
+ topo-id->assignments (topology-assignments storm-cluster-state)
+ {:keys [user-topologies
+ supervisor-topologies]} (user-and-supervisor-topos nimbus
+ conf
+ blob-store
+ topo-id->assignments
+ sid)]
+ (doseq [storm-id supervisor-topologies]
+ (let [topo-info (get-common-topo-info storm-id "getSupervisorPageInfo")
+ worker->resources (get-worker-resources-for-topology nimbus storm-id)]
+ (doseq [worker-summary (stats/agg-worker-stats storm-id
+ topo-info
+ worker->resources
+ include-sys?
+ (get user-topologies storm-id)
+ sid)]
+ (.add_to_worker_summaries page-info worker-summary))))))
+ page-info))
+
(^ComponentPageInfo getComponentPageInfo
[this
^String topo-id
http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/storm-core/src/clj/org/apache/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/stats.clj b/storm-core/src/clj/org/apache/storm/stats.clj
index 26a4eb4..17d0219 100644
--- a/storm-core/src/clj/org/apache/storm/stats.clj
+++ b/storm-core/src/clj/org/apache/storm/stats.clj
@@ -21,9 +21,11 @@
ExecutorSpecificStats SpoutStats BoltStats ErrorInfo
SupervisorSummary CommonAggregateStats ComponentAggregateStats
ComponentPageInfo ComponentType BoltAggregateStats
- ExecutorAggregateStats SpecificAggregateStats
- SpoutAggregateStats TopologyPageInfo TopologyStats])
+ ExecutorAggregateStats WorkerSummary SpecificAggregateStats
+ SpoutAggregateStats TopologyPageInfo TopologyStats
+ WorkerResources])
(:import [org.apache.storm.utils Utils])
+ (:import [org.apache.storm.scheduler WorkerSlot])
(:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric])
(:use [org.apache.storm log util])
(:use [clojure.math.numeric-tower :only [ceil]]))
@@ -256,7 +258,6 @@
(.get_failed stats)
(.get_complete_ms_avg stats)])
-
(defn clojurify-executor-stats
[^ExecutorStats stats]
(let [ specific-stats (.get_specific stats)
@@ -1002,15 +1003,62 @@
window->complete-latency)
(.set_window_to_acked window->acked)
(.set_window_to_failed window->failed))
- topo-page-info (doto (TopologyPageInfo. topology-id)
- (.set_num_tasks num-tasks)
- (.set_num_workers num-workers)
- (.set_num_executors num-executors)
- (.set_id_to_spout_agg_stats spout-agg-stats)
- (.set_id_to_bolt_agg_stats bolt-agg-stats)
- (.set_topology_stats topology-stats))]
+ topo-page-info (doto (TopologyPageInfo. topology-id)
+ (.set_num_tasks num-tasks)
+ (.set_num_workers num-workers)
+ (.set_num_executors num-executors)
+ (.set_id_to_spout_agg_stats spout-agg-stats)
+ (.set_id_to_bolt_agg_stats bolt-agg-stats)
+ (.set_topology_stats topology-stats))]
topo-page-info))
+(defn agg-worker-stats
+ "Aggregate statistics per worker for a topology. Optionally filtering on specific supervisors."
+ ([storm-id topo-info worker->resources include-sys? user-authorized]
+ (agg-worker-stats storm-id topo-info worker->resources include-sys? user-authorized nil))
+ ([storm-id topo-info worker->resources include-sys? user-authorized filter-supervisor]
+ (let [{:keys [storm-name
+ assignment
+ beats
+ task->component]} topo-info
+ exec->node+port (:executor->node+port assignment)
+ node->host (:node->host assignment)
+ all-node+port->exec (reverse-map exec->node+port)
+ node+port->exec (if (nil? filter-supervisor)
+ all-node+port->exec
+ (filter #(= filter-supervisor (ffirst %)) all-node+port->exec))
+ handle-sys-components-fn (mk-include-sys-fn include-sys?)]
+ (dofor [[[node port] executors] node+port->exec]
+ (let [executor-tasks (map #(range (first %) (inc (last %))) executors)
+ worker-beats (vals (select-keys beats executors))
+ not-null-worker-beat (first (filter identity worker-beats))
+ worker-uptime (or (:uptime not-null-worker-beat) 0)
+ ;; list of components per executor ((c1 c2 c3) (c4) (c5))
+ ;; if the executor was running only system components, an empty list for that executor is possible
+ components-per-executor (for [tasks executor-tasks]
+ (filter handle-sys-components-fn (map #(get task->component %) tasks)))
+ component->num-tasks (frequencies (flatten components-per-executor))
+ num-executors (count executors)
+ default-worker-resources (WorkerResources.)
+ resources (if (nil? worker->resources)
+ default-worker-resources
+ (or (.get worker->resources (WorkerSlot. node port))
+ default-worker-resources))
+ worker-summary (doto
+ (WorkerSummary.)
+ (.set_host (node->host node))
+ (.set_uptime_secs worker-uptime)
+ (.set_supervisor_id node)
+ (.set_port port)
+ (.set_topology_id storm-id)
+ (.set_topology_name storm-name)
+ (.set_num_executors num-executors)
+ (.set_assigned_memonheap (.get_mem_on_heap resources))
+ (.set_assigned_memoffheap (.get_mem_off_heap resources))
+ (.set_assigned_cpu (.get_cpu resources)))]
+ (if user-authorized (.set_component_to_num_tasks worker-summary component->num-tasks))
+ worker-summary)))))
+
(defn agg-topo-execs-stats
"Aggregate various executor statistics for a topology from the given
heartbeats."
http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 8b59aab..4cb01f9 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -38,7 +38,7 @@
TopologyStats CommonAggregateStats ComponentAggregateStats
ComponentType BoltAggregateStats SpoutAggregateStats
ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo
- LogConfig LogLevel LogLevelAction])
+ LogConfig LogLevel LogLevelAction SupervisorPageInfo WorkerSummary])
(:import [org.apache.storm.security.auth AuthUtils ReqContext])
(:import [org.apache.storm.generated AuthorizationException ProfileRequest ProfileAction NodeInfo])
(:import [org.apache.storm.security.auth AuthUtils])
@@ -64,6 +64,7 @@
(defmeter ui:num-cluster-configuration-http-requests)
(defmeter ui:num-cluster-summary-http-requests)
(defmeter ui:num-nimbus-summary-http-requests)
+(defmeter ui:num-supervisor-http-requests)
(defmeter ui:num-supervisor-summary-http-requests)
(defmeter ui:num-all-topologies-summary-http-requests)
(defmeter ui:num-topology-page-http-requests)
@@ -410,26 +411,77 @@
"nimbusUpTime" (pretty-uptime-sec uptime)
"nimbusUpTimeSeconds" uptime}))})))
+(defn worker-summary-to-json
+ [secure? ^WorkerSummary worker-summary]
+ (let [host (.get_host worker-summary)
+ port (.get_port worker-summary)
+ topology-id (.get_topology_id worker-summary)
+ uptime-secs (.get_uptime_secs worker-summary)]
+ {"supervisorId" (.get_supervisor_id worker-summary)
+ "host" host
+ "port" port
+ "topologyId" topology-id
+ "topologyName" (.get_topology_name worker-summary)
+ "executorsTotal" (.get_num_executors worker-summary)
+ "assignedMemOnHeap" (.get_assigned_memonheap worker-summary)
+ "assignedMemOffHeap" (.get_assigned_memoffheap worker-summary)
+ "assignedCpu" (.get_assigned_cpu worker-summary)
+ "componentNumTasks" (.get_component_to_num_tasks worker-summary)
+ "uptime" (pretty-uptime-sec uptime-secs)
+ "uptimeSeconds" uptime-secs
+ "workerLogLink" (worker-log-link host port topology-id secure?)}))
+
+(defn supervisor-summary-to-json
+ [summary]
+ (let [slotsTotal (.get_num_workers summary)
+ slotsUsed (.get_num_used_workers summary)
+ slotsFree (max (- slotsTotal slotsUsed) 0)
+ totalMem (get (.get_total_resources summary) Config/SUPERVISOR_MEMORY_CAPACITY_MB)
+ totalCpu (get (.get_total_resources summary) Config/SUPERVISOR_CPU_CAPACITY)
+ usedMem (.get_used_mem summary)
+ usedCpu (.get_used_cpu summary)
+ availMem (max (- totalMem usedMem) 0)
+ availCpu (max (- totalCpu usedCpu) 0)]
+ {"id" (.get_supervisor_id summary)
+ "host" (.get_host summary)
+ "uptime" (pretty-uptime-sec (.get_uptime_secs summary))
+ "uptimeSeconds" (.get_uptime_secs summary)
+ "slotsTotal" slotsTotal
+ "slotsUsed" slotsUsed
+ "slotsFree" slotsFree
+ "totalMem" totalMem
+ "totalCpu" totalCpu
+ "usedMem" usedMem
+ "usedCpu" usedCpu
+ "logLink" (supervisor-log-link (.get_host summary))
+ "availMem" availMem
+ "availCpu" availCpu
+ "version" (.get_version summary)}))
+
+(defn supervisor-page-info
+ ([supervisor-id host include-sys? secure?]
+ (thrift/with-configured-nimbus-connection
+ nimbus (supervisor-page-info (.getSupervisorPageInfo ^Nimbus$Client nimbus
+ supervisor-id
+ host
+ include-sys?) secure?)))
+ ([^SupervisorPageInfo supervisor-page-info secure?]
+ ;; ask nimbus to return supervisor workers + any details user is allowed
+ ;; access on a per-topology basis (i.e. components)
+ (let [supervisors-json (map supervisor-summary-to-json (.get_supervisor_summaries supervisor-page-info))]
+ {"supervisors" supervisors-json
+ "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)
+ "workers" (into [] (for [^WorkerSummary worker-summary (.get_worker_summaries supervisor-page-info)]
+ (worker-summary-to-json secure? worker-summary)))})))
+
(defn supervisor-summary
([]
(thrift/with-configured-nimbus-connection nimbus
(supervisor-summary
(.get_supervisors (.getClusterInfo ^Nimbus$Client nimbus)))))
([summs]
- {"supervisors"
- (for [^SupervisorSummary s summs]
- {"id" (.get_supervisor_id s)
- "host" (.get_host s)
- "uptime" (pretty-uptime-sec (.get_uptime_secs s))
- "uptimeSeconds" (.get_uptime_secs s)
- "slotsTotal" (.get_num_workers s)
- "slotsUsed" (.get_num_used_workers s)
- "totalMem" (get (.get_total_resources s) Config/SUPERVISOR_MEMORY_CAPACITY_MB)
- "totalCpu" (get (.get_total_resources s) Config/SUPERVISOR_CPU_CAPACITY)
- "usedMem" (.get_used_mem s)
- "usedCpu" (.get_used_cpu s)
- "logLink" (supervisor-log-link (.get_host s))
- "version" (.get_version s)})
+ {"supervisors" (for [^SupervisorSummary s summs]
+ (supervisor-summary-to-json s))
"schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)}))
(defn all-topologies-summary
@@ -588,6 +640,8 @@
"assignedTotalMem" (+ (.get_assigned_memonheap topo-info) (.get_assigned_memoffheap topo-info))
"assignedCpu" (.get_assigned_cpu topo-info)
"topologyStats" topo-stats
+ "workers" (map (partial worker-summary-to-json secure?)
+ (.get_workers topo-info))
"spouts" (map (partial comp-agg-stats-json id secure?)
(.get_id_to_spout_agg_stats topo-info))
"bolts" (map (partial comp-agg-stats-json id secure?)
@@ -1046,6 +1100,16 @@
(assert-authorized-user "getClusterInfo")
(json-response (assoc (supervisor-summary)
"logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)) (:callback m)))
+ (GET "/api/v1/supervisor" [:as {:keys [cookies servlet-request scheme]} & m]
+ (.mark ui:num-supervisor-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "getSupervisorPageInfo")
+ ;; supervisor takes either an id or a host query parameter, and technically both
+ ;; that said, if both the id and host are provided, the id wins
+ (let [id (:id m)
+ host (:host m)]
+ (json-response (assoc (supervisor-page-info id host (check-include-sys? (:sys m)) (= scheme :https))
+ "logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)) (:callback m))))
(GET "/api/v1/topology/summary" [:as {:keys [cookies servlet-request]} & m]
(mark! ui:num-all-topologies-summary-http-requests)
(populate-context! servlet-request)
http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/Assignment.java b/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
index c7a3f8a..90b7516 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
@@ -787,15 +787,15 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
case 2: // NODE_HOST
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map548 = iprot.readMapBegin();
- struct.node_host = new HashMap<String,String>(2*_map548.size);
- String _key549;
- String _val550;
- for (int _i551 = 0; _i551 < _map548.size; ++_i551)
+ org.apache.thrift.protocol.TMap _map582 = iprot.readMapBegin();
+ struct.node_host = new HashMap<String,String>(2*_map582.size);
+ String _key583;
+ String _val584;
+ for (int _i585 = 0; _i585 < _map582.size; ++_i585)
{
- _key549 = iprot.readString();
- _val550 = iprot.readString();
- struct.node_host.put(_key549, _val550);
+ _key583 = iprot.readString();
+ _val584 = iprot.readString();
+ struct.node_host.put(_key583, _val584);
}
iprot.readMapEnd();
}
@@ -807,26 +807,26 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
case 3: // EXECUTOR_NODE_PORT
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map552 = iprot.readMapBegin();
- struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map552.size);
- List<Long> _key553;
- NodeInfo _val554;
- for (int _i555 = 0; _i555 < _map552.size; ++_i555)
+ org.apache.thrift.protocol.TMap _map586 = iprot.readMapBegin();
+ struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map586.size);
+ List<Long> _key587;
+ NodeInfo _val588;
+ for (int _i589 = 0; _i589 < _map586.size; ++_i589)
{
{
- org.apache.thrift.protocol.TList _list556 = iprot.readListBegin();
- _key553 = new ArrayList<Long>(_list556.size);
- long _elem557;
- for (int _i558 = 0; _i558 < _list556.size; ++_i558)
+ org.apache.thrift.protocol.TList _list590 = iprot.readListBegin();
+ _key587 = new ArrayList<Long>(_list590.size);
+ long _elem591;
+ for (int _i592 = 0; _i592 < _list590.size; ++_i592)
{
- _elem557 = iprot.readI64();
- _key553.add(_elem557);
+ _elem591 = iprot.readI64();
+ _key587.add(_elem591);
}
iprot.readListEnd();
}
- _val554 = new NodeInfo();
- _val554.read(iprot);
- struct.executor_node_port.put(_key553, _val554);
+ _val588 = new NodeInfo();
+ _val588.read(iprot);
+ struct.executor_node_port.put(_key587, _val588);
}
iprot.readMapEnd();
}
@@ -838,25 +838,25 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
case 4: // EXECUTOR_START_TIME_SECS
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map559 = iprot.readMapBegin();
- struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map559.size);
- List<Long> _key560;
- long _val561;
- for (int _i562 = 0; _i562 < _map559.size; ++_i562)
+ org.apache.thrift.protocol.TMap _map593 = iprot.readMapBegin();
+ struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map593.size);
+ List<Long> _key594;
+ long _val595;
+ for (int _i596 = 0; _i596 < _map593.size; ++_i596)
{
{
- org.apache.thrift.protocol.TList _list563 = iprot.readListBegin();
- _key560 = new ArrayList<Long>(_list563.size);
- long _elem564;
- for (int _i565 = 0; _i565 < _list563.size; ++_i565)
+ org.apache.thrift.protocol.TList _list597 = iprot.readListBegin();
+ _key594 = new ArrayList<Long>(_list597.size);
+ long _elem598;
+ for (int _i599 = 0; _i599 < _list597.size; ++_i599)
{
- _elem564 = iprot.readI64();
- _key560.add(_elem564);
+ _elem598 = iprot.readI64();
+ _key594.add(_elem598);
}
iprot.readListEnd();
}
- _val561 = iprot.readI64();
- struct.executor_start_time_secs.put(_key560, _val561);
+ _val595 = iprot.readI64();
+ struct.executor_start_time_secs.put(_key594, _val595);
}
iprot.readMapEnd();
}
@@ -868,17 +868,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
case 5: // WORKER_RESOURCES
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map566 = iprot.readMapBegin();
- struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map566.size);
- NodeInfo _key567;
- WorkerResources _val568;
- for (int _i569 = 0; _i569 < _map566.size; ++_i569)
+ org.apache.thrift.protocol.TMap _map600 = iprot.readMapBegin();
+ struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map600.size);
+ NodeInfo _key601;
+ WorkerResources _val602;
+ for (int _i603 = 0; _i603 < _map600.size; ++_i603)
{
- _key567 = new NodeInfo();
- _key567.read(iprot);
- _val568 = new WorkerResources();
- _val568.read(iprot);
- struct.worker_resources.put(_key567, _val568);
+ _key601 = new NodeInfo();
+ _key601.read(iprot);
+ _val602 = new WorkerResources();
+ _val602.read(iprot);
+ struct.worker_resources.put(_key601, _val602);
}
iprot.readMapEnd();
}
@@ -910,10 +910,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
oprot.writeFieldBegin(NODE_HOST_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.node_host.size()));
- for (Map.Entry<String, String> _iter570 : struct.node_host.entrySet())
+ for (Map.Entry<String, String> _iter604 : struct.node_host.entrySet())
{
- oprot.writeString(_iter570.getKey());
- oprot.writeString(_iter570.getValue());
+ oprot.writeString(_iter604.getKey());
+ oprot.writeString(_iter604.getValue());
}
oprot.writeMapEnd();
}
@@ -925,17 +925,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
oprot.writeFieldBegin(EXECUTOR_NODE_PORT_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, struct.executor_node_port.size()));
- for (Map.Entry<List<Long>, NodeInfo> _iter571 : struct.executor_node_port.entrySet())
+ for (Map.Entry<List<Long>, NodeInfo> _iter605 : struct.executor_node_port.entrySet())
{
{
- oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter571.getKey().size()));
- for (long _iter572 : _iter571.getKey())
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter605.getKey().size()));
+ for (long _iter606 : _iter605.getKey())
{
- oprot.writeI64(_iter572);
+ oprot.writeI64(_iter606);
}
oprot.writeListEnd();
}
- _iter571.getValue().write(oprot);
+ _iter605.getValue().write(oprot);
}
oprot.writeMapEnd();
}
@@ -947,17 +947,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
oprot.writeFieldBegin(EXECUTOR_START_TIME_SECS_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, struct.executor_start_time_secs.size()));
- for (Map.Entry<List<Long>, Long> _iter573 : struct.executor_start_time_secs.entrySet())
+ for (Map.Entry<List<Long>, Long> _iter607 : struct.executor_start_time_secs.entrySet())
{
{
- oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter573.getKey().size()));
- for (long _iter574 : _iter573.getKey())
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter607.getKey().size()));
+ for (long _iter608 : _iter607.getKey())
{
- oprot.writeI64(_iter574);
+ oprot.writeI64(_iter608);
}
oprot.writeListEnd();
}
- oprot.writeI64(_iter573.getValue());
+ oprot.writeI64(_iter607.getValue());
}
oprot.writeMapEnd();
}
@@ -969,10 +969,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
oprot.writeFieldBegin(WORKER_RESOURCES_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, struct.worker_resources.size()));
- for (Map.Entry<NodeInfo, WorkerResources> _iter575 : struct.worker_resources.entrySet())
+ for (Map.Entry<NodeInfo, WorkerResources> _iter609 : struct.worker_resources.entrySet())
{
- _iter575.getKey().write(oprot);
- _iter575.getValue().write(oprot);
+ _iter609.getKey().write(oprot);
+ _iter609.getValue().write(oprot);
}
oprot.writeMapEnd();
}
@@ -1014,52 +1014,52 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
if (struct.is_set_node_host()) {
{
oprot.writeI32(struct.node_host.size());
- for (Map.Entry<String, String> _iter576 : struct.node_host.entrySet())
+ for (Map.Entry<String, String> _iter610 : struct.node_host.entrySet())
{
- oprot.writeString(_iter576.getKey());
- oprot.writeString(_iter576.getValue());
+ oprot.writeString(_iter610.getKey());
+ oprot.writeString(_iter610.getValue());
}
}
}
if (struct.is_set_executor_node_port()) {
{
oprot.writeI32(struct.executor_node_port.size());
- for (Map.Entry<List<Long>, NodeInfo> _iter577 : struct.executor_node_port.entrySet())
+ for (Map.Entry<List<Long>, NodeInfo> _iter611 : struct.executor_node_port.entrySet())
{
{
- oprot.writeI32(_iter577.getKey().size());
- for (long _iter578 : _iter577.getKey())
+ oprot.writeI32(_iter611.getKey().size());
+ for (long _iter612 : _iter611.getKey())
{
- oprot.writeI64(_iter578);
+ oprot.writeI64(_iter612);
}
}
- _iter577.getValue().write(oprot);
+ _iter611.getValue().write(oprot);
}
}
}
if (struct.is_set_executor_start_time_secs()) {
{
oprot.writeI32(struct.executor_start_time_secs.size());
- for (Map.Entry<List<Long>, Long> _iter579 : struct.executor_start_time_secs.entrySet())
+ for (Map.Entry<List<Long>, Long> _iter613 : struct.executor_start_time_secs.entrySet())
{
{
- oprot.writeI32(_iter579.getKey().size());
- for (long _iter580 : _iter579.getKey())
+ oprot.writeI32(_iter613.getKey().size());
+ for (long _iter614 : _iter613.getKey())
{
- oprot.writeI64(_iter580);
+ oprot.writeI64(_iter614);
}
}
- oprot.writeI64(_iter579.getValue());
+ oprot.writeI64(_iter613.getValue());
}
}
}
if (struct.is_set_worker_resources()) {
{
oprot.writeI32(struct.worker_resources.size());
- for (Map.Entry<NodeInfo, WorkerResources> _iter581 : struct.worker_resources.entrySet())
+ for (Map.Entry<NodeInfo, WorkerResources> _iter615 : struct.worker_resources.entrySet())
{
- _iter581.getKey().write(oprot);
- _iter581.getValue().write(oprot);
+ _iter615.getKey().write(oprot);
+ _iter615.getValue().write(oprot);
}
}
}
@@ -1073,81 +1073,81 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
BitSet incoming = iprot.readBitSet(4);
if (incoming.get(0)) {
{
- org.apache.thrift.protocol.TMap _map582 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
- struct.node_host = new HashMap<String,String>(2*_map582.size);
- String _key583;
- String _val584;
- for (int _i585 = 0; _i585 < _map582.size; ++_i585)
+ org.apache.thrift.protocol.TMap _map616 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+ struct.node_host = new HashMap<String,String>(2*_map616.size);
+ String _key617;
+ String _val618;
+ for (int _i619 = 0; _i619 < _map616.size; ++_i619)
{
- _key583 = iprot.readString();
- _val584 = iprot.readString();
- struct.node_host.put(_key583, _val584);
+ _key617 = iprot.readString();
+ _val618 = iprot.readString();
+ struct.node_host.put(_key617, _val618);
}
}
struct.set_node_host_isSet(true);
}
if (incoming.get(1)) {
{
- org.apache.thrift.protocol.TMap _map586 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
- struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map586.size);
- List<Long> _key587;
- NodeInfo _val588;
- for (int _i589 = 0; _i589 < _map586.size; ++_i589)
+ org.apache.thrift.protocol.TMap _map620 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+ struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map620.size);
+ List<Long> _key621;
+ NodeInfo _val622;
+ for (int _i623 = 0; _i623 < _map620.size; ++_i623)
{
{
- org.apache.thrift.protocol.TList _list590 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
- _key587 = new ArrayList<Long>(_list590.size);
- long _elem591;
- for (int _i592 = 0; _i592 < _list590.size; ++_i592)
+ org.apache.thrift.protocol.TList _list624 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
+ _key621 = new ArrayList<Long>(_list624.size);
+ long _elem625;
+ for (int _i626 = 0; _i626 < _list624.size; ++_i626)
{
- _elem591 = iprot.readI64();
- _key587.add(_elem591);
+ _elem625 = iprot.readI64();
+ _key621.add(_elem625);
}
}
- _val588 = new NodeInfo();
- _val588.read(iprot);
- struct.executor_node_port.put(_key587, _val588);
+ _val622 = new NodeInfo();
+ _val622.read(iprot);
+ struct.executor_node_port.put(_key621, _val622);
}
}
struct.set_executor_node_port_isSet(true);
}
if (incoming.get(2)) {
{
- org.apache.thrift.protocol.TMap _map593 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32());
- struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map593.size);
- List<Long> _key594;
- long _val595;
- for (int _i596 = 0; _i596 < _map593.size; ++_i596)
+ org.apache.thrift.protocol.TMap _map627 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32());
+ struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map627.size);
+ List<Long> _key628;
+ long _val629;
+ for (int _i630 = 0; _i630 < _map627.size; ++_i630)
{
{
- org.apache.thrift.protocol.TList _list597 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
- _key594 = new ArrayList<Long>(_list597.size);
- long _elem598;
- for (int _i599 = 0; _i599 < _list597.size; ++_i599)
+ org.apache.thrift.protocol.TList _list631 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
+ _key628 = new ArrayList<Long>(_list631.size);
+ long _elem632;
+ for (int _i633 = 0; _i633 < _list631.size; ++_i633)
{
- _elem598 = iprot.readI64();
- _key594.add(_elem598);
+ _elem632 = iprot.readI64();
+ _key628.add(_elem632);
}
}
- _val595 = iprot.readI64();
- struct.executor_start_time_secs.put(_key594, _val595);
+ _val629 = iprot.readI64();
+ struct.executor_start_time_secs.put(_key628, _val629);
}
}
struct.set_executor_start_time_secs_isSet(true);
}
if (incoming.get(3)) {
{
- org.apache.thrift.protocol.TMap _map600 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
- struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map600.size);
- NodeInfo _key601;
- WorkerResources _val602;
- for (int _i603 = 0; _i603 < _map600.size; ++_i603)
+ org.apache.thrift.protocol.TMap _map634 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+ struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map634.size);
+ NodeInfo _key635;
+ WorkerResources _val636;
+ for (int _i637 = 0; _i637 < _map634.size; ++_i637)
{
- _key601 = new NodeInfo();
- _key601.read(iprot);
- _val602 = new WorkerResources();
- _val602.read(iprot);
- struct.worker_resources.put(_key601, _val602);
+ _key635 = new NodeInfo();
+ _key635.read(iprot);
+ _val636 = new WorkerResources();
+ _val636.read(iprot);
+ struct.worker_resources.put(_key635, _val636);
}
}
struct.set_worker_resources_isSet(true);
http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/storm-core/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java b/storm-core/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java
index 8585a7d..59c0894 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java
@@ -635,17 +635,17 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
case 2: // EXECUTOR_STATS
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map624 = iprot.readMapBegin();
- struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map624.size);
- ExecutorInfo _key625;
- ExecutorStats _val626;
- for (int _i627 = 0; _i627 < _map624.size; ++_i627)
+ org.apache.thrift.protocol.TMap _map658 = iprot.readMapBegin();
+ struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map658.size);
+ ExecutorInfo _key659;
+ ExecutorStats _val660;
+ for (int _i661 = 0; _i661 < _map658.size; ++_i661)
{
- _key625 = new ExecutorInfo();
- _key625.read(iprot);
- _val626 = new ExecutorStats();
- _val626.read(iprot);
- struct.executor_stats.put(_key625, _val626);
+ _key659 = new ExecutorInfo();
+ _key659.read(iprot);
+ _val660 = new ExecutorStats();
+ _val660.read(iprot);
+ struct.executor_stats.put(_key659, _val660);
}
iprot.readMapEnd();
}
@@ -692,10 +692,10 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
oprot.writeFieldBegin(EXECUTOR_STATS_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, struct.executor_stats.size()));
- for (Map.Entry<ExecutorInfo, ExecutorStats> _iter628 : struct.executor_stats.entrySet())
+ for (Map.Entry<ExecutorInfo, ExecutorStats> _iter662 : struct.executor_stats.entrySet())
{
- _iter628.getKey().write(oprot);
- _iter628.getValue().write(oprot);
+ _iter662.getKey().write(oprot);
+ _iter662.getValue().write(oprot);
}
oprot.writeMapEnd();
}
@@ -727,10 +727,10 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
oprot.writeString(struct.storm_id);
{
oprot.writeI32(struct.executor_stats.size());
- for (Map.Entry<ExecutorInfo, ExecutorStats> _iter629 : struct.executor_stats.entrySet())
+ for (Map.Entry<ExecutorInfo, ExecutorStats> _iter663 : struct.executor_stats.entrySet())
{
- _iter629.getKey().write(oprot);
- _iter629.getValue().write(oprot);
+ _iter663.getKey().write(oprot);
+ _iter663.getValue().write(oprot);
}
}
oprot.writeI32(struct.time_secs);
@@ -743,17 +743,17 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo
struct.storm_id = iprot.readString();
struct.set_storm_id_isSet(true);
{
- org.apache.thrift.protocol.TMap _map630 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
- struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map630.size);
- ExecutorInfo _key631;
- ExecutorStats _val632;
- for (int _i633 = 0; _i633 < _map630.size; ++_i633)
+ org.apache.thrift.protocol.TMap _map664 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+ struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map664.size);
+ ExecutorInfo _key665;
+ ExecutorStats _val666;
+ for (int _i667 = 0; _i667 < _map664.size; ++_i667)
{
- _key631 = new ExecutorInfo();
- _key631.read(iprot);
- _val632 = new ExecutorStats();
- _val632.read(iprot);
- struct.executor_stats.put(_key631, _val632);
+ _key665 = new ExecutorInfo();
+ _key665.read(iprot);
+ _val666 = new ExecutorStats();
+ _val666.read(iprot);
+ struct.executor_stats.put(_key665, _val666);
}
}
struct.set_executor_stats_isSet(true);