You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2013/12/19 07:25:18 UTC
[06/12] Merge branch '0.9.0-windows' of github.com:davidlao2k/storm
into storm-windows
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/697849dc/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index a575b1f,0000000..91989bc
mode 100644,000000..100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -1,828 -1,0 +1,830 @@@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http:;; www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.ui.core
+ (:use compojure.core)
+ (:use ring.middleware.reload)
+ (:use [hiccup core page-helpers])
+ (:use [backtype.storm config util log])
+ (:use [backtype.storm.ui helpers])
+ (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID system-id?]]])
+ (:use [ring.adapter.jetty :only [run-jetty]])
+ (:use [clojure.string :only [trim]])
+ (:import [backtype.storm.generated ExecutorSpecificStats
+ ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats
+ ErrorInfo ClusterSummary SupervisorSummary TopologySummary
+ Nimbus$Client StormTopology GlobalStreamId RebalanceOptions
+ KillOptions])
+ (:import [java.io File])
+ (:require [compojure.route :as route]
+ [compojure.handler :as handler]
+ [ring.util.response :as resp]
+ [backtype.storm [thrift :as thrift]])
+ (:import [org.apache.commons.lang StringEscapeUtils])
+ (:gen-class))
+
+(def ^:dynamic *STORM-CONF* (read-storm-config))
+
+(defmacro with-nimbus [nimbus-sym & body]
+ `(thrift/with-nimbus-connection [~nimbus-sym (*STORM-CONF* NIMBUS-HOST) (*STORM-CONF* NIMBUS-THRIFT-PORT)]
+ ~@body
+ ))
+
+(defn get-filled-stats [summs]
+ (->> summs
+ (map #(.get_stats ^ExecutorSummary %))
+ (filter not-nil?)))
+
+(defn mk-system-toggle-button [include-sys?]
+ [:p {:class "js-only"}
+ [:input {:type "button"
+ :value (str (if include-sys? "Hide" "Show") " System Stats")
+ :onclick "toggleSys()"}]])
+
+(defn ui-template [body]
+ (html4
+ [:head
+ [:title "Storm UI"]
+ (include-css "/css/bootstrap-1.1.0.css")
+ (include-css "/css/style.css")
+ (include-js "/js/jquery-1.6.2.min.js")
+ (include-js "/js/jquery.tablesorter.min.js")
+ (include-js "/js/jquery.cookies.2.2.0.min.js")
+ (include-js "/js/script.js")
+ ]
+ [:body
+ [:h1 (link-to "/" "Storm UI")]
+ (seq body)
+ ]))
+
+(defn read-storm-version []
+ (let [storm-home (System/getProperty "storm.home")
+ release-path (format "%s/RELEASE" storm-home)
+ release-file (File. release-path)]
+ (if (and (.exists release-file) (.isFile release-file))
+ (trim (slurp release-path))
+ "Unknown")))
+
+(defn cluster-summary-table [^ClusterSummary summ]
+ (let [sups (.get_supervisors summ)
+ used-slots (reduce + (map #(.get_num_used_workers ^SupervisorSummary %) sups))
+ total-slots (reduce + (map #(.get_num_workers ^SupervisorSummary %) sups))
+ free-slots (- total-slots used-slots)
+ total-tasks (->> (.get_topologies summ)
+ (map #(.get_num_tasks ^TopologySummary %))
+ (reduce +))
+ total-executors (->> (.get_topologies summ)
+ (map #(.get_num_executors ^TopologySummary %))
+ (reduce +))]
+ (table ["Version" "Nimbus uptime" "Supervisors" "Used slots" "Free slots" "Total slots" "Executors" "Tasks"]
+ [[(read-storm-version)
+ (pretty-uptime-sec (.get_nimbus_uptime_secs summ))
+ (count sups)
+ used-slots
+ free-slots
+ total-slots
+ total-executors
+ total-tasks]])
+ ))
+
+(defn topology-link
+ ([id] (topology-link id id))
+ ([id content]
+ (link-to (url-format "/topology/%s" id) (escape-html content))))
+
+(defn main-topology-summary-table [summs]
+ ;; make the id clickable
+ ;; make the table sortable
+ (sorted-table
+ ["Name" "Id" "Status" "Uptime" "Num workers" "Num executors" "Num tasks"]
+ (for [^TopologySummary t summs]
+ [(topology-link (.get_id t) (.get_name t))
+ (escape-html (.get_id t))
+ (.get_status t)
+ (pretty-uptime-sec (.get_uptime_secs t))
+ (.get_num_workers t)
+ (.get_num_executors t)
+ (.get_num_tasks t)
+ ])
+ :time-cols [3]
+ :sort-list "[[0,0]]"
+ ))
+
+(defn supervisor-summary-table [summs]
+ (sorted-table
+ ["Id" "Host" "Uptime" "Slots" "Used slots"]
+ (for [^SupervisorSummary s summs]
+ [(.get_supervisor_id s)
+ (.get_host s)
+ (pretty-uptime-sec (.get_uptime_secs s))
+ (.get_num_workers s)
+ (.get_num_used_workers s)])
+ :time-cols [2]))
+
+(defn configuration-table [conf]
+ (sorted-table ["Key" "Value"]
+ (map #(vector (key %) (str (val %))) conf)))
+
+(defn main-page []
+ (with-nimbus nimbus
+ (let [summ (.getClusterInfo ^Nimbus$Client nimbus)]
+ (concat
+ [[:h2 "Cluster Summary"]]
+ [(cluster-summary-table summ)]
+ [[:h2 "Topology summary"]]
+ (main-topology-summary-table (.get_topologies summ))
+ [[:h2 "Supervisor summary"]]
+ (supervisor-summary-table (.get_supervisors summ))
+ [[:h2 "Nimbus Configuration"]]
+ (configuration-table (from-json (.getNimbusConf ^Nimbus$Client nimbus)))
+ ))))
+
+(defn component-type [^StormTopology topology id]
+ (let [bolts (.get_bolts topology)
+ spouts (.get_spouts topology)]
+ (cond
+ (.containsKey bolts id) :bolt
+ (.containsKey spouts id) :spout
+ )))
+
+(defn executor-summary-type [topology ^ExecutorSummary s]
+ (component-type topology (.get_component_id s)))
+
+(defn add-pairs
+ ([] [0 0])
+ ([[a1 a2] [b1 b2]]
+ [(+ a1 b1) (+ a2 b2)]))
+
+(defn expand-averages [avg counts]
+ (let [avg (clojurify-structure avg)
+ counts (clojurify-structure counts)]
+ (into {}
+ (for [[slice streams] counts]
+ [slice
+ (into {}
+ (for [[stream c] streams]
+ [stream
+ [(* c (get-in avg [slice stream]))
+ c]]
+ ))]
+ ))))
+
+
+(defn expand-averages-seq [average-seq counts-seq]
+ (->> (map vector average-seq counts-seq)
+ (map #(apply expand-averages %))
+ (apply merge-with
+ (fn [s1 s2]
+ (merge-with
+ add-pairs
+ s1
+ s2)))
+ ))
+
+(defn- val-avg [[t c]]
+ (if (= t 0) 0
+ (double (/ t c))))
+
+(defn aggregate-averages [average-seq counts-seq]
+ (->> (expand-averages-seq average-seq counts-seq)
+ (map-val
+ (fn [s]
+ (map-val val-avg s)
+ ))
+ ))
+
+(defn aggregate-counts [counts-seq]
+ (->> counts-seq
+ (map clojurify-structure)
+ (apply merge-with
+ (fn [s1 s2]
+ (merge-with + s1 s2))
+ )))
+
+(defn aggregate-avg-streams [avg counts]
+ (let [expanded (expand-averages avg counts)]
+ (->> expanded
+ (map-val #(reduce add-pairs (vals %)))
+ (map-val val-avg)
+ )))
+
+(defn aggregate-count-streams [stats]
+ (->> stats
+ (map-val #(reduce + (vals %)))))
+
+(defn aggregate-common-stats [stats-seq]
+ {:emitted (aggregate-counts (map #(.get_emitted ^ExecutorStats %) stats-seq))
+ :transferred (aggregate-counts (map #(.get_transferred ^ExecutorStats %) stats-seq))}
+ )
+
+(defn mk-include-sys-fn [include-sys?]
+ (if include-sys?
+ (fn [_] true)
+ (fn [stream] (and (string? stream) (not (system-id? stream))))))
+
+(defn pre-process [stream-summary include-sys?]
+ (let [filter-fn (mk-include-sys-fn include-sys?)
+ emitted (:emitted stream-summary)
+ emitted (into {} (for [[window stat] emitted]
+ {window (filter-key filter-fn stat)}))
+ transferred (:transferred stream-summary)
+ transferred (into {} (for [[window stat] transferred]
+ {window (filter-key filter-fn stat)}))
+ stream-summary (-> stream-summary (dissoc :emitted) (assoc :emitted emitted))
+ stream-summary (-> stream-summary (dissoc :transferred) (assoc :transferred transferred))]
+ stream-summary))
+
+(defn aggregate-bolt-stats [stats-seq include-sys?]
+ (let [stats-seq (collectify stats-seq)]
+ (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
+ {:acked
+ (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_acked)
+ stats-seq))
+ :failed
+ (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_failed)
+ stats-seq))
+ :executed
+ (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
+ stats-seq))
+ :process-latencies
+ (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_process_ms_avg)
+ stats-seq)
+ (map #(.. ^ExecutorStats % get_specific get_bolt get_acked)
+ stats-seq))
+ :execute-latencies
+ (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_execute_ms_avg)
+ stats-seq)
+ (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
+ stats-seq))
+ })))
+
+(defn aggregate-spout-stats [stats-seq include-sys?]
+ (let [stats-seq (collectify stats-seq)]
+ (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
+ {:acked
+ (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_acked)
+ stats-seq))
+ :failed
+ (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_failed)
+ stats-seq))
+ :complete-latencies
+ (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_spout get_complete_ms_avg)
+ stats-seq)
+ (map #(.. ^ExecutorStats % get_specific get_spout get_acked)
+ stats-seq))
+ }
+ )))
+
+(defn aggregate-bolt-streams [stats]
+ {:acked (aggregate-count-streams (:acked stats))
+ :failed (aggregate-count-streams (:failed stats))
+ :emitted (aggregate-count-streams (:emitted stats))
+ :transferred (aggregate-count-streams (:transferred stats))
+ :process-latencies (aggregate-avg-streams (:process-latencies stats)
+ (:acked stats))
+ :executed (aggregate-count-streams (:executed stats))
+ :execute-latencies (aggregate-avg-streams (:execute-latencies stats)
+ (:executed stats))
+ })
+
+(defn aggregate-spout-streams [stats]
+ {:acked (aggregate-count-streams (:acked stats))
+ :failed (aggregate-count-streams (:failed stats))
+ :emitted (aggregate-count-streams (:emitted stats))
+ :transferred (aggregate-count-streams (:transferred stats))
+ :complete-latencies (aggregate-avg-streams (:complete-latencies stats)
+ (:acked stats))
+ })
+
+(defn spout-summary? [topology s]
+ (= :spout (executor-summary-type topology s)))
+
+(defn bolt-summary? [topology s]
+ (= :bolt (executor-summary-type topology s)))
+
+(defn topology-summary-table [^TopologyInfo summ]
+ (let [executors (.get_executors summ)
+ workers (set (for [^ExecutorSummary e executors] [(.get_host e) (.get_port e)]))]
+ (table ["Name" "Id" "Status" "Uptime" "Num workers" "Num executors" "Num tasks"]
+ [[(escape-html (.get_name summ))
+ (escape-html (.get_id summ))
+ (.get_status summ)
+ (pretty-uptime-sec (.get_uptime_secs summ))
+ (count workers)
+ (count executors)
+ (sum-tasks executors)
+ ]]
+ )))
+
+(defn total-aggregate-stats [spout-summs bolt-summs include-sys?]
+ (let [spout-stats (get-filled-stats spout-summs)
+ bolt-stats (get-filled-stats bolt-summs)
+ agg-spout-stats (-> spout-stats
+ (aggregate-spout-stats include-sys?)
+ aggregate-spout-streams)
+ agg-bolt-stats (-> bolt-stats
+ (aggregate-bolt-stats include-sys?)
+ aggregate-bolt-streams)]
+ (merge-with
+ (fn [s1 s2]
+ (merge-with + s1 s2))
+ (select-keys agg-bolt-stats [:emitted :transferred])
+ agg-spout-stats
+ )))
+
+(defn stats-times [stats-map]
+ (sort-by #(Integer/parseInt %)
+ (-> stats-map
+ clojurify-structure
+ (dissoc ":all-time")
+ keys)))
+
+(defn topology-stats-table [id window stats]
+ (let [times (stats-times (:emitted stats))
+ display-map (into {} (for [t times] [t pretty-uptime-sec]))
+ display-map (assoc display-map ":all-time" (fn [_] "All time"))]
+ (sorted-table
+ ["Window" "Emitted" "Transferred" "Complete latency (ms)" "Acked" "Failed"]
+ (for [k (concat times [":all-time"])
+ :let [disp ((display-map k) k)]]
+ [(link-to (if (= k window) {:class "red"} {})
+ (url-format "/topology/%s?window=%s" id k)
+ disp)
+ (get-in stats [:emitted k])
+ (get-in stats [:transferred k])
+ (float-str (get-in stats [:complete-latencies k]))
+ (get-in stats [:acked k])
+ (get-in stats [:failed k])
+ ]
+ )
+ :time-cols [0]
+ )))
+
+(defn group-by-comp [summs]
+ (let [ret (group-by #(.get_component_id ^ExecutorSummary %) summs)]
+ (into (sorted-map) ret )))
+
+(defn error-subset [error-str]
+ (apply str (take 200 error-str)))
+
+(defn most-recent-error [errors-list]
+ (let [error (->> errors-list
+ (sort-by #(.get_error_time_secs ^ErrorInfo %))
+ reverse
+ first)]
+ (if error
+ [:span (if (< (time-delta (.get_error_time_secs ^ErrorInfo error))
+ (* 60 30))
+ {:class "red"}
+ {})
+ (error-subset (.get_error ^ErrorInfo error))]
+ )))
+
+(defn component-link [storm-id id]
+ (link-to (url-format "/topology/%s/component/%s" storm-id id) (escape-html id)))
+
+(defn worker-log-link [host port]
+ (link-to (url-format "http://%s:%s/log?file=worker-%s.log"
+ host (*STORM-CONF* LOGVIEWER-PORT) port) (str port)))
+
+(defn render-capacity [capacity]
+ (let [capacity (nil-to-zero capacity)]
+ [:span (if (> capacity 0.9)
+ {:class "red"}
+ {})
+ (float-str capacity)]))
+
+(defn compute-executor-capacity [^ExecutorSummary e]
+ (let [stats (.get_stats e)
+ stats (if stats
+ (-> stats
+ (aggregate-bolt-stats true)
+ (aggregate-bolt-streams)
+ swap-map-order
+ (get "600")))
+ uptime (nil-to-zero (.get_uptime_secs e))
+ window (if (< uptime 600) uptime 600)
+ executed (-> stats :executed nil-to-zero)
+ latency (-> stats :execute-latencies nil-to-zero)
+ ]
+ (if (> window 0)
+ (div (* executed latency) (* 1000 window))
+ )))
+
+(defn compute-bolt-capacity [executors]
+ (->> executors
+ (map compute-executor-capacity)
+ (map nil-to-zero)
+ (apply max)))
+
+(defn spout-comp-table [top-id summ-map errors window include-sys?]
+ (sorted-table
+ ["Id" "Executors" "Tasks" "Emitted" "Transferred" "Complete latency (ms)"
+ "Acked" "Failed" "Last error"]
+ (for [[id summs] summ-map
+ :let [stats-seq (get-filled-stats summs)
+ stats (aggregate-spout-streams
+ (aggregate-spout-stats
+ stats-seq include-sys?))]]
+ [(component-link top-id id)
+ (count summs)
+ (sum-tasks summs)
+ (get-in stats [:emitted window])
+ (get-in stats [:transferred window])
+ (float-str (get-in stats [:complete-latencies window]))
+ (get-in stats [:acked window])
+ (get-in stats [:failed window])
+ (most-recent-error (get errors id))
+ ]
+ )))
+
+(defn bolt-comp-table [top-id summ-map errors window include-sys?]
+ (sorted-table
+ ["Id" "Executors" "Tasks" "Emitted" "Transferred" "Capacity (last 10m)" "Execute latency (ms)" "Executed" "Process latency (ms)"
+ "Acked" "Failed" "Last error"]
+ (for [[id summs] summ-map
+ :let [stats-seq (get-filled-stats summs)
+ stats (aggregate-bolt-streams
+ (aggregate-bolt-stats
+ stats-seq include-sys?))
+ ]]
+ [(component-link top-id id)
+ (count summs)
+ (sum-tasks summs)
+ (get-in stats [:emitted window])
+ (get-in stats [:transferred window])
+ (render-capacity (compute-bolt-capacity summs))
+ (float-str (get-in stats [:execute-latencies window]))
+ (get-in stats [:executed window])
+ (float-str (get-in stats [:process-latencies window]))
+ (get-in stats [:acked window])
+ (get-in stats [:failed window])
+ (most-recent-error (get errors id))
+ ]
+ )))
+
+(defn window-hint [window]
+ (if (= window ":all-time")
+ "All time"
+ (pretty-uptime-sec window)))
+
+(defn topology-action-button [id name action command is-wait default-wait enabled]
+ [:input {:type "button"
+ :value action
+ (if enabled :enabled :disabled) ""
+ :onclick (str "confirmAction('"
+ (StringEscapeUtils/escapeJavaScript id) "', '"
+ (StringEscapeUtils/escapeJavaScript name) "', '"
+ command "', " is-wait ", " default-wait ")")}])
+
+(defn topology-page [id window include-sys?]
+ (with-nimbus nimbus
+ (let [window (if window window ":all-time")
+ window-hint (window-hint window)
+ summ (.getTopologyInfo ^Nimbus$Client nimbus id)
+ topology (.getTopology ^Nimbus$Client nimbus id)
+ topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus id))
+ spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
+ bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
+ spout-comp-summs (group-by-comp spout-summs)
+ bolt-comp-summs (group-by-comp bolt-summs)
+ bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?) bolt-comp-summs)
+ name (.get_name summ)
+ status (.get_status summ)
+ msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)
+ ]
+ (concat
+ [[:h2 "Topology summary"]]
+ [(topology-summary-table summ)]
+ [[:h2 {:class "js-only"} "Topology actions"]]
+ [[:p {:class "js-only"} (concat
+ [(topology-action-button id name "Activate" "activate" false 0 (= "INACTIVE" status))]
+ [(topology-action-button id name "Deactivate" "deactivate" false 0 (= "ACTIVE" status))]
+ [(topology-action-button id name "Rebalance" "rebalance" true msg-timeout (or (= "ACTIVE" status) (= "INACTIVE" status)))]
+ [(topology-action-button id name "Kill" "kill" true msg-timeout (not= "KILLED" status))]
+ )]]
+ [[:h2 "Topology stats"]]
+ (topology-stats-table id window (total-aggregate-stats spout-summs bolt-summs include-sys?))
+ [[:h2 "Spouts (" window-hint ")"]]
+ (spout-comp-table id spout-comp-summs (.get_errors summ) window include-sys?)
+ [[:h2 "Bolts (" window-hint ")"]]
+ (bolt-comp-table id bolt-comp-summs (.get_errors summ) window include-sys?)
+ [[:h2 "Topology Configuration"]]
+ (configuration-table topology-conf)
+ ))))
+
+(defn component-task-summs [^TopologyInfo summ topology id]
+ (let [spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
+ bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
+ spout-comp-summs (group-by-comp spout-summs)
+ bolt-comp-summs (group-by-comp bolt-summs)
+ ret (if (contains? spout-comp-summs id)
+ (spout-comp-summs id)
+ (bolt-comp-summs id))]
+ (sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start) ret)
+ ))
+
+(defn spout-summary-table [topology-id id stats window]
+ (let [times (stats-times (:emitted stats))
+ display-map (into {} (for [t times] [t pretty-uptime-sec]))
+ display-map (assoc display-map ":all-time" (fn [_] "All time"))]
+ (sorted-table
+ ["Window" "Emitted" "Transferred" "Complete latency (ms)" "Acked" "Failed"]
+ (for [k (concat times [":all-time"])
+ :let [disp ((display-map k) k)]]
+ [(link-to (if (= k window) {:class "red"} {})
+ (url-format "/topology/%s/component/%s?window=%s" topology-id id k)
+ disp)
+ (get-in stats [:emitted k])
+ (get-in stats [:transferred k])
+ (float-str (get-in stats [:complete-latencies k]))
+ (get-in stats [:acked k])
+ (get-in stats [:failed k])
+ ])
+ :time-cols [0])))
+
+(defn spout-output-summary-table [stream-summary window]
+ (let [stream-summary (map-val swap-map-order (swap-map-order stream-summary))]
+ (sorted-table
+ ["Stream" "Emitted" "Transferred" "Complete latency (ms)" "Acked" "Failed"]
+ (for [[s stats] (stream-summary window)]
+ [s
+ (nil-to-zero (:emitted stats))
+ (nil-to-zero (:transferred stats))
+ (float-str (:complete-latencies stats))
+ (nil-to-zero (:acked stats))
+ (nil-to-zero (:failed stats))])
+ )))
+
+(defn spout-executor-table [topology-id executors window include-sys?]
+ (sorted-table
+ ["Id" "Uptime" "Host" "Port" "Emitted" "Transferred"
+ "Complete latency (ms)" "Acked" "Failed"]
+ (for [^ExecutorSummary e executors
+ :let [stats (.get_stats e)
+ stats (if stats
+ (-> stats
+ (aggregate-spout-stats include-sys?)
+ aggregate-spout-streams
+ swap-map-order
+ (get window)))]]
+ [(pretty-executor-info (.get_executor_info e))
+ (pretty-uptime-sec (.get_uptime_secs e))
+ (.get_host e)
+ (worker-log-link (.get_host e) (.get_port e))
+ (nil-to-zero (:emitted stats))
+ (nil-to-zero (:transferred stats))
+ (float-str (:complete-latencies stats))
+ (nil-to-zero (:acked stats))
+ (nil-to-zero (:failed stats))
+ ]
+ )
+ :time-cols [1]
+ ))
+
+(defn spout-page [window ^TopologyInfo topology-info component executors include-sys?]
+ (let [window-hint (str " (" (window-hint window) ")")
+ stats (get-filled-stats executors)
+ stream-summary (-> stats (aggregate-spout-stats include-sys?))
+ summary (-> stream-summary aggregate-spout-streams)]
+ (concat
+ [[:h2 "Spout stats"]]
+ (spout-summary-table (.get_id topology-info) component summary window)
+ [[:h2 "Output stats" window-hint]]
+ (spout-output-summary-table stream-summary window)
+ [[:h2 "Executors" window-hint]]
+ (spout-executor-table (.get_id topology-info) executors window include-sys?)
+ ;; task id, task uptime, stream aggregated stats, last error
+ )))
+
+(defn bolt-output-summary-table [stream-summary window]
+ (let [stream-summary (-> stream-summary
+ swap-map-order
+ (get window)
+ (select-keys [:emitted :transferred])
+ swap-map-order)]
+ (sorted-table
+ ["Stream" "Emitted" "Transferred"]
+ (for [[s stats] stream-summary]
+ [s
+ (nil-to-zero (:emitted stats))
+ (nil-to-zero (:transferred stats))
+ ])
+ )))
+
+(defn bolt-input-summary-table [stream-summary window]
+ (let [stream-summary (-> stream-summary
+ swap-map-order
+ (get window)
+ (select-keys [:acked :failed :process-latencies :executed :execute-latencies])
+ swap-map-order)]
+ (sorted-table
+ ["Component" "Stream" "Execute latency (ms)" "Executed" "Process latency (ms)" "Acked" "Failed"]
+ (for [[^GlobalStreamId s stats] stream-summary]
+ [(escape-html (.get_componentId s))
+ (.get_streamId s)
+ (float-str (:execute-latencies stats))
+ (nil-to-zero (:executed stats))
+ (float-str (:process-latencies stats))
+ (nil-to-zero (:acked stats))
+ (nil-to-zero (:failed stats))
+ ])
+ )))
+
+(defn bolt-executor-table [topology-id executors window include-sys?]
+ (sorted-table
+ ["Id" "Uptime" "Host" "Port" "Emitted" "Transferred" "Capacity (last 10m)"
+ "Execute latency (ms)" "Executed" "Process latency (ms)" "Acked" "Failed"]
+ (for [^ExecutorSummary e executors
+ :let [stats (.get_stats e)
+ stats (if stats
+ (-> stats
+ (aggregate-bolt-stats include-sys?)
+ (aggregate-bolt-streams)
+ swap-map-order
+ (get window)))]]
+ [(pretty-executor-info (.get_executor_info e))
+ (pretty-uptime-sec (.get_uptime_secs e))
+ (.get_host e)
+ (worker-log-link (.get_host e) (.get_port e))
+ (nil-to-zero (:emitted stats))
+ (nil-to-zero (:transferred stats))
+ (render-capacity (compute-executor-capacity e))
+ (float-str (:execute-latencies stats))
+ (nil-to-zero (:executed stats))
+ (float-str (:process-latencies stats))
+ (nil-to-zero (:acked stats))
+ (nil-to-zero (:failed stats))
+ ]
+ )
+ :time-cols [1]
+ ))
+
+(defn bolt-summary-table [topology-id id stats window]
+ (let [times (stats-times (:emitted stats))
+ display-map (into {} (for [t times] [t pretty-uptime-sec]))
+ display-map (assoc display-map ":all-time" (fn [_] "All time"))]
+ (sorted-table
+ ["Window" "Emitted" "Transferred" "Execute latency (ms)" "Executed" "Process latency (ms)" "Acked" "Failed"]
+ (for [k (concat times [":all-time"])
+ :let [disp ((display-map k) k)]]
+ [(link-to (if (= k window) {:class "red"} {})
+ (url-format "/topology/%s/component/%s?window=%s" topology-id id k)
+ disp)
+ (get-in stats [:emitted k])
+ (get-in stats [:transferred k])
+ (float-str (get-in stats [:execute-latencies k]))
+ (get-in stats [:executed k])
+ (float-str (get-in stats [:process-latencies k]))
+ (get-in stats [:acked k])
+ (get-in stats [:failed k])
+ ])
+ :time-cols [0])))
+
+(defn bolt-page [window ^TopologyInfo topology-info component executors include-sys?]
+ (let [window-hint (str " (" (window-hint window) ")")
+ stats (get-filled-stats executors)
+ stream-summary (-> stats (aggregate-bolt-stats include-sys?))
+ summary (-> stream-summary aggregate-bolt-streams)]
+ (concat
+ [[:h2 "Bolt stats"]]
+ (bolt-summary-table (.get_id topology-info) component summary window)
+
+ [[:h2 "Input stats" window-hint]]
+ (bolt-input-summary-table stream-summary window)
+
+ [[:h2 "Output stats" window-hint]]
+ (bolt-output-summary-table stream-summary window)
+
+ [[:h2 "Executors"]]
+ (bolt-executor-table (.get_id topology-info) executors window include-sys?)
+ )))
+
+(defn errors-table [errors-list]
+ (let [errors (->> errors-list
+ (sort-by #(.get_error_time_secs ^ErrorInfo %))
+ reverse)]
+ (sorted-table
+ ["Time" "Error"]
+ (for [^ErrorInfo e errors]
+ [(date-str (.get_error_time_secs e))
+ [:pre (.get_error e)]])
+ :sort-list "[[0,1]]"
+ )))
+
+(defn component-page [topology-id component window include-sys?]
+ (with-nimbus nimbus
+ (let [window (if window window ":all-time")
+ summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id)
+ topology (.getTopology ^Nimbus$Client nimbus topology-id)
+ type (component-type topology component)
+ summs (component-task-summs summ topology component)
+ spec (cond (= type :spout) (spout-page window summ component summs include-sys?)
+ (= type :bolt) (bolt-page window summ component summs include-sys?))]
+ (concat
+ [[:h2 "Component summary"]
+ (table ["Id" "Topology" "Executors" "Tasks"]
+ [[(escape-html component)
+ (topology-link (.get_id summ) (.get_name summ))
+ (count summs)
+ (sum-tasks summs)
+ ]])]
+ spec
+ [[:h2 "Errors"]
+ (errors-table (get (.get_errors summ) component))]
+ ))))
+
+(defn get-include-sys? [cookies]
+ (let [sys? (get cookies "sys")
+ sys? (if (or (nil? sys?) (= "false" (:value sys?))) false true)]
+ sys?))
+
+(defroutes main-routes
+ (GET "/" [:as {cookies :cookies}]
+ (-> (main-page)
+ ui-template))
+ (GET "/topology/:id" [:as {cookies :cookies} id & m]
+ (let [include-sys? (get-include-sys? cookies)]
- (-> (topology-page id (:window m) include-sys?)
++ (try
++ (-> (topology-page id (:window m) include-sys?)
+ (concat [(mk-system-toggle-button include-sys?)])
- ui-template)))
++ ui-template)
++ (catch Exception e (resp/redirect "/")))))
+ (GET "/topology/:id/component/:component" [:as {cookies :cookies} id component & m]
+ (let [include-sys? (get-include-sys? cookies)]
+ (-> (component-page id component (:window m) include-sys?)
+ (concat [(mk-system-toggle-button include-sys?)])
+ ui-template)))
+ (POST "/topology/:id/activate" [id]
+ (with-nimbus nimbus
+ (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)]
+ (.activate nimbus name)
+ (log-message "Activating topology '" name "'")))
+ (resp/redirect (str "/topology/" id)))
+ (POST "/topology/:id/deactivate" [id]
+ (with-nimbus nimbus
+ (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)]
+ (.deactivate nimbus name)
+ (log-message "Deactivating topology '" name "'")))
+ (resp/redirect (str "/topology/" id)))
+ (POST "/topology/:id/rebalance/:wait-time" [id wait-time]
+ (with-nimbus nimbus
+ (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)
+ options (RebalanceOptions.)]
+ (.set_wait_secs options (Integer/parseInt wait-time))
+ (.rebalance nimbus name options)
+ (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
+ (resp/redirect (str "/topology/" id)))
+ (POST "/topology/:id/kill/:wait-time" [id wait-time]
+ (with-nimbus nimbus
+ (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)
+ options (KillOptions.)]
+ (.set_wait_secs options (Integer/parseInt wait-time))
+ (.killTopologyWithOpts nimbus name options)
+ (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
+ (resp/redirect (str "/topology/" id)))
+ (route/resources "/")
+ (route/not-found "Page not found"))
+
+(defn exception->html [ex]
+ (concat
+ [[:h2 "Internal Server Error"]]
+ [[:pre (let [sw (java.io.StringWriter.)]
+ (.printStackTrace ex (java.io.PrintWriter. sw))
+ (.toString sw))]]))
+
+(defn catch-errors [handler]
+ (fn [request]
+ (try
+ (handler request)
+ (catch Exception ex
+ (-> (resp/response (ui-template (exception->html ex)))
+ (resp/status 500)
+ (resp/content-type "text/html"))
+ ))))
+
+(def app
+ (handler/site (-> main-routes
+ (wrap-reload '[backtype.storm.ui.core])
+ catch-errors)))
+
+(defn start-server! [] (run-jetty app {:port (Integer. (*STORM-CONF* UI-PORT))
+ :join? false}))
+
+(defn -main [] (start-server!))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/697849dc/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/util.clj
index 5e488e0,0000000..d83a194
mode 100644,000000..100644
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@@ -1,870 -1,0 +1,880 @@@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http:;; www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.util
+ (:import [java.net InetAddress])
+ (:import [java.util Map Map$Entry List ArrayList Collection Iterator HashMap])
+ (:import [java.io FileReader FileNotFoundException])
+ (:import [backtype.storm Config])
+ (:import [backtype.storm.utils Time Container ClojureTimerTask Utils
+ MutableObject MutableInt])
+ (:import [java.util UUID Random ArrayList List Collections])
+ (:import [java.util.zip ZipFile])
+ (:import [java.util.concurrent.locks ReentrantReadWriteLock])
+ (:import [java.util.concurrent Semaphore])
+ (:import [java.io File RandomAccessFile StringWriter PrintWriter])
+ (:import [java.lang.management ManagementFactory])
+ (:import [org.apache.commons.exec DefaultExecutor CommandLine])
+ (:import [org.apache.commons.io FileUtils])
+ (:import [org.apache.commons.exec ExecuteException])
+ (:import [org.json.simple JSONValue])
+ (:require [clojure [string :as str]])
+ (:import [clojure.lang RT])
+ (:require [clojure [set :as set]])
+ (:use [clojure walk])
+ (:use [backtype.storm log])
+ )
+
+(defn wrap-in-runtime
+ "Wraps an exception in a RuntimeException if needed"
+ [^Exception e]
+ (if (instance? RuntimeException e)
+ e
+ (RuntimeException. e)))
+
++(def on-windows?
++ (= "Windows_NT" (System/getenv "OS")))
++
++(def file-path-separator
++ (System/getProperty "file.separator"))
++
++(def class-path-separator
++ (System/getProperty "path.separator"))
++
+(defmacro defalias
+ "Defines an alias for a var: a new var with the same root binding (if
+ any) and similar metadata. The metadata of the alias is its initial
+ metadata (as provided by def) merged into the metadata of the original."
+ ([name orig]
+ `(do
+ (alter-meta!
+ (if (.hasRoot (var ~orig))
+ (def ~name (.getRawRoot (var ~orig)))
+ (def ~name))
+ ;; When copying metadata, disregard {:macro false}.
+ ;; Workaround for http://www.assembla.com/spaces/clojure/tickets/273
+ #(conj (dissoc % :macro)
+ (apply dissoc (meta (var ~orig)) (remove #{:macro} (keys %)))))
+ (var ~name)))
+ ([name orig doc]
+ (list `defalias (with-meta name (assoc (meta name) :doc doc)) orig)))
+
+;; name-with-attributes by Konrad Hinsen:
+(defn name-with-attributes
+ "To be used in macro definitions.
+ Handles optional docstrings and attribute maps for a name to be defined
+ in a list of macro arguments. If the first macro argument is a string,
+ it is added as a docstring to name and removed from the macro argument
+ list. If afterwards the first macro argument is a map, its entries are
+ added to the name's metadata map and the map is removed from the
+ macro argument list. The return value is a vector containing the name
+ with its extended metadata map and the list of unprocessed macro
+ arguments."
+ [name macro-args]
+ (let [[docstring macro-args] (if (string? (first macro-args))
+ [(first macro-args) (next macro-args)]
+ [nil macro-args])
+ [attr macro-args] (if (map? (first macro-args))
+ [(first macro-args) (next macro-args)]
+ [{} macro-args])
+ attr (if docstring
+ (assoc attr :doc docstring)
+ attr)
+ attr (if (meta name)
+ (conj (meta name) attr)
+ attr)]
+ [(with-meta name attr) macro-args]))
+
+(defmacro defnk
+ "Define a function accepting keyword arguments. Symbols up to the first
+ keyword in the parameter list are taken as positional arguments. Then
+ an alternating sequence of keywords and defaults values is expected. The
+ values of the keyword arguments are available in the function body by
+ virtue of the symbol corresponding to the keyword (cf. :keys destructuring).
+ defnk accepts an optional docstring as well as an optional metadata map."
+ [fn-name & fn-tail]
+ (let [[fn-name [args & body]] (name-with-attributes fn-name fn-tail)
+ [pos kw-vals] (split-with symbol? args)
+ syms (map #(-> % name symbol) (take-nth 2 kw-vals))
+ values (take-nth 2 (rest kw-vals))
+ sym-vals (apply hash-map (interleave syms values))
+ de-map {:keys (vec syms)
+ :or sym-vals}]
+ `(defn ~fn-name
+ [~@pos & options#]
+ (let [~de-map (apply hash-map options#)]
+ ~@body))))
+
+(defn find-first
+ "Returns the first item of coll for which (pred item) returns logical true.
+ Consumes sequences up to the first match, will consume the entire sequence
+ and return nil if no match is found."
+ [pred coll]
+ (first (filter pred coll)))
+
+(defn dissoc-in
+ "Dissociates an entry from a nested associative structure returning a new
+ nested structure. keys is a sequence of keys. Any empty maps that result
+ will not be present in the new structure."
+ [m [k & ks :as keys]]
+ (if ks
+ (if-let [nextmap (get m k)]
+ (let [newmap (dissoc-in nextmap ks)]
+ (if (seq newmap)
+ (assoc m k newmap)
+ (dissoc m k)))
+ m)
+ (dissoc m k)))
+
+(defn indexed
+ "Returns a lazy sequence of [index, item] pairs, where items come
+ from 's' and indexes count up from zero.
+
+ (indexed '(a b c d)) => ([0 a] [1 b] [2 c] [3 d])"
+ [s]
+ (map vector (iterate inc 0) s))
+
+(defn positions
+ "Returns a lazy sequence containing the positions at which pred
+ is true for items in coll."
+ [pred coll]
+ (for [[idx elt] (indexed coll) :when (pred elt)] idx))
+
+(defn exception-cause? [klass ^Throwable t]
+ (->> (iterate #(.getCause ^Throwable %) t)
+ (take-while identity)
+ (some (partial instance? klass))
+ boolean))
+
+(defmacro thrown-cause? [klass & body]
+ `(try
+ ~@body
+ false
+ (catch Throwable t#
+ (exception-cause? ~klass t#))))
+
+(defmacro thrown-cause-with-msg? [klass re & body]
+ `(try
+ ~@body
+ false
+ (catch Throwable t#
+ (and (re-matches ~re (.getMessage t#))
+ (exception-cause? ~klass t#)))))
+
+(defmacro forcat [[args aseq] & body]
+ `(mapcat (fn [~args]
+ ~@body)
+ ~aseq))
+
+(defmacro try-cause [& body]
+ (let [checker (fn [form]
+ (or (not (sequential? form))
+ (not= 'catch (first form))))
+ [code guards] (split-with checker body)
+ error-local (gensym "t")
+ guards (forcat [[_ klass local & guard-body] guards]
+ `((exception-cause? ~klass ~error-local)
+ (let [~local ~error-local]
+ ~@guard-body
+ )))
+ ]
+ `(try ~@code
+ (catch Throwable ~error-local
+ (cond ~@guards
+ true (throw ~error-local)
+ )))))
+
+(defn local-hostname []
+ (.getCanonicalHostName (InetAddress/getLocalHost)))
+
+(letfn [(try-port [port]
+ (with-open [socket (java.net.ServerSocket. port)]
+ (.getLocalPort socket)))]
+ (defn available-port
+ ([] (try-port 0))
+ ([preferred]
+ (try
+ (try-port preferred)
+ (catch java.io.IOException e
+ (available-port))))))
+
+(defn uuid []
+ (str (UUID/randomUUID)))
+
+(defn current-time-secs []
+ (Time/currentTimeSecs))
+
+(defn current-time-millis []
+ (Time/currentTimeMillis))
+
+(defn clojurify-structure [s]
+ (prewalk (fn [x]
+ (cond (instance? Map x) (into {} x)
+ (instance? List x) (vec x)
+ true x))
+ s))
+
+(defmacro with-file-lock [path & body]
+ `(let [f# (File. ~path)
+ _# (.createNewFile f#)
+ rf# (RandomAccessFile. f# "rw")
+ lock# (.. rf# (getChannel) (lock))]
+ (try
+ ~@body
+ (finally
+ (.release lock#)
+ (.close rf#))
+ )))
+
+(defn tokenize-path [^String path]
+ (let [toks (.split path "/")]
+ (vec (filter (complement empty?) toks))
+ ))
+
+(defn assoc-conj [m k v]
+ (merge-with concat m {k [v]}))
+
+;; returns [ones in first set not in second, ones in second set not in first]
+(defn set-delta [old curr]
+ (let [s1 (set old)
+ s2 (set curr)]
+ [(set/difference s1 s2) (set/difference s2 s1)]
+ ))
+
+(defn parent-path [path]
+ (let [toks (tokenize-path path)]
+ (str "/" (str/join "/" (butlast toks)))
+ ))
+
+(defn toks->path [toks]
+ (str "/" (str/join "/" toks))
+ )
+
+(defn normalize-path [^String path]
+ (toks->path (tokenize-path path)))
+
+(defn map-val [afn amap]
+ (into {}
+ (for [[k v] amap]
+ [k (afn v)]
+ )))
+
+(defn filter-val [afn amap]
+ (into {}
+ (filter
+ (fn [[k v]]
+ (afn v))
+ amap
+ )))
+
+(defn filter-key [afn amap]
+ (into {}
+ (filter
+ (fn [[k v]]
+ (afn k))
+ amap
+ )))
+
+(defn map-key [afn amap]
+ (into {}
+ (for [[k v] amap]
+ [(afn k) v]
+ )))
+
+(defn separate [pred aseq]
+ [(filter pred aseq) (filter (complement pred) aseq)])
+
+(defn full-path [parent name]
+ (let [toks (tokenize-path parent)]
+ (toks->path (conj toks name))
+ ))
+
+(def not-nil? (complement nil?))
+
+(defn barr [& vals]
+ (byte-array (map byte vals)))
+
+(defn halt-process! [val & msg]
+ (log-message "Halting process: " msg)
+ (.halt (Runtime/getRuntime) val)
+ )
+
+(defn sum [vals]
+ (reduce + vals))
+
+(defn repeat-seq
+ ([aseq]
+ (apply concat (repeat aseq)))
+ ([amt aseq]
+ (apply concat (repeat amt aseq))
+ ))
+
+(defn div
+ "Perform floating point division on the arguments."
+ [f & rest] (apply / (double f) rest))
+
+(defn defaulted [val default]
+ (if val val default))
+
+(defn mk-counter
+ ([] (mk-counter 1))
+ ([start-val]
+ (let [val (atom (dec start-val))]
+ (fn []
+ (swap! val inc)))))
+
+(defmacro for-times [times & body]
+ `(for [i# (range ~times)]
+ ~@body
+ ))
+
+(defmacro dofor [& body]
+ `(doall (for ~@body)))
+
+(defn reverse-map
+ "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
+ [amap]
+ (reduce (fn [m [k v]]
+ (let [existing (get m v [])]
+ (assoc m v (conj existing k))))
+ {} amap))
+
+(defmacro print-vars [& vars]
+ (let [prints (for [v vars] `(println ~(str v) ~v))]
+ `(do ~@prints)))
+
+(defn process-pid
+ "Gets the pid of this JVM. Hacky because Java doesn't provide a real way to do this."
+ []
+ (let [name (.getName (ManagementFactory/getRuntimeMXBean))
+ split (.split name "@")]
+ (when-not (= 2 (count split))
+ (throw (RuntimeException. (str "Got unexpected process name: " name))))
+ (first split)
+ ))
+
+(defn exec-command! [command]
+ (let [[comm-str & args] (seq (.split command " "))
+ command (CommandLine. comm-str)]
+ (doseq [a args]
+ (.addArgument command a))
+ (.execute (DefaultExecutor.) command)
+ ))
+
+(defn extract-dir-from-jar [jarpath dir destdir]
+ (try-cause
+ (exec-command! (str "unzip -qq " jarpath " " dir "/** -d " destdir))
+ (catch ExecuteException e
+ (log-message "Could not extract " dir " from " jarpath))
+ ))
+
+(defn ensure-process-killed! [pid]
+ ;; TODO: should probably do a ps ax of some sort to make sure it was killed
+ (try-cause
- (exec-command! (str "kill -9 " pid))
++ (exec-command! (str (if on-windows? "taskkill /f /pid " "kill -9 ") pid))
+ (catch ExecuteException e
+ (log-message "Error when trying to kill " pid ". Process is probably already dead."))
+ ))
+
+(defnk launch-process [command :environment {}]
+ (let [command (->> (seq (.split command " "))
+ (filter (complement empty?)))
+ builder (ProcessBuilder. command)
+ process-env (.environment builder)]
+ (doseq [[k v] environment]
+ (.put process-env k v))
+ (.start builder)
+ ))
+
+(defn sleep-secs [secs]
+ (when (pos? secs)
+ (Time/sleep (* (long secs) 1000))))
+
+(defn sleep-until-secs [target-secs]
+ (Time/sleepUntil (* (long target-secs) 1000)))
+
+(defprotocol SmartThread
+ (start [this])
+ (join [this])
+ (interrupt [this])
+ (sleeping? [this]))
+
+;; afn returns amount of time to sleep
+(defnk async-loop [afn
+ :daemon false
+ :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
+ :priority Thread/NORM_PRIORITY
+ :factory? false
+ :start true
+ :thread-name nil]
+ (let [thread (Thread.
+ (fn []
+ (try-cause
+ (let [afn (if factory? (afn) afn)]
+ (loop []
+ (let [sleep-time (afn)]
+ (when-not (nil? sleep-time)
+ (sleep-secs sleep-time)
+ (recur))
+ )))
+ (catch InterruptedException e
+ (log-message "Async loop interrupted!")
+ )
+ (catch Throwable t
+ (log-error t "Async loop died!")
+ (kill-fn t)
+ ))
+ ))]
+ (.setDaemon thread daemon)
+ (.setPriority thread priority)
+ (when thread-name
+ (.setName thread (str (.getName thread) "-" thread-name)))
+ (when start
+ (.start thread))
+ ;; should return object that supports stop, interrupt, join, and waiting?
+ (reify SmartThread
+ (start [this]
+ (.start thread))
+ (join [this]
+ (.join thread))
+ (interrupt [this]
+ (.interrupt thread))
+ (sleeping? [this]
+ (Time/isThreadWaiting thread)
+ ))
+ ))
+
+(defn exists-file? [path]
+ (.exists (File. path)))
+
+(defn rmr [path]
+ (log-debug "Rmr path " path)
+ (when (exists-file? path)
+ (try
+ (FileUtils/forceDelete (File. path))
+ (catch FileNotFoundException e))))
+
+(defn rmpath
+ "Removes file or directory at the path. Not recursive. Throws exception on failure"
+ [path]
+ (log-debug "Removing path " path)
+ (when (exists-file? path)
+ (let [deleted? (.delete (File. path))]
+ (when-not deleted?
+ (throw (RuntimeException. (str "Failed to delete " path))))
+ )))
+
+(defn local-mkdirs
+ [path]
+ (log-debug "Making dirs at " path)
+ (FileUtils/forceMkdir (File. path)))
+
+(defn touch [path]
+ (log-debug "Touching file at " path)
- (let [success? (.createNewFile (File. path))]
++ (let [success? (do (if on-windows? (.mkdirs (.getParentFile (File. path))))
++ (.createNewFile (File. path)))]
+ (when-not success?
+ (throw (RuntimeException. (str "Failed to touch " path))))
+ ))
+
+(defn read-dir-contents [dir]
+ (if (exists-file? dir)
+ (let [content-files (.listFiles (File. dir))]
+ (map #(.getName ^File %) content-files))
+ [] ))
+
+(defn compact [aseq]
+ (filter (complement nil?) aseq))
+
+(defn current-classpath []
+ (System/getProperty "java.class.path"))
+
+(defn add-to-classpath [classpath paths]
- (str/join ":" (cons classpath paths)))
++ (str/join class-path-separator (cons classpath paths)))
+
+(defn ^ReentrantReadWriteLock mk-rw-lock []
+ (ReentrantReadWriteLock.))
+
+(defmacro read-locked [rw-lock & body]
+ (let [lock (with-meta rw-lock {:tag `ReentrantReadWriteLock})]
+ `(let [rlock# (.readLock ~lock)]
+ (try (.lock rlock#)
+ ~@body
+ (finally (.unlock rlock#))))))
+
+(defmacro write-locked [rw-lock & body]
+ (let [lock (with-meta rw-lock {:tag `ReentrantReadWriteLock})]
+ `(let [wlock# (.writeLock ~lock)]
+ (try (.lock wlock#)
+ ~@body
+ (finally (.unlock wlock#))))))
+
+(defn wait-for-condition [apredicate]
+ (while (not (apredicate))
+ (Time/sleep 100)
+ ))
+
+(defn some? [pred aseq]
+ ((complement nil?) (some pred aseq)))
+
+(defn time-delta [time-secs]
+ (- (current-time-secs) time-secs))
+
+(defn time-delta-ms [time-ms]
+ (- (System/currentTimeMillis) (long time-ms)))
+
+(defn parse-int [str]
+ (Integer/valueOf str))
+
+(defn integer-divided [sum num-pieces]
+ (clojurify-structure (Utils/integerDivided sum num-pieces)))
+
+(defn collectify [obj]
+ (if (or (sequential? obj) (instance? Collection obj)) obj [obj]))
+
+(defn to-json [obj]
+ (JSONValue/toJSONString obj))
+
+(defn from-json [^String str]
+ (if str
+ (clojurify-structure
+ (JSONValue/parse str))
+ nil
+ ))
+
+(defmacro letlocals [& body]
+ (let [[tobind lexpr] (split-at (dec (count body)) body)
+ binded (vec (mapcat (fn [e]
+ (if (and (list? e) (= 'bind (first e)))
+ [(second e) (last e)]
+ ['_ e]
+ ))
+ tobind ))]
+ `(let ~binded
+ ~(first lexpr)
+ )))
+
+(defn remove-first [pred aseq]
+ (let [[b e] (split-with (complement pred) aseq)]
+ (when (empty? e)
+ (throw (IllegalArgumentException. "Nothing to remove")))
+ (concat b (rest e))
+ ))
+
+(defn assoc-non-nil [m k v]
+ (if v (assoc m k v) m))
+
+(defn multi-set
+ "Returns a map of elem to count"
+ [aseq]
+ (apply merge-with +
+ (map #(hash-map % 1) aseq)))
+
+(defn set-var-root* [avar val]
+ (alter-var-root avar (fn [avar] val)))
+
+(defmacro set-var-root [var-sym val]
+ `(set-var-root* (var ~var-sym) ~val))
+
+(defmacro with-var-roots [bindings & body]
+ (let [settings (partition 2 bindings)
+ tmpvars (repeatedly (count settings) (partial gensym "old"))
+ vars (map first settings)
+ savevals (vec (mapcat (fn [t v] [t v]) tmpvars vars))
+ setters (for [[v s] settings] `(set-var-root ~v ~s))
+ restorers (map (fn [v s] `(set-var-root ~v ~s)) vars tmpvars)
+ ]
+ `(let ~savevals
+ ~@setters
+ (try
+ ~@body
+ (finally
+ ~@restorers))
+ )))
+
+(defn map-diff
+ "Returns mappings in m2 that aren't in m1"
+ [m1 m2]
+ (into {}
+ (filter
+ (fn [[k v]] (not= v (m1 k)))
+ m2
+ )))
+
+
+(defn select-keys-pred [pred amap]
+ (into {}
+ (filter
+ (fn [[k v]]
+ (pred k))
+ amap)))
+
+
+(defn rotating-random-range [choices]
+ (let [rand (Random.)
+ choices (ArrayList. choices)]
+ (Collections/shuffle choices rand)
+ [(MutableInt. -1) choices rand]))
+
+(defn acquire-random-range-id [[^MutableInt curr ^List state ^Random rand]]
+ (when (>= (.increment curr) (.size state))
+ (.set curr 0)
+ (Collections/shuffle state rand))
+ (.get state (.get curr)))
+
+; this can be rewritten to be tail recursive
+(defn interleave-all [& colls]
+ (if (empty? colls)
+ []
+ (let [colls (filter (complement empty?) colls)
+ my-elems (map first colls)
+ rest-elems (apply interleave-all (map rest colls))]
+ (concat my-elems rest-elems)
+ )))
+
+(defn update [m k afn]
+ (assoc m k (afn (get m k))))
+
+(defn any-intersection [& sets]
+ (let [elem->count (multi-set (apply concat sets))]
+ (-> (filter-val #(> % 1) elem->count)
+ keys
+ )))
+
+(defn between?
+ "val >= lower and val <= upper"
+ [val lower upper]
+ (and (>= val lower)
+ (<= val upper)))
+
+(defmacro benchmark [& body]
+ `(let [l# (doall (range 1000000))]
+ (time
+ (doseq [i# l#]
+ ~@body))))
+
+(defn rand-sampler [freq]
+ (let [r (java.util.Random.)]
+ (fn []
+ (= 0 (.nextInt r freq)))
+ ))
+
+(defn even-sampler [freq]
+ (let [freq (int freq)
+ start (int 0)
+ r (java.util.Random.)
+ curr (MutableInt. -1)
+ target (MutableInt. (.nextInt r freq))]
+ (with-meta
+ (fn []
+ (let [i (.increment curr)]
+ (when (>= i freq)
+ (.set curr start)
+ (.set target (.nextInt r freq))))
+ (= (.get curr) (.get target)))
+ {:rate freq})))
+
+(defn sampler-rate [sampler]
+ (:rate (meta sampler)))
+
+(defn class-selector [obj & args] (class obj))
+
+(defn uptime-computer []
+ (let [start-time (current-time-secs)]
+ (fn []
+ (time-delta start-time)
+ )))
+
+(defn stringify-error [error]
+ (let [result (StringWriter.)
+ printer (PrintWriter. result)]
+ (.printStackTrace error printer)
+ (.toString result)
+ ))
+
+(defn nil-to-zero [v]
+ (or v 0))
+
+(defn bit-xor-vals [vals]
+ (reduce bit-xor 0 vals))
+
+(defmacro with-error-reaction [afn & body]
+ `(try ~@body
+ (catch Throwable t# (~afn t#))))
+
+(defn container []
+ (Container.))
+
+(defn container-set! [^Container container obj]
+ (set! (. container object) obj)
+ container)
+
+(defn container-get [^Container container]
+ (. container object))
+
+(defn to-millis [secs]
+ (* 1000 (long secs)))
+
+(defn throw-runtime [& strs]
+ (throw (RuntimeException. (apply str strs))))
+
+(defn redirect-stdio-to-slf4j! []
+ ;; set-var-root doesn't work with *out* and *err*, so digging much deeper here
+ ;; Unfortunately, this code seems to work at the REPL but not when spawned as worker processes
+ ;; it might have something to do with being a child process
+ ;; (set! (. (.getThreadBinding RT/OUT) val)
+ ;; (java.io.OutputStreamWriter.
+ ;; (log-stream :info "STDIO")))
+ ;; (set! (. (.getThreadBinding RT/ERR) val)
+ ;; (PrintWriter.
+ ;; (java.io.OutputStreamWriter.
+ ;; (log-stream :error "STDIO"))
+ ;; true))
+ (log-capture! "STDIO"))
+
+(defn spy [prefix val]
+ (log-message prefix ": " val)
+ val)
+
+(defn zip-contains-dir? [zipfile target]
+ (let [entries (->> zipfile (ZipFile.) .entries enumeration-seq (map (memfn getName)))]
- (some? #(.startsWith % (str target "/")) entries)
++ (some? #(.startsWith % (str target file-path-separator)) entries)
+ ))
+
+(defn url-encode [s]
+ (java.net.URLEncoder/encode s))
+
+(defn join-maps [& maps]
+ (let [all-keys (apply set/union (for [m maps] (-> m keys set)))]
+ (into {}
+ (for [k all-keys]
+ [k (for [m maps] (m k))]
+ ))))
+
+(defn partition-fixed [max-num-chunks aseq]
+ (if (zero? max-num-chunks)
+ []
+ (let [chunks (->> (integer-divided (count aseq) max-num-chunks)
+ (#(dissoc % 0))
+ (sort-by (comp - first))
+ (mapcat (fn [[size amt]] (repeat amt size)))
+ )]
+ (loop [result []
+ [chunk & rest-chunks] chunks
+ data aseq]
+ (if (nil? chunk)
+ result
+ (let [[c rest-data] (split-at chunk data)]
+ (recur (conj result c)
+ rest-chunks
+ rest-data)))))))
+
+
+(defn assoc-apply-self [curr key afn]
+ (assoc curr key (afn curr)))
+
+(defmacro recursive-map [& forms]
+ (->> (partition 2 forms)
+ (map (fn [[key form]] `(assoc-apply-self ~key (fn [~'<>] ~form))))
+ (concat `(-> {}))))
+
+(defn current-stack-trace []
+ (->> (Thread/currentThread)
+ .getStackTrace
+ (map str)
+ (str/join "\n")
+ ))
+
+(defn get-iterator [^Iterable alist]
+ (if alist (.iterator alist)))
+
+(defn iter-has-next? [^Iterator iter]
+ (if iter (.hasNext iter) false))
+
+(defn iter-next [^Iterator iter]
+ (.next iter))
+
+(defmacro fast-list-iter [pairs & body]
+ (let [pairs (partition 2 pairs)
+ lists (map second pairs)
+ elems (map first pairs)
+ iters (map (fn [_] (gensym)) lists)
+ bindings (->> (map (fn [i l] [i `(get-iterator ~l)]) iters lists) (apply concat))
+ tests (map (fn [i] `(iter-has-next? ~i)) iters)
+ assignments (->> (map (fn [e i] [e `(iter-next ~i)]) elems iters) (apply concat))]
+ `(let [~@bindings]
+ (while (and ~@tests)
+ (let [~@assignments]
+ ~@body
+ )))))
+
+(defn fast-list-map [afn alist]
+ (let [ret (ArrayList.)]
+ (fast-list-iter [e alist]
+ (.add ret (afn e)))
+ ret ))
+
+(defmacro fast-list-for [[e alist] & body]
+ `(fast-list-map (fn [~e] ~@body) ~alist))
+
+(defn map-iter [^Map amap]
+ (if amap (-> amap .entrySet .iterator)))
+
+(defn convert-entry [^Map$Entry entry]
+ [(.getKey entry) (.getValue entry)])
+
+(defmacro fast-map-iter [[bind amap] & body]
+ `(let [iter# (map-iter ~amap)]
+ (while (iter-has-next? iter#)
+ (let [entry# (iter-next iter#)
+ ~bind (convert-entry entry#)]
+ ~@body
+ ))))
+
+(defn fast-first [^List alist]
+ (.get alist 0))
+
+(defmacro get-with-default [amap key default-val]
+ `(let [curr# (.get ~amap ~key)]
+ (if curr#
+ curr#
+ (do
+ (let [new# ~default-val]
+ (.put ~amap ~key new#)
+ new#
+ )))))
+
+(defn fast-group-by [afn alist]
+ (let [ret (HashMap.)]
+ (fast-list-iter [e alist]
+ (let [key (afn e)
+ ^List curr (get-with-default ret key (ArrayList.))]
+ (.add curr e)))
+ ret ))
+
+(defn new-instance [klass]
+ (let [klass (if (string? klass) (Class/forName klass) klass)]
+ (.newInstance klass)
+ ))
+
+(defmacro -<>
+ ([x] x)
+ ([x form] (if (seq? form)
+ (with-meta
+ (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
+ (concat begin [x] end))
+ (meta form))
+ (list form x)))
+ ([x form & more] `(-<> (-<> ~x ~form) ~@more)))