You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/05 23:00:42 UTC
[10/37] storm git commit: Aggregate topo stats on nimbus, not ui
http://git-wip-us.apache.org/repos/asf/storm/blob/a16b50c9/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index c440043..e5fb708 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -21,7 +21,7 @@
ring.middleware.multipart-params)
(:use [ring.middleware.json :only [wrap-json-params]])
(:use [hiccup core page-helpers])
- (:use [backtype.storm config util log])
+ (:use [backtype.storm config util log stats])
(:use [backtype.storm.ui helpers])
(:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
ACKER-FAIL-STREAM-ID system-id? mk-authorization-handler]]])
@@ -31,7 +31,10 @@
ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats
ErrorInfo ClusterSummary SupervisorSummary TopologySummary
Nimbus$Client StormTopology GlobalStreamId RebalanceOptions
- KillOptions GetInfoOptions NumErrorsChoice])
+ KillOptions GetInfoOptions NumErrorsChoice TopologyPageInfo
+ TopologyStats CommonAggregateStats ComponentAggregateStats
+ ComponentType BoltAggregateStats SpoutAggregateStats
+ ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo])
(:import [backtype.storm.security.auth AuthUtils ReqContext])
(:import [backtype.storm.generated AuthorizationException])
(:import [backtype.storm.security.auth AuthUtils])
@@ -87,94 +90,10 @@
(throw (AuthorizationException.
(str "UI request '" op "' for '" user "' user is not authorized")))))))))
-(defn get-filled-stats
- [summs]
- (->> summs
- (map #(.get_stats ^ExecutorSummary %))
- (filter not-nil?)))
-
-(defn component-type
- "Returns the component type (either :bolt or :spout) for a given
- topology and component id. Returns nil if not found."
- [^StormTopology topology id]
- (let [bolts (.get_bolts topology)
- spouts (.get_spouts topology)]
- (cond
- (.containsKey bolts id) :bolt
- (.containsKey spouts id) :spout)))
-
(defn executor-summary-type
[topology ^ExecutorSummary s]
(component-type topology (.get_component_id s)))
-(defn add-pairs
- ([] [0 0])
- ([[a1 a2] [b1 b2]]
- [(+ a1 b1) (+ a2 b2)]))
-
-(defn expand-averages
- [avg counts]
- (let [avg (clojurify-structure avg)
- counts (clojurify-structure counts)]
- (into {}
- (for [[slice streams] counts]
- [slice
- (into {}
- (for [[stream c] streams]
- [stream
- [(* c (get-in avg [slice stream]))
- c]]
- ))]))))
-
-(defn expand-averages-seq
- [average-seq counts-seq]
- (->> (map vector average-seq counts-seq)
- (map #(apply expand-averages %))
- (apply merge-with (fn [s1 s2] (merge-with add-pairs s1 s2)))))
-
-(defn- val-avg
- [[t c]]
- (if (= t 0) 0
- (double (/ t c))))
-
-(defn aggregate-averages
- [average-seq counts-seq]
- (->> (expand-averages-seq average-seq counts-seq)
- (map-val
- (fn [s]
- (map-val val-avg s)))))
-
-(defn aggregate-counts
- [counts-seq]
- (->> counts-seq
- (map clojurify-structure)
- (apply merge-with
- (fn [s1 s2]
- (merge-with + s1 s2)))))
-
-(defn aggregate-avg-streams
- [avg counts]
- (let [expanded (expand-averages avg counts)]
- (->> expanded
- (map-val #(reduce add-pairs (vals %)))
- (map-val val-avg))))
-
-(defn aggregate-count-streams
- [stats]
- (->> stats
- (map-val #(reduce + (vals %)))))
-
-(defn aggregate-common-stats
- [stats-seq]
- {:emitted (aggregate-counts (map #(.get_emitted ^ExecutorStats %) stats-seq))
- :transferred (aggregate-counts (map #(.get_transferred ^ExecutorStats %) stats-seq))})
-
-(defn mk-include-sys-fn
- [include-sys?]
- (if include-sys?
- (fn [_] true)
- (fn [stream] (and (string? stream) (not (system-id? stream))))))
-
(defn is-ack-stream
[stream]
(let [acker-streams
@@ -183,80 +102,6 @@
ACKER-FAIL-STREAM-ID]]
(every? #(not= %1 stream) acker-streams)))
-(defn pre-process
- [stream-summary include-sys?]
- (let [filter-fn (mk-include-sys-fn include-sys?)
- emitted (:emitted stream-summary)
- emitted (into {} (for [[window stat] emitted]
- {window (filter-key filter-fn stat)}))
- transferred (:transferred stream-summary)
- transferred (into {} (for [[window stat] transferred]
- {window (filter-key filter-fn stat)}))
- stream-summary (-> stream-summary (dissoc :emitted) (assoc :emitted emitted))
- stream-summary (-> stream-summary (dissoc :transferred) (assoc :transferred transferred))]
- stream-summary))
-
-(defn aggregate-bolt-stats
- [stats-seq include-sys?]
- (let [stats-seq (collectify stats-seq)]
- (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
- {:acked
- (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_acked)
- stats-seq))
- :failed
- (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_failed)
- stats-seq))
- :executed
- (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
- stats-seq))
- :process-latencies
- (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_process_ms_avg)
- stats-seq)
- (map #(.. ^ExecutorStats % get_specific get_bolt get_acked)
- stats-seq))
- :execute-latencies
- (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_execute_ms_avg)
- stats-seq)
- (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
- stats-seq))})))
-
-(defn aggregate-spout-stats
- [stats-seq include-sys?]
- (let [stats-seq (collectify stats-seq)]
- (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
- {:acked
- (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_acked)
- stats-seq))
- :failed
- (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_failed)
- stats-seq))
- :complete-latencies
- (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_spout get_complete_ms_avg)
- stats-seq)
- (map #(.. ^ExecutorStats % get_specific get_spout get_acked)
- stats-seq))})))
-
-(defn aggregate-bolt-streams
- [stats]
- {:acked (aggregate-count-streams (:acked stats))
- :failed (aggregate-count-streams (:failed stats))
- :emitted (aggregate-count-streams (:emitted stats))
- :transferred (aggregate-count-streams (:transferred stats))
- :process-latencies (aggregate-avg-streams (:process-latencies stats)
- (:acked stats))
- :executed (aggregate-count-streams (:executed stats))
- :execute-latencies (aggregate-avg-streams (:execute-latencies stats)
- (:executed stats))})
-
-(defn aggregate-spout-streams
- [stats]
- {:acked (aggregate-count-streams (:acked stats))
- :failed (aggregate-count-streams (:failed stats))
- :emitted (aggregate-count-streams (:emitted stats))
- :transferred (aggregate-count-streams (:transferred stats))
- :complete-latencies (aggregate-avg-streams (:complete-latencies stats)
- (:acked stats))})
-
(defn spout-summary?
[topology s]
(= :spout (executor-summary-type topology s)))
@@ -270,57 +115,11 @@
(let [ret (group-by #(.get_component_id ^ExecutorSummary %) summs)]
(into (sorted-map) ret )))
-(defn error-subset
- [error-str]
- (apply str (take 200 error-str)))
-
-(defn most-recent-error
- [errors-list]
- (let [error (->> errors-list
- (sort-by #(.get_error_time_secs ^ErrorInfo %))
- reverse
- first)]
- error))
-
-(defn component-task-summs
- [^TopologyInfo summ topology id]
- (let [spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
- bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
- spout-comp-summs (group-by-comp spout-summs)
- bolt-comp-summs (group-by-comp bolt-summs)
- ret (if (contains? spout-comp-summs id)
- (spout-comp-summs id)
- (bolt-comp-summs id))]
- (sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start) ret)))
-
(defn worker-log-link [host port topology-id]
(let [fname (logs-filename topology-id port)]
(url-format (str "http://%s:%s/log?file=%s")
host (*STORM-CONF* LOGVIEWER-PORT) fname)))
-(defn compute-executor-capacity
- [^ExecutorSummary e]
- (let [stats (.get_stats e)
- stats (if stats
- (-> stats
- (aggregate-bolt-stats true)
- (aggregate-bolt-streams)
- swap-map-order
- (get "600")))
- uptime (nil-to-zero (.get_uptime_secs e))
- window (if (< uptime 600) uptime 600)
- executed (-> stats :executed nil-to-zero)
- latency (-> stats :execute-latencies nil-to-zero)]
- (if (> window 0)
- (div (* executed latency) (* 1000 window)))))
-
-(defn compute-bolt-capacity
- [executors]
- (->> executors
- (map compute-executor-capacity)
- (map nil-to-zero)
- (apply max)))
-
(defn get-error-time
[error]
(if error
@@ -333,10 +132,9 @@
""))
(defn get-error-port
- [error error-host top-id]
+ [error]
(if error
- (.get_port ^ErrorInfo error)
- ""))
+ (.get_port ^ErrorInfo error)))
(defn get-error-host
[error]
@@ -344,41 +142,6 @@
(.get_host ^ErrorInfo error)
""))
-(defn spout-streams-stats
- [summs include-sys?]
- (let [stats-seq (get-filled-stats summs)]
- (aggregate-spout-streams
- (aggregate-spout-stats
- stats-seq include-sys?))))
-
-(defn bolt-streams-stats
- [summs include-sys?]
- (let [stats-seq (get-filled-stats summs)]
- (aggregate-bolt-streams
- (aggregate-bolt-stats
- stats-seq include-sys?))))
-
-(defn total-aggregate-stats
- [spout-summs bolt-summs include-sys?]
- (let [spout-stats (get-filled-stats spout-summs)
- bolt-stats (get-filled-stats bolt-summs)
- agg-spout-stats (-> spout-stats
- (aggregate-spout-stats include-sys?)
- aggregate-spout-streams)
- agg-bolt-stats (-> bolt-stats
- (aggregate-bolt-stats include-sys?)
- aggregate-bolt-streams)]
- (merge-with
- (fn [s1 s2]
- (merge-with + s1 s2))
- (select-keys
- agg-bolt-stats
- ;; Include only keys that will be used. We want to count acked and
- ;; failed only for the "tuple trees," so we do not include those keys
- ;; from the bolt executors.
- [:emitted :transferred])
- agg-spout-stats)))
-
(defn stats-times
[stats-map]
(sort-by #(Integer/parseInt %)
@@ -393,16 +156,6 @@
"All time"
(pretty-uptime-sec window)))
-(defn topology-action-button
- [id name action command is-wait default-wait enabled]
- [:input {:type "button"
- :value action
- (if enabled :enabled :disabled) ""
- :onclick (str "confirmAction('"
- (StringEscapeUtils/escapeJavaScript id) "', '"
- (StringEscapeUtils/escapeJavaScript name) "', '"
- command "', " is-wait ", " default-wait ")")}])
-
(defn sanitize-stream-name
[name]
(let [sym-regex #"(?![A-Za-z_\-:\.])."]
@@ -496,9 +249,7 @@
spout-comp-summs (group-by-comp spout-summs)
bolt-comp-summs (group-by-comp bolt-summs)
bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?)
- bolt-comp-summs)
- topology-conf (from-json
- (.getTopologyConf ^Nimbus$Client nimbus id))]
+ bolt-comp-summs)]
(visualization-data
(merge (hashmap-to-persistent spouts)
(hashmap-to-persistent bolts))
@@ -604,170 +355,140 @@
"executorsTotal" (.get_num_executors t)
"schedulerInfo" (.get_sched_status t)})}))
-(defn topology-stats [id window stats]
+(defn topology-stats [window stats]
(let [times (stats-times (:emitted stats))
display-map (into {} (for [t times] [t pretty-uptime-sec]))
display-map (assoc display-map ":all-time" (fn [_] "All time"))]
- (for [k (concat times [":all-time"])
- :let [disp ((display-map k) k)]]
+ (for [w (concat times [":all-time"])
+ :let [disp ((display-map w) w)]]
{"windowPretty" disp
- "window" k
- "emitted" (get-in stats [:emitted k])
- "transferred" (get-in stats [:transferred k])
- "completeLatency" (float-str (get-in stats [:complete-latencies k]))
- "acked" (get-in stats [:acked k])
- "failed" (get-in stats [:failed k])})))
-
-(defn spout-comp [top-id summ-map errors window include-sys?]
- (for [[id summs] summ-map
- :let [stats-seq (get-filled-stats summs)
- stats (aggregate-spout-streams
- (aggregate-spout-stats
- stats-seq include-sys?))
- last-error (most-recent-error (get errors id))
- error-host (get-error-host last-error)
- error-port (get-error-port last-error error-host top-id)]]
- {"spoutId" id
- "encodedSpoutId" (url-encode id)
- "executors" (count summs)
- "tasks" (sum-tasks summs)
- "emitted" (get-in stats [:emitted window])
- "transferred" (get-in stats [:transferred window])
- "completeLatency" (float-str (get-in stats [:complete-latencies window]))
- "acked" (get-in stats [:acked window])
- "failed" (get-in stats [:failed window])
- "errorHost" error-host
- "errorPort" error-port
- "errorWorkerLogLink" (worker-log-link error-host error-port top-id)
- "errorLapsedSecs" (get-error-time last-error)
- "lastError" (get-error-data last-error)}))
-
-(defn bolt-comp [top-id summ-map errors window include-sys?]
- (for [[id summs] summ-map
- :let [stats-seq (get-filled-stats summs)
- stats (aggregate-bolt-streams
- (aggregate-bolt-stats
- stats-seq include-sys?))
- last-error (most-recent-error (get errors id))
- error-host (get-error-host last-error)
- error-port (get-error-port last-error error-host top-id)]]
- {"boltId" id
- "encodedBoltId" (url-encode id)
- "executors" (count summs)
- "tasks" (sum-tasks summs)
- "emitted" (get-in stats [:emitted window])
- "transferred" (get-in stats [:transferred window])
- "capacity" (float-str (nil-to-zero (compute-bolt-capacity summs)))
- "executeLatency" (float-str (get-in stats [:execute-latencies window]))
- "executed" (get-in stats [:executed window])
- "processLatency" (float-str (get-in stats [:process-latencies window]))
- "acked" (get-in stats [:acked window])
- "failed" (get-in stats [:failed window])
- "errorHost" error-host
- "errorPort" error-port
- "errorWorkerLogLink" (worker-log-link error-host error-port top-id)
- "errorLapsedSecs" (get-error-time last-error)
- "lastError" (get-error-data last-error)}))
-
-(defn topology-summary [^TopologyInfo summ]
- (let [executors (.get_executors summ)
- workers (set (for [^ExecutorSummary e executors]
- [(.get_host e) (.get_port e)]))]
- {"id" (.get_id summ)
- "encodedId" (url-encode (.get_id summ))
- "owner" (.get_owner summ)
- "name" (.get_name summ)
- "status" (.get_status summ)
- "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
- "tasksTotal" (sum-tasks executors)
- "workersTotal" (count workers)
- "executorsTotal" (count executors)
- "schedulerInfo" (.get_sched_status summ)}))
-
-(defn spout-summary-json [topology-id id stats window]
- (let [times (stats-times (:emitted stats))
- display-map (into {} (for [t times] [t pretty-uptime-sec]))
- display-map (assoc display-map ":all-time" (fn [_] "All time"))]
- (for [k (concat times [":all-time"])
- :let [disp ((display-map k) k)]]
- {"windowPretty" disp
- "window" k
- "emitted" (get-in stats [:emitted k])
- "transferred" (get-in stats [:transferred k])
- "completeLatency" (float-str (get-in stats [:complete-latencies k]))
- "acked" (get-in stats [:acked k])
- "failed" (get-in stats [:failed k])})))
-
-(defn topology-page [id window include-sys? user]
+ "window" w
+ "emitted" (get-in stats [:emitted w])
+ "transferred" (get-in stats [:transferred w])
+ "completeLatency" (float-str (get-in stats [:complete-latencies w]))
+ "acked" (get-in stats [:acked w])
+ "failed" (get-in stats [:failed w])})))
+
+(defn build-visualization [id window include-sys? user]
(with-nimbus nimbus
(let [window (if window window ":all-time")
- window-hint (window-hint window)
- summ (->> (doto
- (GetInfoOptions.)
- (.set_num_err_choice NumErrorsChoice/ONE))
- (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
- topology (.getTopology ^Nimbus$Client nimbus id)
- topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus id))
- spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
- bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
- spout-comp-summs (group-by-comp spout-summs)
- bolt-comp-summs (group-by-comp bolt-summs)
- bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?) bolt-comp-summs)
- name (.get_name summ)
- status (.get_status summ)
- msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)
- spouts (.get_spouts topology)
- bolts (.get_bolts topology)
- visualizer-data (visualization-data (merge (hashmap-to-persistent spouts)
- (hashmap-to-persistent bolts))
- spout-comp-summs
- bolt-comp-summs
+ topology-info (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/ONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus
+ id))
+ storm-topology (.getTopology ^Nimbus$Client nimbus id)
+ spout-executor-summaries (filter (partial spout-summary? storm-topology) (.get_executors topology-info))
+ bolt-executor-summaries (filter (partial bolt-summary? storm-topology) (.get_executors topology-info))
+ spout-comp-id->executor-summaries (group-by-comp spout-executor-summaries)
+ bolt-comp-id->executor-summaries (group-by-comp bolt-executor-summaries)
+ bolt-comp-id->executor-summaries (filter-key (mk-include-sys-fn include-sys?) bolt-comp-id->executor-summaries)
+ id->spout-spec (.get_spouts storm-topology)
+ id->bolt (.get_bolts storm-topology)
+ visualizer-data (visualization-data (merge (hashmap-to-persistent id->spout-spec)
+ (hashmap-to-persistent id->bolt))
+ spout-comp-id->executor-summaries
+ bolt-comp-id->executor-summaries
window
id)]
+ {"visualizationTable" (stream-boxes visualizer-data)})))
+
+(defn- get-error-json
+ [topo-id error-info]
+ (let [host (get-error-host error-info)
+ port (get-error-port error-info)]
+ {"lastError" (get-error-data error-info)
+ "errorHost" host
+ "errorPort" port
+ "errorLapsedSecs" (get-error-time error-info)
+ "errorWorkerLogLink" (worker-log-link host port topo-id)}))
+
+(defn- common-agg-stats-json
+ "Returns a JSON representation of a common aggregated statistics."
+ [^CommonAggregateStats common-stats]
+ {"executors" (.get_num_executors common-stats)
+ "tasks" (.get_num_tasks common-stats)
+ "emitted" (.get_emitted common-stats)
+ "transferred" (.get_transferred common-stats)
+ "acked" (.get_acked common-stats)
+ "failed" (.get_failed common-stats)})
+
+(defmulti comp-agg-stats-json
+ "Returns a JSON representation of aggregated statistics."
+ (fn [_ [id ^ComponentAggregateStats s]] (.get_type s)))
+
+(defmethod comp-agg-stats-json ComponentType/SPOUT
+ [topo-id [id ^ComponentAggregateStats s]]
+ (let [^SpoutAggregateStats ss (.. s get_specific_stats get_spout)
+ cs (.get_common_stats s)]
+ (merge
+ (common-agg-stats-json cs)
+ (get-error-json topo-id (.get_last_error s))
+ {"spoutId" id
+ "encodedSpoutId" (url-encode id)
+ "completeLatency" (float-str (.get_complete_latency_ms ss))})))
+
+(defmethod comp-agg-stats-json ComponentType/BOLT
+ [topo-id [id ^ComponentAggregateStats s]]
+ (let [^BoltAggregateStats ss (.. s get_specific_stats get_bolt)
+ cs (.get_common_stats s)]
+ (merge
+ (common-agg-stats-json cs)
+ (get-error-json topo-id (.get_last_error s))
+ {"boltId" id
+ "encodedBoltId" (url-encode id)
+ "capacity" (float-str (.get_capacity ss))
+ "executeLatency" (float-str (.get_execute_latency_ms ss))
+ "executed" (.get_executed ss)
+ "processLatency" (float-str (.get_process_latency_ms ss))})))
+
+(defn- unpack-topology-page-info
+ "Unpacks the serialized object to data structures"
+ [^TopologyPageInfo topo-info window]
+ (let [id (.get_id topo-info)
+ ^TopologyStats topo-stats (.get_topology_stats topo-info)
+ stat->window->number
+ {:emitted (.get_window_to_emitted topo-stats)
+ :transferred (.get_window_to_transferred topo-stats)
+ :complete-latencies (.get_window_to_complete_latencies_ms topo-stats)
+ :acked (.get_window_to_acked topo-stats)
+ :failed (.get_window_to_failed topo-stats)}
+ topo-stats (topology-stats window stat->window->number)]
+ {"id" id
+ "encodedId" (url-encode id)
+ "owner" (.get_owner topo-info)
+ "name" (.get_name topo-info)
+ "status" (.get_status topo-info)
+ "uptime" (pretty-uptime-sec (.get_uptime_secs topo-info))
+ "tasksTotal" (.get_num_tasks topo-info)
+ "workersTotal" (.get_num_workers topo-info)
+ "executorsTotal" (.get_num_executors topo-info)
+ "schedulerInfo" (.get_sched_status topo-info)
+ "topologyStats" topo-stats
+ "spouts" (map (partial comp-agg-stats-json id)
+ (.get_id_to_spout_agg_stats topo-info))
+ "bolts" (map (partial comp-agg-stats-json id)
+ (.get_id_to_bolt_agg_stats topo-info))
+ "configuration" (.get_topology_conf topo-info)}))
+
+(defn topology-page [id window include-sys? user]
+ (with-nimbus nimbus
+ (let [window (or window ":all-time")
+ window-hint (window-hint window)
+ topo-page-info (.getTopologyPageInfo ^Nimbus$Client nimbus
+ id
+ window
+ include-sys?)
+ topology-conf (from-json (.get_topology_conf topo-page-info))
+ msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)]
(merge
- (topology-summary summ)
+ (unpack-topology-page-info topo-page-info window)
{"user" user
"window" window
"windowHint" window-hint
"msgTimeout" msg-timeout
- "topologyStats" (topology-stats id window (total-aggregate-stats spout-summs bolt-summs include-sys?))
- "spouts" (spout-comp id spout-comp-summs (.get_errors summ) window include-sys?)
- "bolts" (bolt-comp id bolt-comp-summs (.get_errors summ) window include-sys?)
"configuration" topology-conf
- "visualizationTable" (stream-boxes visualizer-data)}))))
-
-(defn spout-output-stats
- [stream-summary window]
- (let [stream-summary (map-val swap-map-order (swap-map-order stream-summary))]
- (for [[s stats] (stream-summary window)]
- {"stream" s
- "emitted" (nil-to-zero (:emitted stats))
- "transferred" (nil-to-zero (:transferred stats))
- "completeLatency" (float-str (:complete-latencies stats))
- "acked" (nil-to-zero (:acked stats))
- "failed" (nil-to-zero (:failed stats))})))
-
-(defn spout-executor-stats
- [topology-id executors window include-sys?]
- (for [^ExecutorSummary e executors
- :let [stats (.get_stats e)
- stats (if stats
- (-> stats
- (aggregate-spout-stats include-sys?)
- aggregate-spout-streams
- swap-map-order
- (get window)))]]
- {"id" (pretty-executor-info (.get_executor_info e))
- "encodedId" (url-encode (pretty-executor-info (.get_executor_info e)))
- "uptime" (pretty-uptime-sec (.get_uptime_secs e))
- "host" (.get_host e)
- "port" (.get_port e)
- "emitted" (nil-to-zero (:emitted stats))
- "transferred" (nil-to-zero (:transferred stats))
- "completeLatency" (float-str (:complete-latencies stats))
- "acked" (nil-to-zero (:acked stats))
- "failed" (nil-to-zero (:failed stats))
- "workerLogLink" (worker-log-link (.get_host e) (.get_port e) topology-id)}))
+ "visualizationTable" []}))))
(defn component-errors
[errors-list topology-id]
@@ -783,128 +504,180 @@
"errorLapsedSecs" (get-error-time e)
"error" (.get_error e)})}))
-(defn spout-stats
- [window ^TopologyInfo topology-info component executors include-sys?]
- (let [window-hint (str " (" (window-hint window) ")")
- stats (get-filled-stats executors)
- stream-summary (-> stats (aggregate-spout-stats include-sys?))
- summary (-> stream-summary aggregate-spout-streams)]
- {"spoutSummary" (spout-summary-json
- (.get_id topology-info) component summary window)
- "outputStats" (spout-output-stats stream-summary window)
- "executorStats" (spout-executor-stats (.get_id topology-info)
- executors window include-sys?)}))
-
-(defn bolt-summary
- [topology-id id stats window]
- (let [times (stats-times (:emitted stats))
- display-map (into {} (for [t times] [t pretty-uptime-sec]))
- display-map (assoc display-map ":all-time" (fn [_] "All time"))]
- (for [k (concat times [":all-time"])
- :let [disp ((display-map k) k)]]
- {"window" k
- "windowPretty" disp
- "emitted" (get-in stats [:emitted k])
- "transferred" (get-in stats [:transferred k])
- "executeLatency" (float-str (get-in stats [:execute-latencies k]))
- "executed" (get-in stats [:executed k])
- "processLatency" (float-str (get-in stats [:process-latencies k]))
- "acked" (get-in stats [:acked k])
- "failed" (get-in stats [:failed k])})))
-
-(defn bolt-output-stats
- [stream-summary window]
- (let [stream-summary (-> stream-summary
- swap-map-order
- (get window)
- (select-keys [:emitted :transferred])
- swap-map-order)]
- (for [[s stats] stream-summary]
- {"stream" s
- "emitted" (nil-to-zero (:emitted stats))
- "transferred" (nil-to-zero (:transferred stats))})))
-
-(defn bolt-input-stats
- [stream-summary window]
- (let [stream-summary
- (-> stream-summary
- swap-map-order
- (get window)
- (select-keys [:acked :failed :process-latencies
- :executed :execute-latencies])
- swap-map-order)]
- (for [[^GlobalStreamId s stats] stream-summary]
- {"component" (.get_componentId s)
- "encodedComponent" (url-encode (.get_componentId s))
- "stream" (.get_streamId s)
- "executeLatency" (float-str (:execute-latencies stats))
- "processLatency" (float-str (:process-latencies stats))
- "executed" (nil-to-zero (:executed stats))
- "acked" (nil-to-zero (:acked stats))
- "failed" (nil-to-zero (:failed stats))})))
-
-(defn bolt-executor-stats
- [topology-id executors window include-sys?]
- (for [^ExecutorSummary e executors
- :let [stats (.get_stats e)
- stats (if stats
- (-> stats
- (aggregate-bolt-stats include-sys?)
- (aggregate-bolt-streams)
- swap-map-order
- (get window)))]]
- {"id" (pretty-executor-info (.get_executor_info e))
- "encodedId" (url-encode (pretty-executor-info (.get_executor_info e)))
- "uptime" (pretty-uptime-sec (.get_uptime_secs e))
- "host" (.get_host e)
- "port" (.get_port e)
- "emitted" (nil-to-zero (:emitted stats))
- "transferred" (nil-to-zero (:transferred stats))
- "capacity" (float-str (nil-to-zero (compute-executor-capacity e)))
- "executeLatency" (float-str (:execute-latencies stats))
- "executed" (nil-to-zero (:executed stats))
- "processLatency" (float-str (:process-latencies stats))
- "acked" (nil-to-zero (:acked stats))
- "failed" (nil-to-zero (:failed stats))
- "workerLogLink" (worker-log-link (.get_host e) (.get_port e) topology-id)}))
-
-(defn bolt-stats
- [window ^TopologyInfo topology-info component executors include-sys?]
- (let [window-hint (str " (" (window-hint window) ")")
- stats (get-filled-stats executors)
- stream-summary (-> stats (aggregate-bolt-stats include-sys?))
- summary (-> stream-summary aggregate-bolt-streams)]
- {"boltStats" (bolt-summary (.get_id topology-info) component summary window)
- "inputStats" (bolt-input-stats stream-summary window)
- "outputStats" (bolt-output-stats stream-summary window)
- "executorStats" (bolt-executor-stats
- (.get_id topology-info) executors window include-sys?)}))
+(defmulti unpack-comp-agg-stat
+ (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
+
+(defmethod unpack-comp-agg-stat ComponentType/BOLT
+ [[window ^ComponentAggregateStats s]]
+ (let [^CommonAggregateStats comm-s (.get_common_stats s)
+ ^SpecificAggregateStats spec-s (.get_specific_stats s)
+ ^BoltAggregateStats bolt-s (.get_bolt spec-s)]
+ {"window" window
+ "windowPretty" (window-hint window)
+ "emitted" (.get_emitted comm-s)
+ "transferred" (.get_transferred comm-s)
+ "acked" (.get_acked comm-s)
+ "failed" (.get_failed comm-s)
+ "executeLatency" (float-str (.get_execute_latency_ms bolt-s))
+ "processLatency" (float-str (.get_process_latency_ms bolt-s))
+ "executed" (.get_executed bolt-s)
+ "capacity" (float-str (.get_capacity bolt-s))}))
+
+(defmethod unpack-comp-agg-stat ComponentType/SPOUT
+ [[window ^ComponentAggregateStats s]]
+ (let [^CommonAggregateStats comm-s (.get_common_stats s)
+ ^SpecificAggregateStats spec-s (.get_specific_stats s)
+ ^SpoutAggregateStats spout-s (.get_spout spec-s)]
+ {"window" window
+ "windowPretty" (window-hint window)
+ "emitted" (.get_emitted comm-s)
+ "transferred" (.get_transferred comm-s)
+ "acked" (.get_acked comm-s)
+ "failed" (.get_failed comm-s)
+ "completeLatency" (float-str (.get_complete_latency_ms spout-s))}))
+
+(defn- unpack-bolt-input-stat
+ [[^GlobalStreamId s ^ComponentAggregateStats stats]]
+ (let [^SpecificAggregateStats sas (.get_specific_stats stats)
+ ^BoltAggregateStats bas (.get_bolt sas)
+ ^CommonAggregateStats cas (.get_common_stats stats)
+ comp-id (.get_componentId s)]
+ {"component" comp-id
+ "encodedComponentId" (url-encode comp-id)
+ "stream" (.get_streamId s)
+ "executeLatency" (float-str (.get_execute_latency_ms bas))
+ "processLatency" (float-str (.get_process_latency_ms bas))
+ "executed" (nil-to-zero (.get_executed bas))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))}))
+
+(defmulti unpack-comp-output-stat
+ (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
+
+(defmethod unpack-comp-output-stat ComponentType/BOLT
+ [[stream-id ^ComponentAggregateStats stats]]
+ (let [^CommonAggregateStats cas (.get_common_stats stats)]
+ {"stream" stream-id
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))}))
+
+(defmethod unpack-comp-output-stat ComponentType/SPOUT
+ [[stream-id ^ComponentAggregateStats stats]]
+ (let [^CommonAggregateStats cas (.get_common_stats stats)
+ ^SpecificAggregateStats spec-s (.get_specific_stats stats)
+ ^SpoutAggregateStats spout-s (.get_spout spec-s)]
+ {"stream" stream-id
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))
+ "completeLatency" (float-str (.get_complete_latency_ms spout-s))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))}))
+
+(defmulti unpack-comp-exec-stat
+ (fn [_ ^ComponentAggregateStats cas] (.get_type (.get_stats ^ExecutorAggregateStats cas))))
+
+(defmethod unpack-comp-exec-stat ComponentType/BOLT
+ [topology-id ^ExecutorAggregateStats eas]
+ (let [^ExecutorSummary summ (.get_exec_summary eas)
+ ^ExecutorInfo info (.get_executor_info summ)
+ ^ComponentAggregateStats stats (.get_stats eas)
+ ^SpecificAggregateStats ss (.get_specific_stats stats)
+ ^BoltAggregateStats bas (.get_bolt ss)
+ ^CommonAggregateStats cas (.get_common_stats stats)
+ host (.get_host summ)
+ port (.get_port summ)
+ exec-id (pretty-executor-info info)]
+ {"id" exec-id
+ "encodedId" (url-encode exec-id)
+ "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
+ "host" host
+ "port" port
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))
+ "capacity" (float-str (nil-to-zero (.get_capacity bas)))
+ "executeLatency" (float-str (.get_execute_latency_ms bas))
+ "executed" (nil-to-zero (.get_executed bas))
+ "processLatency" (float-str (.get_process_latency_ms bas))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))
+ "workerLogLink" (worker-log-link host port topology-id)}))
+
+(defmethod unpack-comp-exec-stat ComponentType/SPOUT
+ [topology-id ^ExecutorAggregateStats eas]
+ (let [^ExecutorSummary summ (.get_exec_summary eas)
+ ^ExecutorInfo info (.get_executor_info summ)
+ ^ComponentAggregateStats stats (.get_stats eas)
+ ^SpecificAggregateStats ss (.get_specific_stats stats)
+ ^SpoutAggregateStats sas (.get_spout ss)
+ ^CommonAggregateStats cas (.get_common_stats stats)
+ host (.get_host summ)
+ port (.get_port summ)
+ exec-id (pretty-executor-info info)]
+ {"id" exec-id
+ "encodedId" (url-encode exec-id)
+ "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
+ "host" host
+ "port" port
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))
+ "completeLatency" (float-str (.get_complete_latency_ms sas))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))
+ "workerLogLink" (worker-log-link host port topology-id)}))
+
+(defmulti unpack-component-page-info
+ "Unpacks component-specific info to clojure data structures"
+ (fn [^ComponentPageInfo info & _]
+ (.get_component_type info)))
+
+(defmethod unpack-component-page-info ComponentType/BOLT
+ [^ComponentPageInfo info topology-id window include-sys?]
+ (merge
+ {"boltStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
+ "inputStats" (map unpack-bolt-input-stat (.get_gsid_to_input_stats info))
+ "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
+ "executorStats" (map (partial unpack-comp-exec-stat topology-id)
+ (.get_exec_stats info))}
+ (-> info .get_errors (component-errors topology-id))))
+
+(defmethod unpack-component-page-info ComponentType/SPOUT
+ [^ComponentPageInfo info topology-id window include-sys?]
+ (merge
+ {"spoutStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
+ "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
+ "executorStats" (map (partial unpack-comp-exec-stat topology-id)
+ (.get_exec_stats info))}
+ (-> info .get_errors (component-errors topology-id))))
(defn component-page
[topology-id component window include-sys? user]
(with-nimbus nimbus
- (let [window (if window window ":all-time")
- summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id)
- topology (.getTopology ^Nimbus$Client nimbus topology-id)
- topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id))
- type (component-type topology component)
- summs (component-task-summs summ topology component)
- spec (cond (= type :spout) (spout-stats window summ component summs include-sys?)
- (= type :bolt) (bolt-stats window summ component summs include-sys?))
- errors (component-errors (get (.get_errors summ) component) topology-id)]
- (merge
- {"user" user
- "id" component
- "encodedId" (url-encode component)
- "name" (.get_name summ)
- "executors" (count summs)
- "tasks" (sum-tasks summs)
- "topologyId" topology-id
- "encodedTopologyId" (url-encode topology-id)
- "window" window
- "componentType" (name type)
- "windowHint" (window-hint window)}
- spec errors))))
+ (let [window (or window ":all-time")
+ window-hint (window-hint window)
+ comp-page-info (.getComponentPageInfo ^Nimbus$Client nimbus
+ topology-id
+ component
+ window
+ include-sys?)
+ topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus
+ topology-id))
+ msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)]
+ (assoc
+ (unpack-component-page-info comp-page-info
+ topology-id
+ window
+ include-sys?)
+ "user" user
+ "id" component
+ "encodedId" (url-encode component)
+ "name" (.get_topology_name comp-page-info)
+ "executors" (.get_num_executors comp-page-info)
+ "tasks" (.get_num_tasks comp-page-info)
+ "topologyId" topology-id
+ "encodedTopologyId" (url-encode topology-id)
+ "window" window
+ "componentType" (-> comp-page-info .get_component_type str lower-case)
+ "windowHint" window-hint))))
(defn topology-config [topology-id]
(with-nimbus nimbus
@@ -955,6 +728,11 @@
(let [user (.getUserName http-creds-handler servlet-request)]
(assert-authorized-user servlet-request "getTopology" (topology-config id))
(json-response (topology-page id (:window m) (check-include-sys? (:sys m)) user) (:callback m))))
+ (GET "/api/v1/topology/:id/visualization-init" [:as {:keys [cookies servlet-request]} id & m]
+ (assert-authorized-user servlet-request "getTopology" (topology-config id))
+ (let [id (url-decode id)
+ user (.getUserName http-creds-handler servlet-request)]
+ (json-response (build-visualization id (:window m) (check-include-sys? (:sys m)) user) (:callback m))))
(GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies servlet-request]} id & m]
(assert-authorized-user servlet-request "getTopology" (topology-config id))
(json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys m))) (:callback m)))
http://git-wip-us.apache.org/repos/asf/storm/blob/a16b50c9/storm-core/src/clj/backtype/storm/ui/helpers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/helpers.clj b/storm-core/src/clj/backtype/storm/ui/helpers.clj
index 70ef179..a5ff810 100644
--- a/storm-core/src/clj/backtype/storm/ui/helpers.clj
+++ b/storm-core/src/clj/backtype/storm/ui/helpers.clj
@@ -96,40 +96,10 @@
)]
])
-(defn float-str [n]
- (if n
- (format "%.3f" (float n))
- "0"
- ))
-
-(defn swap-map-order [m]
- (->> m
- (map (fn [[k v]]
- (into
- {}
- (for [[k2 v2] v]
- [k2 {k v2}]
- ))
- ))
- (apply merge-with merge)
- ))
-
(defn url-format [fmt & args]
(String/format fmt
(to-array (map #(url-encode (str %)) args))))
-(defn to-tasks [^ExecutorInfo e]
- (let [start (.get_task_start e)
- end (.get_task_end e)]
- (range start (inc end))
- ))
-
-(defn sum-tasks [executors]
- (reduce + (->> executors
- (map #(.get_executor_info ^ExecutorSummary %))
- (map to-tasks)
- (map count))))
-
(defn pretty-executor-info [^ExecutorInfo e]
(str "[" (.get_task_start e) "-" (.get_task_end e) "]"))
http://git-wip-us.apache.org/repos/asf/storm/blob/a16b50c9/storm-core/src/jvm/backtype/storm/generated/Assignment.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/Assignment.java b/storm-core/src/jvm/backtype/storm/generated/Assignment.java
index 2e6e8a3..bb74a78 100644
--- a/storm-core/src/jvm/backtype/storm/generated/Assignment.java
+++ b/storm-core/src/jvm/backtype/storm/generated/Assignment.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-3-5")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-5-14")
public class Assignment implements org.apache.thrift.TBase<Assignment, Assignment._Fields>, java.io.Serializable, Cloneable, Comparable<Assignment> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Assignment");
@@ -678,15 +678,15 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
case 2: // NODE_HOST
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map370 = iprot.readMapBegin();
- struct.node_host = new HashMap<String,String>(2*_map370.size);
- String _key371;
- String _val372;
- for (int _i373 = 0; _i373 < _map370.size; ++_i373)
+ org.apache.thrift.protocol.TMap _map486 = iprot.readMapBegin();
+ struct.node_host = new HashMap<String,String>(2*_map486.size);
+ String _key487;
+ String _val488;
+ for (int _i489 = 0; _i489 < _map486.size; ++_i489)
{
- _key371 = iprot.readString();
- _val372 = iprot.readString();
- struct.node_host.put(_key371, _val372);
+ _key487 = iprot.readString();
+ _val488 = iprot.readString();
+ struct.node_host.put(_key487, _val488);
}
iprot.readMapEnd();
}
@@ -698,26 +698,26 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
case 3: // EXECUTOR_NODE_PORT
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map374 = iprot.readMapBegin();
- struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map374.size);
- List<Long> _key375;
- NodeInfo _val376;
- for (int _i377 = 0; _i377 < _map374.size; ++_i377)
+ org.apache.thrift.protocol.TMap _map490 = iprot.readMapBegin();
+ struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map490.size);
+ List<Long> _key491;
+ NodeInfo _val492;
+ for (int _i493 = 0; _i493 < _map490.size; ++_i493)
{
{
- org.apache.thrift.protocol.TList _list378 = iprot.readListBegin();
- _key375 = new ArrayList<Long>(_list378.size);
- long _elem379;
- for (int _i380 = 0; _i380 < _list378.size; ++_i380)
+ org.apache.thrift.protocol.TList _list494 = iprot.readListBegin();
+ _key491 = new ArrayList<Long>(_list494.size);
+ long _elem495;
+ for (int _i496 = 0; _i496 < _list494.size; ++_i496)
{
- _elem379 = iprot.readI64();
- _key375.add(_elem379);
+ _elem495 = iprot.readI64();
+ _key491.add(_elem495);
}
iprot.readListEnd();
}
- _val376 = new NodeInfo();
- _val376.read(iprot);
- struct.executor_node_port.put(_key375, _val376);
+ _val492 = new NodeInfo();
+ _val492.read(iprot);
+ struct.executor_node_port.put(_key491, _val492);
}
iprot.readMapEnd();
}
@@ -729,25 +729,25 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
case 4: // EXECUTOR_START_TIME_SECS
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map381 = iprot.readMapBegin();
- struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map381.size);
- List<Long> _key382;
- long _val383;
- for (int _i384 = 0; _i384 < _map381.size; ++_i384)
+ org.apache.thrift.protocol.TMap _map497 = iprot.readMapBegin();
+ struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map497.size);
+ List<Long> _key498;
+ long _val499;
+ for (int _i500 = 0; _i500 < _map497.size; ++_i500)
{
{
- org.apache.thrift.protocol.TList _list385 = iprot.readListBegin();
- _key382 = new ArrayList<Long>(_list385.size);
- long _elem386;
- for (int _i387 = 0; _i387 < _list385.size; ++_i387)
+ org.apache.thrift.protocol.TList _list501 = iprot.readListBegin();
+ _key498 = new ArrayList<Long>(_list501.size);
+ long _elem502;
+ for (int _i503 = 0; _i503 < _list501.size; ++_i503)
{
- _elem386 = iprot.readI64();
- _key382.add(_elem386);
+ _elem502 = iprot.readI64();
+ _key498.add(_elem502);
}
iprot.readListEnd();
}
- _val383 = iprot.readI64();
- struct.executor_start_time_secs.put(_key382, _val383);
+ _val499 = iprot.readI64();
+ struct.executor_start_time_secs.put(_key498, _val499);
}
iprot.readMapEnd();
}
@@ -779,10 +779,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
oprot.writeFieldBegin(NODE_HOST_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.node_host.size()));
- for (Map.Entry<String, String> _iter388 : struct.node_host.entrySet())
+ for (Map.Entry<String, String> _iter504 : struct.node_host.entrySet())
{
- oprot.writeString(_iter388.getKey());
- oprot.writeString(_iter388.getValue());
+ oprot.writeString(_iter504.getKey());
+ oprot.writeString(_iter504.getValue());
}
oprot.writeMapEnd();
}
@@ -794,17 +794,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
oprot.writeFieldBegin(EXECUTOR_NODE_PORT_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, struct.executor_node_port.size()));
- for (Map.Entry<List<Long>, NodeInfo> _iter389 : struct.executor_node_port.entrySet())
+ for (Map.Entry<List<Long>, NodeInfo> _iter505 : struct.executor_node_port.entrySet())
{
{
- oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter389.getKey().size()));
- for (long _iter390 : _iter389.getKey())
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter505.getKey().size()));
+ for (long _iter506 : _iter505.getKey())
{
- oprot.writeI64(_iter390);
+ oprot.writeI64(_iter506);
}
oprot.writeListEnd();
}
- _iter389.getValue().write(oprot);
+ _iter505.getValue().write(oprot);
}
oprot.writeMapEnd();
}
@@ -816,17 +816,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
oprot.writeFieldBegin(EXECUTOR_START_TIME_SECS_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, struct.executor_start_time_secs.size()));
- for (Map.Entry<List<Long>, Long> _iter391 : struct.executor_start_time_secs.entrySet())
+ for (Map.Entry<List<Long>, Long> _iter507 : struct.executor_start_time_secs.entrySet())
{
{
- oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter391.getKey().size()));
- for (long _iter392 : _iter391.getKey())
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter507.getKey().size()));
+ for (long _iter508 : _iter507.getKey())
{
- oprot.writeI64(_iter392);
+ oprot.writeI64(_iter508);
}
oprot.writeListEnd();
}
- oprot.writeI64(_iter391.getValue());
+ oprot.writeI64(_iter507.getValue());
}
oprot.writeMapEnd();
}
@@ -865,42 +865,42 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
if (struct.is_set_node_host()) {
{
oprot.writeI32(struct.node_host.size());
- for (Map.Entry<String, String> _iter393 : struct.node_host.entrySet())
+ for (Map.Entry<String, String> _iter509 : struct.node_host.entrySet())
{
- oprot.writeString(_iter393.getKey());
- oprot.writeString(_iter393.getValue());
+ oprot.writeString(_iter509.getKey());
+ oprot.writeString(_iter509.getValue());
}
}
}
if (struct.is_set_executor_node_port()) {
{
oprot.writeI32(struct.executor_node_port.size());
- for (Map.Entry<List<Long>, NodeInfo> _iter394 : struct.executor_node_port.entrySet())
+ for (Map.Entry<List<Long>, NodeInfo> _iter510 : struct.executor_node_port.entrySet())
{
{
- oprot.writeI32(_iter394.getKey().size());
- for (long _iter395 : _iter394.getKey())
+ oprot.writeI32(_iter510.getKey().size());
+ for (long _iter511 : _iter510.getKey())
{
- oprot.writeI64(_iter395);
+ oprot.writeI64(_iter511);
}
}
- _iter394.getValue().write(oprot);
+ _iter510.getValue().write(oprot);
}
}
}
if (struct.is_set_executor_start_time_secs()) {
{
oprot.writeI32(struct.executor_start_time_secs.size());
- for (Map.Entry<List<Long>, Long> _iter396 : struct.executor_start_time_secs.entrySet())
+ for (Map.Entry<List<Long>, Long> _iter512 : struct.executor_start_time_secs.entrySet())
{
{
- oprot.writeI32(_iter396.getKey().size());
- for (long _iter397 : _iter396.getKey())
+ oprot.writeI32(_iter512.getKey().size());
+ for (long _iter513 : _iter512.getKey())
{
- oprot.writeI64(_iter397);
+ oprot.writeI64(_iter513);
}
}
- oprot.writeI64(_iter396.getValue());
+ oprot.writeI64(_iter512.getValue());
}
}
}
@@ -914,64 +914,64 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
BitSet incoming = iprot.readBitSet(3);
if (incoming.get(0)) {
{
- org.apache.thrift.protocol.TMap _map398 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
- struct.node_host = new HashMap<String,String>(2*_map398.size);
- String _key399;
- String _val400;
- for (int _i401 = 0; _i401 < _map398.size; ++_i401)
+ org.apache.thrift.protocol.TMap _map514 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+ struct.node_host = new HashMap<String,String>(2*_map514.size);
+ String _key515;
+ String _val516;
+ for (int _i517 = 0; _i517 < _map514.size; ++_i517)
{
- _key399 = iprot.readString();
- _val400 = iprot.readString();
- struct.node_host.put(_key399, _val400);
+ _key515 = iprot.readString();
+ _val516 = iprot.readString();
+ struct.node_host.put(_key515, _val516);
}
}
struct.set_node_host_isSet(true);
}
if (incoming.get(1)) {
{
- org.apache.thrift.protocol.TMap _map402 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
- struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map402.size);
- List<Long> _key403;
- NodeInfo _val404;
- for (int _i405 = 0; _i405 < _map402.size; ++_i405)
+ org.apache.thrift.protocol.TMap _map518 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+ struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map518.size);
+ List<Long> _key519;
+ NodeInfo _val520;
+ for (int _i521 = 0; _i521 < _map518.size; ++_i521)
{
{
- org.apache.thrift.protocol.TList _list406 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
- _key403 = new ArrayList<Long>(_list406.size);
- long _elem407;
- for (int _i408 = 0; _i408 < _list406.size; ++_i408)
+ org.apache.thrift.protocol.TList _list522 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
+ _key519 = new ArrayList<Long>(_list522.size);
+ long _elem523;
+ for (int _i524 = 0; _i524 < _list522.size; ++_i524)
{
- _elem407 = iprot.readI64();
- _key403.add(_elem407);
+ _elem523 = iprot.readI64();
+ _key519.add(_elem523);
}
}
- _val404 = new NodeInfo();
- _val404.read(iprot);
- struct.executor_node_port.put(_key403, _val404);
+ _val520 = new NodeInfo();
+ _val520.read(iprot);
+ struct.executor_node_port.put(_key519, _val520);
}
}
struct.set_executor_node_port_isSet(true);
}
if (incoming.get(2)) {
{
- org.apache.thrift.protocol.TMap _map409 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32());
- struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map409.size);
- List<Long> _key410;
- long _val411;
- for (int _i412 = 0; _i412 < _map409.size; ++_i412)
+ org.apache.thrift.protocol.TMap _map525 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32());
+ struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map525.size);
+ List<Long> _key526;
+ long _val527;
+ for (int _i528 = 0; _i528 < _map525.size; ++_i528)
{
{
- org.apache.thrift.protocol.TList _list413 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
- _key410 = new ArrayList<Long>(_list413.size);
- long _elem414;
- for (int _i415 = 0; _i415 < _list413.size; ++_i415)
+ org.apache.thrift.protocol.TList _list529 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
+ _key526 = new ArrayList<Long>(_list529.size);
+ long _elem530;
+ for (int _i531 = 0; _i531 < _list529.size; ++_i531)
{
- _elem414 = iprot.readI64();
- _key410.add(_elem414);
+ _elem530 = iprot.readI64();
+ _key526.add(_elem530);
}
}
- _val411 = iprot.readI64();
- struct.executor_start_time_secs.put(_key410, _val411);
+ _val527 = iprot.readI64();
+ struct.executor_start_time_secs.put(_key526, _val527);
}
}
struct.set_executor_start_time_secs_isSet(true);
http://git-wip-us.apache.org/repos/asf/storm/blob/a16b50c9/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java b/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java
new file mode 100644
index 0000000..386ccfd
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java
@@ -0,0 +1,704 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package backtype.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-5-14")
+public class BoltAggregateStats implements org.apache.thrift.TBase<BoltAggregateStats, BoltAggregateStats._Fields>, java.io.Serializable, Cloneable, Comparable<BoltAggregateStats> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("BoltAggregateStats");
+
+ private static final org.apache.thrift.protocol.TField EXECUTE_LATENCY_MS_FIELD_DESC = new org.apache.thrift.protocol.TField("execute_latency_ms", org.apache.thrift.protocol.TType.DOUBLE, (short)513);
+ private static final org.apache.thrift.protocol.TField PROCESS_LATENCY_MS_FIELD_DESC = new org.apache.thrift.protocol.TField("process_latency_ms", org.apache.thrift.protocol.TType.DOUBLE, (short)514);
+ private static final org.apache.thrift.protocol.TField EXECUTED_FIELD_DESC = new org.apache.thrift.protocol.TField("executed", org.apache.thrift.protocol.TType.I64, (short)515);
+ private static final org.apache.thrift.protocol.TField CAPACITY_FIELD_DESC = new org.apache.thrift.protocol.TField("capacity", org.apache.thrift.protocol.TType.DOUBLE, (short)516);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new BoltAggregateStatsStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new BoltAggregateStatsTupleSchemeFactory());
+ }
+
+ private double execute_latency_ms; // optional
+ private double process_latency_ms; // optional
+ private long executed; // optional
+ private double capacity; // optional
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ EXECUTE_LATENCY_MS((short)513, "execute_latency_ms"),
+ PROCESS_LATENCY_MS((short)514, "process_latency_ms"),
+ EXECUTED((short)515, "executed"),
+ CAPACITY((short)516, "capacity");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 513: // EXECUTE_LATENCY_MS
+ return EXECUTE_LATENCY_MS;
+ case 514: // PROCESS_LATENCY_MS
+ return PROCESS_LATENCY_MS;
+ case 515: // EXECUTED
+ return EXECUTED;
+ case 516: // CAPACITY
+ return CAPACITY;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ private static final int __EXECUTE_LATENCY_MS_ISSET_ID = 0;
+ private static final int __PROCESS_LATENCY_MS_ISSET_ID = 1;
+ private static final int __EXECUTED_ISSET_ID = 2;
+ private static final int __CAPACITY_ISSET_ID = 3;
+ private byte __isset_bitfield = 0;
+ private static final _Fields optionals[] = {_Fields.EXECUTE_LATENCY_MS,_Fields.PROCESS_LATENCY_MS,_Fields.EXECUTED,_Fields.CAPACITY};
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.EXECUTE_LATENCY_MS, new org.apache.thrift.meta_data.FieldMetaData("execute_latency_ms", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
+ tmpMap.put(_Fields.PROCESS_LATENCY_MS, new org.apache.thrift.meta_data.FieldMetaData("process_latency_ms", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
+ tmpMap.put(_Fields.EXECUTED, new org.apache.thrift.meta_data.FieldMetaData("executed", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ tmpMap.put(_Fields.CAPACITY, new org.apache.thrift.meta_data.FieldMetaData("capacity", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(BoltAggregateStats.class, metaDataMap);
+ }
+
+ public BoltAggregateStats() {
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public BoltAggregateStats(BoltAggregateStats other) {
+ __isset_bitfield = other.__isset_bitfield;
+ this.execute_latency_ms = other.execute_latency_ms;
+ this.process_latency_ms = other.process_latency_ms;
+ this.executed = other.executed;
+ this.capacity = other.capacity;
+ }
+
+ public BoltAggregateStats deepCopy() {
+ return new BoltAggregateStats(this);
+ }
+
+ @Override
+ public void clear() {
+ set_execute_latency_ms_isSet(false);
+ this.execute_latency_ms = 0.0;
+ set_process_latency_ms_isSet(false);
+ this.process_latency_ms = 0.0;
+ set_executed_isSet(false);
+ this.executed = 0;
+ set_capacity_isSet(false);
+ this.capacity = 0.0;
+ }
+
+ public double get_execute_latency_ms() {
+ return this.execute_latency_ms;
+ }
+
+ public void set_execute_latency_ms(double execute_latency_ms) {
+ this.execute_latency_ms = execute_latency_ms;
+ set_execute_latency_ms_isSet(true);
+ }
+
+ public void unset_execute_latency_ms() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __EXECUTE_LATENCY_MS_ISSET_ID);
+ }
+
+ /** Returns true if field execute_latency_ms is set (has been assigned a value) and false otherwise */
+ public boolean is_set_execute_latency_ms() {
+ return EncodingUtils.testBit(__isset_bitfield, __EXECUTE_LATENCY_MS_ISSET_ID);
+ }
+
+ public void set_execute_latency_ms_isSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __EXECUTE_LATENCY_MS_ISSET_ID, value);
+ }
+
+ public double get_process_latency_ms() {
+ return this.process_latency_ms;
+ }
+
+ public void set_process_latency_ms(double process_latency_ms) {
+ this.process_latency_ms = process_latency_ms;
+ set_process_latency_ms_isSet(true);
+ }
+
+ public void unset_process_latency_ms() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __PROCESS_LATENCY_MS_ISSET_ID);
+ }
+
+ /** Returns true if field process_latency_ms is set (has been assigned a value) and false otherwise */
+ public boolean is_set_process_latency_ms() {
+ return EncodingUtils.testBit(__isset_bitfield, __PROCESS_LATENCY_MS_ISSET_ID);
+ }
+
+ public void set_process_latency_ms_isSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PROCESS_LATENCY_MS_ISSET_ID, value);
+ }
+
+ public long get_executed() {
+ return this.executed;
+ }
+
+ public void set_executed(long executed) {
+ this.executed = executed;
+ set_executed_isSet(true);
+ }
+
+ public void unset_executed() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __EXECUTED_ISSET_ID);
+ }
+
+ /** Returns true if field executed is set (has been assigned a value) and false otherwise */
+ public boolean is_set_executed() {
+ return EncodingUtils.testBit(__isset_bitfield, __EXECUTED_ISSET_ID);
+ }
+
+ public void set_executed_isSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __EXECUTED_ISSET_ID, value);
+ }
+
+ public double get_capacity() {
+ return this.capacity;
+ }
+
+ public void set_capacity(double capacity) {
+ this.capacity = capacity;
+ set_capacity_isSet(true);
+ }
+
+ public void unset_capacity() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __CAPACITY_ISSET_ID);
+ }
+
+ /** Returns true if field capacity is set (has been assigned a value) and false otherwise */
+ public boolean is_set_capacity() {
+ return EncodingUtils.testBit(__isset_bitfield, __CAPACITY_ISSET_ID);
+ }
+
+ public void set_capacity_isSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __CAPACITY_ISSET_ID, value);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case EXECUTE_LATENCY_MS:
+ if (value == null) {
+ unset_execute_latency_ms();
+ } else {
+ set_execute_latency_ms((Double)value);
+ }
+ break;
+
+ case PROCESS_LATENCY_MS:
+ if (value == null) {
+ unset_process_latency_ms();
+ } else {
+ set_process_latency_ms((Double)value);
+ }
+ break;
+
+ case EXECUTED:
+ if (value == null) {
+ unset_executed();
+ } else {
+ set_executed((Long)value);
+ }
+ break;
+
+ case CAPACITY:
+ if (value == null) {
+ unset_capacity();
+ } else {
+ set_capacity((Double)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case EXECUTE_LATENCY_MS:
+ return Double.valueOf(get_execute_latency_ms());
+
+ case PROCESS_LATENCY_MS:
+ return Double.valueOf(get_process_latency_ms());
+
+ case EXECUTED:
+ return Long.valueOf(get_executed());
+
+ case CAPACITY:
+ return Double.valueOf(get_capacity());
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case EXECUTE_LATENCY_MS:
+ return is_set_execute_latency_ms();
+ case PROCESS_LATENCY_MS:
+ return is_set_process_latency_ms();
+ case EXECUTED:
+ return is_set_executed();
+ case CAPACITY:
+ return is_set_capacity();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof BoltAggregateStats)
+ return this.equals((BoltAggregateStats)that);
+ return false;
+ }
+
+ public boolean equals(BoltAggregateStats that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_execute_latency_ms = true && this.is_set_execute_latency_ms();
+ boolean that_present_execute_latency_ms = true && that.is_set_execute_latency_ms();
+ if (this_present_execute_latency_ms || that_present_execute_latency_ms) {
+ if (!(this_present_execute_latency_ms && that_present_execute_latency_ms))
+ return false;
+ if (this.execute_latency_ms != that.execute_latency_ms)
+ return false;
+ }
+
+ boolean this_present_process_latency_ms = true && this.is_set_process_latency_ms();
+ boolean that_present_process_latency_ms = true && that.is_set_process_latency_ms();
+ if (this_present_process_latency_ms || that_present_process_latency_ms) {
+ if (!(this_present_process_latency_ms && that_present_process_latency_ms))
+ return false;
+ if (this.process_latency_ms != that.process_latency_ms)
+ return false;
+ }
+
+ boolean this_present_executed = true && this.is_set_executed();
+ boolean that_present_executed = true && that.is_set_executed();
+ if (this_present_executed || that_present_executed) {
+ if (!(this_present_executed && that_present_executed))
+ return false;
+ if (this.executed != that.executed)
+ return false;
+ }
+
+ boolean this_present_capacity = true && this.is_set_capacity();
+ boolean that_present_capacity = true && that.is_set_capacity();
+ if (this_present_capacity || that_present_capacity) {
+ if (!(this_present_capacity && that_present_capacity))
+ return false;
+ if (this.capacity != that.capacity)
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_execute_latency_ms = true && (is_set_execute_latency_ms());
+ list.add(present_execute_latency_ms);
+ if (present_execute_latency_ms)
+ list.add(execute_latency_ms);
+
+ boolean present_process_latency_ms = true && (is_set_process_latency_ms());
+ list.add(present_process_latency_ms);
+ if (present_process_latency_ms)
+ list.add(process_latency_ms);
+
+ boolean present_executed = true && (is_set_executed());
+ list.add(present_executed);
+ if (present_executed)
+ list.add(executed);
+
+ boolean present_capacity = true && (is_set_capacity());
+ list.add(present_capacity);
+ if (present_capacity)
+ list.add(capacity);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(BoltAggregateStats other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_execute_latency_ms()).compareTo(other.is_set_execute_latency_ms());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_execute_latency_ms()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.execute_latency_ms, other.execute_latency_ms);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_process_latency_ms()).compareTo(other.is_set_process_latency_ms());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_process_latency_ms()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.process_latency_ms, other.process_latency_ms);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_executed()).compareTo(other.is_set_executed());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_executed()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.executed, other.executed);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_capacity()).compareTo(other.is_set_capacity());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_capacity()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.capacity, other.capacity);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("BoltAggregateStats(");
+ boolean first = true;
+
+ if (is_set_execute_latency_ms()) {
+ sb.append("execute_latency_ms:");
+ sb.append(this.execute_latency_ms);
+ first = false;
+ }
+ if (is_set_process_latency_ms()) {
+ if (!first) sb.append(", ");
+ sb.append("process_latency_ms:");
+ sb.append(this.process_latency_ms);
+ first = false;
+ }
+ if (is_set_executed()) {
+ if (!first) sb.append(", ");
+ sb.append("executed:");
+ sb.append(this.executed);
+ first = false;
+ }
+ if (is_set_capacity()) {
+ if (!first) sb.append(", ");
+ sb.append("capacity:");
+ sb.append(this.capacity);
+ first = false;
+ }
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bitfield = 0;
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class BoltAggregateStatsStandardSchemeFactory implements SchemeFactory {
+ public BoltAggregateStatsStandardScheme getScheme() {
+ return new BoltAggregateStatsStandardScheme();
+ }
+ }
+
+ private static class BoltAggregateStatsStandardScheme extends StandardScheme<BoltAggregateStats> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, BoltAggregateStats struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 513: // EXECUTE_LATENCY_MS
+ if (schemeField.type == org.apache.thrift.protocol.TType.DOUBLE) {
+ struct.execute_latency_ms = iprot.readDouble();
+ struct.set_execute_latency_ms_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 514: // PROCESS_LATENCY_MS
+ if (schemeField.type == org.apache.thrift.protocol.TType.DOUBLE) {
+ struct.process_latency_ms = iprot.readDouble();
+ struct.set_process_latency_ms_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 515: // EXECUTED
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.executed = iprot.readI64();
+ struct.set_executed_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 516: // CAPACITY
+ if (schemeField.type == org.apache.thrift.protocol.TType.DOUBLE) {
+ struct.capacity = iprot.readDouble();
+ struct.set_capacity_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, BoltAggregateStats struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.is_set_execute_latency_ms()) {
+ oprot.writeFieldBegin(EXECUTE_LATENCY_MS_FIELD_DESC);
+ oprot.writeDouble(struct.execute_latency_ms);
+ oprot.writeFieldEnd();
+ }
+ if (struct.is_set_process_latency_ms()) {
+ oprot.writeFieldBegin(PROCESS_LATENCY_MS_FIELD_DESC);
+ oprot.writeDouble(struct.process_latency_ms);
+ oprot.writeFieldEnd();
+ }
+ if (struct.is_set_executed()) {
+ oprot.writeFieldBegin(EXECUTED_FIELD_DESC);
+ oprot.writeI64(struct.executed);
+ oprot.writeFieldEnd();
+ }
+ if (struct.is_set_capacity()) {
+ oprot.writeFieldBegin(CAPACITY_FIELD_DESC);
+ oprot.writeDouble(struct.capacity);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class BoltAggregateStatsTupleSchemeFactory implements SchemeFactory {
+ public BoltAggregateStatsTupleScheme getScheme() {
+ return new BoltAggregateStatsTupleScheme();
+ }
+ }
+
+ private static class BoltAggregateStatsTupleScheme extends TupleScheme<BoltAggregateStats> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, BoltAggregateStats struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.is_set_execute_latency_ms()) {
+ optionals.set(0);
+ }
+ if (struct.is_set_process_latency_ms()) {
+ optionals.set(1);
+ }
+ if (struct.is_set_executed()) {
+ optionals.set(2);
+ }
+ if (struct.is_set_capacity()) {
+ optionals.set(3);
+ }
+ oprot.writeBitSet(optionals, 4);
+ if (struct.is_set_execute_latency_ms()) {
+ oprot.writeDouble(struct.execute_latency_ms);
+ }
+ if (struct.is_set_process_latency_ms()) {
+ oprot.writeDouble(struct.process_latency_ms);
+ }
+ if (struct.is_set_executed()) {
+ oprot.writeI64(struct.executed);
+ }
+ if (struct.is_set_capacity()) {
+ oprot.writeDouble(struct.capacity);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, BoltAggregateStats struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(4);
+ if (incoming.get(0)) {
+ struct.execute_latency_ms = iprot.readDouble();
+ struct.set_execute_latency_ms_isSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.process_latency_ms = iprot.readDouble();
+ struct.set_process_latency_ms_isSet(true);
+ }
+ if (incoming.get(2)) {
+ struct.executed = iprot.readI64();
+ struct.set_executed_isSet(true);
+ }
+ if (incoming.get(3)) {
+ struct.capacity = iprot.readDouble();
+ struct.set_capacity_isSet(true);
+ }
+ }
+ }
+
+}
+