You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:20 UTC

[24/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
deleted file mode 100644
index 61ddfa9..0000000
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ /dev/null
@@ -1,1273 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.ui.core
-  (:use compojure.core)
-  (:use [clojure.java.shell :only [sh]])
-  (:use ring.middleware.reload
-        ring.middleware.multipart-params)
-  (:use [ring.middleware.json :only [wrap-json-params]])
-  (:use [hiccup core page-helpers])
-  (:use [backtype.storm config util log stats zookeeper converter])
-  (:use [backtype.storm.ui helpers])
-  (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
-                                              ACKER-FAIL-STREAM-ID mk-authorization-handler
-                                              start-metrics-reporters]]])
-  (:import [backtype.storm.utils Utils]
-           [backtype.storm.generated NimbusSummary])
-  (:use [clojure.string :only [blank? lower-case trim split]])
-  (:import [backtype.storm.generated ExecutorSpecificStats
-            ExecutorStats ExecutorSummary ExecutorInfo TopologyInfo SpoutStats BoltStats
-            ErrorInfo ClusterSummary SupervisorSummary TopologySummary
-            Nimbus$Client StormTopology GlobalStreamId RebalanceOptions
-            KillOptions GetInfoOptions NumErrorsChoice DebugOptions TopologyPageInfo
-            TopologyStats CommonAggregateStats ComponentAggregateStats
-            ComponentType BoltAggregateStats SpoutAggregateStats
-            ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo
-            LogConfig LogLevel LogLevelAction])
-  (:import [backtype.storm.security.auth AuthUtils ReqContext])
-  (:import [backtype.storm.generated AuthorizationException ProfileRequest ProfileAction NodeInfo])
-  (:import [backtype.storm.security.auth AuthUtils])
-  (:import [backtype.storm.utils VersionInfo])
-  (:import [backtype.storm Config])
-  (:import [java.io File])
-  (:require [compojure.route :as route]
-            [compojure.handler :as handler]
-            [ring.util.response :as resp]
-            [backtype.storm [thrift :as thrift]])
-  (:require [metrics.meters :refer [defmeter mark!]])
-  (:import [org.apache.commons.lang StringEscapeUtils])
-  (:import [org.apache.logging.log4j Level])
-  (:gen-class))
-
-(def ^:dynamic *STORM-CONF* (read-storm-config))
-(def ^:dynamic *UI-ACL-HANDLER* (mk-authorization-handler (*STORM-CONF* NIMBUS-AUTHORIZER) *STORM-CONF*))
-(def ^:dynamic *UI-IMPERSONATION-HANDLER* (mk-authorization-handler (*STORM-CONF* NIMBUS-IMPERSONATION-AUTHORIZER) *STORM-CONF*))
-(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
-(def STORM-VERSION (VersionInfo/getVersion))
-
-(defmeter ui:num-cluster-configuration-http-requests)
-(defmeter ui:num-cluster-summary-http-requests)
-(defmeter ui:num-nimbus-summary-http-requests)
-(defmeter ui:num-supervisor-summary-http-requests)
-(defmeter ui:num-all-topologies-summary-http-requests)
-(defmeter ui:num-topology-page-http-requests)
-(defmeter ui:num-build-visualization-http-requests)
-(defmeter ui:num-mk-visualization-data-http-requests)
-(defmeter ui:num-component-page-http-requests)
-(defmeter ui:num-log-config-http-requests)
-(defmeter ui:num-activate-topology-http-requests)
-(defmeter ui:num-deactivate-topology-http-requests)
-(defmeter ui:num-debug-topology-http-requests)
-(defmeter ui:num-component-op-response-http-requests)
-(defmeter ui:num-topology-op-response-http-requests)
-(defmeter ui:num-topology-op-response-http-requests)
-(defmeter ui:num-topology-op-response-http-requests)
-(defmeter ui:num-main-page-http-requests)
-
-(defn assert-authorized-user
-  ([op]
-    (assert-authorized-user op nil))
-  ([op topology-conf]
-    (let [context (ReqContext/context)]
-      (if (.isImpersonating context)
-        (if *UI-IMPERSONATION-HANDLER*
-            (if-not (.permit *UI-IMPERSONATION-HANDLER* context op topology-conf)
-              (let [principal (.principal context)
-                    real-principal (.realPrincipal context)
-                    user (if principal (.getName principal) "unknown")
-                    real-user (if real-principal (.getName real-principal) "unknown")
-                    remote-address (.remoteAddress context)]
-                (throw (AuthorizationException.
-                         (str "user '" real-user "' is not authorized to impersonate user '" user "' from host '" remote-address "'. Please
-                         see SECURITY.MD to learn how to configure impersonation ACL.")))))
-          (log-warn " principal " (.realPrincipal context) " is trying to impersonate " (.principal context) " but "
-            NIMBUS-IMPERSONATION-AUTHORIZER " has no authorizer configured. This is a potential security hole.
-            Please see SECURITY.MD to learn how to configure an impersonation authorizer.")))
-
-      (if *UI-ACL-HANDLER*
-       (if-not (.permit *UI-ACL-HANDLER* context op topology-conf)
-         (let [principal (.principal context)
-               user (if principal (.getName principal) "unknown")]
-           (throw (AuthorizationException.
-                   (str "UI request '" op "' for '" user "' user is not authorized")))))))))
-
-
-(defn assert-authorized-profiler-action
-  [op]
-  (if-not (*STORM-CONF* WORKER-PROFILER-ENABLED)
-    (throw (AuthorizationException.
-             (str "UI request for profiler action '" op "' is disabled.")))))
-
-
-(defn executor-summary-type
-  [topology ^ExecutorSummary s]
-  (component-type topology (.get_component_id s)))
-
-(defn is-ack-stream
-  [stream]
-  (let [acker-streams
-        [ACKER-INIT-STREAM-ID
-         ACKER-ACK-STREAM-ID
-         ACKER-FAIL-STREAM-ID]]
-    (every? #(not= %1 stream) acker-streams)))
-
-(defn spout-summary?
-  [topology s]
-  (= :spout (executor-summary-type topology s)))
-
-(defn bolt-summary?
-  [topology s]
-  (= :bolt (executor-summary-type topology s)))
-
-(defn group-by-comp
-  [summs]
-  (let [ret (group-by #(.get_component_id ^ExecutorSummary %) summs)]
-    (into (sorted-map) ret )))
-
-(defn logviewer-link [host fname secure?]
-  (if (and secure? (*STORM-CONF* LOGVIEWER-HTTPS-PORT))
-    (url-format "https://%s:%s/log?file=%s"
-      host
-      (*STORM-CONF* LOGVIEWER-HTTPS-PORT)
-      fname)
-    (url-format "http://%s:%s/log?file=%s"
-      host
-      (*STORM-CONF* LOGVIEWER-PORT)
-      fname)))
-
-(defn event-log-link
-  [topology-id component-id host port secure?]
-  (logviewer-link host (event-logs-filename topology-id port) secure?))
-
-(defn worker-log-link [host port topology-id secure?]
-  (let [fname (logs-filename topology-id port)]
-    (logviewer-link host fname secure?)))
-
-(defn nimbus-log-link [host port]
-  (url-format "http://%s:%s/daemonlog?file=nimbus.log" host (*STORM-CONF* LOGVIEWER-PORT) port))
-
-(defn get-error-time
-  [error]
-  (if error
-    (time-delta (.get_error_time_secs ^ErrorInfo error))))
-
-(defn get-error-data
-  [error]
-  (if error
-    (error-subset (.get_error ^ErrorInfo error))
-    ""))
-
-(defn get-error-port
-  [error]
-  (if error
-    (.get_port ^ErrorInfo error)
-    ""))
-
-(defn get-error-host
-  [error]
-  (if error
-    (.get_host ^ErrorInfo error)
-    ""))
-
-(defn get-error-time
-  [error]
-  (if error
-    (.get_error_time_secs ^ErrorInfo error)
-    ""))
-
-(defn worker-dump-link [host port topology-id]
-  (url-format "http://%s:%s/dumps/%s/%s"
-              (url-encode host)
-              (*STORM-CONF* LOGVIEWER-PORT)
-              (url-encode topology-id)
-              (str (url-encode host) ":" (url-encode port))))
-
-(defn stats-times
-  [stats-map]
-  (sort-by #(Integer/parseInt %)
-           (-> stats-map
-               clojurify-structure
-               (dissoc ":all-time")
-               keys)))
-
-(defn window-hint
-  [window]
-  (if (= window ":all-time")
-    "All time"
-    (pretty-uptime-sec window)))
-
-(defn sanitize-stream-name
-  [name]
-  (let [sym-regex #"(?![A-Za-z_\-:\.])."]
-    (str
-     (if (re-find #"^[A-Za-z]" name)
-       (clojure.string/replace name sym-regex "_")
-       (clojure.string/replace (str \s name) sym-regex "_"))
-     (hash name))))
-
-(defn sanitize-transferred
-  [transferred]
-  (into {}
-        (for [[time, stream-map] transferred]
-          [time, (into {}
-                       (for [[stream, trans] stream-map]
-                         [(sanitize-stream-name stream), trans]))])))
-
-(defn visualization-data
-  [spout-bolt spout-comp-summs bolt-comp-summs window storm-id]
-  (let [components (for [[id spec] spout-bolt]
-            [id
-             (let [inputs (.get_inputs (.get_common spec))
-                   bolt-summs (get bolt-comp-summs id)
-                   spout-summs (get spout-comp-summs id)
-                   bolt-cap (if bolt-summs
-                              (compute-bolt-capacity bolt-summs)
-                              0)]
-               {:type (if bolt-summs "bolt" "spout")
-                :capacity bolt-cap
-                :latency (if bolt-summs
-                           (get-in
-                             (bolt-streams-stats bolt-summs true)
-                             [:process-latencies window])
-                           (get-in
-                             (spout-streams-stats spout-summs true)
-                             [:complete-latencies window]))
-                :transferred (or
-                               (get-in
-                                 (spout-streams-stats spout-summs true)
-                                 [:transferred window])
-                               (get-in
-                                 (bolt-streams-stats bolt-summs true)
-                                 [:transferred window]))
-                :stats (let [mapfn (fn [dat]
-                                     (map (fn [^ExecutorSummary summ]
-                                            {:host (.get_host summ)
-                                             :port (.get_port summ)
-                                             :uptime_secs (.get_uptime_secs summ)
-                                             :transferred (if-let [stats (.get_stats summ)]
-                                                            (sanitize-transferred (.get_transferred stats)))})
-                                          dat))]
-                         (if bolt-summs
-                           (mapfn bolt-summs)
-                           (mapfn spout-summs)))
-                :link (url-format "/component.html?id=%s&topology_id=%s" id storm-id)
-                :inputs (for [[global-stream-id group] inputs]
-                          {:component (.get_componentId global-stream-id)
-                           :stream (.get_streamId global-stream-id)
-                           :sani-stream (sanitize-stream-name (.get_streamId global-stream-id))
-                           :grouping (clojure.core/name (thrift/grouping-type group))})})])]
-    (into {} (doall components))))
-
-(defn stream-boxes [datmap]
-  (let [filter-fn (mk-include-sys-fn true)
-        streams
-        (vec (doall (distinct
-                     (apply concat
-                            (for [[k v] datmap]
-                              (for [m (get v :inputs)]
-                                {:stream (get m :stream)
-                                 :sani-stream (get m :sani-stream)
-                                 :checked (is-ack-stream (get m :stream))}))))))]
-    (map (fn [row]
-           {:row row}) (partition 4 4 nil streams))))
-
-(defn- get-topology-info
-  ([^Nimbus$Client nimbus id]
-    (.getTopologyInfo nimbus id))
-  ([^Nimbus$Client nimbus id options]
-    (.getTopologyInfoWithOpts nimbus id options)))
-
-(defn mk-visualization-data
-  [id window include-sys?]
-  (thrift/with-configured-nimbus-connection
-    nimbus
-    (let [window (if window window ":all-time")
-          topology (.getTopology ^Nimbus$Client nimbus id)
-          spouts (.get_spouts topology)
-          bolts (.get_bolts topology)
-          summ (->> (doto
-                      (GetInfoOptions.)
-                      (.set_num_err_choice NumErrorsChoice/NONE))
-                    (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
-          execs (.get_executors summ)
-          spout-summs (filter (partial spout-summary? topology) execs)
-          bolt-summs (filter (partial bolt-summary? topology) execs)
-          spout-comp-summs (group-by-comp spout-summs)
-          bolt-comp-summs (group-by-comp bolt-summs)
-          bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?)
-                                      bolt-comp-summs)]
-      (visualization-data
-       (merge (hashmap-to-persistent spouts)
-              (hashmap-to-persistent bolts))
-       spout-comp-summs bolt-comp-summs window id))))
-
-(defn validate-tplg-submit-params [params]
-  (let [tplg-jar-file (params :topologyJar)
-        tplg-config (if (not-nil? (params :topologyConfig)) (from-json (params :topologyConfig)))]
-    (cond
-     (nil? tplg-jar-file) {:valid false :error "missing topology jar file"}
-     (nil? tplg-config) {:valid false :error "missing topology config"}
-     (nil? (tplg-config "topologyMainClass")) {:valid false :error "topologyMainClass missing in topologyConfig"}
-     :else {:valid true})))
-
-(defn run-tplg-submit-cmd [tplg-jar-file tplg-config user]
-  (let [tplg-main-class (if (not-nil? tplg-config) (trim (tplg-config "topologyMainClass")))
-        tplg-main-class-args (if (not-nil? tplg-config) (tplg-config "topologyMainClassArgs"))
-        storm-home (System/getProperty "storm.home")
-        storm-conf-dir (str storm-home file-path-separator "conf")
-        storm-log-dir (if (not-nil? (*STORM-CONF* "storm.log.dir")) (*STORM-CONF* "storm.log.dir")
-                          (str storm-home file-path-separator "logs"))
-        storm-libs (str storm-home file-path-separator "lib" file-path-separator "*")
-        java-cmd (str (System/getProperty "java.home") file-path-separator "bin" file-path-separator "java")
-        storm-cmd (str storm-home file-path-separator "bin" file-path-separator "storm")
-        tplg-cmd-response (apply sh
-                            (flatten
-                              [storm-cmd "jar" tplg-jar-file tplg-main-class
-                                (if (not-nil? tplg-main-class-args) tplg-main-class-args [])
-                                (if (not= user "unknown") (str "-c storm.doAsUser=" user) [])]))]
-    (log-message "tplg-cmd-response " tplg-cmd-response)
-    (cond
-     (= (tplg-cmd-response :exit) 0) {"status" "success"}
-     (and (not= (tplg-cmd-response :exit) 0)
-          (not-nil? (re-find #"already exists on cluster" (tplg-cmd-response :err)))) {"status" "failed" "error" "Topology with the same name exists in cluster"}
-          (not= (tplg-cmd-response :exit) 0) {"status" "failed" "error" (clojure.string/trim-newline (tplg-cmd-response :err))}
-          :else {"status" "success" "response" "topology deployed"}
-          )))
-
-(defn cluster-configuration []
-  (thrift/with-configured-nimbus-connection nimbus
-    (.getNimbusConf ^Nimbus$Client nimbus)))
-
-(defn topology-history-info
-  ([user]
-    (thrift/with-configured-nimbus-connection nimbus
-      (topology-history-info (.getTopologyHistory ^Nimbus$Client nimbus user) user)))
-  ([history user]
-    {"topo-history"
-     (into [] (.get_topo_ids history))}))
-
-(defn cluster-summary
-  ([user]
-     (thrift/with-configured-nimbus-connection nimbus
-        (cluster-summary (.getClusterInfo ^Nimbus$Client nimbus) user)))
-  ([^ClusterSummary summ user]
-     (let [sups (.get_supervisors summ)
-           used-slots (reduce + (map #(.get_num_used_workers ^SupervisorSummary %) sups))
-           total-slots (reduce + (map #(.get_num_workers ^SupervisorSummary %) sups))
-           free-slots (- total-slots used-slots)
-           topologies (.get_topologies_size summ)
-           total-tasks (->> (.get_topologies summ)
-                            (map #(.get_num_tasks ^TopologySummary %))
-                            (reduce +))
-           total-executors (->> (.get_topologies summ)
-                                (map #(.get_num_executors ^TopologySummary %))
-                                (reduce +))]
-       {"user" user
-        "stormVersion" STORM-VERSION
-        "supervisors" (count sups)
-        "topologies" topologies
-        "slotsTotal" total-slots
-        "slotsUsed"  used-slots
-        "slotsFree" free-slots
-        "executorsTotal" total-executors
-        "tasksTotal" total-tasks })))
-
-(defn convert-to-nimbus-summary[nimbus-seed]
-  (let [[host port] (.split nimbus-seed ":")]
-    {
-      "host" host
-      "port" port
-      "nimbusLogLink" (nimbus-log-link host port)
-      "status" "Offline"
-      "version" "Not applicable"
-      "nimbusUpTime" "Not applicable"
-      "nimbusUptimeSeconds" "Not applicable"}
-    ))
-
-(defn nimbus-summary
-  ([]
-    (thrift/with-configured-nimbus-connection nimbus
-      (nimbus-summary
-        (.get_nimbuses (.getClusterInfo ^Nimbus$Client nimbus)))))
-  ([nimbuses]
-    (let [nimbus-seeds (set (map #(str %1 ":" (*STORM-CONF* NIMBUS-THRIFT-PORT)) (set (*STORM-CONF* NIMBUS-SEEDS))))
-          alive-nimbuses (set (map #(str (.get_host %1) ":" (.get_port %1)) nimbuses))
-          offline-nimbuses (clojure.set/difference nimbus-seeds alive-nimbuses)
-          offline-nimbuses-summary (map #(convert-to-nimbus-summary %1) offline-nimbuses)]
-      {"nimbuses"
-       (concat offline-nimbuses-summary
-       (for [^NimbusSummary n nimbuses
-             :let [uptime (.get_uptime_secs n)]]
-         {
-          "host" (.get_host n)
-          "port" (.get_port n)
-          "nimbusLogLink" (nimbus-log-link (.get_host n) (.get_port n))
-          "status" (if (.is_isLeader n) "Leader" "Not a Leader")
-          "version" (.get_version n)
-          "nimbusUpTime" (pretty-uptime-sec uptime)
-          "nimbusUpTimeSeconds" uptime}))})))
-
-(defn supervisor-summary
-  ([]
-   (thrift/with-configured-nimbus-connection nimbus
-                (supervisor-summary
-                  (.get_supervisors (.getClusterInfo ^Nimbus$Client nimbus)))))
-  ([summs]
-   {"supervisors"
-    (for [^SupervisorSummary s summs]
-      {"id" (.get_supervisor_id s)
-       "host" (.get_host s)
-       "uptime" (pretty-uptime-sec (.get_uptime_secs s))
-       "uptimeSeconds" (.get_uptime_secs s)
-       "slotsTotal" (.get_num_workers s)
-       "slotsUsed" (.get_num_used_workers s)
-       "totalMem" (get (.get_total_resources s) Config/SUPERVISOR_MEMORY_CAPACITY_MB)
-       "totalCpu" (get (.get_total_resources s) Config/SUPERVISOR_CPU_CAPACITY)
-       "usedMem" (.get_used_mem s)
-       "usedCpu" (.get_used_cpu s)
-       "version" (.get_version s)})
-    "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)}))
-
-(defn all-topologies-summary
-  ([]
-   (thrift/with-configured-nimbus-connection
-     nimbus
-     (all-topologies-summary
-       (.get_topologies (.getClusterInfo ^Nimbus$Client nimbus)))))
-  ([summs]
-   {"topologies"
-    (for [^TopologySummary t summs]
-      {
-       "id" (.get_id t)
-       "encodedId" (url-encode (.get_id t))
-       "owner" (.get_owner t)
-       "name" (.get_name t)
-       "status" (.get_status t)
-       "uptime" (pretty-uptime-sec (.get_uptime_secs t))
-       "uptimeSeconds" (.get_uptime_secs t)
-       "tasksTotal" (.get_num_tasks t)
-       "workersTotal" (.get_num_workers t)
-       "executorsTotal" (.get_num_executors t)
-       "replicationCount" (.get_replication_count t)
-       "schedulerInfo" (.get_sched_status t)
-       "requestedMemOnHeap" (.get_requested_memonheap t)
-       "requestedMemOffHeap" (.get_requested_memoffheap t)
-       "requestedTotalMem" (+ (.get_requested_memonheap t) (.get_requested_memoffheap t))
-       "requestedCpu" (.get_requested_cpu t)
-       "assignedMemOnHeap" (.get_assigned_memonheap t)
-       "assignedMemOffHeap" (.get_assigned_memoffheap t)
-       "assignedTotalMem" (+ (.get_assigned_memonheap t) (.get_assigned_memoffheap t))
-       "assignedCpu" (.get_assigned_cpu t)})
-    "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)}))
-
-(defn topology-stats [window stats]
-  (let [times (stats-times (:emitted stats))
-        display-map (into {} (for [t times] [t pretty-uptime-sec]))
-        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
-    (for [w (concat times [":all-time"])
-          :let [disp ((display-map w) w)]]
-      {"windowPretty" disp
-       "window" w
-       "emitted" (get-in stats [:emitted w])
-       "transferred" (get-in stats [:transferred w])
-       "completeLatency" (float-str (get-in stats [:complete-latencies w]))
-       "acked" (get-in stats [:acked w])
-       "failed" (get-in stats [:failed w])})))
-
-(defn build-visualization [id window include-sys?]
-  (thrift/with-configured-nimbus-connection nimbus
-    (let [window (if window window ":all-time")
-          topology-info (->> (doto
-                               (GetInfoOptions.)
-                               (.set_num_err_choice NumErrorsChoice/ONE))
-                             (.getTopologyInfoWithOpts ^Nimbus$Client nimbus
-                                                       id))
-          storm-topology (.getTopology ^Nimbus$Client nimbus id)
-          spout-executor-summaries (filter (partial spout-summary? storm-topology) (.get_executors topology-info))
-          bolt-executor-summaries (filter (partial bolt-summary? storm-topology) (.get_executors topology-info))
-          spout-comp-id->executor-summaries (group-by-comp spout-executor-summaries)
-          bolt-comp-id->executor-summaries (group-by-comp bolt-executor-summaries)
-          bolt-comp-id->executor-summaries (filter-key (mk-include-sys-fn include-sys?) bolt-comp-id->executor-summaries)
-          id->spout-spec (.get_spouts storm-topology)
-          id->bolt (.get_bolts storm-topology)
-          visualizer-data (visualization-data (merge (hashmap-to-persistent id->spout-spec)
-                                                     (hashmap-to-persistent id->bolt))
-                                              spout-comp-id->executor-summaries
-                                              bolt-comp-id->executor-summaries
-                                              window
-                                              id)]
-       {"visualizationTable" (stream-boxes visualizer-data)})))
-
-(defn- get-error-json
-  [topo-id error-info secure?]
-  (let [host (get-error-host error-info)
-        port (get-error-port error-info)]
-    {"lastError" (get-error-data error-info)
-     "errorTime" (get-error-time error-info)
-     "errorHost" host
-     "errorPort" port
-     "errorLapsedSecs" (get-error-time error-info)
-     "errorWorkerLogLink" (worker-log-link host port topo-id secure?)}))
-
-(defn- common-agg-stats-json
-  "Returns a JSON representation of a common aggregated statistics."
-  [^CommonAggregateStats common-stats]
-  {"executors" (.get_num_executors common-stats)
-   "tasks" (.get_num_tasks common-stats)
-   "emitted" (.get_emitted common-stats)
-   "transferred" (.get_transferred common-stats)
-   "acked" (.get_acked common-stats)
-   "failed" (.get_failed common-stats)})
-
-(defmulti comp-agg-stats-json
-  "Returns a JSON representation of aggregated statistics."
-  (fn [_ _ [id ^ComponentAggregateStats s]] (.get_type s)))
-
-(defmethod comp-agg-stats-json ComponentType/SPOUT
-  [topo-id secure? [id ^ComponentAggregateStats s]]
-  (let [^SpoutAggregateStats ss (.. s get_specific_stats get_spout)
-        cs (.get_common_stats s)]
-    (merge
-      (common-agg-stats-json cs)
-      (get-error-json topo-id (.get_last_error s) secure?)
-      {"spoutId" id
-       "encodedSpoutId" (url-encode id)
-       "completeLatency" (float-str (.get_complete_latency_ms ss))})))
-
-(defmethod comp-agg-stats-json ComponentType/BOLT
-  [topo-id secure? [id ^ComponentAggregateStats s]]
-  (let [^BoltAggregateStats ss (.. s get_specific_stats get_bolt)
-        cs (.get_common_stats s)]
-    (merge
-      (common-agg-stats-json cs)
-      (get-error-json topo-id (.get_last_error s) secure?)
-      {"boltId" id
-       "encodedBoltId" (url-encode id)
-       "capacity" (float-str (.get_capacity ss))
-       "executeLatency" (float-str (.get_execute_latency_ms ss))
-       "executed" (.get_executed ss)
-       "processLatency" (float-str (.get_process_latency_ms ss))})))
-
-(defn- unpack-topology-page-info
-  "Unpacks the serialized object to data structures"
-  [^TopologyPageInfo topo-info window secure?]
-  (let [id (.get_id topo-info)
-        ^TopologyStats topo-stats (.get_topology_stats topo-info)
-        stat->window->number
-          {:emitted (.get_window_to_emitted topo-stats)
-           :transferred (.get_window_to_transferred topo-stats)
-           :complete-latencies (.get_window_to_complete_latencies_ms topo-stats)
-           :acked (.get_window_to_acked topo-stats)
-           :failed (.get_window_to_failed topo-stats)}
-        topo-stats (topology-stats window stat->window->number)
-        [debugEnabled
-         samplingPct] (if-let [debug-opts (.get_debug_options topo-info)]
-                        [(.is_enable debug-opts)
-                         (.get_samplingpct debug-opts)])
-        uptime (.get_uptime_secs topo-info)]
-    {"id" id
-     "encodedId" (url-encode id)
-     "owner" (.get_owner topo-info)
-     "name" (.get_name topo-info)
-     "status" (.get_status topo-info)
-     "uptime" (pretty-uptime-sec uptime)
-     "uptimeSeconds" uptime
-     "tasksTotal" (.get_num_tasks topo-info)
-     "workersTotal" (.get_num_workers topo-info)
-     "executorsTotal" (.get_num_executors topo-info)
-     "schedulerInfo" (.get_sched_status topo-info)
-     "requestedMemOnHeap" (.get_requested_memonheap topo-info)
-     "requestedMemOffHeap" (.get_requested_memoffheap topo-info)
-     "requestedCpu" (.get_requested_cpu topo-info)
-     "assignedMemOnHeap" (.get_assigned_memonheap topo-info)
-     "assignedMemOffHeap" (.get_assigned_memoffheap topo-info)
-     "assignedTotalMem" (+ (.get_assigned_memonheap topo-info) (.get_assigned_memoffheap topo-info))
-     "assignedCpu" (.get_assigned_cpu topo-info)
-     "topologyStats" topo-stats
-     "spouts" (map (partial comp-agg-stats-json id secure?)
-                   (.get_id_to_spout_agg_stats topo-info))
-     "bolts" (map (partial comp-agg-stats-json id secure?)
-                  (.get_id_to_bolt_agg_stats topo-info))
-     "configuration" (.get_topology_conf topo-info)
-     "debug" (or debugEnabled false)
-     "samplingPct" (or samplingPct 10)
-     "replicationCount" (.get_replication_count topo-info)}))
-
-(defn exec-host-port
-  [executors]
-  (for [^ExecutorSummary e executors]
-    {"host" (.get_host e)
-     "port" (.get_port e)}))
-
-(defn worker-host-port
-  "Get the set of all worker host/ports"
-  [id]
-  (thrift/with-configured-nimbus-connection nimbus
-    (distinct (exec-host-port (.get_executors (get-topology-info nimbus id))))))
-
-(defn topology-page [id window include-sys? user secure?]
-  (thrift/with-configured-nimbus-connection nimbus
-    (let [window (if window window ":all-time")
-          window-hint (window-hint window)
-          topo-page-info (.getTopologyPageInfo ^Nimbus$Client nimbus
-                                               id
-                                               window
-                                               include-sys?)
-          topology-conf (from-json (.get_topology_conf topo-page-info))
-          msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)]
-      (merge
-       (unpack-topology-page-info topo-page-info window secure?)
-       {"user" user
-        "window" window
-        "windowHint" window-hint
-        "msgTimeout" msg-timeout
-        "configuration" topology-conf
-        "visualizationTable" []
-        "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)}))))
-
-(defn component-errors
-  [errors-list topology-id secure?]
-  (let [errors (->> errors-list
-                    (sort-by #(.get_error_time_secs ^ErrorInfo %))
-                    reverse)]
-    {"componentErrors"
-     (for [^ErrorInfo e errors]
-       {"time" (* 1000 (long (.get_error_time_secs e)))
-        "errorHost" (.get_host e)
-        "errorPort"  (.get_port e)
-        "errorWorkerLogLink"  (worker-log-link (.get_host e)
-                                               (.get_port e)
-                                               topology-id
-                                               secure?)
-        "errorLapsedSecs" (get-error-time e)
-        "error" (.get_error e)})}))
-
-(defmulti unpack-comp-agg-stat
-  (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
-
-(defmethod unpack-comp-agg-stat ComponentType/BOLT
-  [[window ^ComponentAggregateStats s]]
-  (let [^CommonAggregateStats comm-s (.get_common_stats s)
-        ^SpecificAggregateStats spec-s (.get_specific_stats s)
-        ^BoltAggregateStats bolt-s (.get_bolt spec-s)]
-    {"window" window
-     "windowPretty" (window-hint window)
-     "emitted" (.get_emitted comm-s)
-     "transferred" (.get_transferred comm-s)
-     "acked" (.get_acked comm-s)
-     "failed" (.get_failed comm-s)
-     "executeLatency" (float-str (.get_execute_latency_ms bolt-s))
-     "processLatency"  (float-str (.get_process_latency_ms bolt-s))
-     "executed" (.get_executed bolt-s)
-     "capacity" (float-str (.get_capacity bolt-s))}))
-
-(defmethod unpack-comp-agg-stat ComponentType/SPOUT
-  [[window ^ComponentAggregateStats s]]
-  (let [^CommonAggregateStats comm-s (.get_common_stats s)
-        ^SpecificAggregateStats spec-s (.get_specific_stats s)
-        ^SpoutAggregateStats spout-s (.get_spout spec-s)]
-    {"window" window
-     "windowPretty" (window-hint window)
-     "emitted" (.get_emitted comm-s)
-     "transferred" (.get_transferred comm-s)
-     "acked" (.get_acked comm-s)
-     "failed" (.get_failed comm-s)
-     "completeLatency" (float-str (.get_complete_latency_ms spout-s))}))
-
-(defn- unpack-bolt-input-stat
-  [[^GlobalStreamId s ^ComponentAggregateStats stats]]
-  (let [^SpecificAggregateStats sas (.get_specific_stats stats)
-        ^BoltAggregateStats bas (.get_bolt sas)
-        ^CommonAggregateStats cas (.get_common_stats stats)
-        comp-id (.get_componentId s)]
-    {"component" comp-id
-     "encodedComponentId" (url-encode comp-id)
-     "stream" (.get_streamId s)
-     "executeLatency" (float-str (.get_execute_latency_ms bas))
-     "processLatency" (float-str (.get_process_latency_ms bas))
-     "executed" (nil-to-zero (.get_executed bas))
-     "acked" (nil-to-zero (.get_acked cas))
-     "failed" (nil-to-zero (.get_failed cas))}))
-
-(defmulti unpack-comp-output-stat
-  (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
-
-(defmethod unpack-comp-output-stat ComponentType/BOLT
-  [[stream-id ^ComponentAggregateStats stats]]
-  (let [^CommonAggregateStats cas (.get_common_stats stats)]
-    {"stream" stream-id
-     "emitted" (nil-to-zero (.get_emitted cas))
-     "transferred" (nil-to-zero (.get_transferred cas))}))
-
-(defmethod unpack-comp-output-stat ComponentType/SPOUT
-  [[stream-id ^ComponentAggregateStats stats]]
-  (let [^CommonAggregateStats cas (.get_common_stats stats)
-        ^SpecificAggregateStats spec-s (.get_specific_stats stats)
-        ^SpoutAggregateStats spout-s (.get_spout spec-s)]
-    {"stream" stream-id
-     "emitted" (nil-to-zero (.get_emitted cas))
-     "transferred" (nil-to-zero (.get_transferred cas))
-     "completeLatency" (float-str (.get_complete_latency_ms spout-s))
-     "acked" (nil-to-zero (.get_acked cas))
-     "failed" (nil-to-zero (.get_failed cas))}))
-
-(defmulti unpack-comp-exec-stat
-  (fn [_ _ ^ComponentAggregateStats cas] (.get_type (.get_stats ^ExecutorAggregateStats cas))))
-
-(defmethod unpack-comp-exec-stat ComponentType/BOLT
-  [topology-id secure? ^ExecutorAggregateStats eas]
-  (let [^ExecutorSummary summ (.get_exec_summary eas)
-        ^ExecutorInfo info (.get_executor_info summ)
-        ^ComponentAggregateStats stats (.get_stats eas)
-        ^SpecificAggregateStats ss (.get_specific_stats stats)
-        ^BoltAggregateStats bas (.get_bolt ss)
-        ^CommonAggregateStats cas (.get_common_stats stats)
-        host (.get_host summ)
-        port (.get_port summ)
-        exec-id (pretty-executor-info info)
-        uptime (.get_uptime_secs summ)]
-    {"id" exec-id
-     "encodedId" (url-encode exec-id)
-     "uptime" (pretty-uptime-sec uptime)
-     "uptimeSeconds" uptime
-     "host" host
-     "port" port
-     "emitted" (nil-to-zero (.get_emitted cas))
-     "transferred" (nil-to-zero (.get_transferred cas))
-     "capacity" (float-str (nil-to-zero (.get_capacity bas)))
-     "executeLatency" (float-str (.get_execute_latency_ms bas))
-     "executed" (nil-to-zero (.get_executed bas))
-     "processLatency" (float-str (.get_process_latency_ms bas))
-     "acked" (nil-to-zero (.get_acked cas))
-     "failed" (nil-to-zero (.get_failed cas))
-     "workerLogLink" (worker-log-link host port topology-id secure?)}))
-
-(defmethod unpack-comp-exec-stat ComponentType/SPOUT
-  [topology-id secure? ^ExecutorAggregateStats eas]
-  (let [^ExecutorSummary summ (.get_exec_summary eas)
-        ^ExecutorInfo info (.get_executor_info summ)
-        ^ComponentAggregateStats stats (.get_stats eas)
-        ^SpecificAggregateStats ss (.get_specific_stats stats)
-        ^SpoutAggregateStats sas (.get_spout ss)
-        ^CommonAggregateStats cas (.get_common_stats stats)
-        host (.get_host summ)
-        port (.get_port summ)
-        exec-id (pretty-executor-info info)
-        uptime (.get_uptime_secs summ)]
-    {"id" exec-id
-     "encodedId" (url-encode exec-id)
-     "uptime" (pretty-uptime-sec uptime)
-     "uptimeSeconds" uptime
-     "host" host
-     "port" port
-     "emitted" (nil-to-zero (.get_emitted cas))
-     "transferred" (nil-to-zero (.get_transferred cas))
-     "completeLatency" (float-str (.get_complete_latency_ms sas))
-     "acked" (nil-to-zero (.get_acked cas))
-     "failed" (nil-to-zero (.get_failed cas))
-     "workerLogLink" (worker-log-link host port topology-id secure?)}))
-
-(defmulti unpack-component-page-info
-  "Unpacks component-specific info to clojure data structures"
-  (fn [^ComponentPageInfo info & _]
-    (.get_component_type info)))
-
-(defmethod unpack-component-page-info ComponentType/BOLT
-  [^ComponentPageInfo info topology-id window include-sys? secure?]
-  (merge
-    {"boltStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
-     "inputStats" (map unpack-bolt-input-stat (.get_gsid_to_input_stats info))
-     "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
-     "executorStats" (map (partial unpack-comp-exec-stat topology-id secure?)
-                          (.get_exec_stats info))}
-    (-> info .get_errors (component-errors topology-id secure?))))
-
-(defmethod unpack-component-page-info ComponentType/SPOUT
-  [^ComponentPageInfo info topology-id window include-sys? secure?]
-  (merge
-    {"spoutSummary" (map unpack-comp-agg-stat (.get_window_to_stats info))
-     "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
-     "executorStats" (map (partial unpack-comp-exec-stat topology-id secure?)
-                          (.get_exec_stats info))}
-    (-> info .get_errors (component-errors topology-id secure?))))
-
-(defn get-active-profile-actions
-  [nimbus topology-id component]
-  (let [profile-actions  (.getComponentPendingProfileActions nimbus
-                                               topology-id
-                                               component
-                                 ProfileAction/JPROFILE_STOP)
-        latest-profile-actions (map clojurify-profile-request profile-actions)
-        active-actions (map (fn [profile-action]
-                              {"host" (:host profile-action)
-                               "port" (str (:port profile-action))
-                               "dumplink" (worker-dump-link (:host profile-action) (str (:port profile-action)) topology-id)
-                               "timestamp" (str (- (:timestamp profile-action) (System/currentTimeMillis)))})
-                            latest-profile-actions)]
-    (log-message "Latest-active actions are: " (pr active-actions))
-    active-actions))
-
-(defn component-page
-  [topology-id component window include-sys? user secure?]
-  (thrift/with-configured-nimbus-connection nimbus
-    (let [window (or window ":all-time")
-          window-hint (window-hint window)
-          comp-page-info (.getComponentPageInfo ^Nimbus$Client nimbus
-                                                topology-id
-                                                component
-                                                window
-                                                include-sys?)
-          topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus
-                                                     topology-id))
-          msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)
-          [debugEnabled
-           samplingPct] (if-let [debug-opts (.get_debug_options comp-page-info)]
-                          [(.is_enable debug-opts)
-                           (.get_samplingpct debug-opts)])]
-      (assoc
-       (unpack-component-page-info comp-page-info
-                                   topology-id
-                                   window
-                                   include-sys?
-                                   secure?)
-       "user" user
-       "id" component
-       "encodedId" (url-encode component)
-       "name" (.get_topology_name comp-page-info)
-       "executors" (.get_num_executors comp-page-info)
-       "tasks" (.get_num_tasks comp-page-info)
-       "topologyId" topology-id
-       "topologyStatus" (.get_topology_status comp-page-info)
-       "encodedTopologyId" (url-encode topology-id)
-       "window" window
-       "componentType" (-> comp-page-info .get_component_type str lower-case)
-       "windowHint" window-hint
-       "debug" (or debugEnabled false)
-       "samplingPct" (or samplingPct 10)
-       "eventLogLink" (event-log-link topology-id
-                                      component
-                                      (.get_eventlog_host comp-page-info)
-                                      (.get_eventlog_port comp-page-info)
-                                      secure?)
-       "profileActionEnabled" (*STORM-CONF* WORKER-PROFILER-ENABLED)
-       "profilerActive" (if (*STORM-CONF* WORKER-PROFILER-ENABLED)
-                          (get-active-profile-actions nimbus topology-id component)
-                          [])))))
-    
-(defn- level-to-dict [level]
-  (if level
-    (let [timeout (.get_reset_log_level_timeout_secs level)
-          timeout-epoch (.get_reset_log_level_timeout_epoch level)
-          target-level (.get_target_log_level level)
-          reset-level (.get_reset_log_level level)]
-          {"target_level" (.toString (Level/toLevel target-level))
-           "reset_level" (.toString (Level/toLevel reset-level))
-           "timeout" timeout
-           "timeout_epoch" timeout-epoch})))
-
-(defn log-config [topology-id]
-  (thrift/with-configured-nimbus-connection
-    nimbus
-    (let [log-config (.getLogConfig ^Nimbus$Client nimbus topology-id)
-          named-logger-levels (into {}
-                                (for [[key val] (.get_named_logger_level log-config)]
-                                  [(str key) (level-to-dict val)]))]
-      {"namedLoggerLevels" named-logger-levels})))
-
-(defn topology-config [topology-id]
-  (thrift/with-configured-nimbus-connection nimbus
-    (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id))))
-
-(defn topology-op-response [topology-id op]
-  {"topologyOperation" op,
-   "topologyId" topology-id,
-   "status" "success"
-   })
-
-(defn component-op-response [topology-id component-id op]
-  {"topologyOperation" op,
-   "topologyId" topology-id,
-   "componentId" component-id,
-   "status" "success"
-   })
-
-(defn check-include-sys?
-  [sys?]
-  (if (or (nil? sys?) (= "false" sys?)) false true))
-
-(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
-
-(defn populate-context!
-  "Populate the Storm RequestContext from an servlet-request. This should be called in each handler"
-  [servlet-request]
-    (when http-creds-handler
-      (.populateContext http-creds-handler (ReqContext/context) servlet-request)))
-
-(defn get-user-name
-  [servlet-request]
-  (.getUserName http-creds-handler servlet-request))
-
-(defroutes main-routes
-  (GET "/api/v1/cluster/configuration" [& m]
-    (mark! ui:num-cluster-configuration-http-requests)
-    (json-response (cluster-configuration)
-                   (:callback m) :serialize-fn identity))
-  (GET "/api/v1/cluster/summary" [:as {:keys [cookies servlet-request]} & m]
-    (mark! ui:num-cluster-summary-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "getClusterInfo")
-    (let [user (get-user-name servlet-request)]
-      (json-response (assoc (cluster-summary user)
-                          "bugtracker-url" (*STORM-CONF* UI-PROJECT-BUGTRACKER-URL)
-                          "central-log-url" (*STORM-CONF* UI-CENTRAL-LOGGING-URL)) (:callback m))))
-  (GET "/api/v1/nimbus/summary" [:as {:keys [cookies servlet-request]} & m]
-    (mark! ui:num-nimbus-summary-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "getClusterInfo")
-    (json-response (nimbus-summary) (:callback m)))
-  (GET "/api/v1/history/summary" [:as {:keys [cookies servlet-request]} & m]
-    (let [user (.getUserName http-creds-handler servlet-request)]
-      (json-response (topology-history-info user) (:callback m))))
-  (GET "/api/v1/supervisor/summary" [:as {:keys [cookies servlet-request]} & m]
-    (mark! ui:num-supervisor-summary-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "getClusterInfo")
-    (json-response (assoc (supervisor-summary)
-                     "logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)) (:callback m)))
-  (GET "/api/v1/topology/summary" [:as {:keys [cookies servlet-request]} & m]
-    (mark! ui:num-all-topologies-summary-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "getClusterInfo")
-    (json-response (all-topologies-summary) (:callback m)))
-  (GET  "/api/v1/topology-workers/:id" [:as {:keys [cookies servlet-request]} id & m]
-    (let [id (url-decode id)]
-      (json-response {"hostPortList" (worker-host-port id)
-                      "logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)} (:callback m))))
-  (GET "/api/v1/topology/:id" [:as {:keys [cookies servlet-request scheme]} id & m]
-    (mark! ui:num-topology-page-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "getTopology" (topology-config id))
-    (let [user (get-user-name servlet-request)]
-      (json-response (topology-page id (:window m) (check-include-sys? (:sys m)) user (= scheme :https)) (:callback m))))
-  (GET "/api/v1/topology/:id/visualization-init" [:as {:keys [cookies servlet-request]} id & m]
-    (mark! ui:num-build-visualization-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "getTopology" (topology-config id))
-    (json-response (build-visualization id (:window m) (check-include-sys? (:sys m))) (:callback m)))
-  (GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies servlet-request]} id & m]
-    (mark! ui:num-mk-visualization-data-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "getTopology" (topology-config id))
-    (json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys m))) (:callback m)))
-  (GET "/api/v1/topology/:id/component/:component" [:as {:keys [cookies servlet-request scheme]} id component & m]
-    (mark! ui:num-component-page-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "getTopology" (topology-config id))
-    (let [user (get-user-name servlet-request)]
-      (json-response
-          (component-page id component (:window m) (check-include-sys? (:sys m)) user (= scheme :https))
-          (:callback m))))
-  (GET "/api/v1/topology/:id/logconfig" [:as {:keys [cookies servlet-request]} id & m]
-    (mark! ui:num-log-config-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "getTopology" (topology-config id))
-       (json-response (log-config id) (:callback m)))
-  (POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id & m]
-    (mark! ui:num-activate-topology-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "activate" (topology-config id))
-    (thrift/with-configured-nimbus-connection nimbus
-       (let [tplg (->> (doto
-                        (GetInfoOptions.)
-                        (.set_num_err_choice NumErrorsChoice/NONE))
-                      (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
-            name (.get_name tplg)]
-        (.activate nimbus name)
-        (log-message "Activating topology '" name "'")))
-    (json-response (topology-op-response id "activate") (m "callback")))
-  (POST "/api/v1/topology/:id/deactivate" [:as {:keys [cookies servlet-request]} id & m]
-    (mark! ui:num-deactivate-topology-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "deactivate" (topology-config id))
-    (thrift/with-configured-nimbus-connection nimbus
-        (let [tplg (->> (doto
-                        (GetInfoOptions.)
-                        (.set_num_err_choice NumErrorsChoice/NONE))
-                      (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
-            name (.get_name tplg)]
-        (.deactivate nimbus name)
-        (log-message "Deactivating topology '" name "'")))
-    (json-response (topology-op-response id "deactivate") (m "callback")))
-  (POST "/api/v1/topology/:id/debug/:action/:spct" [:as {:keys [cookies servlet-request]} id action spct & m]
-    (mark! ui:num-debug-topology-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "debug" (topology-config id))
-    (thrift/with-configured-nimbus-connection nimbus
-        (let [tplg (->> (doto
-                        (GetInfoOptions.)
-                        (.set_num_err_choice NumErrorsChoice/NONE))
-                   (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
-            name (.get_name tplg)
-            enable? (= "enable" action)]
-        (.debug nimbus name "" enable? (Integer/parseInt spct))
-        (log-message "Debug topology [" name "] action [" action "] sampling pct [" spct "]")))
-     (json-response (topology-op-response id (str "debug/" action)) (m "callback")))
-  (POST "/api/v1/topology/:id/component/:component/debug/:action/:spct" [:as {:keys [cookies servlet-request]} id component action spct & m]
-    (mark! ui:num-component-op-response-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "debug" (topology-config id))
-    (thrift/with-configured-nimbus-connection nimbus
-      (let [tplg (->> (doto
-                        (GetInfoOptions.)
-                        (.set_num_err_choice NumErrorsChoice/NONE))
-                   (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
-            name (.get_name tplg)
-            enable? (= "enable" action)]
-        (.debug nimbus name component enable? (Integer/parseInt spct))
-        (log-message "Debug topology [" name "] component [" component "] action [" action "] sampling pct [" spct "]")))
-    (json-response (component-op-response id component (str "/debug/" action)) (m "callback")))
-  (POST "/api/v1/topology/:id/rebalance/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time & m]
-    (mark! ui:num-topology-op-response-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "rebalance" (topology-config id))
-    (thrift/with-configured-nimbus-connection nimbus
-      (let [tplg (->> (doto
-                        (GetInfoOptions.)
-                        (.set_num_err_choice NumErrorsChoice/NONE))
-                      (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
-            name (.get_name tplg)
-            rebalance-options (m "rebalanceOptions")
-            options (RebalanceOptions.)]
-        (.set_wait_secs options (Integer/parseInt wait-time))
-        (if (and (not-nil? rebalance-options) (contains? rebalance-options "numWorkers"))
-          (.set_num_workers options (Integer/parseInt (.toString (rebalance-options "numWorkers")))))
-        (if (and (not-nil? rebalance-options) (contains? rebalance-options "executors"))
-          (doseq [keyval (rebalance-options "executors")]
-            (.put_to_num_executors options (key keyval) (Integer/parseInt (.toString (val keyval))))))
-        (.rebalance nimbus name options)
-        (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
-    (json-response (topology-op-response id "rebalance") (m "callback")))
-  (POST "/api/v1/topology/:id/kill/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time & m]
-    (mark! ui:num-topology-op-response-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "killTopology" (topology-config id))
-    (thrift/with-configured-nimbus-connection nimbus
-      (let [tplg (->> (doto
-                        (GetInfoOptions.)
-                        (.set_num_err_choice NumErrorsChoice/NONE))
-                      (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
-            name (.get_name tplg)
-            options (KillOptions.)]
-        (.set_wait_secs options (Integer/parseInt wait-time))
-        (.killTopologyWithOpts nimbus name options)
-        (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
-    (json-response (topology-op-response id "kill") (m "callback")))
-  (POST "/api/v1/topology/:id/logconfig" [:as {:keys [cookies servlet-request]} id namedLoggerLevels & m]
-    (mark! ui:num-topology-op-response-http-requests)
-    (populate-context! servlet-request)
-    (assert-authorized-user "setLogConfig" (topology-config id))
-    (thrift/with-configured-nimbus-connection
-      nimbus
-      (let [new-log-config (LogConfig.)]
-        (doseq [[key level] namedLoggerLevels]
-            (let [logger-name (str key)
-                  target-level (.get level "target_level")
-                  timeout (or (.get level "timeout") 0)
-                  named-logger-level (LogLevel.)]
-              ;; if target-level is nil, do not set it, user wants to clear
-              (log-message "The target level for " logger-name " is " target-level)
-              (if (nil? target-level)
-                (do
-                  (.set_action named-logger-level LogLevelAction/REMOVE)
-                  (.unset_target_log_level named-logger-level))
-                (do
-                  (.set_action named-logger-level LogLevelAction/UPDATE)
-                  ;; the toLevel here ensures the string we get is valid
-                  (.set_target_log_level named-logger-level (.name (Level/toLevel target-level)))
-                  (.set_reset_log_level_timeout_secs named-logger-level timeout)))
-              (log-message "Adding this " logger-name " " named-logger-level " to " new-log-config)
-              (.put_to_named_logger_level new-log-config logger-name named-logger-level)))
-        (log-message "Setting topology " id " log config " new-log-config)
-        (.setLogConfig nimbus id new-log-config)
-        (json-response (log-config id) (m "callback")))))
-
-  (GET "/api/v1/topology/:id/profiling/start/:host-port/:timeout"
-       [:as {:keys [servlet-request]} id host-port timeout & m]
-       (thrift/with-configured-nimbus-connection nimbus
-         (assert-authorized-user "setWorkerProfiler" (topology-config id))
-         (assert-authorized-profiler-action "start")
-         (let [[host, port] (split host-port #":")
-               nodeinfo (NodeInfo. host (set [(Long. port)]))
-               timestamp (+ (System/currentTimeMillis) (* 60000 (Long. timeout)))
-               request (ProfileRequest. nodeinfo
-                                        ProfileAction/JPROFILE_STOP)]
-           (.set_time_stamp request timestamp)
-           (.setWorkerProfiler nimbus id request)
-           (json-response {"status" "ok"
-                           "id" host-port
-                           "timeout" timeout
-                           "dumplink" (worker-dump-link
-                                       host
-                                       port
-                                       id)}
-                          (m "callback")))))
-
-  (GET "/api/v1/topology/:id/profiling/stop/:host-port"
-       [:as {:keys [servlet-request]} id host-port & m]
-       (thrift/with-configured-nimbus-connection nimbus
-         (assert-authorized-user "setWorkerProfiler" (topology-config id))
-         (assert-authorized-profiler-action "stop")
-         (let [[host, port] (split host-port #":")
-               nodeinfo (NodeInfo. host (set [(Long. port)]))
-               timestamp 0
-               request (ProfileRequest. nodeinfo
-                                        ProfileAction/JPROFILE_STOP)]
-           (.set_time_stamp request timestamp)
-           (.setWorkerProfiler nimbus id request)
-           (json-response {"status" "ok"
-                           "id" host-port}
-                          (m "callback")))))
-  
-  (GET "/api/v1/topology/:id/profiling/dumpprofile/:host-port"
-       [:as {:keys [servlet-request]} id host-port & m]
-       (thrift/with-configured-nimbus-connection nimbus
-         (assert-authorized-user "setWorkerProfiler" (topology-config id))
-         (assert-authorized-profiler-action "dumpprofile")
-         (let [[host, port] (split host-port #":")
-               nodeinfo (NodeInfo. host (set [(Long. port)]))
-               timestamp (System/currentTimeMillis)
-               request (ProfileRequest. nodeinfo
-                                        ProfileAction/JPROFILE_DUMP)]
-           (.set_time_stamp request timestamp)
-           (.setWorkerProfiler nimbus id request)
-           (json-response {"status" "ok"
-                           "id" host-port}
-                          (m "callback")))))
-
-  (GET "/api/v1/topology/:id/profiling/dumpjstack/:host-port"
-       [:as {:keys [servlet-request]} id host-port & m]
-       (thrift/with-configured-nimbus-connection nimbus
-         (assert-authorized-user "setWorkerProfiler" (topology-config id))
-         (assert-authorized-profiler-action "dumpjstack")
-         (let [[host, port] (split host-port #":")
-               nodeinfo (NodeInfo. host (set [(Long. port)]))
-               timestamp (System/currentTimeMillis)
-               request (ProfileRequest. nodeinfo
-                                        ProfileAction/JSTACK_DUMP)]
-           (.set_time_stamp request timestamp)
-           (.setWorkerProfiler nimbus id request)
-           (json-response {"status" "ok"
-                           "id" host-port}
-                          (m "callback")))))
-
-  (GET "/api/v1/topology/:id/profiling/restartworker/:host-port"
-       [:as {:keys [servlet-request]} id host-port & m]
-       (thrift/with-configured-nimbus-connection nimbus
-         (assert-authorized-user "setWorkerProfiler" (topology-config id))
-         (assert-authorized-profiler-action "restartworker")
-         (let [[host, port] (split host-port #":")
-               nodeinfo (NodeInfo. host (set [(Long. port)]))
-               timestamp (System/currentTimeMillis)
-               request (ProfileRequest. nodeinfo
-                                        ProfileAction/JVM_RESTART)]
-           (.set_time_stamp request timestamp)
-           (.setWorkerProfiler nimbus id request)
-           (json-response {"status" "ok"
-                           "id" host-port}
-                          (m "callback")))))
-       
-  (GET "/api/v1/topology/:id/profiling/dumpheap/:host-port"
-       [:as {:keys [servlet-request]} id host-port & m]
-       (thrift/with-configured-nimbus-connection nimbus
-         (assert-authorized-user "setWorkerProfiler" (topology-config id))
-         (assert-authorized-profiler-action "dumpheap")
-         (let [[host, port] (split host-port #":")
-               nodeinfo (NodeInfo. host (set [(Long. port)]))
-               timestamp (System/currentTimeMillis)
-               request (ProfileRequest. nodeinfo
-                                        ProfileAction/JMAP_DUMP)]
-           (.set_time_stamp request timestamp)
-           (.setWorkerProfiler nimbus id request)
-           (json-response {"status" "ok"
-                           "id" host-port}
-                          (m "callback")))))
-  
-  (GET "/" [:as {cookies :cookies}]
-    (mark! ui:num-main-page-http-requests)
-    (resp/redirect "/index.html"))
-  (route/resources "/")
-  (route/not-found "Page not found"))
-
-(defn catch-errors
-  [handler]
-  (fn [request]
-    (try
-      (handler request)
-      (catch Exception ex
-        (json-response (exception->json ex) ((:query-params request) "callback") :status 500)))))
-
-(def app
-  (handler/site (-> main-routes
-                    (wrap-json-params)
-                    (wrap-multipart-params)
-                    (wrap-reload '[backtype.storm.ui.core])
-                    requests-middleware
-                    catch-errors)))
-
-(defn start-server!
-  []
-  (try
-    (let [conf *STORM-CONF*
-          header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES))
-          filters-confs [{:filter-class (conf UI-FILTER)
-                          :filter-params (conf UI-FILTER-PARAMS)}]
-          https-port (if (not-nil? (conf UI-HTTPS-PORT)) (conf UI-HTTPS-PORT) 0)
-          https-ks-path (conf UI-HTTPS-KEYSTORE-PATH)
-          https-ks-password (conf UI-HTTPS-KEYSTORE-PASSWORD)
-          https-ks-type (conf UI-HTTPS-KEYSTORE-TYPE)
-          https-key-password (conf UI-HTTPS-KEY-PASSWORD)
-          https-ts-path (conf UI-HTTPS-TRUSTSTORE-PATH)
-          https-ts-password (conf UI-HTTPS-TRUSTSTORE-PASSWORD)
-          https-ts-type (conf UI-HTTPS-TRUSTSTORE-TYPE)
-          https-want-client-auth (conf UI-HTTPS-WANT-CLIENT-AUTH)
-          https-need-client-auth (conf UI-HTTPS-NEED-CLIENT-AUTH)]
-      (start-metrics-reporters)
-      (storm-run-jetty {:port (conf UI-PORT)
-                        :host (conf UI-HOST)
-                        :https-port https-port
-                        :configurator (fn [server]
-                                        (config-ssl server
-                                                    https-port
-                                                    https-ks-path
-                                                    https-ks-password
-                                                    https-ks-type
-                                                    https-key-password
-                                                    https-ts-path
-                                                    https-ts-password
-                                                    https-ts-type
-                                                    https-need-client-auth
-                                                    https-want-client-auth)
-                                        (doseq [connector (.getConnectors server)]
-                                          (.setRequestHeaderSize connector header-buffer-size))
-                                        (config-filter server app filters-confs))}))
-   (catch Exception ex
-     (log-error ex))))
-
-(defn -main
-  []
-  (log-message "Starting ui server for storm version '" STORM-VERSION "'")
-  (start-server!))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/ui/helpers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/helpers.clj b/storm-core/src/clj/backtype/storm/ui/helpers.clj
deleted file mode 100644
index e0db5c8..0000000
--- a/storm-core/src/clj/backtype/storm/ui/helpers.clj
+++ /dev/null
@@ -1,240 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.ui.helpers
-  (:use compojure.core)
-  (:use [hiccup core page-helpers])
-  (:use [clojure
-         [string :only [blank? join]]
-         [walk :only [keywordize-keys]]])
-  (:use [backtype.storm config log])
-  (:use [backtype.storm.util :only [clojurify-structure uuid defnk to-json url-encode not-nil?]])
-  (:use [clj-time coerce format])
-  (:import [backtype.storm.generated ExecutorInfo ExecutorSummary])
-  (:import [backtype.storm.logging.filters AccessLoggingFilter])
-  (:import [java.util EnumSet])
-  (:import [org.eclipse.jetty.server Server]
-           [org.eclipse.jetty.server.nio SelectChannelConnector]
-           [org.eclipse.jetty.server.ssl SslSocketConnector]
-           [org.eclipse.jetty.servlet ServletHolder FilterMapping]
-	   [org.eclipse.jetty.util.ssl SslContextFactory]
-           [org.eclipse.jetty.server DispatcherType]
-           [org.eclipse.jetty.servlets CrossOriginFilter])
-  (:require [ring.util servlet])
-  (:require [compojure.route :as route]
-            [compojure.handler :as handler])
-  (:require [metrics.meters :refer [defmeter mark!]]))
-
-(defmeter num-web-requests)
-(defn requests-middleware
-  "Coda Hale metric for counting the number of web requests."
-  [handler]
-  (fn [req]
-    (mark! num-web-requests)
-    (handler req)))
-
-(defn split-divide [val divider]
-  [(Integer. (int (/ val divider))) (mod val divider)]
-  )
-
-(def PRETTY-SEC-DIVIDERS
-     [["s" 60]
-      ["m" 60]
-      ["h" 24]
-      ["d" nil]])
-
-(def PRETTY-MS-DIVIDERS
-     (cons ["ms" 1000]
-           PRETTY-SEC-DIVIDERS))
-
-(defn pretty-uptime-str* [val dividers]
-  (let [val (if (string? val) (Integer/parseInt val) val)
-        vals (reduce (fn [[state val] [_ divider]]
-                       (if (pos? val)
-                         (let [[divided mod] (if divider
-                                               (split-divide val divider)
-                                               [nil val])]
-                           [(concat state [mod])
-                            divided]
-                           )
-                         [state val]
-                         ))
-                     [[] val]
-                     dividers)
-        strs (->>
-              (first vals)
-              (map
-               (fn [[suffix _] val]
-                 (str val suffix))
-               dividers
-               ))]
-    (join " " (reverse strs))
-    ))
-
-(defn pretty-uptime-sec [secs]
-  (pretty-uptime-str* secs PRETTY-SEC-DIVIDERS))
-
-(defn pretty-uptime-ms [ms]
-  (pretty-uptime-str* ms PRETTY-MS-DIVIDERS))
-
-
-(defelem table [headers-map data]
-  [:table
-   [:thead
-    [:tr
-     (for [h headers-map]
-       [:th (if (:text h) [:span (:attr h) (:text h)] h)])
-     ]]
-   [:tbody
-    (for [row data]
-      [:tr
-       (for [col row]
-         [:td col]
-         )]
-      )]
-   ])
-
-(defn url-format [fmt & args]
-  (String/format fmt
-    (to-array (map #(url-encode (str %)) args))))
-
-(defn pretty-executor-info [^ExecutorInfo e]
-  (str "[" (.get_task_start e) "-" (.get_task_end e) "]"))
-
-(defn unauthorized-user-json
-  [user]
-  {"error" "No Authorization"
-   "errorMessage" (str "User " user " is not authorized.")})
-
-(defn unauthorized-user-html [user]
-  [[:h2 "User '" (escape-html user) "' is not authorized."]])
-
-(defn- mk-ssl-connector [port ks-path ks-password ks-type key-password
-                         ts-path ts-password ts-type need-client-auth want-client-auth]
-  (let [sslContextFactory (doto (SslContextFactory.)
-                            (.setExcludeCipherSuites (into-array String ["SSL_RSA_WITH_RC4_128_MD5" "SSL_RSA_WITH_RC4_128_SHA"]))
-                            (.setExcludeProtocols (into-array String ["SSLv3"]))
-                            (.setAllowRenegotiate false)
-                            (.setKeyStorePath ks-path)
-                            (.setKeyStoreType ks-type)
-                            (.setKeyStorePassword ks-password)
-                            (.setKeyManagerPassword key-password))]
-    (if (and (not-nil? ts-path) (not-nil? ts-password) (not-nil? ts-type))
-      (do
-        (.setTrustStore sslContextFactory ts-path)
-        (.setTrustStoreType sslContextFactory ts-type)
-        (.setTrustStorePassword sslContextFactory ts-password)))
-    (cond
-      need-client-auth (.setNeedClientAuth sslContextFactory true)
-      want-client-auth (.setWantClientAuth sslContextFactory true))
-    (doto (SslSocketConnector. sslContextFactory)
-      (.setPort port))))
-
-
-(defn config-ssl [server port ks-path ks-password ks-type key-password
-                  ts-path ts-password ts-type need-client-auth want-client-auth]
-  (when (> port 0)
-    (.addConnector server (mk-ssl-connector port ks-path ks-password ks-type key-password
-                                            ts-path ts-password ts-type need-client-auth want-client-auth))))
-
-(defn cors-filter-handler
-  []
-  (doto (org.eclipse.jetty.servlet.FilterHolder. (CrossOriginFilter.))
-    (.setInitParameter CrossOriginFilter/ALLOWED_ORIGINS_PARAM "*")
-    (.setInitParameter CrossOriginFilter/ALLOWED_METHODS_PARAM "GET, POST, PUT")
-    (.setInitParameter CrossOriginFilter/ALLOWED_HEADERS_PARAM "X-Requested-With, X-Requested-By, Access-Control-Allow-Origin, Content-Type, Content-Length, Accept, Origin")
-    (.setInitParameter CrossOriginFilter/ACCESS_CONTROL_ALLOW_ORIGIN_HEADER "*")
-    ))
-
-(defn mk-access-logging-filter-handler []
-  (org.eclipse.jetty.servlet.FilterHolder. (AccessLoggingFilter.)))
-
-(defn config-filter [server handler filters-confs]
-  (if filters-confs
-    (let [servlet-holder (ServletHolder.
-                           (ring.util.servlet/servlet handler))
-          context (doto (org.eclipse.jetty.servlet.ServletContextHandler. server "/")
-                    (.addServlet servlet-holder "/"))]
-      (.addFilter context (cors-filter-handler) "/*" (EnumSet/allOf DispatcherType))
-      (doseq [{:keys [filter-name filter-class filter-params]} filters-confs]
-        (if filter-class
-          (let [filter-holder (doto (org.eclipse.jetty.servlet.FilterHolder.)
-                                (.setClassName filter-class)
-                                (.setName (or filter-name filter-class))
-                                (.setInitParameters (or filter-params {})))]
-            (.addFilter context filter-holder "/*" FilterMapping/ALL))))
-      (.addFilter context (mk-access-logging-filter-handler) "/*" (EnumSet/allOf DispatcherType))
-      (.setHandler server context))))
-
-(defn ring-response-from-exception [ex]
-  {:headers {}
-   :status 400
-   :body (.getMessage ex)})
-
-(defn- remove-non-ssl-connectors [server]
-  (doseq [c (.getConnectors server)]
-    (when-not (or (nil? c) (instance? SslSocketConnector c))
-      (.removeConnector server c)
-      ))
-  server)
-
-;; Modified from ring.adapter.jetty 1.3.0
-(defn- jetty-create-server
-  "Construct a Jetty Server instance."
-  [options]
-  (let [connector (doto (SelectChannelConnector.)
-                    (.setPort (options :port 80))
-                    (.setHost (options :host))
-                    (.setMaxIdleTime (options :max-idle-time 200000)))
-        server    (doto (Server.)
-                    (.addConnector connector)
-                    (.setSendDateHeader true))
-        https-port (options :https-port)]
-    (if (and (not-nil? https-port) (> https-port 0)) (remove-non-ssl-connectors server))
-    server))
-
-(defn storm-run-jetty
-  "Modified version of run-jetty
-  Assumes configurator sets handler."
-  [config]
-  {:pre [(:configurator config)]}
-  (let [#^Server s (jetty-create-server (dissoc config :configurator))
-        configurator (:configurator config)]
-    (configurator s)
-    (.start s)))
-
-(defn wrap-json-in-callback [callback response]
-  (str callback "(" response ");"))
-
-(defnk json-response
-  [data callback :serialize-fn to-json :status 200 :headers {}]
-  {:status status
-   :headers (merge {"Cache-Control" "no-cache, no-store"
-                    "Access-Control-Allow-Origin" "*"
-                    "Access-Control-Allow-Headers" "Content-Type, Access-Control-Allow-Headers, Access-Controler-Allow-Origin, X-Requested-By, X-Csrf-Token, Authorization, X-Requested-With"}
-              (if (not-nil? callback) {"Content-Type" "application/javascript;charset=utf-8"}
-                {"Content-Type" "application/json;charset=utf-8"})
-              headers)
-   :body (if (not-nil? callback)
-           (wrap-json-in-callback callback (serialize-fn data))
-           (serialize-fn data))})
-
-(defn exception->json
-  [ex]
-  {"error" "Internal Server Error"
-   "errorMessage"
-   (let [sw (java.io.StringWriter.)]
-     (.printStackTrace ex (java.io.PrintWriter. sw))
-     (.toString sw))})