You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:09 UTC
[13/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs
to org.apache.storm, but try to provide some form of backwards compatability
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
new file mode 100644
index 0000000..b309b2c
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -0,0 +1,1273 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.ui.core
+ (:use compojure.core)
+ (:use [clojure.java.shell :only [sh]])
+ (:use ring.middleware.reload
+ ring.middleware.multipart-params)
+ (:use [ring.middleware.json :only [wrap-json-params]])
+ (:use [hiccup core page-helpers])
+ (:use [org.apache.storm config util log stats zookeeper converter])
+ (:use [org.apache.storm.ui helpers])
+ (:use [org.apache.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
+ ACKER-FAIL-STREAM-ID mk-authorization-handler
+ start-metrics-reporters]]])
+ (:import [org.apache.storm.utils Utils]
+ [org.apache.storm.generated NimbusSummary])
+ (:use [clojure.string :only [blank? lower-case trim split]])
+ (:import [org.apache.storm.generated ExecutorSpecificStats
+ ExecutorStats ExecutorSummary ExecutorInfo TopologyInfo SpoutStats BoltStats
+ ErrorInfo ClusterSummary SupervisorSummary TopologySummary
+ Nimbus$Client StormTopology GlobalStreamId RebalanceOptions
+ KillOptions GetInfoOptions NumErrorsChoice DebugOptions TopologyPageInfo
+ TopologyStats CommonAggregateStats ComponentAggregateStats
+ ComponentType BoltAggregateStats SpoutAggregateStats
+ ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo
+ LogConfig LogLevel LogLevelAction])
+ (:import [org.apache.storm.security.auth AuthUtils ReqContext])
+ (:import [org.apache.storm.generated AuthorizationException ProfileRequest ProfileAction NodeInfo])
+ (:import [org.apache.storm.security.auth AuthUtils])
+ (:import [org.apache.storm.utils VersionInfo])
+ (:import [org.apache.storm Config])
+ (:import [java.io File])
+ (:require [compojure.route :as route]
+ [compojure.handler :as handler]
+ [ring.util.response :as resp]
+ [org.apache.storm [thrift :as thrift]])
+ (:require [metrics.meters :refer [defmeter mark!]])
+ (:import [org.apache.commons.lang StringEscapeUtils])
+ (:import [org.apache.logging.log4j Level])
+ (:gen-class))
+
+(def ^:dynamic *STORM-CONF* (read-storm-config))
+(def ^:dynamic *UI-ACL-HANDLER* (mk-authorization-handler (*STORM-CONF* NIMBUS-AUTHORIZER) *STORM-CONF*))
+(def ^:dynamic *UI-IMPERSONATION-HANDLER* (mk-authorization-handler (*STORM-CONF* NIMBUS-IMPERSONATION-AUTHORIZER) *STORM-CONF*))
+(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
+(def STORM-VERSION (VersionInfo/getVersion))
+
+(defmeter ui:num-cluster-configuration-http-requests)
+(defmeter ui:num-cluster-summary-http-requests)
+(defmeter ui:num-nimbus-summary-http-requests)
+(defmeter ui:num-supervisor-summary-http-requests)
+(defmeter ui:num-all-topologies-summary-http-requests)
+(defmeter ui:num-topology-page-http-requests)
+(defmeter ui:num-build-visualization-http-requests)
+(defmeter ui:num-mk-visualization-data-http-requests)
+(defmeter ui:num-component-page-http-requests)
+(defmeter ui:num-log-config-http-requests)
+(defmeter ui:num-activate-topology-http-requests)
+(defmeter ui:num-deactivate-topology-http-requests)
+(defmeter ui:num-debug-topology-http-requests)
+(defmeter ui:num-component-op-response-http-requests)
+(defmeter ui:num-topology-op-response-http-requests)
+(defmeter ui:num-topology-op-response-http-requests)
+(defmeter ui:num-topology-op-response-http-requests)
+(defmeter ui:num-main-page-http-requests)
+
+(defn assert-authorized-user
+ ([op]
+ (assert-authorized-user op nil))
+ ([op topology-conf]
+ (let [context (ReqContext/context)]
+ (if (.isImpersonating context)
+ (if *UI-IMPERSONATION-HANDLER*
+ (if-not (.permit *UI-IMPERSONATION-HANDLER* context op topology-conf)
+ (let [principal (.principal context)
+ real-principal (.realPrincipal context)
+ user (if principal (.getName principal) "unknown")
+ real-user (if real-principal (.getName real-principal) "unknown")
+ remote-address (.remoteAddress context)]
+ (throw (AuthorizationException.
+ (str "user '" real-user "' is not authorized to impersonate user '" user "' from host '" remote-address "'. Please
+ see SECURITY.MD to learn how to configure impersonation ACL.")))))
+ (log-warn " principal " (.realPrincipal context) " is trying to impersonate " (.principal context) " but "
+ NIMBUS-IMPERSONATION-AUTHORIZER " has no authorizer configured. This is a potential security hole.
+ Please see SECURITY.MD to learn how to configure an impersonation authorizer.")))
+
+ (if *UI-ACL-HANDLER*
+ (if-not (.permit *UI-ACL-HANDLER* context op topology-conf)
+ (let [principal (.principal context)
+ user (if principal (.getName principal) "unknown")]
+ (throw (AuthorizationException.
+ (str "UI request '" op "' for '" user "' user is not authorized")))))))))
+
+
+(defn assert-authorized-profiler-action
+ [op]
+ (if-not (*STORM-CONF* WORKER-PROFILER-ENABLED)
+ (throw (AuthorizationException.
+ (str "UI request for profiler action '" op "' is disabled.")))))
+
+
+(defn executor-summary-type
+ [topology ^ExecutorSummary s]
+ (component-type topology (.get_component_id s)))
+
+(defn is-ack-stream
+ [stream]
+ (let [acker-streams
+ [ACKER-INIT-STREAM-ID
+ ACKER-ACK-STREAM-ID
+ ACKER-FAIL-STREAM-ID]]
+ (every? #(not= %1 stream) acker-streams)))
+
+(defn spout-summary?
+ [topology s]
+ (= :spout (executor-summary-type topology s)))
+
+(defn bolt-summary?
+ [topology s]
+ (= :bolt (executor-summary-type topology s)))
+
+(defn group-by-comp
+ [summs]
+ (let [ret (group-by #(.get_component_id ^ExecutorSummary %) summs)]
+ (into (sorted-map) ret )))
+
+(defn logviewer-link [host fname secure?]
+ (if (and secure? (*STORM-CONF* LOGVIEWER-HTTPS-PORT))
+ (url-format "https://%s:%s/log?file=%s"
+ host
+ (*STORM-CONF* LOGVIEWER-HTTPS-PORT)
+ fname)
+ (url-format "http://%s:%s/log?file=%s"
+ host
+ (*STORM-CONF* LOGVIEWER-PORT)
+ fname)))
+
+(defn event-log-link
+ [topology-id component-id host port secure?]
+ (logviewer-link host (event-logs-filename topology-id port) secure?))
+
+(defn worker-log-link [host port topology-id secure?]
+ (let [fname (logs-filename topology-id port)]
+ (logviewer-link host fname secure?)))
+
+(defn nimbus-log-link [host port]
+ (url-format "http://%s:%s/daemonlog?file=nimbus.log" host (*STORM-CONF* LOGVIEWER-PORT) port))
+
+(defn get-error-time
+ [error]
+ (if error
+ (time-delta (.get_error_time_secs ^ErrorInfo error))))
+
+(defn get-error-data
+ [error]
+ (if error
+ (error-subset (.get_error ^ErrorInfo error))
+ ""))
+
+(defn get-error-port
+ [error]
+ (if error
+ (.get_port ^ErrorInfo error)
+ ""))
+
+(defn get-error-host
+ [error]
+ (if error
+ (.get_host ^ErrorInfo error)
+ ""))
+
+(defn get-error-time
+ [error]
+ (if error
+ (.get_error_time_secs ^ErrorInfo error)
+ ""))
+
+(defn worker-dump-link [host port topology-id]
+ (url-format "http://%s:%s/dumps/%s/%s"
+ (url-encode host)
+ (*STORM-CONF* LOGVIEWER-PORT)
+ (url-encode topology-id)
+ (str (url-encode host) ":" (url-encode port))))
+
+(defn stats-times
+ [stats-map]
+ (sort-by #(Integer/parseInt %)
+ (-> stats-map
+ clojurify-structure
+ (dissoc ":all-time")
+ keys)))
+
+(defn window-hint
+ [window]
+ (if (= window ":all-time")
+ "All time"
+ (pretty-uptime-sec window)))
+
+(defn sanitize-stream-name
+ [name]
+ (let [sym-regex #"(?![A-Za-z_\-:\.])."]
+ (str
+ (if (re-find #"^[A-Za-z]" name)
+ (clojure.string/replace name sym-regex "_")
+ (clojure.string/replace (str \s name) sym-regex "_"))
+ (hash name))))
+
+(defn sanitize-transferred
+ [transferred]
+ (into {}
+ (for [[time, stream-map] transferred]
+ [time, (into {}
+ (for [[stream, trans] stream-map]
+ [(sanitize-stream-name stream), trans]))])))
+
+(defn visualization-data
+ [spout-bolt spout-comp-summs bolt-comp-summs window storm-id]
+ (let [components (for [[id spec] spout-bolt]
+ [id
+ (let [inputs (.get_inputs (.get_common spec))
+ bolt-summs (get bolt-comp-summs id)
+ spout-summs (get spout-comp-summs id)
+ bolt-cap (if bolt-summs
+ (compute-bolt-capacity bolt-summs)
+ 0)]
+ {:type (if bolt-summs "bolt" "spout")
+ :capacity bolt-cap
+ :latency (if bolt-summs
+ (get-in
+ (bolt-streams-stats bolt-summs true)
+ [:process-latencies window])
+ (get-in
+ (spout-streams-stats spout-summs true)
+ [:complete-latencies window]))
+ :transferred (or
+ (get-in
+ (spout-streams-stats spout-summs true)
+ [:transferred window])
+ (get-in
+ (bolt-streams-stats bolt-summs true)
+ [:transferred window]))
+ :stats (let [mapfn (fn [dat]
+ (map (fn [^ExecutorSummary summ]
+ {:host (.get_host summ)
+ :port (.get_port summ)
+ :uptime_secs (.get_uptime_secs summ)
+ :transferred (if-let [stats (.get_stats summ)]
+ (sanitize-transferred (.get_transferred stats)))})
+ dat))]
+ (if bolt-summs
+ (mapfn bolt-summs)
+ (mapfn spout-summs)))
+ :link (url-format "/component.html?id=%s&topology_id=%s" id storm-id)
+ :inputs (for [[global-stream-id group] inputs]
+ {:component (.get_componentId global-stream-id)
+ :stream (.get_streamId global-stream-id)
+ :sani-stream (sanitize-stream-name (.get_streamId global-stream-id))
+ :grouping (clojure.core/name (thrift/grouping-type group))})})])]
+ (into {} (doall components))))
+
+(defn stream-boxes [datmap]
+ (let [filter-fn (mk-include-sys-fn true)
+ streams
+ (vec (doall (distinct
+ (apply concat
+ (for [[k v] datmap]
+ (for [m (get v :inputs)]
+ {:stream (get m :stream)
+ :sani-stream (get m :sani-stream)
+ :checked (is-ack-stream (get m :stream))}))))))]
+ (map (fn [row]
+ {:row row}) (partition 4 4 nil streams))))
+
+(defn- get-topology-info
+ ([^Nimbus$Client nimbus id]
+ (.getTopologyInfo nimbus id))
+ ([^Nimbus$Client nimbus id options]
+ (.getTopologyInfoWithOpts nimbus id options)))
+
+(defn mk-visualization-data
+ [id window include-sys?]
+ (thrift/with-configured-nimbus-connection
+ nimbus
+ (let [window (if window window ":all-time")
+ topology (.getTopology ^Nimbus$Client nimbus id)
+ spouts (.get_spouts topology)
+ bolts (.get_bolts topology)
+ summ (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/NONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
+ execs (.get_executors summ)
+ spout-summs (filter (partial spout-summary? topology) execs)
+ bolt-summs (filter (partial bolt-summary? topology) execs)
+ spout-comp-summs (group-by-comp spout-summs)
+ bolt-comp-summs (group-by-comp bolt-summs)
+ bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?)
+ bolt-comp-summs)]
+ (visualization-data
+ (merge (hashmap-to-persistent spouts)
+ (hashmap-to-persistent bolts))
+ spout-comp-summs bolt-comp-summs window id))))
+
+(defn validate-tplg-submit-params [params]
+ (let [tplg-jar-file (params :topologyJar)
+ tplg-config (if (not-nil? (params :topologyConfig)) (from-json (params :topologyConfig)))]
+ (cond
+ (nil? tplg-jar-file) {:valid false :error "missing topology jar file"}
+ (nil? tplg-config) {:valid false :error "missing topology config"}
+ (nil? (tplg-config "topologyMainClass")) {:valid false :error "topologyMainClass missing in topologyConfig"}
+ :else {:valid true})))
+
+(defn run-tplg-submit-cmd [tplg-jar-file tplg-config user]
+ (let [tplg-main-class (if (not-nil? tplg-config) (trim (tplg-config "topologyMainClass")))
+ tplg-main-class-args (if (not-nil? tplg-config) (tplg-config "topologyMainClassArgs"))
+ storm-home (System/getProperty "storm.home")
+ storm-conf-dir (str storm-home file-path-separator "conf")
+ storm-log-dir (if (not-nil? (*STORM-CONF* "storm.log.dir")) (*STORM-CONF* "storm.log.dir")
+ (str storm-home file-path-separator "logs"))
+ storm-libs (str storm-home file-path-separator "lib" file-path-separator "*")
+ java-cmd (str (System/getProperty "java.home") file-path-separator "bin" file-path-separator "java")
+ storm-cmd (str storm-home file-path-separator "bin" file-path-separator "storm")
+ tplg-cmd-response (apply sh
+ (flatten
+ [storm-cmd "jar" tplg-jar-file tplg-main-class
+ (if (not-nil? tplg-main-class-args) tplg-main-class-args [])
+ (if (not= user "unknown") (str "-c storm.doAsUser=" user) [])]))]
+ (log-message "tplg-cmd-response " tplg-cmd-response)
+ (cond
+ (= (tplg-cmd-response :exit) 0) {"status" "success"}
+ (and (not= (tplg-cmd-response :exit) 0)
+ (not-nil? (re-find #"already exists on cluster" (tplg-cmd-response :err)))) {"status" "failed" "error" "Topology with the same name exists in cluster"}
+ (not= (tplg-cmd-response :exit) 0) {"status" "failed" "error" (clojure.string/trim-newline (tplg-cmd-response :err))}
+ :else {"status" "success" "response" "topology deployed"}
+ )))
+
+(defn cluster-configuration []
+ (thrift/with-configured-nimbus-connection nimbus
+ (.getNimbusConf ^Nimbus$Client nimbus)))
+
+(defn topology-history-info
+ ([user]
+ (thrift/with-configured-nimbus-connection nimbus
+ (topology-history-info (.getTopologyHistory ^Nimbus$Client nimbus user) user)))
+ ([history user]
+ {"topo-history"
+ (into [] (.get_topo_ids history))}))
+
+(defn cluster-summary
+ ([user]
+ (thrift/with-configured-nimbus-connection nimbus
+ (cluster-summary (.getClusterInfo ^Nimbus$Client nimbus) user)))
+ ([^ClusterSummary summ user]
+ (let [sups (.get_supervisors summ)
+ used-slots (reduce + (map #(.get_num_used_workers ^SupervisorSummary %) sups))
+ total-slots (reduce + (map #(.get_num_workers ^SupervisorSummary %) sups))
+ free-slots (- total-slots used-slots)
+ topologies (.get_topologies_size summ)
+ total-tasks (->> (.get_topologies summ)
+ (map #(.get_num_tasks ^TopologySummary %))
+ (reduce +))
+ total-executors (->> (.get_topologies summ)
+ (map #(.get_num_executors ^TopologySummary %))
+ (reduce +))]
+ {"user" user
+ "stormVersion" STORM-VERSION
+ "supervisors" (count sups)
+ "topologies" topologies
+ "slotsTotal" total-slots
+ "slotsUsed" used-slots
+ "slotsFree" free-slots
+ "executorsTotal" total-executors
+ "tasksTotal" total-tasks })))
+
+(defn convert-to-nimbus-summary[nimbus-seed]
+ (let [[host port] (.split nimbus-seed ":")]
+ {
+ "host" host
+ "port" port
+ "nimbusLogLink" (nimbus-log-link host port)
+ "status" "Offline"
+ "version" "Not applicable"
+ "nimbusUpTime" "Not applicable"
+ "nimbusUptimeSeconds" "Not applicable"}
+ ))
+
+(defn nimbus-summary
+ ([]
+ (thrift/with-configured-nimbus-connection nimbus
+ (nimbus-summary
+ (.get_nimbuses (.getClusterInfo ^Nimbus$Client nimbus)))))
+ ([nimbuses]
+ (let [nimbus-seeds (set (map #(str %1 ":" (*STORM-CONF* NIMBUS-THRIFT-PORT)) (set (*STORM-CONF* NIMBUS-SEEDS))))
+ alive-nimbuses (set (map #(str (.get_host %1) ":" (.get_port %1)) nimbuses))
+ offline-nimbuses (clojure.set/difference nimbus-seeds alive-nimbuses)
+ offline-nimbuses-summary (map #(convert-to-nimbus-summary %1) offline-nimbuses)]
+ {"nimbuses"
+ (concat offline-nimbuses-summary
+ (for [^NimbusSummary n nimbuses
+ :let [uptime (.get_uptime_secs n)]]
+ {
+ "host" (.get_host n)
+ "port" (.get_port n)
+ "nimbusLogLink" (nimbus-log-link (.get_host n) (.get_port n))
+ "status" (if (.is_isLeader n) "Leader" "Not a Leader")
+ "version" (.get_version n)
+ "nimbusUpTime" (pretty-uptime-sec uptime)
+ "nimbusUpTimeSeconds" uptime}))})))
+
+(defn supervisor-summary
+ ([]
+ (thrift/with-configured-nimbus-connection nimbus
+ (supervisor-summary
+ (.get_supervisors (.getClusterInfo ^Nimbus$Client nimbus)))))
+ ([summs]
+ {"supervisors"
+ (for [^SupervisorSummary s summs]
+ {"id" (.get_supervisor_id s)
+ "host" (.get_host s)
+ "uptime" (pretty-uptime-sec (.get_uptime_secs s))
+ "uptimeSeconds" (.get_uptime_secs s)
+ "slotsTotal" (.get_num_workers s)
+ "slotsUsed" (.get_num_used_workers s)
+ "totalMem" (get (.get_total_resources s) Config/SUPERVISOR_MEMORY_CAPACITY_MB)
+ "totalCpu" (get (.get_total_resources s) Config/SUPERVISOR_CPU_CAPACITY)
+ "usedMem" (.get_used_mem s)
+ "usedCpu" (.get_used_cpu s)
+ "version" (.get_version s)})
+ "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)}))
+
+(defn all-topologies-summary
+ ([]
+ (thrift/with-configured-nimbus-connection
+ nimbus
+ (all-topologies-summary
+ (.get_topologies (.getClusterInfo ^Nimbus$Client nimbus)))))
+ ([summs]
+ {"topologies"
+ (for [^TopologySummary t summs]
+ {
+ "id" (.get_id t)
+ "encodedId" (url-encode (.get_id t))
+ "owner" (.get_owner t)
+ "name" (.get_name t)
+ "status" (.get_status t)
+ "uptime" (pretty-uptime-sec (.get_uptime_secs t))
+ "uptimeSeconds" (.get_uptime_secs t)
+ "tasksTotal" (.get_num_tasks t)
+ "workersTotal" (.get_num_workers t)
+ "executorsTotal" (.get_num_executors t)
+ "replicationCount" (.get_replication_count t)
+ "schedulerInfo" (.get_sched_status t)
+ "requestedMemOnHeap" (.get_requested_memonheap t)
+ "requestedMemOffHeap" (.get_requested_memoffheap t)
+ "requestedTotalMem" (+ (.get_requested_memonheap t) (.get_requested_memoffheap t))
+ "requestedCpu" (.get_requested_cpu t)
+ "assignedMemOnHeap" (.get_assigned_memonheap t)
+ "assignedMemOffHeap" (.get_assigned_memoffheap t)
+ "assignedTotalMem" (+ (.get_assigned_memonheap t) (.get_assigned_memoffheap t))
+ "assignedCpu" (.get_assigned_cpu t)})
+ "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)}))
+
+(defn topology-stats [window stats]
+ (let [times (stats-times (:emitted stats))
+ display-map (into {} (for [t times] [t pretty-uptime-sec]))
+ display-map (assoc display-map ":all-time" (fn [_] "All time"))]
+ (for [w (concat times [":all-time"])
+ :let [disp ((display-map w) w)]]
+ {"windowPretty" disp
+ "window" w
+ "emitted" (get-in stats [:emitted w])
+ "transferred" (get-in stats [:transferred w])
+ "completeLatency" (float-str (get-in stats [:complete-latencies w]))
+ "acked" (get-in stats [:acked w])
+ "failed" (get-in stats [:failed w])})))
+
+(defn build-visualization [id window include-sys?]
+ (thrift/with-configured-nimbus-connection nimbus
+ (let [window (if window window ":all-time")
+ topology-info (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/ONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus
+ id))
+ storm-topology (.getTopology ^Nimbus$Client nimbus id)
+ spout-executor-summaries (filter (partial spout-summary? storm-topology) (.get_executors topology-info))
+ bolt-executor-summaries (filter (partial bolt-summary? storm-topology) (.get_executors topology-info))
+ spout-comp-id->executor-summaries (group-by-comp spout-executor-summaries)
+ bolt-comp-id->executor-summaries (group-by-comp bolt-executor-summaries)
+ bolt-comp-id->executor-summaries (filter-key (mk-include-sys-fn include-sys?) bolt-comp-id->executor-summaries)
+ id->spout-spec (.get_spouts storm-topology)
+ id->bolt (.get_bolts storm-topology)
+ visualizer-data (visualization-data (merge (hashmap-to-persistent id->spout-spec)
+ (hashmap-to-persistent id->bolt))
+ spout-comp-id->executor-summaries
+ bolt-comp-id->executor-summaries
+ window
+ id)]
+ {"visualizationTable" (stream-boxes visualizer-data)})))
+
+(defn- get-error-json
+ [topo-id error-info secure?]
+ (let [host (get-error-host error-info)
+ port (get-error-port error-info)]
+ {"lastError" (get-error-data error-info)
+ "errorTime" (get-error-time error-info)
+ "errorHost" host
+ "errorPort" port
+ "errorLapsedSecs" (get-error-time error-info)
+ "errorWorkerLogLink" (worker-log-link host port topo-id secure?)}))
+
+(defn- common-agg-stats-json
+ "Returns a JSON representation of a common aggregated statistics."
+ [^CommonAggregateStats common-stats]
+ {"executors" (.get_num_executors common-stats)
+ "tasks" (.get_num_tasks common-stats)
+ "emitted" (.get_emitted common-stats)
+ "transferred" (.get_transferred common-stats)
+ "acked" (.get_acked common-stats)
+ "failed" (.get_failed common-stats)})
+
+(defmulti comp-agg-stats-json
+ "Returns a JSON representation of aggregated statistics."
+ (fn [_ _ [id ^ComponentAggregateStats s]] (.get_type s)))
+
+(defmethod comp-agg-stats-json ComponentType/SPOUT
+ [topo-id secure? [id ^ComponentAggregateStats s]]
+ (let [^SpoutAggregateStats ss (.. s get_specific_stats get_spout)
+ cs (.get_common_stats s)]
+ (merge
+ (common-agg-stats-json cs)
+ (get-error-json topo-id (.get_last_error s) secure?)
+ {"spoutId" id
+ "encodedSpoutId" (url-encode id)
+ "completeLatency" (float-str (.get_complete_latency_ms ss))})))
+
+(defmethod comp-agg-stats-json ComponentType/BOLT
+ [topo-id secure? [id ^ComponentAggregateStats s]]
+ (let [^BoltAggregateStats ss (.. s get_specific_stats get_bolt)
+ cs (.get_common_stats s)]
+ (merge
+ (common-agg-stats-json cs)
+ (get-error-json topo-id (.get_last_error s) secure?)
+ {"boltId" id
+ "encodedBoltId" (url-encode id)
+ "capacity" (float-str (.get_capacity ss))
+ "executeLatency" (float-str (.get_execute_latency_ms ss))
+ "executed" (.get_executed ss)
+ "processLatency" (float-str (.get_process_latency_ms ss))})))
+
+(defn- unpack-topology-page-info
+ "Unpacks the serialized object to data structures"
+ [^TopologyPageInfo topo-info window secure?]
+ (let [id (.get_id topo-info)
+ ^TopologyStats topo-stats (.get_topology_stats topo-info)
+ stat->window->number
+ {:emitted (.get_window_to_emitted topo-stats)
+ :transferred (.get_window_to_transferred topo-stats)
+ :complete-latencies (.get_window_to_complete_latencies_ms topo-stats)
+ :acked (.get_window_to_acked topo-stats)
+ :failed (.get_window_to_failed topo-stats)}
+ topo-stats (topology-stats window stat->window->number)
+ [debugEnabled
+ samplingPct] (if-let [debug-opts (.get_debug_options topo-info)]
+ [(.is_enable debug-opts)
+ (.get_samplingpct debug-opts)])
+ uptime (.get_uptime_secs topo-info)]
+ {"id" id
+ "encodedId" (url-encode id)
+ "owner" (.get_owner topo-info)
+ "name" (.get_name topo-info)
+ "status" (.get_status topo-info)
+ "uptime" (pretty-uptime-sec uptime)
+ "uptimeSeconds" uptime
+ "tasksTotal" (.get_num_tasks topo-info)
+ "workersTotal" (.get_num_workers topo-info)
+ "executorsTotal" (.get_num_executors topo-info)
+ "schedulerInfo" (.get_sched_status topo-info)
+ "requestedMemOnHeap" (.get_requested_memonheap topo-info)
+ "requestedMemOffHeap" (.get_requested_memoffheap topo-info)
+ "requestedCpu" (.get_requested_cpu topo-info)
+ "assignedMemOnHeap" (.get_assigned_memonheap topo-info)
+ "assignedMemOffHeap" (.get_assigned_memoffheap topo-info)
+ "assignedTotalMem" (+ (.get_assigned_memonheap topo-info) (.get_assigned_memoffheap topo-info))
+ "assignedCpu" (.get_assigned_cpu topo-info)
+ "topologyStats" topo-stats
+ "spouts" (map (partial comp-agg-stats-json id secure?)
+ (.get_id_to_spout_agg_stats topo-info))
+ "bolts" (map (partial comp-agg-stats-json id secure?)
+ (.get_id_to_bolt_agg_stats topo-info))
+ "configuration" (.get_topology_conf topo-info)
+ "debug" (or debugEnabled false)
+ "samplingPct" (or samplingPct 10)
+ "replicationCount" (.get_replication_count topo-info)}))
+
+(defn exec-host-port
+ [executors]
+ (for [^ExecutorSummary e executors]
+ {"host" (.get_host e)
+ "port" (.get_port e)}))
+
+(defn worker-host-port
+ "Get the set of all worker host/ports"
+ [id]
+ (thrift/with-configured-nimbus-connection nimbus
+ (distinct (exec-host-port (.get_executors (get-topology-info nimbus id))))))
+
+(defn topology-page [id window include-sys? user secure?]
+ (thrift/with-configured-nimbus-connection nimbus
+ (let [window (if window window ":all-time")
+ window-hint (window-hint window)
+ topo-page-info (.getTopologyPageInfo ^Nimbus$Client nimbus
+ id
+ window
+ include-sys?)
+ topology-conf (from-json (.get_topology_conf topo-page-info))
+ msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)]
+ (merge
+ (unpack-topology-page-info topo-page-info window secure?)
+ {"user" user
+ "window" window
+ "windowHint" window-hint
+ "msgTimeout" msg-timeout
+ "configuration" topology-conf
+ "visualizationTable" []
+ "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)}))))
+
+(defn component-errors
+ [errors-list topology-id secure?]
+ (let [errors (->> errors-list
+ (sort-by #(.get_error_time_secs ^ErrorInfo %))
+ reverse)]
+ {"componentErrors"
+ (for [^ErrorInfo e errors]
+ {"time" (* 1000 (long (.get_error_time_secs e)))
+ "errorHost" (.get_host e)
+ "errorPort" (.get_port e)
+ "errorWorkerLogLink" (worker-log-link (.get_host e)
+ (.get_port e)
+ topology-id
+ secure?)
+ "errorLapsedSecs" (get-error-time e)
+ "error" (.get_error e)})}))
+
+(defmulti unpack-comp-agg-stat
+ (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
+
+(defmethod unpack-comp-agg-stat ComponentType/BOLT
+ [[window ^ComponentAggregateStats s]]
+ (let [^CommonAggregateStats comm-s (.get_common_stats s)
+ ^SpecificAggregateStats spec-s (.get_specific_stats s)
+ ^BoltAggregateStats bolt-s (.get_bolt spec-s)]
+ {"window" window
+ "windowPretty" (window-hint window)
+ "emitted" (.get_emitted comm-s)
+ "transferred" (.get_transferred comm-s)
+ "acked" (.get_acked comm-s)
+ "failed" (.get_failed comm-s)
+ "executeLatency" (float-str (.get_execute_latency_ms bolt-s))
+ "processLatency" (float-str (.get_process_latency_ms bolt-s))
+ "executed" (.get_executed bolt-s)
+ "capacity" (float-str (.get_capacity bolt-s))}))
+
+(defmethod unpack-comp-agg-stat ComponentType/SPOUT
+ [[window ^ComponentAggregateStats s]]
+ (let [^CommonAggregateStats comm-s (.get_common_stats s)
+ ^SpecificAggregateStats spec-s (.get_specific_stats s)
+ ^SpoutAggregateStats spout-s (.get_spout spec-s)]
+ {"window" window
+ "windowPretty" (window-hint window)
+ "emitted" (.get_emitted comm-s)
+ "transferred" (.get_transferred comm-s)
+ "acked" (.get_acked comm-s)
+ "failed" (.get_failed comm-s)
+ "completeLatency" (float-str (.get_complete_latency_ms spout-s))}))
+
+(defn- unpack-bolt-input-stat
+ [[^GlobalStreamId s ^ComponentAggregateStats stats]]
+ (let [^SpecificAggregateStats sas (.get_specific_stats stats)
+ ^BoltAggregateStats bas (.get_bolt sas)
+ ^CommonAggregateStats cas (.get_common_stats stats)
+ comp-id (.get_componentId s)]
+ {"component" comp-id
+ "encodedComponentId" (url-encode comp-id)
+ "stream" (.get_streamId s)
+ "executeLatency" (float-str (.get_execute_latency_ms bas))
+ "processLatency" (float-str (.get_process_latency_ms bas))
+ "executed" (nil-to-zero (.get_executed bas))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))}))
+
+(defmulti unpack-comp-output-stat
+ (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
+
+(defmethod unpack-comp-output-stat ComponentType/BOLT
+ [[stream-id ^ComponentAggregateStats stats]]
+ (let [^CommonAggregateStats cas (.get_common_stats stats)]
+ {"stream" stream-id
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))}))
+
+(defmethod unpack-comp-output-stat ComponentType/SPOUT
+ [[stream-id ^ComponentAggregateStats stats]]
+ (let [^CommonAggregateStats cas (.get_common_stats stats)
+ ^SpecificAggregateStats spec-s (.get_specific_stats stats)
+ ^SpoutAggregateStats spout-s (.get_spout spec-s)]
+ {"stream" stream-id
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))
+ "completeLatency" (float-str (.get_complete_latency_ms spout-s))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))}))
+
+(defmulti unpack-comp-exec-stat
+ (fn [_ _ ^ComponentAggregateStats cas] (.get_type (.get_stats ^ExecutorAggregateStats cas))))
+
+(defmethod unpack-comp-exec-stat ComponentType/BOLT
+ [topology-id secure? ^ExecutorAggregateStats eas]
+ (let [^ExecutorSummary summ (.get_exec_summary eas)
+ ^ExecutorInfo info (.get_executor_info summ)
+ ^ComponentAggregateStats stats (.get_stats eas)
+ ^SpecificAggregateStats ss (.get_specific_stats stats)
+ ^BoltAggregateStats bas (.get_bolt ss)
+ ^CommonAggregateStats cas (.get_common_stats stats)
+ host (.get_host summ)
+ port (.get_port summ)
+ exec-id (pretty-executor-info info)
+ uptime (.get_uptime_secs summ)]
+ {"id" exec-id
+ "encodedId" (url-encode exec-id)
+ "uptime" (pretty-uptime-sec uptime)
+ "uptimeSeconds" uptime
+ "host" host
+ "port" port
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))
+ "capacity" (float-str (nil-to-zero (.get_capacity bas)))
+ "executeLatency" (float-str (.get_execute_latency_ms bas))
+ "executed" (nil-to-zero (.get_executed bas))
+ "processLatency" (float-str (.get_process_latency_ms bas))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))
+ "workerLogLink" (worker-log-link host port topology-id secure?)}))
+
+(defmethod unpack-comp-exec-stat ComponentType/SPOUT
+ [topology-id secure? ^ExecutorAggregateStats eas]
+ (let [^ExecutorSummary summ (.get_exec_summary eas)
+ ^ExecutorInfo info (.get_executor_info summ)
+ ^ComponentAggregateStats stats (.get_stats eas)
+ ^SpecificAggregateStats ss (.get_specific_stats stats)
+ ^SpoutAggregateStats sas (.get_spout ss)
+ ^CommonAggregateStats cas (.get_common_stats stats)
+ host (.get_host summ)
+ port (.get_port summ)
+ exec-id (pretty-executor-info info)
+ uptime (.get_uptime_secs summ)]
+ {"id" exec-id
+ "encodedId" (url-encode exec-id)
+ "uptime" (pretty-uptime-sec uptime)
+ "uptimeSeconds" uptime
+ "host" host
+ "port" port
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))
+ "completeLatency" (float-str (.get_complete_latency_ms sas))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))
+ "workerLogLink" (worker-log-link host port topology-id secure?)}))
+
+(defmulti unpack-component-page-info
+ "Unpacks component-specific info to clojure data structures"
+ (fn [^ComponentPageInfo info & _]
+ (.get_component_type info)))
+
+(defmethod unpack-component-page-info ComponentType/BOLT
+ [^ComponentPageInfo info topology-id window include-sys? secure?]
+ (merge
+ {"boltStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
+ "inputStats" (map unpack-bolt-input-stat (.get_gsid_to_input_stats info))
+ "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
+ "executorStats" (map (partial unpack-comp-exec-stat topology-id secure?)
+ (.get_exec_stats info))}
+ (-> info .get_errors (component-errors topology-id secure?))))
+
+(defmethod unpack-component-page-info ComponentType/SPOUT
+ [^ComponentPageInfo info topology-id window include-sys? secure?]
+ (merge
+ {"spoutSummary" (map unpack-comp-agg-stat (.get_window_to_stats info))
+ "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
+ "executorStats" (map (partial unpack-comp-exec-stat topology-id secure?)
+ (.get_exec_stats info))}
+ (-> info .get_errors (component-errors topology-id secure?))))
+
+(defn get-active-profile-actions
+ [nimbus topology-id component]
+ (let [profile-actions (.getComponentPendingProfileActions nimbus
+ topology-id
+ component
+ ProfileAction/JPROFILE_STOP)
+ latest-profile-actions (map clojurify-profile-request profile-actions)
+ active-actions (map (fn [profile-action]
+ {"host" (:host profile-action)
+ "port" (str (:port profile-action))
+ "dumplink" (worker-dump-link (:host profile-action) (str (:port profile-action)) topology-id)
+ "timestamp" (str (- (:timestamp profile-action) (System/currentTimeMillis)))})
+ latest-profile-actions)]
+ (log-message "Latest-active actions are: " (pr active-actions))
+ active-actions))
+
+(defn component-page
+ [topology-id component window include-sys? user secure?]
+ (thrift/with-configured-nimbus-connection nimbus
+ (let [window (or window ":all-time")
+ window-hint (window-hint window)
+ comp-page-info (.getComponentPageInfo ^Nimbus$Client nimbus
+ topology-id
+ component
+ window
+ include-sys?)
+ topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus
+ topology-id))
+ msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)
+ [debugEnabled
+ samplingPct] (if-let [debug-opts (.get_debug_options comp-page-info)]
+ [(.is_enable debug-opts)
+ (.get_samplingpct debug-opts)])]
+ (assoc
+ (unpack-component-page-info comp-page-info
+ topology-id
+ window
+ include-sys?
+ secure?)
+ "user" user
+ "id" component
+ "encodedId" (url-encode component)
+ "name" (.get_topology_name comp-page-info)
+ "executors" (.get_num_executors comp-page-info)
+ "tasks" (.get_num_tasks comp-page-info)
+ "topologyId" topology-id
+ "topologyStatus" (.get_topology_status comp-page-info)
+ "encodedTopologyId" (url-encode topology-id)
+ "window" window
+ "componentType" (-> comp-page-info .get_component_type str lower-case)
+ "windowHint" window-hint
+ "debug" (or debugEnabled false)
+ "samplingPct" (or samplingPct 10)
+ "eventLogLink" (event-log-link topology-id
+ component
+ (.get_eventlog_host comp-page-info)
+ (.get_eventlog_port comp-page-info)
+ secure?)
+ "profileActionEnabled" (*STORM-CONF* WORKER-PROFILER-ENABLED)
+ "profilerActive" (if (*STORM-CONF* WORKER-PROFILER-ENABLED)
+ (get-active-profile-actions nimbus topology-id component)
+ [])))))
+
+(defn- level-to-dict [level]
+ (if level
+ (let [timeout (.get_reset_log_level_timeout_secs level)
+ timeout-epoch (.get_reset_log_level_timeout_epoch level)
+ target-level (.get_target_log_level level)
+ reset-level (.get_reset_log_level level)]
+ {"target_level" (.toString (Level/toLevel target-level))
+ "reset_level" (.toString (Level/toLevel reset-level))
+ "timeout" timeout
+ "timeout_epoch" timeout-epoch})))
+
+(defn log-config [topology-id]
+ (thrift/with-configured-nimbus-connection
+ nimbus
+ (let [log-config (.getLogConfig ^Nimbus$Client nimbus topology-id)
+ named-logger-levels (into {}
+ (for [[key val] (.get_named_logger_level log-config)]
+ [(str key) (level-to-dict val)]))]
+ {"namedLoggerLevels" named-logger-levels})))
+
+(defn topology-config [topology-id]
+ (thrift/with-configured-nimbus-connection nimbus
+ (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id))))
+
+(defn topology-op-response [topology-id op]
+ {"topologyOperation" op,
+ "topologyId" topology-id,
+ "status" "success"
+ })
+
+(defn component-op-response [topology-id component-id op]
+ {"topologyOperation" op,
+ "topologyId" topology-id,
+ "componentId" component-id,
+ "status" "success"
+ })
+
+(defn check-include-sys?
+ [sys?]
+ (if (or (nil? sys?) (= "false" sys?)) false true))
+
+(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
+
+(defn populate-context!
+ "Populate the Storm RequestContext from an servlet-request. This should be called in each handler"
+ [servlet-request]
+ (when http-creds-handler
+ (.populateContext http-creds-handler (ReqContext/context) servlet-request)))
+
+(defn get-user-name
+ [servlet-request]
+ (.getUserName http-creds-handler servlet-request))
+
+(defroutes main-routes
+ (GET "/api/v1/cluster/configuration" [& m]
+ (mark! ui:num-cluster-configuration-http-requests)
+ (json-response (cluster-configuration)
+ (:callback m) :serialize-fn identity))
+ (GET "/api/v1/cluster/summary" [:as {:keys [cookies servlet-request]} & m]
+ (mark! ui:num-cluster-summary-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "getClusterInfo")
+ (let [user (get-user-name servlet-request)]
+ (json-response (assoc (cluster-summary user)
+ "bugtracker-url" (*STORM-CONF* UI-PROJECT-BUGTRACKER-URL)
+ "central-log-url" (*STORM-CONF* UI-CENTRAL-LOGGING-URL)) (:callback m))))
+ (GET "/api/v1/nimbus/summary" [:as {:keys [cookies servlet-request]} & m]
+ (mark! ui:num-nimbus-summary-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "getClusterInfo")
+ (json-response (nimbus-summary) (:callback m)))
+ (GET "/api/v1/history/summary" [:as {:keys [cookies servlet-request]} & m]
+ (let [user (.getUserName http-creds-handler servlet-request)]
+ (json-response (topology-history-info user) (:callback m))))
+ (GET "/api/v1/supervisor/summary" [:as {:keys [cookies servlet-request]} & m]
+ (mark! ui:num-supervisor-summary-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "getClusterInfo")
+ (json-response (assoc (supervisor-summary)
+ "logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)) (:callback m)))
+ (GET "/api/v1/topology/summary" [:as {:keys [cookies servlet-request]} & m]
+ (mark! ui:num-all-topologies-summary-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "getClusterInfo")
+ (json-response (all-topologies-summary) (:callback m)))
+ (GET "/api/v1/topology-workers/:id" [:as {:keys [cookies servlet-request]} id & m]
+ (let [id (url-decode id)]
+ (json-response {"hostPortList" (worker-host-port id)
+ "logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)} (:callback m))))
+ (GET "/api/v1/topology/:id" [:as {:keys [cookies servlet-request scheme]} id & m]
+ (mark! ui:num-topology-page-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "getTopology" (topology-config id))
+ (let [user (get-user-name servlet-request)]
+ (json-response (topology-page id (:window m) (check-include-sys? (:sys m)) user (= scheme :https)) (:callback m))))
+ (GET "/api/v1/topology/:id/visualization-init" [:as {:keys [cookies servlet-request]} id & m]
+ (mark! ui:num-build-visualization-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "getTopology" (topology-config id))
+ (json-response (build-visualization id (:window m) (check-include-sys? (:sys m))) (:callback m)))
+ (GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies servlet-request]} id & m]
+ (mark! ui:num-mk-visualization-data-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "getTopology" (topology-config id))
+ (json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys m))) (:callback m)))
+ (GET "/api/v1/topology/:id/component/:component" [:as {:keys [cookies servlet-request scheme]} id component & m]
+ (mark! ui:num-component-page-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "getTopology" (topology-config id))
+ (let [user (get-user-name servlet-request)]
+ (json-response
+ (component-page id component (:window m) (check-include-sys? (:sys m)) user (= scheme :https))
+ (:callback m))))
+ (GET "/api/v1/topology/:id/logconfig" [:as {:keys [cookies servlet-request]} id & m]
+ (mark! ui:num-log-config-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "getTopology" (topology-config id))
+ (json-response (log-config id) (:callback m)))
+ (POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id & m]
+ (mark! ui:num-activate-topology-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "activate" (topology-config id))
+ (thrift/with-configured-nimbus-connection nimbus
+ (let [tplg (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/NONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
+ name (.get_name tplg)]
+ (.activate nimbus name)
+ (log-message "Activating topology '" name "'")))
+ (json-response (topology-op-response id "activate") (m "callback")))
+ (POST "/api/v1/topology/:id/deactivate" [:as {:keys [cookies servlet-request]} id & m]
+ (mark! ui:num-deactivate-topology-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "deactivate" (topology-config id))
+ (thrift/with-configured-nimbus-connection nimbus
+ (let [tplg (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/NONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
+ name (.get_name tplg)]
+ (.deactivate nimbus name)
+ (log-message "Deactivating topology '" name "'")))
+ (json-response (topology-op-response id "deactivate") (m "callback")))
+ (POST "/api/v1/topology/:id/debug/:action/:spct" [:as {:keys [cookies servlet-request]} id action spct & m]
+ (mark! ui:num-debug-topology-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "debug" (topology-config id))
+ (thrift/with-configured-nimbus-connection nimbus
+ (let [tplg (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/NONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
+ name (.get_name tplg)
+ enable? (= "enable" action)]
+ (.debug nimbus name "" enable? (Integer/parseInt spct))
+ (log-message "Debug topology [" name "] action [" action "] sampling pct [" spct "]")))
+ (json-response (topology-op-response id (str "debug/" action)) (m "callback")))
+ (POST "/api/v1/topology/:id/component/:component/debug/:action/:spct" [:as {:keys [cookies servlet-request]} id component action spct & m]
+ (mark! ui:num-component-op-response-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "debug" (topology-config id))
+ (thrift/with-configured-nimbus-connection nimbus
+ (let [tplg (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/NONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
+ name (.get_name tplg)
+ enable? (= "enable" action)]
+ (.debug nimbus name component enable? (Integer/parseInt spct))
+ (log-message "Debug topology [" name "] component [" component "] action [" action "] sampling pct [" spct "]")))
+ (json-response (component-op-response id component (str "/debug/" action)) (m "callback")))
+ (POST "/api/v1/topology/:id/rebalance/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time & m]
+ (mark! ui:num-topology-op-response-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "rebalance" (topology-config id))
+ (thrift/with-configured-nimbus-connection nimbus
+ (let [tplg (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/NONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
+ name (.get_name tplg)
+ rebalance-options (m "rebalanceOptions")
+ options (RebalanceOptions.)]
+ (.set_wait_secs options (Integer/parseInt wait-time))
+ (if (and (not-nil? rebalance-options) (contains? rebalance-options "numWorkers"))
+ (.set_num_workers options (Integer/parseInt (.toString (rebalance-options "numWorkers")))))
+ (if (and (not-nil? rebalance-options) (contains? rebalance-options "executors"))
+ (doseq [keyval (rebalance-options "executors")]
+ (.put_to_num_executors options (key keyval) (Integer/parseInt (.toString (val keyval))))))
+ (.rebalance nimbus name options)
+ (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
+ (json-response (topology-op-response id "rebalance") (m "callback")))
+ (POST "/api/v1/topology/:id/kill/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time & m]
+ (mark! ui:num-topology-op-response-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "killTopology" (topology-config id))
+ (thrift/with-configured-nimbus-connection nimbus
+ (let [tplg (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/NONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
+ name (.get_name tplg)
+ options (KillOptions.)]
+ (.set_wait_secs options (Integer/parseInt wait-time))
+ (.killTopologyWithOpts nimbus name options)
+ (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
+ (json-response (topology-op-response id "kill") (m "callback")))
+ (POST "/api/v1/topology/:id/logconfig" [:as {:keys [cookies servlet-request]} id namedLoggerLevels & m]
+ (mark! ui:num-topology-op-response-http-requests)
+ (populate-context! servlet-request)
+ (assert-authorized-user "setLogConfig" (topology-config id))
+ (thrift/with-configured-nimbus-connection
+ nimbus
+ (let [new-log-config (LogConfig.)]
+ (doseq [[key level] namedLoggerLevels]
+ (let [logger-name (str key)
+ target-level (.get level "target_level")
+ timeout (or (.get level "timeout") 0)
+ named-logger-level (LogLevel.)]
+ ;; if target-level is nil, do not set it, user wants to clear
+ (log-message "The target level for " logger-name " is " target-level)
+ (if (nil? target-level)
+ (do
+ (.set_action named-logger-level LogLevelAction/REMOVE)
+ (.unset_target_log_level named-logger-level))
+ (do
+ (.set_action named-logger-level LogLevelAction/UPDATE)
+ ;; the toLevel here ensures the string we get is valid
+ (.set_target_log_level named-logger-level (.name (Level/toLevel target-level)))
+ (.set_reset_log_level_timeout_secs named-logger-level timeout)))
+ (log-message "Adding this " logger-name " " named-logger-level " to " new-log-config)
+ (.put_to_named_logger_level new-log-config logger-name named-logger-level)))
+ (log-message "Setting topology " id " log config " new-log-config)
+ (.setLogConfig nimbus id new-log-config)
+ (json-response (log-config id) (m "callback")))))
+
+ (GET "/api/v1/topology/:id/profiling/start/:host-port/:timeout"
+ [:as {:keys [servlet-request]} id host-port timeout & m]
+ (thrift/with-configured-nimbus-connection nimbus
+ (assert-authorized-user "setWorkerProfiler" (topology-config id))
+ (assert-authorized-profiler-action "start")
+ (let [[host, port] (split host-port #":")
+ nodeinfo (NodeInfo. host (set [(Long. port)]))
+ timestamp (+ (System/currentTimeMillis) (* 60000 (Long. timeout)))
+ request (ProfileRequest. nodeinfo
+ ProfileAction/JPROFILE_STOP)]
+ (.set_time_stamp request timestamp)
+ (.setWorkerProfiler nimbus id request)
+ (json-response {"status" "ok"
+ "id" host-port
+ "timeout" timeout
+ "dumplink" (worker-dump-link
+ host
+ port
+ id)}
+ (m "callback")))))
+
+ (GET "/api/v1/topology/:id/profiling/stop/:host-port"
+ [:as {:keys [servlet-request]} id host-port & m]
+ (thrift/with-configured-nimbus-connection nimbus
+ (assert-authorized-user "setWorkerProfiler" (topology-config id))
+ (assert-authorized-profiler-action "stop")
+ (let [[host, port] (split host-port #":")
+ nodeinfo (NodeInfo. host (set [(Long. port)]))
+ timestamp 0
+ request (ProfileRequest. nodeinfo
+ ProfileAction/JPROFILE_STOP)]
+ (.set_time_stamp request timestamp)
+ (.setWorkerProfiler nimbus id request)
+ (json-response {"status" "ok"
+ "id" host-port}
+ (m "callback")))))
+
+ (GET "/api/v1/topology/:id/profiling/dumpprofile/:host-port"
+ [:as {:keys [servlet-request]} id host-port & m]
+ (thrift/with-configured-nimbus-connection nimbus
+ (assert-authorized-user "setWorkerProfiler" (topology-config id))
+ (assert-authorized-profiler-action "dumpprofile")
+ (let [[host, port] (split host-port #":")
+ nodeinfo (NodeInfo. host (set [(Long. port)]))
+ timestamp (System/currentTimeMillis)
+ request (ProfileRequest. nodeinfo
+ ProfileAction/JPROFILE_DUMP)]
+ (.set_time_stamp request timestamp)
+ (.setWorkerProfiler nimbus id request)
+ (json-response {"status" "ok"
+ "id" host-port}
+ (m "callback")))))
+
+ (GET "/api/v1/topology/:id/profiling/dumpjstack/:host-port"
+ [:as {:keys [servlet-request]} id host-port & m]
+ (thrift/with-configured-nimbus-connection nimbus
+ (assert-authorized-user "setWorkerProfiler" (topology-config id))
+ (assert-authorized-profiler-action "dumpjstack")
+ (let [[host, port] (split host-port #":")
+ nodeinfo (NodeInfo. host (set [(Long. port)]))
+ timestamp (System/currentTimeMillis)
+ request (ProfileRequest. nodeinfo
+ ProfileAction/JSTACK_DUMP)]
+ (.set_time_stamp request timestamp)
+ (.setWorkerProfiler nimbus id request)
+ (json-response {"status" "ok"
+ "id" host-port}
+ (m "callback")))))
+
+ (GET "/api/v1/topology/:id/profiling/restartworker/:host-port"
+ [:as {:keys [servlet-request]} id host-port & m]
+ (thrift/with-configured-nimbus-connection nimbus
+ (assert-authorized-user "setWorkerProfiler" (topology-config id))
+ (assert-authorized-profiler-action "restartworker")
+ (let [[host, port] (split host-port #":")
+ nodeinfo (NodeInfo. host (set [(Long. port)]))
+ timestamp (System/currentTimeMillis)
+ request (ProfileRequest. nodeinfo
+ ProfileAction/JVM_RESTART)]
+ (.set_time_stamp request timestamp)
+ (.setWorkerProfiler nimbus id request)
+ (json-response {"status" "ok"
+ "id" host-port}
+ (m "callback")))))
+
+ (GET "/api/v1/topology/:id/profiling/dumpheap/:host-port"
+ [:as {:keys [servlet-request]} id host-port & m]
+ (thrift/with-configured-nimbus-connection nimbus
+ (assert-authorized-user "setWorkerProfiler" (topology-config id))
+ (assert-authorized-profiler-action "dumpheap")
+ (let [[host, port] (split host-port #":")
+ nodeinfo (NodeInfo. host (set [(Long. port)]))
+ timestamp (System/currentTimeMillis)
+ request (ProfileRequest. nodeinfo
+ ProfileAction/JMAP_DUMP)]
+ (.set_time_stamp request timestamp)
+ (.setWorkerProfiler nimbus id request)
+ (json-response {"status" "ok"
+ "id" host-port}
+ (m "callback")))))
+
+ (GET "/" [:as {cookies :cookies}]
+ (mark! ui:num-main-page-http-requests)
+ (resp/redirect "/index.html"))
+ (route/resources "/")
+ (route/not-found "Page not found"))
+
+(defn catch-errors
+ [handler]
+ (fn [request]
+ (try
+ (handler request)
+ (catch Exception ex
+ (json-response (exception->json ex) ((:query-params request) "callback") :status 500)))))
+
+(def app
+ (handler/site (-> main-routes
+ (wrap-json-params)
+ (wrap-multipart-params)
+ (wrap-reload '[org.apache.storm.ui.core])
+ requests-middleware
+ catch-errors)))
+
+(defn start-server!
+ []
+ (try
+ (let [conf *STORM-CONF*
+ header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES))
+ filters-confs [{:filter-class (conf UI-FILTER)
+ :filter-params (conf UI-FILTER-PARAMS)}]
+ https-port (if (not-nil? (conf UI-HTTPS-PORT)) (conf UI-HTTPS-PORT) 0)
+ https-ks-path (conf UI-HTTPS-KEYSTORE-PATH)
+ https-ks-password (conf UI-HTTPS-KEYSTORE-PASSWORD)
+ https-ks-type (conf UI-HTTPS-KEYSTORE-TYPE)
+ https-key-password (conf UI-HTTPS-KEY-PASSWORD)
+ https-ts-path (conf UI-HTTPS-TRUSTSTORE-PATH)
+ https-ts-password (conf UI-HTTPS-TRUSTSTORE-PASSWORD)
+ https-ts-type (conf UI-HTTPS-TRUSTSTORE-TYPE)
+ https-want-client-auth (conf UI-HTTPS-WANT-CLIENT-AUTH)
+ https-need-client-auth (conf UI-HTTPS-NEED-CLIENT-AUTH)]
+ (start-metrics-reporters)
+ (storm-run-jetty {:port (conf UI-PORT)
+ :host (conf UI-HOST)
+ :https-port https-port
+ :configurator (fn [server]
+ (config-ssl server
+ https-port
+ https-ks-path
+ https-ks-password
+ https-ks-type
+ https-key-password
+ https-ts-path
+ https-ts-password
+ https-ts-type
+ https-need-client-auth
+ https-want-client-auth)
+ (doseq [connector (.getConnectors server)]
+ (.setRequestHeaderSize connector header-buffer-size))
+ (config-filter server app filters-confs))}))
+ (catch Exception ex
+ (log-error ex))))
+
+(defn -main
+ []
+ (log-message "Starting ui server for storm version '" STORM-VERSION "'")
+ (start-server!))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/ui/helpers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/helpers.clj b/storm-core/src/clj/org/apache/storm/ui/helpers.clj
new file mode 100644
index 0000000..7ded154
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/ui/helpers.clj
@@ -0,0 +1,240 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.ui.helpers
+ (:use compojure.core)
+ (:use [hiccup core page-helpers])
+ (:use [clojure
+ [string :only [blank? join]]
+ [walk :only [keywordize-keys]]])
+ (:use [org.apache.storm config log])
+ (:use [org.apache.storm.util :only [clojurify-structure uuid defnk to-json url-encode not-nil?]])
+ (:use [clj-time coerce format])
+ (:import [org.apache.storm.generated ExecutorInfo ExecutorSummary])
+ (:import [org.apache.storm.logging.filters AccessLoggingFilter])
+ (:import [java.util EnumSet])
+ (:import [org.eclipse.jetty.server Server]
+ [org.eclipse.jetty.server.nio SelectChannelConnector]
+ [org.eclipse.jetty.server.ssl SslSocketConnector]
+ [org.eclipse.jetty.servlet ServletHolder FilterMapping]
+ [org.eclipse.jetty.util.ssl SslContextFactory]
+ [org.eclipse.jetty.server DispatcherType]
+ [org.eclipse.jetty.servlets CrossOriginFilter])
+ (:require [ring.util servlet])
+ (:require [compojure.route :as route]
+ [compojure.handler :as handler])
+ (:require [metrics.meters :refer [defmeter mark!]]))
+
+(defmeter num-web-requests)
+(defn requests-middleware
+ "Coda Hale metric for counting the number of web requests."
+ [handler]
+ (fn [req]
+ (mark! num-web-requests)
+ (handler req)))
+
+(defn split-divide [val divider]
+ [(Integer. (int (/ val divider))) (mod val divider)]
+ )
+
+(def PRETTY-SEC-DIVIDERS
+ [["s" 60]
+ ["m" 60]
+ ["h" 24]
+ ["d" nil]])
+
+(def PRETTY-MS-DIVIDERS
+ (cons ["ms" 1000]
+ PRETTY-SEC-DIVIDERS))
+
+(defn pretty-uptime-str* [val dividers]
+ (let [val (if (string? val) (Integer/parseInt val) val)
+ vals (reduce (fn [[state val] [_ divider]]
+ (if (pos? val)
+ (let [[divided mod] (if divider
+ (split-divide val divider)
+ [nil val])]
+ [(concat state [mod])
+ divided]
+ )
+ [state val]
+ ))
+ [[] val]
+ dividers)
+ strs (->>
+ (first vals)
+ (map
+ (fn [[suffix _] val]
+ (str val suffix))
+ dividers
+ ))]
+ (join " " (reverse strs))
+ ))
+
+(defn pretty-uptime-sec [secs]
+ (pretty-uptime-str* secs PRETTY-SEC-DIVIDERS))
+
+(defn pretty-uptime-ms [ms]
+ (pretty-uptime-str* ms PRETTY-MS-DIVIDERS))
+
+
+(defelem table [headers-map data]
+ [:table
+ [:thead
+ [:tr
+ (for [h headers-map]
+ [:th (if (:text h) [:span (:attr h) (:text h)] h)])
+ ]]
+ [:tbody
+ (for [row data]
+ [:tr
+ (for [col row]
+ [:td col]
+ )]
+ )]
+ ])
+
+(defn url-format [fmt & args]
+ (String/format fmt
+ (to-array (map #(url-encode (str %)) args))))
+
+(defn pretty-executor-info [^ExecutorInfo e]
+ (str "[" (.get_task_start e) "-" (.get_task_end e) "]"))
+
+(defn unauthorized-user-json
+ [user]
+ {"error" "No Authorization"
+ "errorMessage" (str "User " user " is not authorized.")})
+
+(defn unauthorized-user-html [user]
+ [[:h2 "User '" (escape-html user) "' is not authorized."]])
+
+(defn- mk-ssl-connector [port ks-path ks-password ks-type key-password
+ ts-path ts-password ts-type need-client-auth want-client-auth]
+ (let [sslContextFactory (doto (SslContextFactory.)
+ (.setExcludeCipherSuites (into-array String ["SSL_RSA_WITH_RC4_128_MD5" "SSL_RSA_WITH_RC4_128_SHA"]))
+ (.setExcludeProtocols (into-array String ["SSLv3"]))
+ (.setAllowRenegotiate false)
+ (.setKeyStorePath ks-path)
+ (.setKeyStoreType ks-type)
+ (.setKeyStorePassword ks-password)
+ (.setKeyManagerPassword key-password))]
+ (if (and (not-nil? ts-path) (not-nil? ts-password) (not-nil? ts-type))
+ (do
+ (.setTrustStore sslContextFactory ts-path)
+ (.setTrustStoreType sslContextFactory ts-type)
+ (.setTrustStorePassword sslContextFactory ts-password)))
+ (cond
+ need-client-auth (.setNeedClientAuth sslContextFactory true)
+ want-client-auth (.setWantClientAuth sslContextFactory true))
+ (doto (SslSocketConnector. sslContextFactory)
+ (.setPort port))))
+
+
+(defn config-ssl [server port ks-path ks-password ks-type key-password
+ ts-path ts-password ts-type need-client-auth want-client-auth]
+ (when (> port 0)
+ (.addConnector server (mk-ssl-connector port ks-path ks-password ks-type key-password
+ ts-path ts-password ts-type need-client-auth want-client-auth))))
+
+(defn cors-filter-handler
+ []
+ (doto (org.eclipse.jetty.servlet.FilterHolder. (CrossOriginFilter.))
+ (.setInitParameter CrossOriginFilter/ALLOWED_ORIGINS_PARAM "*")
+ (.setInitParameter CrossOriginFilter/ALLOWED_METHODS_PARAM "GET, POST, PUT")
+ (.setInitParameter CrossOriginFilter/ALLOWED_HEADERS_PARAM "X-Requested-With, X-Requested-By, Access-Control-Allow-Origin, Content-Type, Content-Length, Accept, Origin")
+ (.setInitParameter CrossOriginFilter/ACCESS_CONTROL_ALLOW_ORIGIN_HEADER "*")
+ ))
+
+(defn mk-access-logging-filter-handler []
+ (org.eclipse.jetty.servlet.FilterHolder. (AccessLoggingFilter.)))
+
+(defn config-filter [server handler filters-confs]
+ (if filters-confs
+ (let [servlet-holder (ServletHolder.
+ (ring.util.servlet/servlet handler))
+ context (doto (org.eclipse.jetty.servlet.ServletContextHandler. server "/")
+ (.addServlet servlet-holder "/"))]
+ (.addFilter context (cors-filter-handler) "/*" (EnumSet/allOf DispatcherType))
+ (doseq [{:keys [filter-name filter-class filter-params]} filters-confs]
+ (if filter-class
+ (let [filter-holder (doto (org.eclipse.jetty.servlet.FilterHolder.)
+ (.setClassName filter-class)
+ (.setName (or filter-name filter-class))
+ (.setInitParameters (or filter-params {})))]
+ (.addFilter context filter-holder "/*" FilterMapping/ALL))))
+ (.addFilter context (mk-access-logging-filter-handler) "/*" (EnumSet/allOf DispatcherType))
+ (.setHandler server context))))
+
+(defn ring-response-from-exception [ex]
+ {:headers {}
+ :status 400
+ :body (.getMessage ex)})
+
+(defn- remove-non-ssl-connectors [server]
+ (doseq [c (.getConnectors server)]
+ (when-not (or (nil? c) (instance? SslSocketConnector c))
+ (.removeConnector server c)
+ ))
+ server)
+
+;; Modified from ring.adapter.jetty 1.3.0
+(defn- jetty-create-server
+ "Construct a Jetty Server instance."
+ [options]
+ (let [connector (doto (SelectChannelConnector.)
+ (.setPort (options :port 80))
+ (.setHost (options :host))
+ (.setMaxIdleTime (options :max-idle-time 200000)))
+ server (doto (Server.)
+ (.addConnector connector)
+ (.setSendDateHeader true))
+ https-port (options :https-port)]
+ (if (and (not-nil? https-port) (> https-port 0)) (remove-non-ssl-connectors server))
+ server))
+
+(defn storm-run-jetty
+ "Modified version of run-jetty
+ Assumes configurator sets handler."
+ [config]
+ {:pre [(:configurator config)]}
+ (let [#^Server s (jetty-create-server (dissoc config :configurator))
+ configurator (:configurator config)]
+ (configurator s)
+ (.start s)))
+
+(defn wrap-json-in-callback [callback response]
+ (str callback "(" response ");"))
+
+(defnk json-response
+ [data callback :serialize-fn to-json :status 200 :headers {}]
+ {:status status
+ :headers (merge {"Cache-Control" "no-cache, no-store"
+ "Access-Control-Allow-Origin" "*"
+ "Access-Control-Allow-Headers" "Content-Type, Access-Control-Allow-Headers, Access-Controler-Allow-Origin, X-Requested-By, X-Csrf-Token, Authorization, X-Requested-With"}
+ (if (not-nil? callback) {"Content-Type" "application/javascript;charset=utf-8"}
+ {"Content-Type" "application/json;charset=utf-8"})
+ headers)
+ :body (if (not-nil? callback)
+ (wrap-json-in-callback callback (serialize-fn data))
+ (serialize-fn data))})
+
+(defn exception->json
+ [ex]
+ {"error" "Internal Server Error"
+ "errorMessage"
+ (let [sw (java.io.StringWriter.)]
+ (.printStackTrace ex (java.io.PrintWriter. sw))
+ (.toString sw))})