You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/06/13 05:01:36 UTC
[2/5] storm git commit: STORM-1723 Introduce ClusterMetricsConsumer
http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 0786824..2c78ee9 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -52,12 +52,16 @@
(:require [org.apache.storm [cluster :as cluster]
[converter :as converter]
[stats :as stats]])
+ (:require [org.apache.storm.ui.core :as ui])
(:require [clojure.set :as set])
(:import [org.apache.storm.daemon.common StormBase Assignment])
(:use [org.apache.storm.daemon common])
(:use [org.apache.storm config])
(:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
- (:import [org.apache.storm.utils VersionInfo])
+ (:import [org.apache.storm.utils VersionInfo]
+ (org.apache.storm.metric ClusterMetricsConsumerExecutor)
+ (org.apache.storm.metric.api IClusterMetricsConsumer$ClusterInfo DataPoint IClusterMetricsConsumer$SupervisorInfo)
+ (org.apache.storm Config))
(:require [clj-time.core :as time])
(:require [clj-time.coerce :as coerce])
(:require [metrics.meters :refer [defmeter mark!]])
@@ -164,6 +168,13 @@
(catch Exception e
(log-warn-error e "Ingoring exception, Could not initialize " (conf NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN)))))))
+(defn mk-cluster-metrics-consumer-executors [storm-conf]
+ (map
+ (fn [consumer]
+ (ClusterMetricsConsumerExecutor. (get consumer "class")
+ (get consumer "argument")))
+ (get storm-conf STORM-CLUSTER-METRICS-CONSUMER-REGISTER)))
+
(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
@@ -203,6 +214,7 @@
:topo-history-state (nimbus-topo-history-state conf)
:nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf)
:nimbus-topology-action-notifier (create-tology-action-notifier conf)
+ :cluster-consumer-executors (mk-cluster-metrics-consumer-executors conf)
}))
(defn inbox [nimbus]
@@ -1355,12 +1367,43 @@
(defmethod blob-sync :local [conf nimbus]
nil)
-(defserverfn service-handler [conf inimbus]
- (.prepare inimbus conf (master-inimbus-dir conf))
- (log-message "Starting Nimbus with conf " conf)
- (let [nimbus (nimbus-data conf inimbus)
- blob-store (:blob-store nimbus)
- principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
+(defn extract-cluster-metrics [^ClusterSummary summ]
+ (let [cluster-summ (ui/cluster-summary summ "nimbus")]
+ {:cluster-info (IClusterMetricsConsumer$ClusterInfo. (System/currentTimeMillis))
+ :data-points (map
+ (fn [[k v]] (DataPoint. k v))
+ (select-keys cluster-summ ["supervisors" "topologies" "slotsTotal" "slotsUsed" "slotsFree"
+ "executorsTotal" "tasksTotal"]))}))
+
+(defn extract-supervisors-metrics [^ClusterSummary summ]
+ (let [sups (.get_supervisors summ)
+ supervisors-summ ((ui/supervisor-summary sups) "supervisors")]
+ (map (fn [supervisor-summ]
+ {:supervisor-info (IClusterMetricsConsumer$SupervisorInfo.
+ (supervisor-summ "host")
+ (supervisor-summ "id")
+ (System/currentTimeMillis))
+ :data-points (map
+ (fn [[k v]] (DataPoint. k v))
+ (select-keys supervisor-summ ["slotsTotal" "slotsUsed" "totalMem" "totalCpu"
+ "usedMem" "usedCpu"]))})
+ supervisors-summ)))
+
+(defn send-cluster-metrics-to-executors [nimbus-service nimbus]
+ (let [cluster-summary (.getClusterInfo nimbus-service)
+ cluster-metrics (extract-cluster-metrics cluster-summary)
+ supervisors-metrics (extract-supervisors-metrics cluster-summary)]
+ (dofor
+ [consumer-executor (:cluster-consumer-executors nimbus)]
+ (do
+ (.handleDataPoints consumer-executor (:cluster-info cluster-metrics) (:data-points cluster-metrics))
+ (dofor
+ [supervisor-metrics supervisors-metrics]
+ (do
+ (.handleDataPoints consumer-executor (:supervisor-info supervisor-metrics) (:data-points supervisor-metrics))))))))
+
+(defn mk-reified-nimbus [nimbus conf blob-store]
+ (let [principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
admin-users (or (.get conf NIMBUS-ADMINS) [])
get-common-topo-info
(fn [^String storm-id operation]
@@ -1397,6 +1440,746 @@
(doto (ErrorInfo. (:error e) (:time-secs e))
(.set_host (:host e))
(.set_port (:port e)))))]
+ (reify Nimbus$Iface
+ (^void submitTopologyWithOpts
+ [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
+ ^SubmitOptions submitOptions]
+ (try
+ (mark! nimbus:num-submitTopologyWithOpts-calls)
+ (is-leader nimbus)
+ (assert (not-nil? submitOptions))
+ (validate-topology-name! storm-name)
+ (check-authorization! nimbus storm-name nil "submitTopology")
+ (check-storm-active! nimbus storm-name false)
+ (let [topo-conf (from-json serializedConf)]
+ (try
+ (validate-configs-with-schemas topo-conf)
+ (catch IllegalArgumentException ex
+ (throw (InvalidTopologyException. (.getMessage ex)))))
+ (.validate ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus)
+ storm-name
+ topo-conf
+ topology))
+ (swap! (:submitted-count nimbus) inc)
+ (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))
+ credentials (.get_creds submitOptions)
+ credentials (when credentials (.get_creds credentials))
+ topo-conf (from-json serializedConf)
+ storm-conf-submitted (normalize-conf
+ conf
+ (-> topo-conf
+ (assoc STORM-ID storm-id)
+ (assoc TOPOLOGY-NAME storm-name))
+ topology)
+ req (ReqContext/context)
+ principal (.principal req)
+ submitter-principal (if principal (.toString principal))
+ submitter-user (.toLocal principal-to-local principal)
+ system-user (System/getProperty "user.name")
+ topo-acl (distinct (remove nil? (conj (.get storm-conf-submitted TOPOLOGY-USERS) submitter-principal, submitter-user)))
+ storm-conf (-> storm-conf-submitted
+ (assoc TOPOLOGY-SUBMITTER-PRINCIPAL (if submitter-principal submitter-principal ""))
+ (assoc TOPOLOGY-SUBMITTER-USER (if submitter-user submitter-user system-user)) ;Don't let the user set who we launch as
+ (assoc TOPOLOGY-USERS topo-acl)
+ (assoc STORM-ZOOKEEPER-SUPERACL (.get conf STORM-ZOOKEEPER-SUPERACL)))
+ storm-conf (if (Utils/isZkAuthenticationConfiguredStormServer conf)
+ storm-conf
+ (dissoc storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
+ total-storm-conf (merge conf storm-conf)
+ topology (normalize-topology total-storm-conf topology)
+ storm-cluster-state (:storm-cluster-state nimbus)]
+ (when credentials (doseq [nimbus-autocred-plugin (:nimbus-autocred-plugins nimbus)]
+ (.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf))))
+ (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user))))
+ (throw (AuthorizationException. "Could not determine the user to run this topology as.")))
+ (system-topology! total-storm-conf topology) ;; this validates the structure of the topology
+ (validate-topology-size topo-conf conf topology)
+ (when (and (Utils/isZkAuthenticationConfiguredStormServer conf)
+ (not (Utils/isZkAuthenticationConfiguredTopology storm-conf)))
+ (throw (IllegalArgumentException. "The cluster is configured for zookeeper authentication, but no payload was provided.")))
+ (log-message "Received topology submission for "
+ storm-name
+ " with conf "
+ (redact-value storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
+ ;; lock protects against multiple topologies being submitted at once and
+ ;; cleanup thread killing topology in b/w assignment and starting the topology
+ (locking (:submit-lock nimbus)
+ (check-storm-active! nimbus storm-name false)
+ ;;cred-update-lock is not needed here because creds are being added for the first time.
+ (.set-credentials! storm-cluster-state storm-id credentials storm-conf)
+ (log-message "uploadedJar " uploadedJarLocation)
+ (setup-storm-code nimbus conf storm-id uploadedJarLocation total-storm-conf topology)
+ (wait-for-desired-code-replication nimbus total-storm-conf storm-id)
+ (.setup-heartbeats! storm-cluster-state storm-id)
+ (if (total-storm-conf TOPOLOGY-BACKPRESSURE-ENABLE)
+ (.setup-backpressure! storm-cluster-state storm-id))
+ (notify-topology-action-listener nimbus storm-name "submitTopology")
+ (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
+ TopologyInitialStatus/ACTIVE :active}]
+ (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))))))
+ (catch Throwable e
+ (log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
+ (throw e))))
+
+ (^void submitTopology
+ [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
+ (mark! nimbus:num-submitTopology-calls)
+ (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
+ (SubmitOptions. TopologyInitialStatus/ACTIVE)))
+
+ (^void killTopology [this ^String name]
+ (mark! nimbus:num-killTopology-calls)
+ (.killTopologyWithOpts this name (KillOptions.)))
+
+ (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
+ (mark! nimbus:num-killTopologyWithOpts-calls)
+ (check-storm-active! nimbus storm-name true)
+ (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
+ storm-id (topology-conf STORM-ID)
+ operation "killTopology"]
+ (check-authorization! nimbus storm-name topology-conf operation)
+ (let [wait-amt (if (.is_set_wait_secs options)
+ (.get_wait_secs options)
+ )]
+ (transition-name! nimbus storm-name [:kill wait-amt] true)
+ (notify-topology-action-listener nimbus storm-name operation))
+ (add-topology-to-history-log (get-storm-id (:storm-cluster-state nimbus) storm-name)
+ nimbus topology-conf)))
+
+ (^void rebalance [this ^String storm-name ^RebalanceOptions options]
+ (mark! nimbus:num-rebalance-calls)
+ (check-storm-active! nimbus storm-name true)
+ (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
+ operation "rebalance"]
+ (check-authorization! nimbus storm-name topology-conf operation)
+ (let [wait-amt (if (.is_set_wait_secs options)
+ (.get_wait_secs options))
+ num-workers (if (.is_set_num_workers options)
+ (.get_num_workers options))
+ executor-overrides (if (.is_set_num_executors options)
+ (.get_num_executors options)
+ {})]
+ (doseq [[c num-executors] executor-overrides]
+ (when (<= num-executors 0)
+ (throw (InvalidTopologyException. "Number of executors must be greater than 0"))
+ ))
+ (transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrides] true)
+
+ (notify-topology-action-listener nimbus storm-name operation))))
+
+ (activate [this storm-name]
+ (mark! nimbus:num-activate-calls)
+ (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
+ operation "activate"]
+ (check-authorization! nimbus storm-name topology-conf operation)
+ (transition-name! nimbus storm-name :activate true)
+ (notify-topology-action-listener nimbus storm-name operation)))
+
+ (deactivate [this storm-name]
+ (mark! nimbus:num-deactivate-calls)
+ (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
+ operation "deactivate"]
+ (check-authorization! nimbus storm-name topology-conf operation)
+ (transition-name! nimbus storm-name :inactivate true)
+ (notify-topology-action-listener nimbus storm-name operation)))
+
+ (debug [this storm-name component-id enable? samplingPct]
+ (mark! nimbus:num-debug-calls)
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ storm-id (get-storm-id storm-cluster-state storm-name)
+ topology-conf (try-read-storm-conf conf storm-id blob-store)
+ ;; make sure samplingPct is within bounds.
+ spct (Math/max (Math/min samplingPct 100.0) 0.0)
+ ;; while disabling we retain the sampling pct.
+ debug-options (if enable? {:enable enable? :samplingpct spct} {:enable enable?})
+ storm-base-updates (assoc {} :component->debug (if (empty? component-id)
+ {storm-id debug-options}
+ {component-id debug-options}))]
+ (check-authorization! nimbus storm-name topology-conf "debug")
+ (when-not storm-id
+ (throw (NotAliveException. storm-name)))
+ (log-message "Nimbus setting debug to " enable? " for storm-name '" storm-name "' storm-id '" storm-id "' sampling pct '" spct "'"
+ (if (not (clojure.string/blank? component-id)) (str " component-id '" component-id "'")))
+ (locking (:submit-lock nimbus)
+ (.update-storm! storm-cluster-state storm-id storm-base-updates))))
+
+ (^void setWorkerProfiler
+ [this ^String id ^ProfileRequest profileRequest]
+ (mark! nimbus:num-setWorkerProfiler-calls)
+ (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
+ storm-name (topology-conf TOPOLOGY-NAME)
+ _ (check-authorization! nimbus storm-name topology-conf "setWorkerProfiler")
+ storm-cluster-state (:storm-cluster-state nimbus)]
+ (.set-worker-profile-request storm-cluster-state id profileRequest)))
+
+ (^List getComponentPendingProfileActions
+ [this ^String id ^String component_id ^ProfileAction action]
+ (mark! nimbus:num-getComponentPendingProfileActions-calls)
+ (let [info (get-common-topo-info id "getComponentPendingProfileActions")
+ storm-cluster-state (:storm-cluster-state info)
+ task->component (:task->component info)
+ {:keys [executor->node+port node->host]} (:assignment info)
+ executor->host+port (map-val (fn [[node port]]
+ [(node->host node) port])
+ executor->node+port)
+ nodeinfos (stats/extract-nodeinfos-from-hb-for-comp executor->host+port task->component false component_id)
+ all-pending-actions-for-topology (.get-topology-profile-requests storm-cluster-state id true)
+ latest-profile-actions (remove nil? (map (fn [nodeInfo]
+ (->> all-pending-actions-for-topology
+ (filter #(and (= (:host nodeInfo) (.get_node (.get_nodeInfo %)))
+ (= (:port nodeInfo) (first (.get_port (.get_nodeInfo %))))))
+ (filter #(= action (.get_action %)))
+ (sort-by #(.get_time_stamp %) >)
+ first))
+ nodeinfos))]
+ (log-message "Latest profile actions for topology " id " component " component_id " " (pr-str latest-profile-actions))
+ latest-profile-actions))
+
+ (^void setLogConfig [this ^String id ^LogConfig log-config-msg]
+ (mark! nimbus:num-setLogConfig-calls)
+ (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
+ storm-name (topology-conf TOPOLOGY-NAME)
+ _ (check-authorization! nimbus storm-name topology-conf "setLogConfig")
+ storm-cluster-state (:storm-cluster-state nimbus)
+ merged-log-config (or (.topology-log-config storm-cluster-state id nil) (LogConfig.))
+ named-loggers (.get_named_logger_level merged-log-config)]
+ (doseq [[_ level] named-loggers]
+ (.set_action level LogLevelAction/UNCHANGED))
+ (doseq [[logger-name log-config] (.get_named_logger_level log-config-msg)]
+ (let [action (.get_action log-config)]
+ (if (clojure.string/blank? logger-name)
+ (throw (RuntimeException. "Named loggers need a valid name. Use ROOT for the root logger")))
+ (condp = action
+ LogLevelAction/UPDATE
+ (do (set-logger-timeouts log-config)
+ (.put_to_named_logger_level merged-log-config logger-name log-config))
+ LogLevelAction/REMOVE
+ (let [named-loggers (.get_named_logger_level merged-log-config)]
+ (if (and (not (nil? named-loggers))
+ (.containsKey named-loggers logger-name))
+ (.remove named-loggers logger-name))))))
+ (log-message "Setting log config for " storm-name ":" merged-log-config)
+ (.set-topology-log-config! storm-cluster-state id merged-log-config)))
+
+ (uploadNewCredentials [this storm-name credentials]
+ (mark! nimbus:num-uploadNewCredentials-calls)
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ storm-id (get-storm-id storm-cluster-state storm-name)
+ topology-conf (try-read-storm-conf conf storm-id blob-store)
+ creds (when credentials (.get_creds credentials))]
+ (check-authorization! nimbus storm-name topology-conf "uploadNewCredentials")
+ (locking (:cred-update-lock nimbus) (.set-credentials! storm-cluster-state storm-id creds topology-conf))))
+
+ (beginFileUpload [this]
+ (mark! nimbus:num-beginFileUpload-calls)
+ (check-authorization! nimbus nil nil "fileUpload")
+ (let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")]
+ (.put (:uploaders nimbus)
+ fileloc
+ (Channels/newChannel (FileOutputStream. fileloc)))
+ (log-message "Uploading file from client to " fileloc)
+ fileloc
+ ))
+
+ (^void uploadChunk [this ^String location ^ByteBuffer chunk]
+ (mark! nimbus:num-uploadChunk-calls)
+ (check-authorization! nimbus nil nil "fileUpload")
+ (let [uploaders (:uploaders nimbus)
+ ^WritableByteChannel channel (.get uploaders location)]
+ (when-not channel
+ (throw (RuntimeException.
+ "File for that location does not exist (or timed out)")))
+ (.write channel chunk)
+ (.put uploaders location channel)
+ ))
+
+ (^void finishFileUpload [this ^String location]
+ (mark! nimbus:num-finishFileUpload-calls)
+ (check-authorization! nimbus nil nil "fileUpload")
+ (let [uploaders (:uploaders nimbus)
+ ^WritableByteChannel channel (.get uploaders location)]
+ (when-not channel
+ (throw (RuntimeException.
+ "File for that location does not exist (or timed out)")))
+ (.close channel)
+ (log-message "Finished uploading file from client: " location)
+ (.remove uploaders location)
+ ))
+
+ (^String beginFileDownload
+ [this ^String file]
+ (mark! nimbus:num-beginFileDownload-calls)
+ (check-authorization! nimbus nil nil "fileDownload")
+ (let [is (BufferInputStream. (.getBlob (:blob-store nimbus) file nil)
+ ^Integer (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES)
+ (int 65536)))
+ id (uuid)]
+ (.put (:downloaders nimbus) id is)
+ id))
+
+ (^ByteBuffer downloadChunk [this ^String id]
+ (mark! nimbus:num-downloadChunk-calls)
+ (check-authorization! nimbus nil nil "fileDownload")
+ (let [downloaders (:downloaders nimbus)
+ ^BufferFileInputStream is (.get downloaders id)]
+ (when-not is
+ (throw (RuntimeException.
+ "Could not find input stream for that id")))
+ (let [ret (.read is)]
+ (.put downloaders id is)
+ (when (empty? ret)
+ (.remove downloaders id))
+ (ByteBuffer/wrap ret)
+ )))
+
+ (^String getNimbusConf [this]
+ (mark! nimbus:num-getNimbusConf-calls)
+ (check-authorization! nimbus nil nil "getNimbusConf")
+ (to-json (:conf nimbus)))
+
+ (^LogConfig getLogConfig [this ^String id]
+ (mark! nimbus:num-getLogConfig-calls)
+ (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
+ storm-name (topology-conf TOPOLOGY-NAME)
+ _ (check-authorization! nimbus storm-name topology-conf "getLogConfig")
+ storm-cluster-state (:storm-cluster-state nimbus)
+ log-config (.topology-log-config storm-cluster-state id nil)]
+ (if log-config log-config (LogConfig.))))
+
+ (^String getTopologyConf [this ^String id]
+ (mark! nimbus:num-getTopologyConf-calls)
+ (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
+ storm-name (topology-conf TOPOLOGY-NAME)]
+ (check-authorization! nimbus storm-name topology-conf "getTopologyConf")
+ (to-json topology-conf)))
+
+ (^StormTopology getTopology [this ^String id]
+ (mark! nimbus:num-getTopology-calls)
+ (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
+ storm-name (topology-conf TOPOLOGY-NAME)]
+ (check-authorization! nimbus storm-name topology-conf "getTopology")
+ (system-topology! topology-conf (try-read-storm-topology id (:blob-store nimbus)))))
+
+ (^StormTopology getUserTopology [this ^String id]
+ (mark! nimbus:num-getUserTopology-calls)
+ (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
+ storm-name (topology-conf TOPOLOGY-NAME)]
+ (check-authorization! nimbus storm-name topology-conf "getUserTopology")
+ (try-read-storm-topology id blob-store)))
+
+ (^ClusterSummary getClusterInfo [this]
+ (mark! nimbus:num-getClusterInfo-calls)
+ (check-authorization! nimbus nil nil "getClusterInfo")
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ supervisor-infos (all-supervisor-info storm-cluster-state)
+ ;; TODO: need to get the port info about supervisors...
+ ;; in standalone just look at metadata, otherwise just say N/A?
+ supervisor-summaries (dofor [[id info] supervisor-infos]
+ (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
+ sup-sum (SupervisorSummary. (:hostname info)
+ (:uptime-secs info)
+ (count ports)
+ (count (:used-ports info))
+ id) ]
+ (.set_total_resources sup-sum (map-val double (:resources-map info)))
+ (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
+ (.set_used_mem sup-sum used-mem)
+ (.set_used_cpu sup-sum used-cpu))
+ (when-let [version (:version info)] (.set_version sup-sum version))
+ sup-sum))
+ nimbus-uptime ((:uptime nimbus))
+ bases (topology-bases storm-cluster-state)
+ nimbuses (.nimbuses storm-cluster-state)
+
+ ;;update the isLeader field for each nimbus summary
+ _ (let [leader (.getLeader (:leader-elector nimbus))
+ leader-host (.getHost leader)
+ leader-port (.getPort leader)]
+ (doseq [nimbus-summary nimbuses]
+ (.set_uptime_secs nimbus-summary (time-delta (.get_uptime_secs nimbus-summary)))
+ (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
+
+ topology-summaries (dofor [[id base] bases :when base]
+ (let [assignment (.assignment-info storm-cluster-state id nil)
+ topo-summ (TopologySummary. id
+ (:storm-name base)
+ (->> (:executor->node+port assignment)
+ keys
+ (mapcat executor-id->tasks)
+ count)
+ (->> (:executor->node+port assignment)
+ keys
+ count)
+ (->> (:executor->node+port assignment)
+ vals
+ set
+ count)
+ (time-delta (:launch-time-secs base))
+ (extract-status-str base))]
+ (when-let [owner (:owner base)] (.set_owner topo-summ owner))
+ (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
+ (when-let [resources (.get @(:id->resources nimbus) id)]
+ (.set_requested_memonheap topo-summ (get resources 0))
+ (.set_requested_memoffheap topo-summ (get resources 1))
+ (.set_requested_cpu topo-summ (get resources 2))
+ (.set_assigned_memonheap topo-summ (get resources 3))
+ (.set_assigned_memoffheap topo-summ (get resources 4))
+ (.set_assigned_cpu topo-summ (get resources 5)))
+ (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus))
+ topo-summ))
+ ret (ClusterSummary. supervisor-summaries
+ topology-summaries
+ nimbuses)
+ _ (.set_nimbus_uptime_secs ret nimbus-uptime)]
+ ret))
+
+ (^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options]
+ (mark! nimbus:num-getTopologyInfoWithOpts-calls)
+ (let [{:keys [storm-name
+ storm-cluster-state
+ all-components
+ launch-time-secs
+ assignment
+ beats
+ task->component
+ base]} (get-common-topo-info storm-id "getTopologyInfo")
+ num-err-choice (or (.get_num_err_choice options)
+ NumErrorsChoice/ALL)
+ errors-fn (condp = num-err-choice
+ NumErrorsChoice/NONE (fn [& _] ()) ;; empty list only
+ NumErrorsChoice/ONE (comp #(remove nil? %)
+ list
+ get-last-error)
+ NumErrorsChoice/ALL get-errors
+ ;; Default
+ (do
+ (log-warn "Got invalid NumErrorsChoice '"
+ num-err-choice
+ "'")
+ get-errors))
+ errors (->> all-components
+ (map (fn [c] [c (errors-fn storm-cluster-state storm-id c)]))
+ (into {}))
+ executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)]
+ (let [host (-> assignment :node->host (get node))
+ heartbeat (get beats executor)
+ stats (:stats heartbeat)
+ stats (if stats
+ (stats/thriftify-executor-stats stats))]
+ (doto
+ (ExecutorSummary. (thriftify-executor-id executor)
+ (-> executor first task->component)
+ host
+ port
+ (nil-to-zero (:uptime heartbeat)))
+ (.set_stats stats))
+ ))
+ topo-info (TopologyInfo. storm-id
+ storm-name
+ (time-delta launch-time-secs)
+ executor-summaries
+ (extract-status-str base)
+ errors
+ )]
+ (when-let [owner (:owner base)] (.set_owner topo-info owner))
+ (when-let [sched-status (.get @(:id->sched-status nimbus) storm-id)] (.set_sched_status topo-info sched-status))
+ (when-let [resources (.get @(:id->resources nimbus) storm-id)]
+ (.set_requested_memonheap topo-info (get resources 0))
+ (.set_requested_memoffheap topo-info (get resources 1))
+ (.set_requested_cpu topo-info (get resources 2))
+ (.set_assigned_memonheap topo-info (get resources 3))
+ (.set_assigned_memoffheap topo-info (get resources 4))
+ (.set_assigned_cpu topo-info (get resources 5)))
+ (when-let [component->debug (:component->debug base)]
+ (.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug)))
+ (.set_replication_count topo-info (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
+ topo-info))
+
+ (^TopologyInfo getTopologyInfo [this ^String topology-id]
+ (mark! nimbus:num-getTopologyInfo-calls)
+ (.getTopologyInfoWithOpts this
+ topology-id
+ (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL))))
+
+ (^String beginCreateBlob [this
+ ^String blob-key
+ ^SettableBlobMeta blob-meta]
+ (let [session-id (uuid)]
+ (.put (:blob-uploaders nimbus)
+ session-id
+ (.createBlob (:blob-store nimbus) blob-key blob-meta (get-subject)))
+ (log-message "Created blob for " blob-key
+ " with session id " session-id)
+ (str session-id)))
+
+ (^String beginUpdateBlob [this ^String blob-key]
+ (let [^AtomicOutputStream os (.updateBlob (:blob-store nimbus)
+ blob-key (get-subject))]
+ (let [session-id (uuid)]
+ (.put (:blob-uploaders nimbus) session-id os)
+ (log-message "Created upload session for " blob-key
+ " with id " session-id)
+ (str session-id))))
+
+ (^void createStateInZookeeper [this ^String blob-key]
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ blob-store (:blob-store nimbus)
+ nimbus-host-port-info (:nimbus-host-port-info nimbus)
+ conf (:conf nimbus)]
+ (if (instance? LocalFsBlobStore blob-store)
+ (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf)))
+ (log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info)))
+
+ (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk]
+ (let [uploaders (:blob-uploaders nimbus)]
+ (if-let [^AtomicOutputStream os (.get uploaders session)]
+ (let [chunk-array (.array blob-chunk)
+ remaining (.remaining blob-chunk)
+ array-offset (.arrayOffset blob-chunk)
+ position (.position blob-chunk)]
+ (.write os chunk-array (+ array-offset position) remaining)
+ (.put uploaders session os))
+ (throw-runtime "Blob for session "
+ session
+ " does not exist (or timed out)"))))
+
+ (^void finishBlobUpload [this ^String session]
+ (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)]
+ (do
+ (.close os)
+ (log-message "Finished uploading blob for session "
+ session
+ ". Closing session.")
+ (.remove (:blob-uploaders nimbus) session))
+ (throw-runtime "Blob for session "
+ session
+ " does not exist (or timed out)")))
+
+ (^void cancelBlobUpload [this ^String session]
+ (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)]
+ (do
+ (.cancel os)
+ (log-message "Canceled uploading blob for session "
+ session
+ ". Closing session.")
+ (.remove (:blob-uploaders nimbus) session))
+ (throw-runtime "Blob for session "
+ session
+ " does not exist (or timed out)")))
+
+ (^ReadableBlobMeta getBlobMeta [this ^String blob-key]
+ (let [^ReadableBlobMeta ret (.getBlobMeta (:blob-store nimbus)
+ blob-key (get-subject))]
+ ret))
+
+ (^void setBlobMeta [this ^String blob-key ^SettableBlobMeta blob-meta]
+ (->> (ReqContext/context)
+ (.subject)
+ (.setBlobMeta (:blob-store nimbus) blob-key blob-meta)))
+
+ (^BeginDownloadResult beginBlobDownload [this ^String blob-key]
+ (let [^InputStreamWithMeta is (.getBlob (:blob-store nimbus)
+ blob-key (get-subject))]
+ (let [session-id (uuid)
+ ret (BeginDownloadResult. (.getVersion is) (str session-id))]
+ (.set_data_size ret (.getFileLength is))
+ (.put (:blob-downloaders nimbus) session-id (BufferInputStream. is (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) (int 65536))))
+ (log-message "Created download session for " blob-key
+ " with id " session-id)
+ ret)))
+
+ (^ByteBuffer downloadBlobChunk [this ^String session]
+ (let [downloaders (:blob-downloaders nimbus)
+ ^BufferInputStream is (.get downloaders session)]
+ (when-not is
+ (throw (RuntimeException.
+ "Could not find input stream for session " session)))
+ (let [ret (.read is)]
+ (.put downloaders session is)
+ (when (empty? ret)
+ (.close is)
+ (.remove downloaders session))
+ (log-debug "Sending " (alength ret) " bytes")
+ (ByteBuffer/wrap ret))))
+
+ (^void deleteBlob [this ^String blob-key]
+ (let [subject (->> (ReqContext/context)
+ (.subject))]
+ (.deleteBlob (:blob-store nimbus) blob-key subject)
+ (when (instance? LocalFsBlobStore blob-store)
+ (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key)
+ (.remove-key-version! (:storm-cluster-state nimbus) blob-key))
+ (log-message "Deleted blob for key " blob-key)))
+
+ (^ListBlobsResult listBlobs [this ^String session]
+ (let [listers (:blob-listers nimbus)
+ ^Iterator keys-it (if (clojure.string/blank? session)
+ (.listKeys (:blob-store nimbus))
+ (.get listers session))
+ _ (or keys-it (throw-runtime "Blob list for session "
+ session
+ " does not exist (or timed out)"))
+
+ ;; Create a new session id if the user gave an empty session string.
+ ;; This is the use case when the user wishes to list blobs
+ ;; starting from the beginning.
+ session (if (clojure.string/blank? session)
+ (let [new-session (uuid)]
+ (log-message "Creating new session for downloading list " new-session)
+ new-session)
+ session)]
+ (if-not (.hasNext keys-it)
+ (do
+ (.remove listers session)
+ (log-message "No more blobs to list for session " session)
+ ;; A blank result communicates that there are no more blobs.
+ (ListBlobsResult. (ArrayList. 0) (str session)))
+ (let [^List list-chunk (->> keys-it
+ (iterator-seq)
+ (take 100) ;; Limit to next 100 keys
+ (ArrayList.))]
+ (log-message session " downloading " (.size list-chunk) " entries")
+ (.put listers session keys-it)
+ (ListBlobsResult. list-chunk (str session))))))
+
+ (^int getBlobReplication [this ^String blob-key]
+ (->> (ReqContext/context)
+ (.subject)
+ (.getBlobReplication (:blob-store nimbus) blob-key)))
+
+ (^int updateBlobReplication [this ^String blob-key ^int replication]
+ (->> (ReqContext/context)
+ (.subject)
+ (.updateBlobReplication (:blob-store nimbus) blob-key replication)))
+
+ (^TopologyPageInfo getTopologyPageInfo
+ [this ^String topo-id ^String window ^boolean include-sys?]
+ (mark! nimbus:num-getTopologyPageInfo-calls)
+ (let [info (get-common-topo-info topo-id "getTopologyPageInfo")
+
+ exec->node+port (:executor->node+port (:assignment info))
+ last-err-fn (partial get-last-error
+ (:storm-cluster-state info)
+ topo-id)
+ topo-page-info (stats/agg-topo-execs-stats topo-id
+ exec->node+port
+ (:task->component info)
+ (:beats info)
+ (:topology info)
+ window
+ include-sys?
+ last-err-fn)]
+ (when-let [owner (:owner (:base info))]
+ (.set_owner topo-page-info owner))
+ (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)]
+ (.set_sched_status topo-page-info sched-status))
+ (when-let [resources (.get @(:id->resources nimbus) topo-id)]
+ (.set_requested_memonheap topo-page-info (get resources 0))
+ (.set_requested_memoffheap topo-page-info (get resources 1))
+ (.set_requested_cpu topo-page-info (get resources 2))
+ (.set_assigned_memonheap topo-page-info (get resources 3))
+ (.set_assigned_memoffheap topo-page-info (get resources 4))
+ (.set_assigned_cpu topo-page-info (get resources 5)))
+ (doto topo-page-info
+ (.set_name (:storm-name info))
+ (.set_status (extract-status-str (:base info)))
+ (.set_uptime_secs (time-delta (:launch-time-secs info)))
+ (.set_topology_conf (to-json (try-read-storm-conf conf
+ topo-id (:blob-store nimbus))))
+ (.set_replication_count (get-blob-replication-count (master-stormcode-key topo-id) nimbus)))
+ (when-let [debug-options
+ (get-in info [:base :component->debug topo-id])]
+ (.set_debug_options
+ topo-page-info
+ (converter/thriftify-debugoptions debug-options)))
+ topo-page-info))
+
+ (^ComponentPageInfo getComponentPageInfo
+ [this
+ ^String topo-id
+ ^String component-id
+ ^String window
+ ^boolean include-sys?]
+ (mark! nimbus:num-getComponentPageInfo-calls)
+ (let [info (get-common-topo-info topo-id "getComponentPageInfo")
+ {:keys [executor->node+port node->host]} (:assignment info)
+ executor->host+port (map-val (fn [[node port]]
+ [(node->host node) port])
+ executor->node+port)
+ comp-page-info (stats/agg-comp-execs-stats executor->host+port
+ (:task->component info)
+ (:beats info)
+ window
+ include-sys?
+ topo-id
+ (:topology info)
+ component-id)]
+ (doto comp-page-info
+ (.set_topology_name (:storm-name info))
+ (.set_errors (get-errors (:storm-cluster-state info)
+ topo-id
+ component-id))
+ (.set_topology_status (extract-status-str (:base info))))
+ (when-let [debug-options
+ (get-in info [:base :component->debug component-id])]
+ (.set_debug_options
+ comp-page-info
+ (converter/thriftify-debugoptions debug-options)))
+ ;; Add the event logger details
+ (let [component->tasks (reverse-map (:task->component info))]
+ (if (contains? component->tasks EVENTLOGGER-COMPONENT-ID)
+ (let [eventlogger-tasks (sort (get component->tasks
+ EVENTLOGGER-COMPONENT-ID))
+ ;; Find the task the events from this component route to.
+ task-index (mod (TupleUtils/listHashCode [component-id])
+ (count eventlogger-tasks))
+ task-id (nth eventlogger-tasks task-index)
+ eventlogger-exec (first (filter (fn [[start stop]]
+ (between? task-id start stop))
+ (keys executor->host+port)))
+ [host port] (get executor->host+port eventlogger-exec)]
+ (if (and host port)
+ (doto comp-page-info
+ (.set_eventlog_host host)
+ (.set_eventlog_port port))))))
+ comp-page-info))
+
+ (^TopologyHistoryInfo getTopologyHistory [this ^String user]
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ bases (topology-bases storm-cluster-state)
+ assigned-topology-ids (.assignments storm-cluster-state nil)
+ user-group-match-fn (fn [topo-id user conf]
+ (let [topology-conf (try-read-storm-conf conf topo-id (:blob-store nimbus))
+ groups (get-topo-logs-groups topology-conf)]
+ (or (nil? user)
+ (some #(= % user) admin-users)
+ (does-users-group-intersect? user groups conf)
+ (some #(= % user) (get-topo-logs-users topology-conf)))))
+ active-ids-for-user (filter #(user-group-match-fn % user (:conf nimbus)) assigned-topology-ids)
+ topo-history-list (read-topology-history nimbus user admin-users)]
+ (TopologyHistoryInfo. (distinct (concat active-ids-for-user topo-history-list)))))
+
+ Shutdownable
+ (shutdown [this]
+ (mark! nimbus:num-shutdown-calls)
+ (log-message "Shutting down master")
+ (cancel-timer (:timer nimbus))
+ (.disconnect (:storm-cluster-state nimbus))
+ (.cleanup (:downloaders nimbus))
+ (.cleanup (:uploaders nimbus))
+ (.shutdown (:blob-store nimbus))
+ (.close (:leader-elector nimbus))
+ (when (:nimbus-topology-action-notifier nimbus) (.cleanup (:nimbus-topology-action-notifier nimbus)))
+ (log-message "Shut down master"))
+ DaemonCommon
+ (waiting? [this]
+ (timer-waiting? (:timer nimbus))))))
+
+(defserverfn service-handler [conf inimbus]
+ (.prepare inimbus conf (master-inimbus-dir conf))
+ (log-message "Starting Nimbus with conf " conf)
+ (let [nimbus (nimbus-data conf inimbus)
+ blob-store (:blob-store nimbus)]
(.prepare ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus) conf)
;add to nimbuses
@@ -1415,6 +2198,9 @@
(.blobstore (:storm-cluster-state nimbus) (fn [] (blob-sync conf nimbus)))
(setup-blobstore nimbus))
+ (doseq [consumer (:cluster-consumer-executors nimbus)]
+ (.prepare consumer))
+
(when (is-leader nimbus :throw-exception false)
(doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
(transition! nimbus storm-id :startup)))
@@ -1457,740 +2243,16 @@
(start-metrics-reporters conf)
- (reify Nimbus$Iface
- (^void submitTopologyWithOpts
- [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
- ^SubmitOptions submitOptions]
- (try
- (mark! nimbus:num-submitTopologyWithOpts-calls)
- (is-leader nimbus)
- (assert (not-nil? submitOptions))
- (validate-topology-name! storm-name)
- (check-authorization! nimbus storm-name nil "submitTopology")
- (check-storm-active! nimbus storm-name false)
- (let [topo-conf (from-json serializedConf)]
- (try
- (validate-configs-with-schemas topo-conf)
- (catch IllegalArgumentException ex
- (throw (InvalidTopologyException. (.getMessage ex)))))
- (.validate ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus)
- storm-name
- topo-conf
- topology))
- (swap! (:submitted-count nimbus) inc)
- (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))
- credentials (.get_creds submitOptions)
- credentials (when credentials (.get_creds credentials))
- topo-conf (from-json serializedConf)
- storm-conf-submitted (normalize-conf
- conf
- (-> topo-conf
- (assoc STORM-ID storm-id)
- (assoc TOPOLOGY-NAME storm-name))
- topology)
- req (ReqContext/context)
- principal (.principal req)
- submitter-principal (if principal (.toString principal))
- submitter-user (.toLocal principal-to-local principal)
- system-user (System/getProperty "user.name")
- topo-acl (distinct (remove nil? (conj (.get storm-conf-submitted TOPOLOGY-USERS) submitter-principal, submitter-user)))
- storm-conf (-> storm-conf-submitted
- (assoc TOPOLOGY-SUBMITTER-PRINCIPAL (if submitter-principal submitter-principal ""))
- (assoc TOPOLOGY-SUBMITTER-USER (if submitter-user submitter-user system-user)) ;Don't let the user set who we launch as
- (assoc TOPOLOGY-USERS topo-acl)
- (assoc STORM-ZOOKEEPER-SUPERACL (.get conf STORM-ZOOKEEPER-SUPERACL)))
- storm-conf (if (Utils/isZkAuthenticationConfiguredStormServer conf)
- storm-conf
- (dissoc storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
- total-storm-conf (merge conf storm-conf)
- topology (normalize-topology total-storm-conf topology)
- storm-cluster-state (:storm-cluster-state nimbus)]
- (when credentials (doseq [nimbus-autocred-plugin (:nimbus-autocred-plugins nimbus)]
- (.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf))))
- (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user))))
- (throw (AuthorizationException. "Could not determine the user to run this topology as.")))
- (system-topology! total-storm-conf topology) ;; this validates the structure of the topology
- (validate-topology-size topo-conf conf topology)
- (when (and (Utils/isZkAuthenticationConfiguredStormServer conf)
- (not (Utils/isZkAuthenticationConfiguredTopology storm-conf)))
- (throw (IllegalArgumentException. "The cluster is configured for zookeeper authentication, but no payload was provided.")))
- (log-message "Received topology submission for "
- storm-name
- " with conf "
- (redact-value storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
- ;; lock protects against multiple topologies being submitted at once and
- ;; cleanup thread killing topology in b/w assignment and starting the topology
- (locking (:submit-lock nimbus)
- (check-storm-active! nimbus storm-name false)
- ;;cred-update-lock is not needed here because creds are being added for the first time.
- (.set-credentials! storm-cluster-state storm-id credentials storm-conf)
- (log-message "uploadedJar " uploadedJarLocation)
- (setup-storm-code nimbus conf storm-id uploadedJarLocation total-storm-conf topology)
- (wait-for-desired-code-replication nimbus total-storm-conf storm-id)
- (.setup-heartbeats! storm-cluster-state storm-id)
- (if (total-storm-conf TOPOLOGY-BACKPRESSURE-ENABLE)
- (.setup-backpressure! storm-cluster-state storm-id))
- (notify-topology-action-listener nimbus storm-name "submitTopology")
- (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
- TopologyInitialStatus/ACTIVE :active}]
- (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))))))
- (catch Throwable e
- (log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
- (throw e))))
-
- (^void submitTopology
- [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
- (mark! nimbus:num-submitTopology-calls)
- (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
- (SubmitOptions. TopologyInitialStatus/ACTIVE)))
-
- (^void killTopology [this ^String name]
- (mark! nimbus:num-killTopology-calls)
- (.killTopologyWithOpts this name (KillOptions.)))
-
- (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
- (mark! nimbus:num-killTopologyWithOpts-calls)
- (check-storm-active! nimbus storm-name true)
- (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
- storm-id (topology-conf STORM-ID)
- operation "killTopology"]
- (check-authorization! nimbus storm-name topology-conf operation)
- (let [wait-amt (if (.is_set_wait_secs options)
- (.get_wait_secs options)
- )]
- (transition-name! nimbus storm-name [:kill wait-amt] true)
- (notify-topology-action-listener nimbus storm-name operation))
- (add-topology-to-history-log (get-storm-id (:storm-cluster-state nimbus) storm-name)
- nimbus topology-conf)))
-
- (^void rebalance [this ^String storm-name ^RebalanceOptions options]
- (mark! nimbus:num-rebalance-calls)
- (check-storm-active! nimbus storm-name true)
- (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
- operation "rebalance"]
- (check-authorization! nimbus storm-name topology-conf operation)
- (let [wait-amt (if (.is_set_wait_secs options)
- (.get_wait_secs options))
- num-workers (if (.is_set_num_workers options)
- (.get_num_workers options))
- executor-overrides (if (.is_set_num_executors options)
- (.get_num_executors options)
- {})]
- (doseq [[c num-executors] executor-overrides]
- (when (<= num-executors 0)
- (throw (InvalidTopologyException. "Number of executors must be greater than 0"))
- ))
- (transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrides] true)
-
- (notify-topology-action-listener nimbus storm-name operation))))
-
- (activate [this storm-name]
- (mark! nimbus:num-activate-calls)
- (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
- operation "activate"]
- (check-authorization! nimbus storm-name topology-conf operation)
- (transition-name! nimbus storm-name :activate true)
- (notify-topology-action-listener nimbus storm-name operation)))
-
- (deactivate [this storm-name]
- (mark! nimbus:num-deactivate-calls)
- (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
- operation "deactivate"]
- (check-authorization! nimbus storm-name topology-conf operation)
- (transition-name! nimbus storm-name :inactivate true)
- (notify-topology-action-listener nimbus storm-name operation)))
-
- (debug [this storm-name component-id enable? samplingPct]
- (mark! nimbus:num-debug-calls)
- (let [storm-cluster-state (:storm-cluster-state nimbus)
- storm-id (get-storm-id storm-cluster-state storm-name)
- topology-conf (try-read-storm-conf conf storm-id blob-store)
- ;; make sure samplingPct is within bounds.
- spct (Math/max (Math/min samplingPct 100.0) 0.0)
- ;; while disabling we retain the sampling pct.
- debug-options (if enable? {:enable enable? :samplingpct spct} {:enable enable?})
- storm-base-updates (assoc {} :component->debug (if (empty? component-id)
- {storm-id debug-options}
- {component-id debug-options}))]
- (check-authorization! nimbus storm-name topology-conf "debug")
- (when-not storm-id
- (throw (NotAliveException. storm-name)))
- (log-message "Nimbus setting debug to " enable? " for storm-name '" storm-name "' storm-id '" storm-id "' sampling pct '" spct "'"
- (if (not (clojure.string/blank? component-id)) (str " component-id '" component-id "'")))
- (locking (:submit-lock nimbus)
- (.update-storm! storm-cluster-state storm-id storm-base-updates))))
-
- (^void setWorkerProfiler
- [this ^String id ^ProfileRequest profileRequest]
- (mark! nimbus:num-setWorkerProfiler-calls)
- (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
- storm-name (topology-conf TOPOLOGY-NAME)
- _ (check-authorization! nimbus storm-name topology-conf "setWorkerProfiler")
- storm-cluster-state (:storm-cluster-state nimbus)]
- (.set-worker-profile-request storm-cluster-state id profileRequest)))
-
- (^List getComponentPendingProfileActions
- [this ^String id ^String component_id ^ProfileAction action]
- (mark! nimbus:num-getComponentPendingProfileActions-calls)
- (let [info (get-common-topo-info id "getComponentPendingProfileActions")
- storm-cluster-state (:storm-cluster-state info)
- task->component (:task->component info)
- {:keys [executor->node+port node->host]} (:assignment info)
- executor->host+port (map-val (fn [[node port]]
- [(node->host node) port])
- executor->node+port)
- nodeinfos (stats/extract-nodeinfos-from-hb-for-comp executor->host+port task->component false component_id)
- all-pending-actions-for-topology (.get-topology-profile-requests storm-cluster-state id true)
- latest-profile-actions (remove nil? (map (fn [nodeInfo]
- (->> all-pending-actions-for-topology
- (filter #(and (= (:host nodeInfo) (.get_node (.get_nodeInfo %)))
- (= (:port nodeInfo) (first (.get_port (.get_nodeInfo %))))))
- (filter #(= action (.get_action %)))
- (sort-by #(.get_time_stamp %) >)
- first))
- nodeinfos))]
- (log-message "Latest profile actions for topology " id " component " component_id " " (pr-str latest-profile-actions))
- latest-profile-actions))
-
- (^void setLogConfig [this ^String id ^LogConfig log-config-msg]
- (mark! nimbus:num-setLogConfig-calls)
- (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
- storm-name (topology-conf TOPOLOGY-NAME)
- _ (check-authorization! nimbus storm-name topology-conf "setLogConfig")
- storm-cluster-state (:storm-cluster-state nimbus)
- merged-log-config (or (.topology-log-config storm-cluster-state id nil) (LogConfig.))
- named-loggers (.get_named_logger_level merged-log-config)]
- (doseq [[_ level] named-loggers]
- (.set_action level LogLevelAction/UNCHANGED))
- (doseq [[logger-name log-config] (.get_named_logger_level log-config-msg)]
- (let [action (.get_action log-config)]
- (if (clojure.string/blank? logger-name)
- (throw (RuntimeException. "Named loggers need a valid name. Use ROOT for the root logger")))
- (condp = action
- LogLevelAction/UPDATE
- (do (set-logger-timeouts log-config)
- (.put_to_named_logger_level merged-log-config logger-name log-config))
- LogLevelAction/REMOVE
- (let [named-loggers (.get_named_logger_level merged-log-config)]
- (if (and (not (nil? named-loggers))
- (.containsKey named-loggers logger-name))
- (.remove named-loggers logger-name))))))
- (log-message "Setting log config for " storm-name ":" merged-log-config)
- (.set-topology-log-config! storm-cluster-state id merged-log-config)))
-
- (uploadNewCredentials [this storm-name credentials]
- (mark! nimbus:num-uploadNewCredentials-calls)
- (let [storm-cluster-state (:storm-cluster-state nimbus)
- storm-id (get-storm-id storm-cluster-state storm-name)
- topology-conf (try-read-storm-conf conf storm-id blob-store)
- creds (when credentials (.get_creds credentials))]
- (check-authorization! nimbus storm-name topology-conf "uploadNewCredentials")
- (locking (:cred-update-lock nimbus) (.set-credentials! storm-cluster-state storm-id creds topology-conf))))
-
- (beginFileUpload [this]
- (mark! nimbus:num-beginFileUpload-calls)
- (check-authorization! nimbus nil nil "fileUpload")
- (let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")]
- (.put (:uploaders nimbus)
- fileloc
- (Channels/newChannel (FileOutputStream. fileloc)))
- (log-message "Uploading file from client to " fileloc)
- fileloc
- ))
-
- (^void uploadChunk [this ^String location ^ByteBuffer chunk]
- (mark! nimbus:num-uploadChunk-calls)
- (check-authorization! nimbus nil nil "fileUpload")
- (let [uploaders (:uploaders nimbus)
- ^WritableByteChannel channel (.get uploaders location)]
- (when-not channel
- (throw (RuntimeException.
- "File for that location does not exist (or timed out)")))
- (.write channel chunk)
- (.put uploaders location channel)
- ))
-
- (^void finishFileUpload [this ^String location]
- (mark! nimbus:num-finishFileUpload-calls)
- (check-authorization! nimbus nil nil "fileUpload")
- (let [uploaders (:uploaders nimbus)
- ^WritableByteChannel channel (.get uploaders location)]
- (when-not channel
- (throw (RuntimeException.
- "File for that location does not exist (or timed out)")))
- (.close channel)
- (log-message "Finished uploading file from client: " location)
- (.remove uploaders location)
- ))
-
- (^String beginFileDownload
- [this ^String file]
- (mark! nimbus:num-beginFileDownload-calls)
- (check-authorization! nimbus nil nil "fileDownload")
- (let [is (BufferInputStream. (.getBlob (:blob-store nimbus) file nil)
- ^Integer (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES)
- (int 65536)))
- id (uuid)]
- (.put (:downloaders nimbus) id is)
- id))
-
- (^ByteBuffer downloadChunk [this ^String id]
- (mark! nimbus:num-downloadChunk-calls)
- (check-authorization! nimbus nil nil "fileDownload")
- (let [downloaders (:downloaders nimbus)
- ^BufferFileInputStream is (.get downloaders id)]
- (when-not is
- (throw (RuntimeException.
- "Could not find input stream for that id")))
- (let [ret (.read is)]
- (.put downloaders id is)
- (when (empty? ret)
- (.remove downloaders id))
- (ByteBuffer/wrap ret)
- )))
-
- (^String getNimbusConf [this]
- (mark! nimbus:num-getNimbusConf-calls)
- (check-authorization! nimbus nil nil "getNimbusConf")
- (to-json (:conf nimbus)))
-
- (^LogConfig getLogConfig [this ^String id]
- (mark! nimbus:num-getLogConfig-calls)
- (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
- storm-name (topology-conf TOPOLOGY-NAME)
- _ (check-authorization! nimbus storm-name topology-conf "getLogConfig")
- storm-cluster-state (:storm-cluster-state nimbus)
- log-config (.topology-log-config storm-cluster-state id nil)]
- (if log-config log-config (LogConfig.))))
-
- (^String getTopologyConf [this ^String id]
- (mark! nimbus:num-getTopologyConf-calls)
- (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
- storm-name (topology-conf TOPOLOGY-NAME)]
- (check-authorization! nimbus storm-name topology-conf "getTopologyConf")
- (to-json topology-conf)))
-
- (^StormTopology getTopology [this ^String id]
- (mark! nimbus:num-getTopology-calls)
- (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
- storm-name (topology-conf TOPOLOGY-NAME)]
- (check-authorization! nimbus storm-name topology-conf "getTopology")
- (system-topology! topology-conf (try-read-storm-topology id (:blob-store nimbus)))))
-
- (^StormTopology getUserTopology [this ^String id]
- (mark! nimbus:num-getUserTopology-calls)
- (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
- storm-name (topology-conf TOPOLOGY-NAME)]
- (check-authorization! nimbus storm-name topology-conf "getUserTopology")
- (try-read-storm-topology id blob-store)))
-
- (^ClusterSummary getClusterInfo [this]
- (mark! nimbus:num-getClusterInfo-calls)
- (check-authorization! nimbus nil nil "getClusterInfo")
- (let [storm-cluster-state (:storm-cluster-state nimbus)
- supervisor-infos (all-supervisor-info storm-cluster-state)
- ;; TODO: need to get the port info about supervisors...
- ;; in standalone just look at metadata, otherwise just say N/A?
- supervisor-summaries (dofor [[id info] supervisor-infos]
- (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
- sup-sum (SupervisorSummary. (:hostname info)
- (:uptime-secs info)
- (count ports)
- (count (:used-ports info))
- id) ]
- (.set_total_resources sup-sum (map-val double (:resources-map info)))
- (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
- (.set_used_mem sup-sum used-mem)
- (.set_used_cpu sup-sum used-cpu))
- (when-let [version (:version info)] (.set_version sup-sum version))
- sup-sum))
- nimbus-uptime ((:uptime nimbus))
- bases (topology-bases storm-cluster-state)
- nimbuses (.nimbuses storm-cluster-state)
-
- ;;update the isLeader field for each nimbus summary
- _ (let [leader (.getLeader (:leader-elector nimbus))
- leader-host (.getHost leader)
- leader-port (.getPort leader)]
- (doseq [nimbus-summary nimbuses]
- (.set_uptime_secs nimbus-summary (time-delta (.get_uptime_secs nimbus-summary)))
- (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
-
- topology-summaries (dofor [[id base] bases :when base]
- (let [assignment (.assignment-info storm-cluster-state id nil)
- topo-summ (TopologySummary. id
- (:storm-name base)
- (->> (:executor->node+port assignment)
- keys
- (mapcat executor-id->tasks)
- count)
- (->> (:executor->node+port assignment)
- keys
- count)
- (->> (:executor->node+port assignment)
- vals
- set
- count)
- (time-delta (:launch-time-secs base))
- (extract-status-str base))]
- (when-let [owner (:owner base)] (.set_owner topo-summ owner))
- (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
- (when-let [resources (.get @(:id->resources nimbus) id)]
- (.set_requested_memonheap topo-summ (get resources 0))
- (.set_requested_memoffheap topo-summ (get resources 1))
- (.set_requested_cpu topo-summ (get resources 2))
- (.set_assigned_memonheap topo-summ (get resources 3))
- (.set_assigned_memoffheap topo-summ (get resources 4))
- (.set_assigned_cpu topo-summ (get resources 5)))
- (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus))
- topo-summ))
- ret (ClusterSummary. supervisor-summaries
- topology-summaries
- nimbuses)
- _ (.set_nimbus_uptime_secs ret nimbus-uptime)]
- ret))
-
- (^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options]
- (mark! nimbus:num-getTopologyInfoWithOpts-calls)
- (let [{:keys [storm-name
- storm-cluster-state
- all-components
- launch-time-secs
- assignment
- beats
- task->component
- base]} (get-common-topo-info storm-id "getTopologyInfo")
- num-err-choice (or (.get_num_err_choice options)
- NumErrorsChoice/ALL)
- errors-fn (condp = num-err-choice
- NumErrorsChoice/NONE (fn [& _] ()) ;; empty list only
- NumErrorsChoice/ONE (comp #(remove nil? %)
- list
- get-last-error)
- NumErrorsChoice/ALL get-errors
- ;; Default
- (do
- (log-warn "Got invalid NumErrorsChoice '"
- num-err-choice
- "'")
- get-errors))
- errors (->> all-components
- (map (fn [c] [c (errors-fn storm-cluster-state storm-id c)]))
- (into {}))
- executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)]
- (let [host (-> assignment :node->host (get node))
- heartbeat (get beats executor)
- stats (:stats heartbeat)
- stats (if stats
- (stats/thriftify-executor-stats stats))]
- (doto
- (ExecutorSummary. (thriftify-executor-id executor)
- (-> executor first task->component)
- host
- port
- (nil-to-zero (:uptime heartbeat)))
- (.set_stats stats))
- ))
- topo-info (TopologyInfo. storm-id
- storm-name
- (time-delta launch-time-secs)
- executor-summaries
- (extract-status-str base)
- errors
- )]
- (when-let [owner (:owner base)] (.set_owner topo-info owner))
- (when-let [sched-status (.get @(:id->sched-status nimbus) storm-id)] (.set_sched_status topo-info sched-status))
- (when-let [resources (.get @(:id->resources nimbus) storm-id)]
- (.set_requested_memonheap topo-info (get resources 0))
- (.set_requested_memoffheap topo-info (get resources 1))
- (.set_requested_cpu topo-info (get resources 2))
- (.set_assigned_memonheap topo-info (get resources 3))
- (.set_assigned_memoffheap topo-info (get resources 4))
- (.set_assigned_cpu topo-info (get resources 5)))
- (when-let [component->debug (:component->debug base)]
- (.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug)))
- (.set_replication_count topo-info (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
- topo-info))
-
- (^TopologyInfo getTopologyInfo [this ^String topology-id]
- (mark! nimbus:num-getTopologyInfo-calls)
- (.getTopologyInfoWithOpts this
- topology-id
- (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL))))
-
- (^String beginCreateBlob [this
- ^String blob-key
- ^SettableBlobMeta blob-meta]
- (let [session-id (uuid)]
- (.put (:blob-uploaders nimbus)
- session-id
- (.createBlob (:blob-store nimbus) blob-key blob-meta (get-subject)))
- (log-message "Created blob for " blob-key
- " with session id " session-id)
- (str session-id)))
-
- (^String beginUpdateBlob [this ^String blob-key]
- (let [^AtomicOutputStream os (.updateBlob (:blob-store nimbus)
- blob-key (get-subject))]
- (let [session-id (uuid)]
- (.put (:blob-uploaders nimbus) session-id os)
- (log-message "Created upload session for " blob-key
- " with id " session-id)
- (str session-id))))
-
- (^void createStateInZookeeper [this ^String blob-key]
- (let [storm-cluster-state (:storm-cluster-state nimbus)
- blob-store (:blob-store nimbus)
- nimbus-host-port-info (:nimbus-host-port-info nimbus)
- conf (:conf nimbus)]
- (if (instance? LocalFsBlobStore blob-store)
- (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf)))
- (log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info)))
-
- (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk]
- (let [uploaders (:blob-uploaders nimbus)]
- (if-let [^AtomicOutputStream os (.get uploaders session)]
- (let [chunk-array (.array blob-chunk)
- remaining (.remaining blob-chunk)
- array-offset (.arrayOffset blob-chunk)
- position (.position blob-chunk)]
- (.write os chunk-array (+ array-offset position) remaining)
- (.put uploaders session os))
- (throw-runtime "Blob for session "
- session
- " does not exist (or timed out)"))))
-
- (^void finishBlobUpload [this ^String session]
- (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)]
- (do
- (.close os)
- (log-message "Finished uploading blob for session "
- session
- ". Closing session.")
- (.remove (:blob-uploaders nimbus) session))
- (throw-runtime "Blob for session "
- session
- " does not exist (or timed out)")))
-
- (^void cancelBlobUpload [this ^String session]
- (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)]
- (do
- (.cancel os)
- (log-message "Canceled uploading blob for session "
- session
- ". Closing session.")
- (.remove (:blob-uploaders nimbus) session))
- (throw-runtime "Blob for session "
- session
- " does not exist (or timed out)")))
-
- (^ReadableBlobMeta getBlobMeta [this ^String blob-key]
- (let [^ReadableBlobMeta ret (.getBlobMeta (:blob-store nimbus)
- blob-key (get-subject))]
- ret))
-
- (^void setBlobMeta [this ^String blob-key ^SettableBlobMeta blob-meta]
- (->> (ReqContext/context)
- (.subject)
- (.setBlobMeta (:blob-store nimbus) blob-key blob-meta)))
-
- (^BeginDownloadResult beginBlobDownload [this ^String blob-key]
- (let [^InputStreamWithMeta is (.getBlob (:blob-store nimbus)
- blob-key (get-subject))]
- (let [session-id (uuid)
- ret (BeginDownloadResult. (.getVersion is) (str session-id))]
- (.set_data_size ret (.getFileLength is))
- (.put (:blob-downloaders nimbus) session-id (BufferInputStream. is (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) (int 65536))))
- (log-message "Created download session for " blob-key
- " with id " session-id)
- ret)))
-
- (^ByteBuffer downloadBlobChunk [this ^String session]
- (let [downloaders (:blob-downloaders nimbus)
- ^BufferInputStream is (.get downloaders session)]
- (when-not is
- (throw (RuntimeException.
- "Could not find input stream for session " session)))
- (let [ret (.read is)]
- (.put downloaders session is)
- (when (empty? ret)
- (.close is)
- (.remove downloaders session))
- (log-debug "Sending " (alength ret) " bytes")
- (ByteBuffer/wrap ret))))
-
- (^void deleteBlob [this ^String blob-key]
- (let [subject (->> (ReqContext/context)
- (.subject))]
- (.deleteBlob (:blob-store nimbus) blob-key subject)
- (when (instance? LocalFsBlobStore blob-store)
- (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key)
- (.remove-key-version! (:storm-cluster-state nimbus) blob-key))
- (log-message "Deleted blob for key " blob-key)))
-
- (^ListBlobsResult listBlobs [this ^String session]
- (let [listers (:blob-listers nimbus)
- ^Iterator keys-it (if (clojure.string/blank? session)
- (.listKeys (:blob-store nimbus))
- (.get listers session))
- _ (or keys-it (throw-runtime "Blob list for session "
- session
- " does not exist (or timed out)"))
-
- ;; Create a new session id if the user gave an empty session string.
- ;; This is the use case when the user wishes to list blobs
- ;; starting from the beginning.
- session (if (clojure.string/blank? session)
- (let [new-session (uuid)]
- (log-message "Creating new session for downloading list " new-session)
- new-session)
- session)]
- (if-not (.hasNext keys-it)
- (do
- (.remove listers session)
- (log-message "No more blobs to list for session " session)
- ;; A blank result communicates that there are no more blobs.
- (ListBlobsResult. (ArrayList. 0) (str session)))
- (let [^List list-chunk (->> keys-it
- (iterator-seq)
- (take 100) ;; Limit to next 100 keys
- (ArrayList.))]
- (log-message session " downloading " (.size list-chunk) " entries")
- (.put listers session keys-it)
- (ListBlobsResult. list-chunk (str session))))))
-
- (^int getBlobReplication [this ^String blob-key]
- (->> (ReqContext/context)
- (.subject)
- (.getBlobReplication (:blob-store nimbus) blob-key)))
-
- (^int updateBlobReplication [this ^String blob-key ^int replication]
- (->> (ReqContext/context)
- (.subject)
- (.updateBlobReplication (:blob-store nimbus) blob-key replication)))
-
- (^TopologyPageInfo getTopologyPageInfo
- [this ^String topo-id ^String window ^boolean include-sys?]
- (mark! nimbus:num-getTopologyPageInfo-calls)
- (let [info (get-common-topo-info topo-id "getTopologyPageInfo")
-
- exec->node+port (:executor->node+port (:assignment info))
- last-err-fn (partial get-last-error
- (:storm-cluster-state info)
- topo-id)
- topo-page-info (stats/agg-topo-execs-stats topo-id
- exec->node+port
- (:task->component info)
- (:beats info)
- (:topology info)
- window
- include-sys?
- last-err-fn)]
- (when-let [owner (:owner (:base info))]
- (.set_owner topo-page-info owner))
- (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)]
- (.set_sched_status topo-page-info sched-status))
- (when-let [resources (.get @(:id->resources nimbus) topo-id)]
- (.set_requested_memonheap topo-page-info (get resources 0))
- (.set_requested_memoffheap topo-page-info (get resources 1))
- (.set_requested_cpu topo-page-info (get resources 2))
- (.set_assigned_memonheap topo-page-info (get resources 3))
- (.set_assigned_memoffheap topo-page-info (get resources 4))
- (.set_assigned_cpu topo-page-info (get resources 5)))
- (doto topo-page-info
- (.set_name (:storm-name info))
- (.set_status (extract-status-str (:base info)))
- (.set_uptime_secs (time-delta (:launch-time-secs info)))
- (.set_topology_conf (to-json (try-read-storm-conf conf
- topo-id (:blob-store nimbus))))
- (.set_replication_count (get-blob-replication-count (master-stormcode-key topo-id) nimbus)))
- (when-let [debug-options
- (get-in info [:base :component->debug topo-id])]
- (.set_debug_options
- topo-page-info
- (converter/thriftify-debugoptions debug-options)))
- topo-page-info))
-
- (^ComponentPageInfo getComponentPageInfo
- [this
- ^String topo-id
- ^String component-id
- ^String window
- ^boolean include-sys?]
- (mark! nimbus:num-getComponentPageInfo-calls)
- (let [info (get-common-topo-info topo-id "getComponentPageInfo")
- {:keys [executor->node+port node->host]} (:assignment info)
- executor->host+port (map-val (fn [[node port]]
- [(node->host node) port])
- executor->node+port)
- comp-page-info (stats/agg-comp-execs-stats executor->host+port
- (:task->component info)
- (:beats info)
- window
- include-sys?
- topo-id
- (:topology info)
- component-id)]
- (doto comp-page-info
- (.set_topology_name (:storm-name info))
- (.set_errors (get-errors (:storm-cluster-state info)
- topo-id
- component-id))
- (.set_topology_status (extract-status-str (:base info))))
- (when-let [debug-options
- (get-in info [:base :component->debug component-id])]
- (.set_debug_options
- comp-page-info
- (converter/thriftify-debugoptions debug-options)))
- ;; Add the event logger details
- (let [component->tasks (reverse-map (:task->component info))]
- (if (contains? component->tasks EVENTLOGGER-COMPONENT-ID)
- (let [eventlogger-tasks (sort (get component->tasks
- EVENTLOGGER-COMPONENT-ID))
- ;; Find the task the events from this component route to.
- task-index (mod (TupleUtils/listHashCode [component-id])
- (count eventlogger-tasks))
- task-id (nth eventlogger-tasks task-index)
- eventlogger-exec (first (filter (fn [[start stop]]
-
<TRUNCATED>