You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/06/13 05:01:36 UTC

[2/5] storm git commit: STORM-1723 Introduce ClusterMetricsConsumer

http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 0786824..2c78ee9 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -52,12 +52,16 @@
   (:require [org.apache.storm [cluster :as cluster]
                             [converter :as converter]
                             [stats :as stats]])
+  (:require [org.apache.storm.ui.core :as ui])
   (:require [clojure.set :as set])
   (:import [org.apache.storm.daemon.common StormBase Assignment])
   (:use [org.apache.storm.daemon common])
   (:use [org.apache.storm config])
   (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
-  (:import [org.apache.storm.utils VersionInfo])
+  (:import [org.apache.storm.utils VersionInfo]
+           (org.apache.storm.metric ClusterMetricsConsumerExecutor)
+           (org.apache.storm.metric.api IClusterMetricsConsumer$ClusterInfo DataPoint IClusterMetricsConsumer$SupervisorInfo)
+           (org.apache.storm Config))
   (:require [clj-time.core :as time])
   (:require [clj-time.coerce :as coerce])
   (:require [metrics.meters :refer [defmeter mark!]])
@@ -164,6 +168,13 @@
         (catch Exception e
           (log-warn-error e "Ingoring exception, Could not initialize " (conf NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN)))))))
 
+(defn mk-cluster-metrics-consumer-executors [storm-conf]
+  (map
+    (fn [consumer]
+      (ClusterMetricsConsumerExecutor. (get consumer "class")
+                                       (get consumer "argument")))
+    (get storm-conf STORM-CLUSTER-METRICS-CONSUMER-REGISTER)))
+
 (defn nimbus-data [conf inimbus]
   (let [forced-scheduler (.getForcedScheduler inimbus)]
     {:conf conf
@@ -203,6 +214,7 @@
      :topo-history-state (nimbus-topo-history-state conf)
      :nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf)
      :nimbus-topology-action-notifier (create-tology-action-notifier conf)
+     :cluster-consumer-executors (mk-cluster-metrics-consumer-executors conf)
      }))
 
 (defn inbox [nimbus]
@@ -1355,12 +1367,43 @@
 (defmethod blob-sync :local [conf nimbus]
   nil)
 
-(defserverfn service-handler [conf inimbus]
-  (.prepare inimbus conf (master-inimbus-dir conf))
-  (log-message "Starting Nimbus with conf " conf)
-  (let [nimbus (nimbus-data conf inimbus)
-        blob-store (:blob-store nimbus)
-        principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
+(defn extract-cluster-metrics [^ClusterSummary summ]
+  (let [cluster-summ (ui/cluster-summary summ "nimbus")]
+    {:cluster-info (IClusterMetricsConsumer$ClusterInfo. (System/currentTimeMillis))
+     :data-points  (map
+                     (fn [[k v]] (DataPoint. k v))
+                     (select-keys cluster-summ ["supervisors" "topologies" "slotsTotal" "slotsUsed" "slotsFree"
+                                                "executorsTotal" "tasksTotal"]))}))
+
+(defn extract-supervisors-metrics [^ClusterSummary summ]
+  (let [sups (.get_supervisors summ)
+        supervisors-summ ((ui/supervisor-summary sups) "supervisors")]
+    (map (fn [supervisor-summ]
+           {:supervisor-info (IClusterMetricsConsumer$SupervisorInfo.
+                               (supervisor-summ "host")
+                               (supervisor-summ "id")
+                               (System/currentTimeMillis))
+            :data-points     (map
+                               (fn [[k v]] (DataPoint. k v))
+                               (select-keys supervisor-summ ["slotsTotal" "slotsUsed" "totalMem" "totalCpu"
+                                                             "usedMem" "usedCpu"]))})
+         supervisors-summ)))
+
+(defn send-cluster-metrics-to-executors [nimbus-service nimbus]
+  (let [cluster-summary (.getClusterInfo nimbus-service)
+        cluster-metrics (extract-cluster-metrics cluster-summary)
+        supervisors-metrics (extract-supervisors-metrics cluster-summary)]
+    (dofor
+      [consumer-executor (:cluster-consumer-executors nimbus)]
+      (do
+        (.handleDataPoints consumer-executor (:cluster-info cluster-metrics) (:data-points cluster-metrics))
+        (dofor
+          [supervisor-metrics supervisors-metrics]
+          (do
+          (.handleDataPoints consumer-executor (:supervisor-info supervisor-metrics) (:data-points supervisor-metrics))))))))
+
+(defn mk-reified-nimbus [nimbus conf blob-store]
+  (let [principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
         admin-users (or (.get conf NIMBUS-ADMINS) [])
         get-common-topo-info
           (fn [^String storm-id operation]
@@ -1397,6 +1440,746 @@
                            (doto (ErrorInfo. (:error e) (:time-secs e))
                              (.set_host (:host e))
                              (.set_port (:port e)))))]
+  (reify Nimbus$Iface
+    (^void submitTopologyWithOpts
+      [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
+       ^SubmitOptions submitOptions]
+      (try
+        (mark! nimbus:num-submitTopologyWithOpts-calls)
+        (is-leader nimbus)
+        (assert (not-nil? submitOptions))
+        (validate-topology-name! storm-name)
+        (check-authorization! nimbus storm-name nil "submitTopology")
+        (check-storm-active! nimbus storm-name false)
+        (let [topo-conf (from-json serializedConf)]
+          (try
+            (validate-configs-with-schemas topo-conf)
+            (catch IllegalArgumentException ex
+              (throw (InvalidTopologyException. (.getMessage ex)))))
+          (.validate ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus)
+                     storm-name
+                     topo-conf
+                     topology))
+        (swap! (:submitted-count nimbus) inc)
+        (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))
+              credentials (.get_creds submitOptions)
+              credentials (when credentials (.get_creds credentials))
+              topo-conf (from-json serializedConf)
+              storm-conf-submitted (normalize-conf
+                                     conf
+                                     (-> topo-conf
+                                         (assoc STORM-ID storm-id)
+                                         (assoc TOPOLOGY-NAME storm-name))
+                                     topology)
+              req (ReqContext/context)
+              principal (.principal req)
+              submitter-principal (if principal (.toString principal))
+              submitter-user (.toLocal principal-to-local principal)
+              system-user (System/getProperty "user.name")
+              topo-acl (distinct (remove nil? (conj (.get storm-conf-submitted TOPOLOGY-USERS) submitter-principal, submitter-user)))
+              storm-conf (-> storm-conf-submitted
+                             (assoc TOPOLOGY-SUBMITTER-PRINCIPAL (if submitter-principal submitter-principal ""))
+                             (assoc TOPOLOGY-SUBMITTER-USER (if submitter-user submitter-user system-user)) ;Don't let the user set who we launch as
+                             (assoc TOPOLOGY-USERS topo-acl)
+                             (assoc STORM-ZOOKEEPER-SUPERACL (.get conf STORM-ZOOKEEPER-SUPERACL)))
+              storm-conf (if (Utils/isZkAuthenticationConfiguredStormServer conf)
+                           storm-conf
+                           (dissoc storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
+              total-storm-conf (merge conf storm-conf)
+              topology (normalize-topology total-storm-conf topology)
+              storm-cluster-state (:storm-cluster-state nimbus)]
+          (when credentials (doseq [nimbus-autocred-plugin (:nimbus-autocred-plugins nimbus)]
+                              (.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf))))
+          (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user))))
+            (throw (AuthorizationException. "Could not determine the user to run this topology as.")))
+          (system-topology! total-storm-conf topology) ;; this validates the structure of the topology
+          (validate-topology-size topo-conf conf topology)
+          (when (and (Utils/isZkAuthenticationConfiguredStormServer conf)
+                     (not (Utils/isZkAuthenticationConfiguredTopology storm-conf)))
+            (throw (IllegalArgumentException. "The cluster is configured for zookeeper authentication, but no payload was provided.")))
+          (log-message "Received topology submission for "
+                       storm-name
+                       " with conf "
+                       (redact-value storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
+          ;; lock protects against multiple topologies being submitted at once and
+          ;; cleanup thread killing topology in b/w assignment and starting the topology
+          (locking (:submit-lock nimbus)
+            (check-storm-active! nimbus storm-name false)
+            ;;cred-update-lock is not needed here because creds are being added for the first time.
+            (.set-credentials! storm-cluster-state storm-id credentials storm-conf)
+            (log-message "uploadedJar " uploadedJarLocation)
+            (setup-storm-code nimbus conf storm-id uploadedJarLocation total-storm-conf topology)
+            (wait-for-desired-code-replication nimbus total-storm-conf storm-id)
+            (.setup-heartbeats! storm-cluster-state storm-id)
+            (if (total-storm-conf TOPOLOGY-BACKPRESSURE-ENABLE)
+              (.setup-backpressure! storm-cluster-state storm-id))
+            (notify-topology-action-listener nimbus storm-name "submitTopology")
+            (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
+                                            TopologyInitialStatus/ACTIVE :active}]
+              (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))))))
+        (catch Throwable e
+          (log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
+          (throw e))))
+
+    (^void submitTopology
+      [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
+      (mark! nimbus:num-submitTopology-calls)
+      (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
+                               (SubmitOptions. TopologyInitialStatus/ACTIVE)))
+
+    (^void killTopology [this ^String name]
+      (mark! nimbus:num-killTopology-calls)
+      (.killTopologyWithOpts this name (KillOptions.)))
+
+    (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
+      (mark! nimbus:num-killTopologyWithOpts-calls)
+      (check-storm-active! nimbus storm-name true)
+      (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
+            storm-id (topology-conf STORM-ID)
+            operation "killTopology"]
+        (check-authorization! nimbus storm-name topology-conf operation)
+        (let [wait-amt (if (.is_set_wait_secs options)
+                         (.get_wait_secs options)
+                         )]
+          (transition-name! nimbus storm-name [:kill wait-amt] true)
+          (notify-topology-action-listener nimbus storm-name operation))
+        (add-topology-to-history-log (get-storm-id (:storm-cluster-state nimbus) storm-name)
+                                     nimbus topology-conf)))
+
+    (^void rebalance [this ^String storm-name ^RebalanceOptions options]
+      (mark! nimbus:num-rebalance-calls)
+      (check-storm-active! nimbus storm-name true)
+      (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
+            operation "rebalance"]
+        (check-authorization! nimbus storm-name topology-conf operation)
+        (let [wait-amt (if (.is_set_wait_secs options)
+                         (.get_wait_secs options))
+              num-workers (if (.is_set_num_workers options)
+                            (.get_num_workers options))
+              executor-overrides (if (.is_set_num_executors options)
+                                   (.get_num_executors options)
+                                   {})]
+          (doseq [[c num-executors] executor-overrides]
+            (when (<= num-executors 0)
+              (throw (InvalidTopologyException. "Number of executors must be greater than 0"))
+              ))
+          (transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrides] true)
+
+          (notify-topology-action-listener nimbus storm-name operation))))
+
+    (activate [this storm-name]
+      (mark! nimbus:num-activate-calls)
+      (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
+            operation "activate"]
+        (check-authorization! nimbus storm-name topology-conf operation)
+        (transition-name! nimbus storm-name :activate true)
+        (notify-topology-action-listener nimbus storm-name operation)))
+
+    (deactivate [this storm-name]
+      (mark! nimbus:num-deactivate-calls)
+      (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
+            operation "deactivate"]
+        (check-authorization! nimbus storm-name topology-conf operation)
+        (transition-name! nimbus storm-name :inactivate true)
+        (notify-topology-action-listener nimbus storm-name operation)))
+
+    (debug [this storm-name component-id enable? samplingPct]
+      (mark! nimbus:num-debug-calls)
+      (let [storm-cluster-state (:storm-cluster-state nimbus)
+            storm-id (get-storm-id storm-cluster-state storm-name)
+            topology-conf (try-read-storm-conf conf storm-id blob-store)
+            ;; make sure samplingPct is within bounds.
+            spct (Math/max (Math/min samplingPct 100.0) 0.0)
+            ;; while disabling we retain the sampling pct.
+            debug-options (if enable? {:enable enable? :samplingpct spct} {:enable enable?})
+            storm-base-updates (assoc {} :component->debug (if (empty? component-id)
+                                                             {storm-id debug-options}
+                                                             {component-id debug-options}))]
+        (check-authorization! nimbus storm-name topology-conf "debug")
+        (when-not storm-id
+          (throw (NotAliveException. storm-name)))
+        (log-message "Nimbus setting debug to " enable? " for storm-name '" storm-name "' storm-id '" storm-id "' sampling pct '" spct "'"
+                     (if (not (clojure.string/blank? component-id)) (str " component-id '" component-id "'")))
+        (locking (:submit-lock nimbus)
+          (.update-storm! storm-cluster-state storm-id storm-base-updates))))
+
+    (^void setWorkerProfiler
+      [this ^String id ^ProfileRequest profileRequest]
+      (mark! nimbus:num-setWorkerProfiler-calls)
+      (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
+            storm-name (topology-conf TOPOLOGY-NAME)
+            _ (check-authorization! nimbus storm-name topology-conf "setWorkerProfiler")
+            storm-cluster-state (:storm-cluster-state nimbus)]
+        (.set-worker-profile-request storm-cluster-state id profileRequest)))
+
+    (^List getComponentPendingProfileActions
+      [this ^String id ^String component_id ^ProfileAction action]
+      (mark! nimbus:num-getComponentPendingProfileActions-calls)
+      (let [info (get-common-topo-info id "getComponentPendingProfileActions")
+            storm-cluster-state (:storm-cluster-state info)
+            task->component (:task->component info)
+            {:keys [executor->node+port node->host]} (:assignment info)
+            executor->host+port (map-val (fn [[node port]]
+                                           [(node->host node) port])
+                                         executor->node+port)
+            nodeinfos (stats/extract-nodeinfos-from-hb-for-comp executor->host+port task->component false component_id)
+            all-pending-actions-for-topology (.get-topology-profile-requests storm-cluster-state id true)
+            latest-profile-actions (remove nil? (map (fn [nodeInfo]
+                                                       (->> all-pending-actions-for-topology
+                                                            (filter #(and (= (:host nodeInfo) (.get_node (.get_nodeInfo %)))
+                                                                          (= (:port nodeInfo) (first (.get_port (.get_nodeInfo  %))))))
+                                                            (filter #(= action (.get_action %)))
+                                                            (sort-by #(.get_time_stamp %) >)
+                                                            first))
+                                                     nodeinfos))]
+        (log-message "Latest profile actions for topology " id " component " component_id " " (pr-str latest-profile-actions))
+        latest-profile-actions))
+
+    (^void setLogConfig [this ^String id ^LogConfig log-config-msg]
+      (mark! nimbus:num-setLogConfig-calls)
+      (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
+            storm-name (topology-conf TOPOLOGY-NAME)
+            _ (check-authorization! nimbus storm-name topology-conf "setLogConfig")
+            storm-cluster-state (:storm-cluster-state nimbus)
+            merged-log-config (or (.topology-log-config storm-cluster-state id nil) (LogConfig.))
+            named-loggers (.get_named_logger_level merged-log-config)]
+        (doseq [[_ level] named-loggers]
+          (.set_action level LogLevelAction/UNCHANGED))
+        (doseq [[logger-name log-config] (.get_named_logger_level log-config-msg)]
+          (let [action (.get_action log-config)]
+            (if (clojure.string/blank? logger-name)
+              (throw (RuntimeException. "Named loggers need a valid name. Use ROOT for the root logger")))
+            (condp = action
+              LogLevelAction/UPDATE
+              (do (set-logger-timeouts log-config)
+                  (.put_to_named_logger_level merged-log-config logger-name log-config))
+              LogLevelAction/REMOVE
+              (let [named-loggers (.get_named_logger_level merged-log-config)]
+                (if (and (not (nil? named-loggers))
+                         (.containsKey named-loggers logger-name))
+                  (.remove named-loggers logger-name))))))
+        (log-message "Setting log config for " storm-name ":" merged-log-config)
+        (.set-topology-log-config! storm-cluster-state id merged-log-config)))
+
+    (uploadNewCredentials [this storm-name credentials]
+      (mark! nimbus:num-uploadNewCredentials-calls)
+      (let [storm-cluster-state (:storm-cluster-state nimbus)
+            storm-id (get-storm-id storm-cluster-state storm-name)
+            topology-conf (try-read-storm-conf conf storm-id blob-store)
+            creds (when credentials (.get_creds credentials))]
+        (check-authorization! nimbus storm-name topology-conf "uploadNewCredentials")
+        (locking (:cred-update-lock nimbus) (.set-credentials! storm-cluster-state storm-id creds topology-conf))))
+
+    (beginFileUpload [this]
+      (mark! nimbus:num-beginFileUpload-calls)
+      (check-authorization! nimbus nil nil "fileUpload")
+      (let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")]
+        (.put (:uploaders nimbus)
+              fileloc
+              (Channels/newChannel (FileOutputStream. fileloc)))
+        (log-message "Uploading file from client to " fileloc)
+        fileloc
+        ))
+
+    (^void uploadChunk [this ^String location ^ByteBuffer chunk]
+      (mark! nimbus:num-uploadChunk-calls)
+      (check-authorization! nimbus nil nil "fileUpload")
+      (let [uploaders (:uploaders nimbus)
+            ^WritableByteChannel channel (.get uploaders location)]
+        (when-not channel
+          (throw (RuntimeException.
+                   "File for that location does not exist (or timed out)")))
+        (.write channel chunk)
+        (.put uploaders location channel)
+        ))
+
+    (^void finishFileUpload [this ^String location]
+      (mark! nimbus:num-finishFileUpload-calls)
+      (check-authorization! nimbus nil nil "fileUpload")
+      (let [uploaders (:uploaders nimbus)
+            ^WritableByteChannel channel (.get uploaders location)]
+        (when-not channel
+          (throw (RuntimeException.
+                   "File for that location does not exist (or timed out)")))
+        (.close channel)
+        (log-message "Finished uploading file from client: " location)
+        (.remove uploaders location)
+        ))
+
+    (^String beginFileDownload
+      [this ^String file]
+      (mark! nimbus:num-beginFileDownload-calls)
+      (check-authorization! nimbus nil nil "fileDownload")
+      (let [is (BufferInputStream. (.getBlob (:blob-store nimbus) file nil)
+                                   ^Integer (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES)
+                                                          (int 65536)))
+            id (uuid)]
+        (.put (:downloaders nimbus) id is)
+        id))
+
+    (^ByteBuffer downloadChunk [this ^String id]
+      (mark! nimbus:num-downloadChunk-calls)
+      (check-authorization! nimbus nil nil "fileDownload")
+      (let [downloaders (:downloaders nimbus)
+            ^BufferFileInputStream is (.get downloaders id)]
+        (when-not is
+          (throw (RuntimeException.
+                   "Could not find input stream for that id")))
+        (let [ret (.read is)]
+          (.put downloaders id is)
+          (when (empty? ret)
+            (.remove downloaders id))
+          (ByteBuffer/wrap ret)
+          )))
+
+    (^String getNimbusConf [this]
+      (mark! nimbus:num-getNimbusConf-calls)
+      (check-authorization! nimbus nil nil "getNimbusConf")
+      (to-json (:conf nimbus)))
+
+    (^LogConfig getLogConfig [this ^String id]
+      (mark! nimbus:num-getLogConfig-calls)
+      (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
+            storm-name (topology-conf TOPOLOGY-NAME)
+            _ (check-authorization! nimbus storm-name topology-conf "getLogConfig")
+            storm-cluster-state (:storm-cluster-state nimbus)
+            log-config (.topology-log-config storm-cluster-state id nil)]
+        (if log-config log-config (LogConfig.))))
+
+    (^String getTopologyConf [this ^String id]
+      (mark! nimbus:num-getTopologyConf-calls)
+      (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
+            storm-name (topology-conf TOPOLOGY-NAME)]
+        (check-authorization! nimbus storm-name topology-conf "getTopologyConf")
+        (to-json topology-conf)))
+
+    (^StormTopology getTopology [this ^String id]
+      (mark! nimbus:num-getTopology-calls)
+      (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
+            storm-name (topology-conf TOPOLOGY-NAME)]
+        (check-authorization! nimbus storm-name topology-conf "getTopology")
+        (system-topology! topology-conf (try-read-storm-topology id (:blob-store nimbus)))))
+
+    (^StormTopology getUserTopology [this ^String id]
+      (mark! nimbus:num-getUserTopology-calls)
+      (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
+            storm-name (topology-conf TOPOLOGY-NAME)]
+        (check-authorization! nimbus storm-name topology-conf "getUserTopology")
+        (try-read-storm-topology id blob-store)))
+
+    (^ClusterSummary getClusterInfo [this]
+      (mark! nimbus:num-getClusterInfo-calls)
+      (check-authorization! nimbus nil nil "getClusterInfo")
+      (let [storm-cluster-state (:storm-cluster-state nimbus)
+            supervisor-infos (all-supervisor-info storm-cluster-state)
+            ;; TODO: need to get the port info about supervisors...
+            ;; in standalone just look at metadata, otherwise just say N/A?
+            supervisor-summaries (dofor [[id info] supervisor-infos]
+                                        (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
+                                              sup-sum (SupervisorSummary. (:hostname info)
+                                                                          (:uptime-secs info)
+                                                                          (count ports)
+                                                                          (count (:used-ports info))
+                                                                          id) ]
+                                          (.set_total_resources sup-sum (map-val double (:resources-map info)))
+                                          (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
+                                            (.set_used_mem sup-sum used-mem)
+                                            (.set_used_cpu sup-sum used-cpu))
+                                          (when-let [version (:version info)] (.set_version sup-sum version))
+                                          sup-sum))
+            nimbus-uptime ((:uptime nimbus))
+            bases (topology-bases storm-cluster-state)
+            nimbuses (.nimbuses storm-cluster-state)
+
+            ;;update the isLeader field for each nimbus summary
+            _ (let [leader (.getLeader (:leader-elector nimbus))
+                    leader-host (.getHost leader)
+                    leader-port (.getPort leader)]
+                (doseq [nimbus-summary nimbuses]
+                  (.set_uptime_secs nimbus-summary (time-delta (.get_uptime_secs nimbus-summary)))
+                  (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
+
+            topology-summaries (dofor [[id base] bases :when base]
+                                      (let [assignment (.assignment-info storm-cluster-state id nil)
+                                            topo-summ (TopologySummary. id
+                                                                        (:storm-name base)
+                                                                        (->> (:executor->node+port assignment)
+                                                                             keys
+                                                                             (mapcat executor-id->tasks)
+                                                                             count)
+                                                                        (->> (:executor->node+port assignment)
+                                                                             keys
+                                                                             count)
+                                                                        (->> (:executor->node+port assignment)
+                                                                             vals
+                                                                             set
+                                                                             count)
+                                                                        (time-delta (:launch-time-secs base))
+                                                                        (extract-status-str base))]
+                                        (when-let [owner (:owner base)] (.set_owner topo-summ owner))
+                                        (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
+                                        (when-let [resources (.get @(:id->resources nimbus) id)]
+                                          (.set_requested_memonheap topo-summ (get resources 0))
+                                          (.set_requested_memoffheap topo-summ (get resources 1))
+                                          (.set_requested_cpu topo-summ (get resources 2))
+                                          (.set_assigned_memonheap topo-summ (get resources 3))
+                                          (.set_assigned_memoffheap topo-summ (get resources 4))
+                                          (.set_assigned_cpu topo-summ (get resources 5)))
+                                        (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus))
+                                        topo-summ))
+            ret (ClusterSummary. supervisor-summaries
+                                 topology-summaries
+                                 nimbuses)
+            _ (.set_nimbus_uptime_secs ret nimbus-uptime)]
+        ret))
+
+    (^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options]
+      (mark! nimbus:num-getTopologyInfoWithOpts-calls)
+      (let [{:keys [storm-name
+                    storm-cluster-state
+                    all-components
+                    launch-time-secs
+                    assignment
+                    beats
+                    task->component
+                    base]} (get-common-topo-info storm-id "getTopologyInfo")
+            num-err-choice (or (.get_num_err_choice options)
+                               NumErrorsChoice/ALL)
+            errors-fn (condp = num-err-choice
+                        NumErrorsChoice/NONE (fn [& _] ()) ;; empty list only
+                        NumErrorsChoice/ONE (comp #(remove nil? %)
+                                                  list
+                                                  get-last-error)
+                        NumErrorsChoice/ALL get-errors
+                        ;; Default
+                        (do
+                          (log-warn "Got invalid NumErrorsChoice '"
+                                    num-err-choice
+                                    "'")
+                          get-errors))
+            errors (->> all-components
+                        (map (fn [c] [c (errors-fn storm-cluster-state storm-id c)]))
+                        (into {}))
+            executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)]
+                                      (let [host (-> assignment :node->host (get node))
+                                            heartbeat (get beats executor)
+                                            stats (:stats heartbeat)
+                                            stats (if stats
+                                                    (stats/thriftify-executor-stats stats))]
+                                        (doto
+                                          (ExecutorSummary. (thriftify-executor-id executor)
+                                                            (-> executor first task->component)
+                                                            host
+                                                            port
+                                                            (nil-to-zero (:uptime heartbeat)))
+                                          (.set_stats stats))
+                                        ))
+            topo-info  (TopologyInfo. storm-id
+                                      storm-name
+                                      (time-delta launch-time-secs)
+                                      executor-summaries
+                                      (extract-status-str base)
+                                      errors
+                                      )]
+        (when-let [owner (:owner base)] (.set_owner topo-info owner))
+        (when-let [sched-status (.get @(:id->sched-status nimbus) storm-id)] (.set_sched_status topo-info sched-status))
+        (when-let [resources (.get @(:id->resources nimbus) storm-id)]
+          (.set_requested_memonheap topo-info (get resources 0))
+          (.set_requested_memoffheap topo-info (get resources 1))
+          (.set_requested_cpu topo-info (get resources 2))
+          (.set_assigned_memonheap topo-info (get resources 3))
+          (.set_assigned_memoffheap topo-info (get resources 4))
+          (.set_assigned_cpu topo-info (get resources 5)))
+        (when-let [component->debug (:component->debug base)]
+          (.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug)))
+        (.set_replication_count topo-info (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
+        topo-info))
+
+    (^TopologyInfo getTopologyInfo [this ^String topology-id]
+      (mark! nimbus:num-getTopologyInfo-calls)
+      (.getTopologyInfoWithOpts this
+                                topology-id
+                                (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL))))
+
+    (^String beginCreateBlob [this
+                              ^String blob-key
+                              ^SettableBlobMeta blob-meta]
+      (let [session-id (uuid)]
+        (.put (:blob-uploaders nimbus)
+              session-id
+              (.createBlob (:blob-store nimbus) blob-key blob-meta (get-subject)))
+        (log-message "Created blob for " blob-key
+                     " with session id " session-id)
+        (str session-id)))
+
+    (^String beginUpdateBlob [this ^String blob-key]
+      (let [^AtomicOutputStream os (.updateBlob (:blob-store nimbus)
+                                                blob-key (get-subject))]
+        (let [session-id (uuid)]
+          (.put (:blob-uploaders nimbus) session-id os)
+          (log-message "Created upload session for " blob-key
+                       " with id " session-id)
+          (str session-id))))
+
+    (^void createStateInZookeeper [this ^String blob-key]
+      (let [storm-cluster-state (:storm-cluster-state nimbus)
+            blob-store (:blob-store nimbus)
+            nimbus-host-port-info (:nimbus-host-port-info nimbus)
+            conf (:conf nimbus)]
+        (if (instance? LocalFsBlobStore blob-store)
+          (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf)))
+        (log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info)))
+
+    (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk]
+      (let [uploaders (:blob-uploaders nimbus)]
+        (if-let [^AtomicOutputStream os (.get uploaders session)]
+          (let [chunk-array (.array blob-chunk)
+                remaining (.remaining blob-chunk)
+                array-offset (.arrayOffset blob-chunk)
+                position (.position blob-chunk)]
+            (.write os chunk-array (+ array-offset position) remaining)
+            (.put uploaders session os))
+          (throw-runtime "Blob for session "
+                         session
+                         " does not exist (or timed out)"))))
+
+    (^void finishBlobUpload [this ^String session]
+      (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)]
+        (do
+          (.close os)
+          (log-message "Finished uploading blob for session "
+                       session
+                       ". Closing session.")
+          (.remove (:blob-uploaders nimbus) session))
+        (throw-runtime "Blob for session "
+                       session
+                       " does not exist (or timed out)")))
+
+    (^void cancelBlobUpload [this ^String session]
+      (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)]
+        (do
+          (.cancel os)
+          (log-message "Canceled uploading blob for session "
+                       session
+                       ". Closing session.")
+          (.remove (:blob-uploaders nimbus) session))
+        (throw-runtime "Blob for session "
+                       session
+                       " does not exist (or timed out)")))
+
+    (^ReadableBlobMeta getBlobMeta [this ^String blob-key]
+      (let [^ReadableBlobMeta ret (.getBlobMeta (:blob-store nimbus)
+                                                blob-key (get-subject))]
+        ret))
+
+    (^void setBlobMeta [this ^String blob-key ^SettableBlobMeta blob-meta]
+      (->> (ReqContext/context)
+           (.subject)
+           (.setBlobMeta (:blob-store nimbus) blob-key blob-meta)))
+
+    (^BeginDownloadResult beginBlobDownload [this ^String blob-key]
+      (let [^InputStreamWithMeta is (.getBlob (:blob-store nimbus)
+                                              blob-key (get-subject))]
+        (let [session-id (uuid)
+              ret (BeginDownloadResult. (.getVersion is) (str session-id))]
+          (.set_data_size ret (.getFileLength is))
+          (.put (:blob-downloaders nimbus) session-id (BufferInputStream. is (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) (int 65536))))
+          (log-message "Created download session for " blob-key
+                       " with id " session-id)
+          ret)))
+
+    (^ByteBuffer downloadBlobChunk [this ^String session]
+      (let [downloaders (:blob-downloaders nimbus)
+            ^BufferInputStream is (.get downloaders session)]
+        (when-not is
+          (throw (RuntimeException.
+                   "Could not find input stream for session " session)))
+        (let [ret (.read is)]
+          (.put downloaders session is)
+          (when (empty? ret)
+            (.close is)
+            (.remove downloaders session))
+          (log-debug "Sending " (alength ret) " bytes")
+          (ByteBuffer/wrap ret))))
+
+    (^void deleteBlob [this ^String blob-key]
+      (let [subject (->> (ReqContext/context)
+                         (.subject))]
+        (.deleteBlob (:blob-store nimbus) blob-key subject)
+        (when (instance? LocalFsBlobStore blob-store)
+          (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key)
+          (.remove-key-version! (:storm-cluster-state nimbus) blob-key))
+        (log-message "Deleted blob for key " blob-key)))
+
+    (^ListBlobsResult listBlobs [this ^String session]
+      (let [listers (:blob-listers nimbus)
+            ^Iterator keys-it (if (clojure.string/blank? session)
+                                (.listKeys (:blob-store nimbus))
+                                (.get listers session))
+            _ (or keys-it (throw-runtime "Blob list for session "
+                                         session
+                                         " does not exist (or timed out)"))
+
+            ;; Create a new session id if the user gave an empty session string.
+            ;; This is the use case when the user wishes to list blobs
+            ;; starting from the beginning.
+            session (if (clojure.string/blank? session)
+                      (let [new-session (uuid)]
+                        (log-message "Creating new session for downloading list " new-session)
+                        new-session)
+                      session)]
+        (if-not (.hasNext keys-it)
+          (do
+            (.remove listers session)
+            (log-message "No more blobs to list for session " session)
+            ;; A blank result communicates that there are no more blobs.
+            (ListBlobsResult. (ArrayList. 0) (str session)))
+          (let [^List list-chunk (->> keys-it
+                                      (iterator-seq)
+                                      (take 100) ;; Limit to next 100 keys
+                                      (ArrayList.))]
+            (log-message session " downloading " (.size list-chunk) " entries")
+            (.put listers session keys-it)
+            (ListBlobsResult. list-chunk (str session))))))
+
+    (^int getBlobReplication [this ^String blob-key]
+      (->> (ReqContext/context)
+           (.subject)
+           (.getBlobReplication (:blob-store nimbus) blob-key)))
+
+    (^int updateBlobReplication [this ^String blob-key ^int replication]
+      (->> (ReqContext/context)
+           (.subject)
+           (.updateBlobReplication (:blob-store nimbus) blob-key replication)))
+
+    (^TopologyPageInfo getTopologyPageInfo
+      [this ^String topo-id ^String window ^boolean include-sys?]
+      (mark! nimbus:num-getTopologyPageInfo-calls)
+      (let [info (get-common-topo-info topo-id "getTopologyPageInfo")
+
+            exec->node+port (:executor->node+port (:assignment info))
+            last-err-fn (partial get-last-error
+                                 (:storm-cluster-state info)
+                                 topo-id)
+            topo-page-info (stats/agg-topo-execs-stats topo-id
+                                                       exec->node+port
+                                                       (:task->component info)
+                                                       (:beats info)
+                                                       (:topology info)
+                                                       window
+                                                       include-sys?
+                                                       last-err-fn)]
+        (when-let [owner (:owner (:base info))]
+          (.set_owner topo-page-info owner))
+        (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)]
+          (.set_sched_status topo-page-info sched-status))
+        (when-let [resources (.get @(:id->resources nimbus) topo-id)]
+          (.set_requested_memonheap topo-page-info (get resources 0))
+          (.set_requested_memoffheap topo-page-info (get resources 1))
+          (.set_requested_cpu topo-page-info (get resources 2))
+          (.set_assigned_memonheap topo-page-info (get resources 3))
+          (.set_assigned_memoffheap topo-page-info (get resources 4))
+          (.set_assigned_cpu topo-page-info (get resources 5)))
+        (doto topo-page-info
+          (.set_name (:storm-name info))
+          (.set_status (extract-status-str (:base info)))
+          (.set_uptime_secs (time-delta (:launch-time-secs info)))
+          (.set_topology_conf (to-json (try-read-storm-conf conf
+                                                            topo-id (:blob-store nimbus))))
+          (.set_replication_count (get-blob-replication-count (master-stormcode-key topo-id) nimbus)))
+        (when-let [debug-options
+                   (get-in info [:base :component->debug topo-id])]
+          (.set_debug_options
+            topo-page-info
+            (converter/thriftify-debugoptions debug-options)))
+        topo-page-info))
+
+    (^ComponentPageInfo getComponentPageInfo
+      [this
+       ^String topo-id
+       ^String component-id
+       ^String window
+       ^boolean include-sys?]
+      (mark! nimbus:num-getComponentPageInfo-calls)
+      (let [info (get-common-topo-info topo-id "getComponentPageInfo")
+            {:keys [executor->node+port node->host]} (:assignment info)
+            executor->host+port (map-val (fn [[node port]]
+                                           [(node->host node) port])
+                                         executor->node+port)
+            comp-page-info (stats/agg-comp-execs-stats executor->host+port
+                                                       (:task->component info)
+                                                       (:beats info)
+                                                       window
+                                                       include-sys?
+                                                       topo-id
+                                                       (:topology info)
+                                                       component-id)]
+        (doto comp-page-info
+          (.set_topology_name (:storm-name info))
+          (.set_errors (get-errors (:storm-cluster-state info)
+                                   topo-id
+                                   component-id))
+          (.set_topology_status (extract-status-str (:base info))))
+        (when-let [debug-options
+                   (get-in info [:base :component->debug component-id])]
+          (.set_debug_options
+            comp-page-info
+            (converter/thriftify-debugoptions debug-options)))
+        ;; Add the event logger details
+        (let [component->tasks (reverse-map (:task->component info))]
+          (if (contains? component->tasks EVENTLOGGER-COMPONENT-ID)
+            (let [eventlogger-tasks (sort (get component->tasks
+                                               EVENTLOGGER-COMPONENT-ID))
+                  ;; Find the task the events from this component route to.
+                  task-index (mod (TupleUtils/listHashCode [component-id])
+                                  (count eventlogger-tasks))
+                  task-id (nth eventlogger-tasks task-index)
+                  eventlogger-exec (first (filter (fn [[start stop]]
+                                                    (between? task-id start stop))
+                                                  (keys executor->host+port)))
+                  [host port] (get executor->host+port eventlogger-exec)]
+              (if (and host port)
+                (doto comp-page-info
+                  (.set_eventlog_host host)
+                  (.set_eventlog_port port))))))
+        comp-page-info))
+
+    (^TopologyHistoryInfo getTopologyHistory [this ^String user]
+      (let [storm-cluster-state (:storm-cluster-state nimbus)
+            bases (topology-bases storm-cluster-state)
+            assigned-topology-ids (.assignments storm-cluster-state nil)
+            user-group-match-fn (fn [topo-id user conf]
+                                  (let [topology-conf (try-read-storm-conf conf topo-id (:blob-store nimbus))
+                                        groups (get-topo-logs-groups topology-conf)]
+                                    (or (nil? user)
+                                        (some #(= % user) admin-users)
+                                        (does-users-group-intersect? user groups conf)
+                                        (some #(= % user) (get-topo-logs-users topology-conf)))))
+            active-ids-for-user (filter #(user-group-match-fn % user (:conf nimbus)) assigned-topology-ids)
+            topo-history-list (read-topology-history nimbus user admin-users)]
+        (TopologyHistoryInfo. (distinct (concat active-ids-for-user topo-history-list)))))
+
+    Shutdownable
+    (shutdown [this]
+      (mark! nimbus:num-shutdown-calls)
+      (log-message "Shutting down master")
+      (cancel-timer (:timer nimbus))
+      (.disconnect (:storm-cluster-state nimbus))
+      (.cleanup (:downloaders nimbus))
+      (.cleanup (:uploaders nimbus))
+      (.shutdown (:blob-store nimbus))
+      (.close (:leader-elector nimbus))
+      (when (:nimbus-topology-action-notifier nimbus) (.cleanup (:nimbus-topology-action-notifier nimbus)))
+      (log-message "Shut down master"))
+    DaemonCommon
+    (waiting? [this]
+      (timer-waiting? (:timer nimbus))))))
+
+(defserverfn service-handler [conf inimbus]
+  (.prepare inimbus conf (master-inimbus-dir conf))
+  (log-message "Starting Nimbus with conf " conf)
+  (let [nimbus (nimbus-data conf inimbus)
+        blob-store (:blob-store nimbus)]
     (.prepare ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus) conf)
 
     ;add to nimbuses
@@ -1415,6 +2198,9 @@
       (.blobstore (:storm-cluster-state nimbus) (fn [] (blob-sync conf nimbus)))
       (setup-blobstore nimbus))
 
+    (doseq [consumer (:cluster-consumer-executors nimbus)]
+      (.prepare consumer))
+
     (when (is-leader nimbus :throw-exception false)
       (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
         (transition! nimbus storm-id :startup)))
@@ -1457,740 +2243,16 @@
 
     (start-metrics-reporters conf)
 
-    (reify Nimbus$Iface
-      (^void submitTopologyWithOpts
-        [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
-         ^SubmitOptions submitOptions]
-        (try
-          (mark! nimbus:num-submitTopologyWithOpts-calls)
-          (is-leader nimbus)
-          (assert (not-nil? submitOptions))
-          (validate-topology-name! storm-name)
-          (check-authorization! nimbus storm-name nil "submitTopology")
-          (check-storm-active! nimbus storm-name false)
-          (let [topo-conf (from-json serializedConf)]
-            (try
-              (validate-configs-with-schemas topo-conf)
-              (catch IllegalArgumentException ex
-                (throw (InvalidTopologyException. (.getMessage ex)))))
-            (.validate ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus)
-                       storm-name
-                       topo-conf
-                       topology))
-          (swap! (:submitted-count nimbus) inc)
-          (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))
-                credentials (.get_creds submitOptions)
-                credentials (when credentials (.get_creds credentials))
-                topo-conf (from-json serializedConf)
-                storm-conf-submitted (normalize-conf
-                            conf
-                            (-> topo-conf
-                              (assoc STORM-ID storm-id)
-                              (assoc TOPOLOGY-NAME storm-name))
-                            topology)
-                req (ReqContext/context)
-                principal (.principal req)
-                submitter-principal (if principal (.toString principal))
-                submitter-user (.toLocal principal-to-local principal)
-                system-user (System/getProperty "user.name")
-                topo-acl (distinct (remove nil? (conj (.get storm-conf-submitted TOPOLOGY-USERS) submitter-principal, submitter-user)))
-                storm-conf (-> storm-conf-submitted
-                               (assoc TOPOLOGY-SUBMITTER-PRINCIPAL (if submitter-principal submitter-principal ""))
-                               (assoc TOPOLOGY-SUBMITTER-USER (if submitter-user submitter-user system-user)) ;Don't let the user set who we launch as
-                               (assoc TOPOLOGY-USERS topo-acl)
-                               (assoc STORM-ZOOKEEPER-SUPERACL (.get conf STORM-ZOOKEEPER-SUPERACL)))
-                storm-conf (if (Utils/isZkAuthenticationConfiguredStormServer conf)
-                                storm-conf
-                                (dissoc storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
-                total-storm-conf (merge conf storm-conf)
-                topology (normalize-topology total-storm-conf topology)
-                storm-cluster-state (:storm-cluster-state nimbus)]
-            (when credentials (doseq [nimbus-autocred-plugin (:nimbus-autocred-plugins nimbus)]
-              (.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf))))
-            (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user))))
-              (throw (AuthorizationException. "Could not determine the user to run this topology as.")))
-            (system-topology! total-storm-conf topology) ;; this validates the structure of the topology
-            (validate-topology-size topo-conf conf topology)
-            (when (and (Utils/isZkAuthenticationConfiguredStormServer conf)
-                       (not (Utils/isZkAuthenticationConfiguredTopology storm-conf)))
-                (throw (IllegalArgumentException. "The cluster is configured for zookeeper authentication, but no payload was provided.")))
-            (log-message "Received topology submission for "
-                         storm-name
-                         " with conf "
-                         (redact-value storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
-            ;; lock protects against multiple topologies being submitted at once and
-            ;; cleanup thread killing topology in b/w assignment and starting the topology
-            (locking (:submit-lock nimbus)
-              (check-storm-active! nimbus storm-name false)
-              ;;cred-update-lock is not needed here because creds are being added for the first time.
-              (.set-credentials! storm-cluster-state storm-id credentials storm-conf)
-              (log-message "uploadedJar " uploadedJarLocation)
-              (setup-storm-code nimbus conf storm-id uploadedJarLocation total-storm-conf topology)
-              (wait-for-desired-code-replication nimbus total-storm-conf storm-id)
-              (.setup-heartbeats! storm-cluster-state storm-id)
-              (if (total-storm-conf TOPOLOGY-BACKPRESSURE-ENABLE)
-                (.setup-backpressure! storm-cluster-state storm-id))
-              (notify-topology-action-listener nimbus storm-name "submitTopology")
-              (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
-                                              TopologyInitialStatus/ACTIVE :active}]
-                (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))))))
-          (catch Throwable e
-            (log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
-            (throw e))))
-
-      (^void submitTopology
-        [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
-        (mark! nimbus:num-submitTopology-calls)
-        (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
-                                 (SubmitOptions. TopologyInitialStatus/ACTIVE)))
-
-      (^void killTopology [this ^String name]
-        (mark! nimbus:num-killTopology-calls)
-        (.killTopologyWithOpts this name (KillOptions.)))
-
-      (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
-        (mark! nimbus:num-killTopologyWithOpts-calls)
-        (check-storm-active! nimbus storm-name true)
-        (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
-              storm-id (topology-conf STORM-ID)
-              operation "killTopology"]
-          (check-authorization! nimbus storm-name topology-conf operation)
-          (let [wait-amt (if (.is_set_wait_secs options)
-                           (.get_wait_secs options)
-                           )]
-            (transition-name! nimbus storm-name [:kill wait-amt] true)
-            (notify-topology-action-listener nimbus storm-name operation))
-          (add-topology-to-history-log (get-storm-id (:storm-cluster-state nimbus) storm-name)
-            nimbus topology-conf)))
-
-      (^void rebalance [this ^String storm-name ^RebalanceOptions options]
-        (mark! nimbus:num-rebalance-calls)
-        (check-storm-active! nimbus storm-name true)
-        (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
-              operation "rebalance"]
-          (check-authorization! nimbus storm-name topology-conf operation)
-          (let [wait-amt (if (.is_set_wait_secs options)
-                           (.get_wait_secs options))
-                num-workers (if (.is_set_num_workers options)
-                              (.get_num_workers options))
-                executor-overrides (if (.is_set_num_executors options)
-                                     (.get_num_executors options)
-                                     {})]
-            (doseq [[c num-executors] executor-overrides]
-              (when (<= num-executors 0)
-                (throw (InvalidTopologyException. "Number of executors must be greater than 0"))
-                ))
-            (transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrides] true)
-
-            (notify-topology-action-listener nimbus storm-name operation))))
-
-      (activate [this storm-name]
-        (mark! nimbus:num-activate-calls)
-        (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
-              operation "activate"]
-          (check-authorization! nimbus storm-name topology-conf operation)
-          (transition-name! nimbus storm-name :activate true)
-          (notify-topology-action-listener nimbus storm-name operation)))
-
-      (deactivate [this storm-name]
-        (mark! nimbus:num-deactivate-calls)
-        (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
-              operation "deactivate"]
-          (check-authorization! nimbus storm-name topology-conf operation)
-          (transition-name! nimbus storm-name :inactivate true)
-          (notify-topology-action-listener nimbus storm-name operation)))
-
-      (debug [this storm-name component-id enable? samplingPct]
-        (mark! nimbus:num-debug-calls)
-        (let [storm-cluster-state (:storm-cluster-state nimbus)
-              storm-id (get-storm-id storm-cluster-state storm-name)
-              topology-conf (try-read-storm-conf conf storm-id blob-store)
-              ;; make sure samplingPct is within bounds.
-              spct (Math/max (Math/min samplingPct 100.0) 0.0)
-              ;; while disabling we retain the sampling pct.
-              debug-options (if enable? {:enable enable? :samplingpct spct} {:enable enable?})
-              storm-base-updates (assoc {} :component->debug (if (empty? component-id)
-                                                               {storm-id debug-options}
-                                                               {component-id debug-options}))]
-          (check-authorization! nimbus storm-name topology-conf "debug")
-          (when-not storm-id
-            (throw (NotAliveException. storm-name)))
-          (log-message "Nimbus setting debug to " enable? " for storm-name '" storm-name "' storm-id '" storm-id "' sampling pct '" spct "'"
-            (if (not (clojure.string/blank? component-id)) (str " component-id '" component-id "'")))
-          (locking (:submit-lock nimbus)
-            (.update-storm! storm-cluster-state storm-id storm-base-updates))))
-
-      (^void setWorkerProfiler
-        [this ^String id ^ProfileRequest profileRequest]
-        (mark! nimbus:num-setWorkerProfiler-calls)
-        (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
-              storm-name (topology-conf TOPOLOGY-NAME)
-              _ (check-authorization! nimbus storm-name topology-conf "setWorkerProfiler")
-              storm-cluster-state (:storm-cluster-state nimbus)]
-          (.set-worker-profile-request storm-cluster-state id profileRequest)))
-
-      (^List getComponentPendingProfileActions
-        [this ^String id ^String component_id ^ProfileAction action]
-        (mark! nimbus:num-getComponentPendingProfileActions-calls)
-        (let [info (get-common-topo-info id "getComponentPendingProfileActions")
-              storm-cluster-state (:storm-cluster-state info)
-              task->component (:task->component info)
-              {:keys [executor->node+port node->host]} (:assignment info)
-              executor->host+port (map-val (fn [[node port]]
-                                             [(node->host node) port])
-                                    executor->node+port)
-              nodeinfos (stats/extract-nodeinfos-from-hb-for-comp executor->host+port task->component false component_id)
-              all-pending-actions-for-topology (.get-topology-profile-requests storm-cluster-state id true)
-              latest-profile-actions (remove nil? (map (fn [nodeInfo]
-                                                         (->> all-pending-actions-for-topology
-                                                              (filter #(and (= (:host nodeInfo) (.get_node (.get_nodeInfo %)))
-                                                                         (= (:port nodeInfo) (first (.get_port (.get_nodeInfo  %))))))
-                                                              (filter #(= action (.get_action %)))
-                                                              (sort-by #(.get_time_stamp %) >)
-                                                              first))
-                                                    nodeinfos))]
-          (log-message "Latest profile actions for topology " id " component " component_id " " (pr-str latest-profile-actions))
-          latest-profile-actions))
-
-      (^void setLogConfig [this ^String id ^LogConfig log-config-msg]
-        (mark! nimbus:num-setLogConfig-calls)
-        (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
-              storm-name (topology-conf TOPOLOGY-NAME)
-              _ (check-authorization! nimbus storm-name topology-conf "setLogConfig")
-              storm-cluster-state (:storm-cluster-state nimbus)
-              merged-log-config (or (.topology-log-config storm-cluster-state id nil) (LogConfig.))
-              named-loggers (.get_named_logger_level merged-log-config)]
-            (doseq [[_ level] named-loggers]
-              (.set_action level LogLevelAction/UNCHANGED))
-            (doseq [[logger-name log-config] (.get_named_logger_level log-config-msg)]
-              (let [action (.get_action log-config)]
-                (if (clojure.string/blank? logger-name)
-                  (throw (RuntimeException. "Named loggers need a valid name. Use ROOT for the root logger")))
-                (condp = action
-                  LogLevelAction/UPDATE
-                    (do (set-logger-timeouts log-config)
-                          (.put_to_named_logger_level merged-log-config logger-name log-config))
-                  LogLevelAction/REMOVE
-                    (let [named-loggers (.get_named_logger_level merged-log-config)]
-                      (if (and (not (nil? named-loggers))
-                               (.containsKey named-loggers logger-name))
-                        (.remove named-loggers logger-name))))))
-            (log-message "Setting log config for " storm-name ":" merged-log-config)
-            (.set-topology-log-config! storm-cluster-state id merged-log-config)))
-
-      (uploadNewCredentials [this storm-name credentials]
-        (mark! nimbus:num-uploadNewCredentials-calls)
-        (let [storm-cluster-state (:storm-cluster-state nimbus)
-              storm-id (get-storm-id storm-cluster-state storm-name)
-              topology-conf (try-read-storm-conf conf storm-id blob-store)
-              creds (when credentials (.get_creds credentials))]
-          (check-authorization! nimbus storm-name topology-conf "uploadNewCredentials")
-          (locking (:cred-update-lock nimbus) (.set-credentials! storm-cluster-state storm-id creds topology-conf))))
-
-      (beginFileUpload [this]
-        (mark! nimbus:num-beginFileUpload-calls)
-        (check-authorization! nimbus nil nil "fileUpload")
-        (let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")]
-          (.put (:uploaders nimbus)
-                fileloc
-                (Channels/newChannel (FileOutputStream. fileloc)))
-          (log-message "Uploading file from client to " fileloc)
-          fileloc
-          ))
-
-      (^void uploadChunk [this ^String location ^ByteBuffer chunk]
-        (mark! nimbus:num-uploadChunk-calls)
-        (check-authorization! nimbus nil nil "fileUpload")
-        (let [uploaders (:uploaders nimbus)
-              ^WritableByteChannel channel (.get uploaders location)]
-          (when-not channel
-            (throw (RuntimeException.
-                    "File for that location does not exist (or timed out)")))
-          (.write channel chunk)
-          (.put uploaders location channel)
-          ))
-
-      (^void finishFileUpload [this ^String location]
-        (mark! nimbus:num-finishFileUpload-calls)
-        (check-authorization! nimbus nil nil "fileUpload")
-        (let [uploaders (:uploaders nimbus)
-              ^WritableByteChannel channel (.get uploaders location)]
-          (when-not channel
-            (throw (RuntimeException.
-                    "File for that location does not exist (or timed out)")))
-          (.close channel)
-          (log-message "Finished uploading file from client: " location)
-          (.remove uploaders location)
-          ))
-
-      (^String beginFileDownload
-        [this ^String file]
-        (mark! nimbus:num-beginFileDownload-calls)
-        (check-authorization! nimbus nil nil "fileDownload")
-        (let [is (BufferInputStream. (.getBlob (:blob-store nimbus) file nil)
-              ^Integer (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES)
-              (int 65536)))
-              id (uuid)]
-          (.put (:downloaders nimbus) id is)
-          id))
-
-      (^ByteBuffer downloadChunk [this ^String id]
-        (mark! nimbus:num-downloadChunk-calls)
-        (check-authorization! nimbus nil nil "fileDownload")
-        (let [downloaders (:downloaders nimbus)
-              ^BufferFileInputStream is (.get downloaders id)]
-          (when-not is
-            (throw (RuntimeException.
-                    "Could not find input stream for that id")))
-          (let [ret (.read is)]
-            (.put downloaders id is)
-            (when (empty? ret)
-              (.remove downloaders id))
-            (ByteBuffer/wrap ret)
-            )))
-
-      (^String getNimbusConf [this]
-        (mark! nimbus:num-getNimbusConf-calls)
-        (check-authorization! nimbus nil nil "getNimbusConf")
-        (to-json (:conf nimbus)))
-
-      (^LogConfig getLogConfig [this ^String id]
-        (mark! nimbus:num-getLogConfig-calls)
-        (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
-              storm-name (topology-conf TOPOLOGY-NAME)
-              _ (check-authorization! nimbus storm-name topology-conf "getLogConfig")
-             storm-cluster-state (:storm-cluster-state nimbus)
-             log-config (.topology-log-config storm-cluster-state id nil)]
-           (if log-config log-config (LogConfig.))))
-
-      (^String getTopologyConf [this ^String id]
-        (mark! nimbus:num-getTopologyConf-calls)
-        (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
-              storm-name (topology-conf TOPOLOGY-NAME)]
-              (check-authorization! nimbus storm-name topology-conf "getTopologyConf")
-              (to-json topology-conf)))
-
-      (^StormTopology getTopology [this ^String id]
-        (mark! nimbus:num-getTopology-calls)
-        (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
-              storm-name (topology-conf TOPOLOGY-NAME)]
-              (check-authorization! nimbus storm-name topology-conf "getTopology")
-              (system-topology! topology-conf (try-read-storm-topology id (:blob-store nimbus)))))
-
-      (^StormTopology getUserTopology [this ^String id]
-        (mark! nimbus:num-getUserTopology-calls)
-        (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
-              storm-name (topology-conf TOPOLOGY-NAME)]
-              (check-authorization! nimbus storm-name topology-conf "getUserTopology")
-              (try-read-storm-topology id blob-store)))
-
-      (^ClusterSummary getClusterInfo [this]
-        (mark! nimbus:num-getClusterInfo-calls)
-        (check-authorization! nimbus nil nil "getClusterInfo")
-        (let [storm-cluster-state (:storm-cluster-state nimbus)
-              supervisor-infos (all-supervisor-info storm-cluster-state)
-              ;; TODO: need to get the port info about supervisors...
-              ;; in standalone just look at metadata, otherwise just say N/A?
-              supervisor-summaries (dofor [[id info] supervisor-infos]
-                                     (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
-                                           sup-sum (SupervisorSummary. (:hostname info)
-                                                     (:uptime-secs info)
-                                                     (count ports)
-                                                     (count (:used-ports info))
-                                                     id) ]
-                                       (.set_total_resources sup-sum (map-val double (:resources-map info)))
-                                       (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
-                                         (.set_used_mem sup-sum used-mem)
-                                         (.set_used_cpu sup-sum used-cpu))
-                                       (when-let [version (:version info)] (.set_version sup-sum version))
-                                       sup-sum))
-              nimbus-uptime ((:uptime nimbus))
-              bases (topology-bases storm-cluster-state)
-              nimbuses (.nimbuses storm-cluster-state)
-
-              ;;update the isLeader field for each nimbus summary
-              _ (let [leader (.getLeader (:leader-elector nimbus))
-                      leader-host (.getHost leader)
-                      leader-port (.getPort leader)]
-                  (doseq [nimbus-summary nimbuses]
-                    (.set_uptime_secs nimbus-summary (time-delta (.get_uptime_secs nimbus-summary)))
-                    (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
-
-              topology-summaries (dofor [[id base] bases :when base]
-                                   (let [assignment (.assignment-info storm-cluster-state id nil)
-                                         topo-summ (TopologySummary. id
-                                                     (:storm-name base)
-                                                     (->> (:executor->node+port assignment)
-                                                       keys
-                                                       (mapcat executor-id->tasks)
-                                                       count)
-                                                     (->> (:executor->node+port assignment)
-                                                       keys
-                                                       count)
-                                                     (->> (:executor->node+port assignment)
-                                                       vals
-                                                       set
-                                                       count)
-                                                     (time-delta (:launch-time-secs base))
-                                                     (extract-status-str base))]
-                                     (when-let [owner (:owner base)] (.set_owner topo-summ owner))
-                                     (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
-                                     (when-let [resources (.get @(:id->resources nimbus) id)]
-                                       (.set_requested_memonheap topo-summ (get resources 0))
-                                       (.set_requested_memoffheap topo-summ (get resources 1))
-                                       (.set_requested_cpu topo-summ (get resources 2))
-                                       (.set_assigned_memonheap topo-summ (get resources 3))
-                                       (.set_assigned_memoffheap topo-summ (get resources 4))
-                                       (.set_assigned_cpu topo-summ (get resources 5)))
-                                     (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus))
-                                     topo-summ))
-              ret (ClusterSummary. supervisor-summaries
-                                   topology-summaries
-                                   nimbuses)
-              _ (.set_nimbus_uptime_secs ret nimbus-uptime)]
-              ret))
-
-      (^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options]
-        (mark! nimbus:num-getTopologyInfoWithOpts-calls)
-        (let [{:keys [storm-name
-                      storm-cluster-state
-                      all-components
-                      launch-time-secs
-                      assignment
-                      beats
-                      task->component
-                      base]} (get-common-topo-info storm-id "getTopologyInfo")
-              num-err-choice (or (.get_num_err_choice options)
-                                 NumErrorsChoice/ALL)
-              errors-fn (condp = num-err-choice
-                          NumErrorsChoice/NONE (fn [& _] ()) ;; empty list only
-                          NumErrorsChoice/ONE (comp #(remove nil? %)
-                                                    list
-                                                    get-last-error)
-                          NumErrorsChoice/ALL get-errors
-                          ;; Default
-                          (do
-                            (log-warn "Got invalid NumErrorsChoice '"
-                                      num-err-choice
-                                      "'")
-                            get-errors))
-              errors (->> all-components
-                          (map (fn [c] [c (errors-fn storm-cluster-state storm-id c)]))
-                          (into {}))
-              executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)]
-                                        (let [host (-> assignment :node->host (get node))
-                                              heartbeat (get beats executor)
-                                              stats (:stats heartbeat)
-                                              stats (if stats
-                                                      (stats/thriftify-executor-stats stats))]
-                                          (doto
-                                              (ExecutorSummary. (thriftify-executor-id executor)
-                                                                (-> executor first task->component)
-                                                                host
-                                                                port
-                                                                (nil-to-zero (:uptime heartbeat)))
-                                            (.set_stats stats))
-                                          ))
-              topo-info  (TopologyInfo. storm-id
-                           storm-name
-                           (time-delta launch-time-secs)
-                           executor-summaries
-                           (extract-status-str base)
-                           errors
-                           )]
-            (when-let [owner (:owner base)] (.set_owner topo-info owner))
-            (when-let [sched-status (.get @(:id->sched-status nimbus) storm-id)] (.set_sched_status topo-info sched-status))
-            (when-let [resources (.get @(:id->resources nimbus) storm-id)]
-              (.set_requested_memonheap topo-info (get resources 0))
-              (.set_requested_memoffheap topo-info (get resources 1))
-              (.set_requested_cpu topo-info (get resources 2))
-              (.set_assigned_memonheap topo-info (get resources 3))
-              (.set_assigned_memoffheap topo-info (get resources 4))
-              (.set_assigned_cpu topo-info (get resources 5)))
-            (when-let [component->debug (:component->debug base)]
-              (.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug)))
-            (.set_replication_count topo-info (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
-          topo-info))
-
-      (^TopologyInfo getTopologyInfo [this ^String topology-id]
-        (mark! nimbus:num-getTopologyInfo-calls)
-        (.getTopologyInfoWithOpts this
-                                  topology-id
-                                  (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL))))
-
-      (^String beginCreateBlob [this
-                                ^String blob-key
-                                ^SettableBlobMeta blob-meta]
-        (let [session-id (uuid)]
-          (.put (:blob-uploaders nimbus)
-            session-id
-            (.createBlob (:blob-store nimbus) blob-key blob-meta (get-subject)))
-          (log-message "Created blob for " blob-key
-            " with session id " session-id)
-          (str session-id)))
-
-      (^String beginUpdateBlob [this ^String blob-key]
-        (let [^AtomicOutputStream os (.updateBlob (:blob-store nimbus)
-                                       blob-key (get-subject))]
-          (let [session-id (uuid)]
-            (.put (:blob-uploaders nimbus) session-id os)
-            (log-message "Created upload session for " blob-key
-              " with id " session-id)
-            (str session-id))))
-
-      (^void createStateInZookeeper [this ^String blob-key]
-        (let [storm-cluster-state (:storm-cluster-state nimbus)
-              blob-store (:blob-store nimbus)
-              nimbus-host-port-info (:nimbus-host-port-info nimbus)
-              conf (:conf nimbus)]
-          (if (instance? LocalFsBlobStore blob-store)
-              (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf)))
-          (log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info)))
-
-      (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk]
-        (let [uploaders (:blob-uploaders nimbus)]
-          (if-let [^AtomicOutputStream os (.get uploaders session)]
-            (let [chunk-array (.array blob-chunk)
-                  remaining (.remaining blob-chunk)
-                  array-offset (.arrayOffset blob-chunk)
-                  position (.position blob-chunk)]
-              (.write os chunk-array (+ array-offset position) remaining)
-              (.put uploaders session os))
-            (throw-runtime "Blob for session "
-              session
-              " does not exist (or timed out)"))))
-
-      (^void finishBlobUpload [this ^String session]
-        (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)]
-          (do
-            (.close os)
-            (log-message "Finished uploading blob for session "
-              session
-              ". Closing session.")
-            (.remove (:blob-uploaders nimbus) session))
-          (throw-runtime "Blob for session "
-            session
-            " does not exist (or timed out)")))
-
-      (^void cancelBlobUpload [this ^String session]
-        (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)]
-          (do
-            (.cancel os)
-            (log-message "Canceled uploading blob for session "
-              session
-              ". Closing session.")
-            (.remove (:blob-uploaders nimbus) session))
-          (throw-runtime "Blob for session "
-            session
-            " does not exist (or timed out)")))
-
-      (^ReadableBlobMeta getBlobMeta [this ^String blob-key]
-        (let [^ReadableBlobMeta ret (.getBlobMeta (:blob-store nimbus)
-                                      blob-key (get-subject))]
-          ret))
-
-      (^void setBlobMeta [this ^String blob-key ^SettableBlobMeta blob-meta]
-        (->> (ReqContext/context)
-          (.subject)
-          (.setBlobMeta (:blob-store nimbus) blob-key blob-meta)))
-
-      (^BeginDownloadResult beginBlobDownload [this ^String blob-key]
-        (let [^InputStreamWithMeta is (.getBlob (:blob-store nimbus)
-                                        blob-key (get-subject))]
-          (let [session-id (uuid)
-                ret (BeginDownloadResult. (.getVersion is) (str session-id))]
-            (.set_data_size ret (.getFileLength is))
-            (.put (:blob-downloaders nimbus) session-id (BufferInputStream. is (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) (int 65536))))
-            (log-message "Created download session for " blob-key
-              " with id " session-id)
-            ret)))
-
-      (^ByteBuffer downloadBlobChunk [this ^String session]
-        (let [downloaders (:blob-downloaders nimbus)
-              ^BufferInputStream is (.get downloaders session)]
-          (when-not is
-            (throw (RuntimeException.
-                     "Could not find input stream for session " session)))
-          (let [ret (.read is)]
-            (.put downloaders session is)
-            (when (empty? ret)
-              (.close is)
-              (.remove downloaders session))
-            (log-debug "Sending " (alength ret) " bytes")
-            (ByteBuffer/wrap ret))))
-
-      (^void deleteBlob [this ^String blob-key]
-        (let [subject (->> (ReqContext/context)
-                           (.subject))]
-          (.deleteBlob (:blob-store nimbus) blob-key subject)
-          (when (instance? LocalFsBlobStore blob-store)
-            (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key)
-            (.remove-key-version! (:storm-cluster-state nimbus) blob-key))
-          (log-message "Deleted blob for key " blob-key)))
-
-      (^ListBlobsResult listBlobs [this ^String session]
-        (let [listers (:blob-listers nimbus)
-              ^Iterator keys-it (if (clojure.string/blank? session)
-                                  (.listKeys (:blob-store nimbus))
-                                  (.get listers session))
-              _ (or keys-it (throw-runtime "Blob list for session "
-                              session
-                              " does not exist (or timed out)"))
-
-              ;; Create a new session id if the user gave an empty session string.
-              ;; This is the use case when the user wishes to list blobs
-              ;; starting from the beginning.
-              session (if (clojure.string/blank? session)
-                        (let [new-session (uuid)]
-                          (log-message "Creating new session for downloading list " new-session)
-                          new-session)
-                        session)]
-          (if-not (.hasNext keys-it)
-            (do
-              (.remove listers session)
-              (log-message "No more blobs to list for session " session)
-              ;; A blank result communicates that there are no more blobs.
-              (ListBlobsResult. (ArrayList. 0) (str session)))
-            (let [^List list-chunk (->> keys-it
-                                     (iterator-seq)
-                                     (take 100) ;; Limit to next 100 keys
-                                     (ArrayList.))]
-              (log-message session " downloading " (.size list-chunk) " entries")
-              (.put listers session keys-it)
-              (ListBlobsResult. list-chunk (str session))))))
-
-      (^int getBlobReplication [this ^String blob-key]
-        (->> (ReqContext/context)
-          (.subject)
-          (.getBlobReplication (:blob-store nimbus) blob-key)))
-
-      (^int updateBlobReplication [this ^String blob-key ^int replication]
-        (->> (ReqContext/context)
-          (.subject)
-          (.updateBlobReplication (:blob-store nimbus) blob-key replication)))
-
-      (^TopologyPageInfo getTopologyPageInfo
-        [this ^String topo-id ^String window ^boolean include-sys?]
-        (mark! nimbus:num-getTopologyPageInfo-calls)
-        (let [info (get-common-topo-info topo-id "getTopologyPageInfo")
-
-              exec->node+port (:executor->node+port (:assignment info))
-              last-err-fn (partial get-last-error
-                                   (:storm-cluster-state info)
-                                   topo-id)
-              topo-page-info (stats/agg-topo-execs-stats topo-id
-                                                         exec->node+port
-                                                         (:task->component info)
-                                                         (:beats info)
-                                                         (:topology info)
-                                                         window
-                                                         include-sys?
-                                                         last-err-fn)]
-          (when-let [owner (:owner (:base info))]
-            (.set_owner topo-page-info owner))
-          (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)]
-            (.set_sched_status topo-page-info sched-status))
-          (when-let [resources (.get @(:id->resources nimbus) topo-id)]
-            (.set_requested_memonheap topo-page-info (get resources 0))
-            (.set_requested_memoffheap topo-page-info (get resources 1))
-            (.set_requested_cpu topo-page-info (get resources 2))
-            (.set_assigned_memonheap topo-page-info (get resources 3))
-            (.set_assigned_memoffheap topo-page-info (get resources 4))
-            (.set_assigned_cpu topo-page-info (get resources 5)))
-          (doto topo-page-info
-            (.set_name (:storm-name info))
-            (.set_status (extract-status-str (:base info)))
-            (.set_uptime_secs (time-delta (:launch-time-secs info)))
-            (.set_topology_conf (to-json (try-read-storm-conf conf
-                                                              topo-id (:blob-store nimbus))))
-            (.set_replication_count (get-blob-replication-count (master-stormcode-key topo-id) nimbus)))
-          (when-let [debug-options
-                     (get-in info [:base :component->debug topo-id])]
-            (.set_debug_options
-              topo-page-info
-              (converter/thriftify-debugoptions debug-options)))
-          topo-page-info))
-
-      (^ComponentPageInfo getComponentPageInfo
-        [this
-         ^String topo-id
-         ^String component-id
-         ^String window
-         ^boolean include-sys?]
-        (mark! nimbus:num-getComponentPageInfo-calls)
-        (let [info (get-common-topo-info topo-id "getComponentPageInfo")
-              {:keys [executor->node+port node->host]} (:assignment info)
-              executor->host+port (map-val (fn [[node port]]
-                                             [(node->host node) port])
-                                           executor->node+port)
-              comp-page-info (stats/agg-comp-execs-stats executor->host+port
-                                                         (:task->component info)
-                                                         (:beats info)
-                                                         window
-                                                         include-sys?
-                                                         topo-id
-                                                         (:topology info)
-                                                         component-id)]
-          (doto comp-page-info
-            (.set_topology_name (:storm-name info))
-            (.set_errors (get-errors (:storm-cluster-state info)
-                                     topo-id
-                                     component-id))
-            (.set_topology_status (extract-status-str (:base info))))
-          (when-let [debug-options
-                     (get-in info [:base :component->debug component-id])]
-            (.set_debug_options
-              comp-page-info
-              (converter/thriftify-debugoptions debug-options)))
-          ;; Add the event logger details
-          (let [component->tasks (reverse-map (:task->component info))]
-            (if (contains? component->tasks EVENTLOGGER-COMPONENT-ID)
-              (let [eventlogger-tasks (sort (get component->tasks
-                                                 EVENTLOGGER-COMPONENT-ID))
-                    ;; Find the task the events from this component route to.
-                    task-index (mod (TupleUtils/listHashCode [component-id])
-                                    (count eventlogger-tasks))
-                    task-id (nth eventlogger-tasks task-index)
-                    eventlogger-exec (first (filter (fn [[start stop]]
-                                

<TRUNCATED>