You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2013/12/19 07:25:18 UTC

[06/12] Merge branch '0.9.0-windows' of github.com:davidlao2k/storm into storm-windows

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/697849dc/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index a575b1f,0000000..91989bc
mode 100644,000000..100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -1,828 -1,0 +1,830 @@@
 +;; Licensed to the Apache Software Foundation (ASF) under one
 +;; or more contributor license agreements.  See the NOTICE file
 +;; distributed with this work for additional information
 +;; regarding copyright ownership.  The ASF licenses this file
 +;; to you under the Apache License, Version 2.0 (the
 +;; "License"); you may not use this file except in compliance
 +;; with the License.  You may obtain a copy of the License at
 +;;
 +;; http:;; www.apache.org/licenses/LICENSE-2.0
 +;;
 +;; Unless required by applicable law or agreed to in writing, software
 +;; distributed under the License is distributed on an "AS IS" BASIS,
 +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +;; See the License for the specific language governing permissions and
 +;; limitations under the License.
 +(ns backtype.storm.ui.core
 +  (:use compojure.core)
 +  (:use ring.middleware.reload)
 +  (:use [hiccup core page-helpers])
 +  (:use [backtype.storm config util log])
 +  (:use [backtype.storm.ui helpers])
 +  (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID system-id?]]])
 +  (:use [ring.adapter.jetty :only [run-jetty]])
 +  (:use [clojure.string :only [trim]])
 +  (:import [backtype.storm.generated ExecutorSpecificStats
 +            ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats
 +            ErrorInfo ClusterSummary SupervisorSummary TopologySummary
 +            Nimbus$Client StormTopology GlobalStreamId RebalanceOptions
 +            KillOptions])
 +  (:import [java.io File])
 +  (:require [compojure.route :as route]
 +            [compojure.handler :as handler]
 +            [ring.util.response :as resp]
 +            [backtype.storm [thrift :as thrift]])
 +  (:import [org.apache.commons.lang StringEscapeUtils])
 +  (:gen-class))
 +
 +(def ^:dynamic *STORM-CONF* (read-storm-config))
 +
 +(defmacro with-nimbus [nimbus-sym & body]
 +  `(thrift/with-nimbus-connection [~nimbus-sym (*STORM-CONF* NIMBUS-HOST) (*STORM-CONF* NIMBUS-THRIFT-PORT)]
 +     ~@body
 +     ))
 +
 +(defn get-filled-stats [summs]
 +  (->> summs
 +       (map #(.get_stats ^ExecutorSummary %))
 +       (filter not-nil?)))
 +
 +(defn mk-system-toggle-button [include-sys?]
 +  [:p {:class "js-only"}
 +    [:input {:type "button"
 +             :value (str (if include-sys? "Hide" "Show") " System Stats")
 +             :onclick "toggleSys()"}]])
 +
 +(defn ui-template [body]
 +  (html4
 +   [:head
 +    [:title "Storm UI"]
 +    (include-css "/css/bootstrap-1.1.0.css")
 +    (include-css "/css/style.css")
 +    (include-js "/js/jquery-1.6.2.min.js")
 +    (include-js "/js/jquery.tablesorter.min.js")
 +    (include-js "/js/jquery.cookies.2.2.0.min.js")
 +    (include-js "/js/script.js")
 +    ]
 +   [:body
 +    [:h1 (link-to "/" "Storm UI")]
 +    (seq body)
 +    ]))
 +
 +(defn read-storm-version []
 +  (let [storm-home (System/getProperty "storm.home")
 +        release-path (format "%s/RELEASE" storm-home)
 +        release-file (File. release-path)]
 +    (if (and (.exists release-file) (.isFile release-file))
 +      (trim (slurp release-path))
 +      "Unknown")))
 +
 +(defn cluster-summary-table [^ClusterSummary summ]
 +  (let [sups (.get_supervisors summ)
 +        used-slots (reduce + (map #(.get_num_used_workers ^SupervisorSummary %) sups))
 +        total-slots (reduce + (map #(.get_num_workers ^SupervisorSummary %) sups))
 +        free-slots (- total-slots used-slots)
 +        total-tasks (->> (.get_topologies summ)
 +                         (map #(.get_num_tasks ^TopologySummary %))
 +                         (reduce +))
 +        total-executors (->> (.get_topologies summ)
 +                             (map #(.get_num_executors ^TopologySummary %))
 +                             (reduce +))]
 +    (table ["Version" "Nimbus uptime" "Supervisors" "Used slots" "Free slots" "Total slots" "Executors" "Tasks"]
 +           [[(read-storm-version)
 +             (pretty-uptime-sec (.get_nimbus_uptime_secs summ))
 +             (count sups)
 +             used-slots
 +             free-slots
 +             total-slots
 +             total-executors
 +             total-tasks]])
 +    ))
 +
 +(defn topology-link
 +  ([id] (topology-link id id))
 +  ([id content]
 +     (link-to (url-format "/topology/%s" id) (escape-html content))))
 +
 +(defn main-topology-summary-table [summs]
 +  ;; make the id clickable
 +  ;; make the table sortable
 +  (sorted-table
 +   ["Name" "Id" "Status" "Uptime" "Num workers" "Num executors" "Num tasks"]
 +   (for [^TopologySummary t summs]
 +     [(topology-link (.get_id t) (.get_name t))
 +      (escape-html (.get_id t))
 +      (.get_status t)
 +      (pretty-uptime-sec (.get_uptime_secs t))
 +      (.get_num_workers t)
 +      (.get_num_executors t)
 +      (.get_num_tasks t)
 +      ])
 +   :time-cols [3]
 +   :sort-list "[[0,0]]"
 +   ))
 +
 +(defn supervisor-summary-table [summs]
 +  (sorted-table
 +   ["Id" "Host" "Uptime" "Slots" "Used slots"]
 +   (for [^SupervisorSummary s summs]
 +     [(.get_supervisor_id s)
 +      (.get_host s)
 +      (pretty-uptime-sec (.get_uptime_secs s))
 +      (.get_num_workers s)
 +      (.get_num_used_workers s)])
 +   :time-cols [2]))
 +
 +(defn configuration-table [conf]
 +  (sorted-table ["Key" "Value"]
 +    (map #(vector (key %) (str (val %))) conf)))
 +
 +(defn main-page []
 +  (with-nimbus nimbus
 +    (let [summ (.getClusterInfo ^Nimbus$Client nimbus)]
 +      (concat
 +       [[:h2 "Cluster Summary"]]
 +       [(cluster-summary-table summ)]
 +       [[:h2 "Topology summary"]]
 +       (main-topology-summary-table (.get_topologies summ))
 +       [[:h2 "Supervisor summary"]]
 +       (supervisor-summary-table (.get_supervisors summ))
 +       [[:h2 "Nimbus Configuration"]]
 +       (configuration-table (from-json (.getNimbusConf ^Nimbus$Client nimbus)))
 +       ))))
 +
 +(defn component-type [^StormTopology topology id]
 +  (let [bolts (.get_bolts topology)
 +        spouts (.get_spouts topology)]
 +    (cond
 +     (.containsKey bolts id) :bolt
 +     (.containsKey spouts id) :spout
 +     )))
 +
 +(defn executor-summary-type [topology ^ExecutorSummary s]
 +  (component-type topology (.get_component_id s)))
 +
 +(defn add-pairs
 +  ([] [0 0])
 +  ([[a1 a2] [b1 b2]]
 +      [(+ a1 b1) (+ a2 b2)]))
 +
 +(defn expand-averages [avg counts]
 +  (let [avg (clojurify-structure avg)
 +        counts (clojurify-structure counts)]
 +    (into {}
 +          (for [[slice streams] counts]
 +            [slice
 +             (into {}
 +                   (for [[stream c] streams]
 +                     [stream
 +                      [(* c (get-in avg [slice stream]))
 +                       c]]
 +                     ))]
 +            ))))
 +
 +
 +(defn expand-averages-seq [average-seq counts-seq]
 +  (->> (map vector average-seq counts-seq)
 +       (map #(apply expand-averages %))
 +       (apply merge-with
 +              (fn [s1 s2]
 +                (merge-with
 +                 add-pairs
 +                 s1
 +                 s2)))
 +       ))
 +
 +(defn- val-avg [[t c]]
 +  (if (= t 0) 0
 +      (double (/ t c))))
 +
 +(defn aggregate-averages [average-seq counts-seq]
 +  (->> (expand-averages-seq average-seq counts-seq)
 +       (map-val
 +        (fn [s]
 +          (map-val val-avg s)
 +          ))
 +       ))
 +
 +(defn aggregate-counts [counts-seq]
 +  (->> counts-seq
 +       (map clojurify-structure)
 +       (apply merge-with
 +              (fn [s1 s2]
 +                (merge-with + s1 s2))
 +              )))
 +
 +(defn aggregate-avg-streams [avg counts]
 +  (let [expanded (expand-averages avg counts)]
 +    (->> expanded
 +         (map-val #(reduce add-pairs (vals %)))
 +         (map-val val-avg)
 +         )))
 +
 +(defn aggregate-count-streams [stats]
 +  (->> stats
 +       (map-val #(reduce + (vals %)))))
 +
 +(defn aggregate-common-stats [stats-seq]
 +  {:emitted (aggregate-counts (map #(.get_emitted ^ExecutorStats %) stats-seq))
 +   :transferred (aggregate-counts (map #(.get_transferred ^ExecutorStats %) stats-seq))}
 +  )
 +
 +(defn mk-include-sys-fn [include-sys?]
 +  (if include-sys?
 +    (fn [_] true)
 +    (fn [stream] (and (string? stream) (not (system-id? stream))))))
 +
 +(defn pre-process [stream-summary include-sys?]
 +  (let [filter-fn (mk-include-sys-fn include-sys?)
 +        emitted (:emitted stream-summary)
 +        emitted (into {} (for [[window stat] emitted]
 +                           {window (filter-key filter-fn stat)}))
 +        transferred (:transferred stream-summary)
 +        transferred (into {} (for [[window stat] transferred]
 +                               {window (filter-key filter-fn stat)}))
 +        stream-summary (-> stream-summary (dissoc :emitted) (assoc :emitted emitted))
 +        stream-summary (-> stream-summary (dissoc :transferred) (assoc :transferred transferred))]
 +    stream-summary))
 +
 +(defn aggregate-bolt-stats [stats-seq include-sys?]
 +  (let [stats-seq (collectify stats-seq)]
 +    (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
 +           {:acked
 +            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_acked)
 +                                   stats-seq))
 +            :failed
 +            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_failed)
 +                                   stats-seq))
 +            :executed
 +            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
 +                                   stats-seq))
 +            :process-latencies
 +            (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_process_ms_avg)
 +                                     stats-seq)
 +                                (map #(.. ^ExecutorStats % get_specific get_bolt get_acked)
 +                                     stats-seq))
 +            :execute-latencies
 +            (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_execute_ms_avg)
 +                                     stats-seq)
 +                                (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
 +                                     stats-seq))
 +            })))
 +
 +(defn aggregate-spout-stats [stats-seq include-sys?]
 +  (let [stats-seq (collectify stats-seq)]
 +    (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
 +           {:acked
 +            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_acked)
 +                                   stats-seq))
 +            :failed
 +            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_failed)
 +                                   stats-seq))
 +            :complete-latencies
 +            (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_spout get_complete_ms_avg)
 +                                     stats-seq)
 +                                (map #(.. ^ExecutorStats % get_specific get_spout get_acked)
 +                                     stats-seq))
 +            }
 +           )))
 +
 +(defn aggregate-bolt-streams [stats]
 +  {:acked (aggregate-count-streams (:acked stats))
 +   :failed (aggregate-count-streams (:failed stats))
 +   :emitted (aggregate-count-streams (:emitted stats))
 +   :transferred (aggregate-count-streams (:transferred stats))
 +   :process-latencies (aggregate-avg-streams (:process-latencies stats)
 +                                             (:acked stats))
 +   :executed (aggregate-count-streams (:executed stats))
 +   :execute-latencies (aggregate-avg-streams (:execute-latencies stats)
 +                                             (:executed stats))
 +   })
 +
 +(defn aggregate-spout-streams [stats]
 +  {:acked (aggregate-count-streams (:acked stats))
 +   :failed (aggregate-count-streams (:failed stats))
 +   :emitted (aggregate-count-streams (:emitted stats))
 +   :transferred (aggregate-count-streams (:transferred stats))
 +   :complete-latencies (aggregate-avg-streams (:complete-latencies stats)
 +                                              (:acked stats))
 +   })
 +
 +(defn spout-summary? [topology s]
 +  (= :spout (executor-summary-type topology s)))
 +
 +(defn bolt-summary? [topology s]
 +  (= :bolt (executor-summary-type topology s)))
 +
 +(defn topology-summary-table [^TopologyInfo summ]
 +  (let [executors (.get_executors summ)
 +        workers (set (for [^ExecutorSummary e executors] [(.get_host e) (.get_port e)]))]
 +    (table ["Name" "Id" "Status" "Uptime" "Num workers" "Num executors" "Num tasks"]
 +           [[(escape-html (.get_name summ))
 +             (escape-html (.get_id summ))
 +             (.get_status summ)
 +             (pretty-uptime-sec (.get_uptime_secs summ))
 +             (count workers)
 +             (count executors)
 +             (sum-tasks executors)
 +             ]]
 +           )))
 +
 +(defn total-aggregate-stats [spout-summs bolt-summs include-sys?]
 +  (let [spout-stats (get-filled-stats spout-summs)
 +        bolt-stats (get-filled-stats bolt-summs)
 +        agg-spout-stats (-> spout-stats
 +                            (aggregate-spout-stats include-sys?)
 +                            aggregate-spout-streams)
 +        agg-bolt-stats (-> bolt-stats
 +                           (aggregate-bolt-stats include-sys?)
 +                           aggregate-bolt-streams)]
 +    (merge-with
 +     (fn [s1 s2]
 +       (merge-with + s1 s2))
 +     (select-keys agg-bolt-stats [:emitted :transferred])
 +     agg-spout-stats
 +     )))
 +
 +(defn stats-times [stats-map]
 +  (sort-by #(Integer/parseInt %)
 +           (-> stats-map
 +               clojurify-structure
 +               (dissoc ":all-time")
 +               keys)))
 +
 +(defn topology-stats-table [id window stats]
 +  (let [times (stats-times (:emitted stats))
 +        display-map (into {} (for [t times] [t pretty-uptime-sec]))
 +        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
 +    (sorted-table
 +     ["Window" "Emitted" "Transferred" "Complete latency (ms)" "Acked" "Failed"]
 +     (for [k (concat times [":all-time"])
 +           :let [disp ((display-map k) k)]]
 +       [(link-to (if (= k window) {:class "red"} {})
 +                 (url-format "/topology/%s?window=%s" id k)
 +                 disp)
 +        (get-in stats [:emitted k])
 +        (get-in stats [:transferred k])
 +        (float-str (get-in stats [:complete-latencies k]))
 +        (get-in stats [:acked k])
 +        (get-in stats [:failed k])
 +        ]
 +       )
 +     :time-cols [0]
 +     )))
 +
 +(defn group-by-comp [summs]
 +  (let [ret (group-by #(.get_component_id ^ExecutorSummary %) summs)]
 +    (into (sorted-map) ret )))
 +
 +(defn error-subset [error-str]
 +  (apply str (take 200 error-str)))
 +
 +(defn most-recent-error [errors-list]
 +  (let [error (->> errors-list
 +                   (sort-by #(.get_error_time_secs ^ErrorInfo %))
 +                   reverse
 +                   first)]
 +    (if error
 +      [:span (if (< (time-delta (.get_error_time_secs ^ErrorInfo error))
 +                    (* 60 30))
 +               {:class "red"}
 +               {})
 +       (error-subset (.get_error ^ErrorInfo error))]
 +      )))
 +
 +(defn component-link [storm-id id]
 +  (link-to (url-format "/topology/%s/component/%s" storm-id id) (escape-html id)))
 +
 +(defn worker-log-link [host port]
 +  (link-to (url-format "http://%s:%s/log?file=worker-%s.log"
 +              host (*STORM-CONF* LOGVIEWER-PORT) port) (str port)))
 +
 +(defn render-capacity [capacity]
 +  (let [capacity (nil-to-zero capacity)]
 +    [:span (if (> capacity 0.9)
 +                 {:class "red"}
 +                 {})
 +           (float-str capacity)]))
 +
 +(defn compute-executor-capacity [^ExecutorSummary e]
 +  (let [stats (.get_stats e)
 +        stats (if stats
 +                (-> stats
 +                    (aggregate-bolt-stats true)
 +                    (aggregate-bolt-streams)
 +                    swap-map-order
 +                    (get "600")))
 +        uptime (nil-to-zero (.get_uptime_secs e))
 +        window (if (< uptime 600) uptime 600)
 +        executed (-> stats :executed nil-to-zero)
 +        latency (-> stats :execute-latencies nil-to-zero)
 +        ]
 +   (if (> window 0)
 +     (div (* executed latency) (* 1000 window))
 +     )))
 +
 +(defn compute-bolt-capacity [executors]
 +  (->> executors
 +       (map compute-executor-capacity)
 +       (map nil-to-zero)
 +       (apply max)))
 +
 +(defn spout-comp-table [top-id summ-map errors window include-sys?]
 +  (sorted-table
 +   ["Id" "Executors" "Tasks" "Emitted" "Transferred" "Complete latency (ms)"
 +    "Acked" "Failed" "Last error"]
 +   (for [[id summs] summ-map
 +         :let [stats-seq (get-filled-stats summs)
 +               stats (aggregate-spout-streams
 +                      (aggregate-spout-stats
 +                       stats-seq include-sys?))]]
 +     [(component-link top-id id)
 +      (count summs)
 +      (sum-tasks summs)
 +      (get-in stats [:emitted window])
 +      (get-in stats [:transferred window])
 +      (float-str (get-in stats [:complete-latencies window]))
 +      (get-in stats [:acked window])
 +      (get-in stats [:failed window])
 +      (most-recent-error (get errors id))
 +      ]
 +     )))
 +
 +(defn bolt-comp-table [top-id summ-map errors window include-sys?]
 +  (sorted-table
 +   ["Id" "Executors" "Tasks" "Emitted" "Transferred" "Capacity (last 10m)" "Execute latency (ms)" "Executed" "Process latency (ms)"
 +    "Acked" "Failed" "Last error"]
 +   (for [[id summs] summ-map
 +         :let [stats-seq (get-filled-stats summs)
 +               stats (aggregate-bolt-streams
 +                      (aggregate-bolt-stats
 +                       stats-seq include-sys?))
 +               ]]
 +     [(component-link top-id id)
 +      (count summs)
 +      (sum-tasks summs)
 +      (get-in stats [:emitted window])
 +      (get-in stats [:transferred window])
 +      (render-capacity (compute-bolt-capacity summs))
 +      (float-str (get-in stats [:execute-latencies window]))
 +      (get-in stats [:executed window])
 +      (float-str (get-in stats [:process-latencies window]))
 +      (get-in stats [:acked window])
 +      (get-in stats [:failed window])
 +      (most-recent-error (get errors id))
 +      ]
 +     )))
 +
 +(defn window-hint [window]
 +  (if (= window ":all-time")
 +    "All time"
 +    (pretty-uptime-sec window)))
 +
 +(defn topology-action-button [id name action command is-wait default-wait enabled]
 +  [:input {:type "button"
 +           :value action
 +           (if enabled :enabled :disabled) ""
 +           :onclick (str "confirmAction('" 
 +                         (StringEscapeUtils/escapeJavaScript id) "', '" 
 +                         (StringEscapeUtils/escapeJavaScript name) "', '"
 +                         command "', " is-wait ", " default-wait ")")}])
 +
 +(defn topology-page [id window include-sys?]
 +  (with-nimbus nimbus
 +    (let [window (if window window ":all-time")
 +          window-hint (window-hint window)
 +          summ (.getTopologyInfo ^Nimbus$Client nimbus id)
 +          topology (.getTopology ^Nimbus$Client nimbus id)
 +          topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus id))
 +          spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
 +          bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
 +          spout-comp-summs (group-by-comp spout-summs)
 +          bolt-comp-summs (group-by-comp bolt-summs)
 +          bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?) bolt-comp-summs)
 +          name (.get_name summ)
 +          status (.get_status summ)
 +          msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)
 +          ]
 +      (concat
 +       [[:h2 "Topology summary"]]
 +       [(topology-summary-table summ)]
 +       [[:h2 {:class "js-only"} "Topology actions"]]
 +       [[:p {:class "js-only"} (concat
 +         [(topology-action-button id name "Activate" "activate" false 0 (= "INACTIVE" status))]
 +         [(topology-action-button id name "Deactivate" "deactivate" false 0 (= "ACTIVE" status))]
 +         [(topology-action-button id name "Rebalance" "rebalance" true msg-timeout (or (= "ACTIVE" status) (= "INACTIVE" status)))]
 +         [(topology-action-button id name "Kill" "kill" true msg-timeout (not= "KILLED" status))]
 +       )]]
 +       [[:h2 "Topology stats"]]
 +       (topology-stats-table id window (total-aggregate-stats spout-summs bolt-summs include-sys?))
 +       [[:h2 "Spouts (" window-hint ")"]]
 +       (spout-comp-table id spout-comp-summs (.get_errors summ) window include-sys?)
 +       [[:h2 "Bolts (" window-hint ")"]]
 +       (bolt-comp-table id bolt-comp-summs (.get_errors summ) window include-sys?)
 +       [[:h2 "Topology Configuration"]]
 +       (configuration-table topology-conf)
 +       ))))
 +
 +(defn component-task-summs [^TopologyInfo summ topology id]
 +  (let [spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
 +        bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
 +        spout-comp-summs (group-by-comp spout-summs)
 +        bolt-comp-summs (group-by-comp bolt-summs)
 +        ret (if (contains? spout-comp-summs id)
 +              (spout-comp-summs id)
 +              (bolt-comp-summs id))]
 +    (sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start) ret)
 +    ))
 +
 +(defn spout-summary-table [topology-id id stats window]
 +  (let [times (stats-times (:emitted stats))
 +        display-map (into {} (for [t times] [t pretty-uptime-sec]))
 +        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
 +    (sorted-table
 +     ["Window" "Emitted" "Transferred" "Complete latency (ms)" "Acked" "Failed"]
 +     (for [k (concat times [":all-time"])
 +           :let [disp ((display-map k) k)]]
 +       [(link-to (if (= k window) {:class "red"} {})
 +                 (url-format "/topology/%s/component/%s?window=%s" topology-id id k)
 +                 disp)
 +        (get-in stats [:emitted k])
 +        (get-in stats [:transferred k])
 +        (float-str (get-in stats [:complete-latencies k]))
 +        (get-in stats [:acked k])
 +        (get-in stats [:failed k])
 +        ])
 +     :time-cols [0])))
 +
 +(defn spout-output-summary-table [stream-summary window]
 +  (let [stream-summary (map-val swap-map-order (swap-map-order stream-summary))]
 +    (sorted-table
 +     ["Stream" "Emitted" "Transferred" "Complete latency (ms)" "Acked" "Failed"]
 +     (for [[s stats] (stream-summary window)]
 +       [s
 +        (nil-to-zero (:emitted stats))
 +        (nil-to-zero (:transferred stats))
 +        (float-str (:complete-latencies stats))
 +        (nil-to-zero (:acked stats))
 +        (nil-to-zero (:failed stats))])
 +     )))
 +
 +(defn spout-executor-table [topology-id executors window include-sys?]
 +  (sorted-table
 +   ["Id" "Uptime" "Host" "Port" "Emitted" "Transferred"
 +    "Complete latency (ms)" "Acked" "Failed"]
 +   (for [^ExecutorSummary e executors
 +         :let [stats (.get_stats e)
 +               stats (if stats
 +                       (-> stats
 +                           (aggregate-spout-stats include-sys?)
 +                           aggregate-spout-streams
 +                           swap-map-order
 +                           (get window)))]]
 +     [(pretty-executor-info (.get_executor_info e))
 +      (pretty-uptime-sec (.get_uptime_secs e))
 +      (.get_host e)
 +      (worker-log-link (.get_host e) (.get_port e))
 +      (nil-to-zero (:emitted stats))
 +      (nil-to-zero (:transferred stats))
 +      (float-str (:complete-latencies stats))
 +      (nil-to-zero (:acked stats))
 +      (nil-to-zero (:failed stats))
 +      ]
 +     )
 +   :time-cols [1]
 +   ))
 +
 +(defn spout-page [window ^TopologyInfo topology-info component executors include-sys?]
 +  (let [window-hint (str " (" (window-hint window) ")")
 +        stats (get-filled-stats executors)
 +        stream-summary (-> stats (aggregate-spout-stats include-sys?))
 +        summary (-> stream-summary aggregate-spout-streams)]
 +    (concat
 +     [[:h2 "Spout stats"]]
 +     (spout-summary-table (.get_id topology-info) component summary window)
 +     [[:h2 "Output stats" window-hint]]
 +     (spout-output-summary-table stream-summary window)
 +     [[:h2 "Executors" window-hint]]
 +     (spout-executor-table (.get_id topology-info) executors window include-sys?)
 +     ;; task id, task uptime, stream aggregated stats, last error
 +     )))
 +
 +(defn bolt-output-summary-table [stream-summary window]
 +  (let [stream-summary (-> stream-summary
 +                           swap-map-order
 +                           (get window)
 +                           (select-keys [:emitted :transferred])
 +                           swap-map-order)]
 +    (sorted-table
 +     ["Stream" "Emitted" "Transferred"]
 +     (for [[s stats] stream-summary]
 +       [s
 +        (nil-to-zero (:emitted stats))
 +        (nil-to-zero (:transferred stats))
 +        ])
 +     )))
 +
 +(defn bolt-input-summary-table [stream-summary window]
 +  (let [stream-summary (-> stream-summary
 +                           swap-map-order
 +                           (get window)
 +                           (select-keys [:acked :failed :process-latencies :executed :execute-latencies])
 +                           swap-map-order)]
 +    (sorted-table
 +     ["Component" "Stream" "Execute latency (ms)" "Executed" "Process latency (ms)" "Acked" "Failed"]
 +     (for [[^GlobalStreamId s stats] stream-summary]
 +       [(escape-html (.get_componentId s))
 +        (.get_streamId s)
 +        (float-str (:execute-latencies stats))
 +        (nil-to-zero (:executed stats))
 +        (float-str (:process-latencies stats))
 +        (nil-to-zero (:acked stats))
 +        (nil-to-zero (:failed stats))
 +        ])
 +     )))
 +
 +(defn bolt-executor-table [topology-id executors window include-sys?]
 +  (sorted-table
 +   ["Id" "Uptime" "Host" "Port" "Emitted" "Transferred" "Capacity (last 10m)"
 +    "Execute latency (ms)" "Executed" "Process latency (ms)" "Acked" "Failed"]
 +   (for [^ExecutorSummary e executors
 +         :let [stats (.get_stats e)
 +               stats (if stats
 +                       (-> stats
 +                           (aggregate-bolt-stats include-sys?)
 +                           (aggregate-bolt-streams)
 +                           swap-map-order
 +                           (get window)))]]
 +     [(pretty-executor-info (.get_executor_info e))
 +      (pretty-uptime-sec (.get_uptime_secs e))
 +      (.get_host e)
 +      (worker-log-link (.get_host e) (.get_port e))
 +      (nil-to-zero (:emitted stats))
 +      (nil-to-zero (:transferred stats))
 +      (render-capacity (compute-executor-capacity e))
 +      (float-str (:execute-latencies stats))
 +      (nil-to-zero (:executed stats))
 +      (float-str (:process-latencies stats))
 +      (nil-to-zero (:acked stats))
 +      (nil-to-zero (:failed stats))
 +      ]
 +     )
 +   :time-cols [1]
 +   ))
 +
 +(defn bolt-summary-table [topology-id id stats window]
 +  (let [times (stats-times (:emitted stats))
 +        display-map (into {} (for [t times] [t pretty-uptime-sec]))
 +        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
 +    (sorted-table
 +     ["Window" "Emitted" "Transferred" "Execute latency (ms)" "Executed" "Process latency (ms)" "Acked" "Failed"]
 +     (for [k (concat times [":all-time"])
 +           :let [disp ((display-map k) k)]]
 +       [(link-to (if (= k window) {:class "red"} {})
 +                 (url-format "/topology/%s/component/%s?window=%s" topology-id id k)
 +                 disp)
 +        (get-in stats [:emitted k])
 +        (get-in stats [:transferred k])
 +        (float-str (get-in stats [:execute-latencies k]))
 +        (get-in stats [:executed k])
 +        (float-str (get-in stats [:process-latencies k]))
 +        (get-in stats [:acked k])
 +        (get-in stats [:failed k])
 +        ])
 +     :time-cols [0])))
 +
 +(defn bolt-page [window ^TopologyInfo topology-info component executors include-sys?]
 +  (let [window-hint (str " (" (window-hint window) ")")
 +        stats (get-filled-stats executors)
 +        stream-summary (-> stats (aggregate-bolt-stats include-sys?))
 +        summary (-> stream-summary aggregate-bolt-streams)]
 +    (concat
 +     [[:h2 "Bolt stats"]]
 +     (bolt-summary-table (.get_id topology-info) component summary window)
 +
 +     [[:h2 "Input stats" window-hint]]
 +     (bolt-input-summary-table stream-summary window)
 +
 +     [[:h2 "Output stats" window-hint]]
 +     (bolt-output-summary-table stream-summary window)
 +
 +     [[:h2 "Executors"]]
 +     (bolt-executor-table (.get_id topology-info) executors window include-sys?)
 +     )))
 +
 +(defn errors-table [errors-list]
 +  (let [errors (->> errors-list
 +                    (sort-by #(.get_error_time_secs ^ErrorInfo %))
 +                    reverse)]
 +    (sorted-table
 +     ["Time" "Error"]
 +     (for [^ErrorInfo e errors]
 +       [(date-str (.get_error_time_secs e))
 +        [:pre (.get_error e)]])
 +     :sort-list "[[0,1]]"
 +     )))
 +
 +(defn component-page [topology-id component window include-sys?]
 +  (with-nimbus nimbus
 +    (let [window (if window window ":all-time")
 +          summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id)
 +          topology (.getTopology ^Nimbus$Client nimbus topology-id)
 +          type (component-type topology component)
 +          summs (component-task-summs summ topology component)
 +          spec (cond (= type :spout) (spout-page window summ component summs include-sys?)
 +                     (= type :bolt) (bolt-page window summ component summs include-sys?))]
 +      (concat
 +       [[:h2 "Component summary"]
 +        (table ["Id" "Topology" "Executors" "Tasks"]
 +               [[(escape-html component)
 +                 (topology-link (.get_id summ) (.get_name summ))
 +                 (count summs)
 +                 (sum-tasks summs)
 +                 ]])]
 +       spec
 +       [[:h2 "Errors"]
 +        (errors-table (get (.get_errors summ) component))]
 +       ))))
 +
 +(defn get-include-sys? [cookies]
 +  (let [sys? (get cookies "sys")
 +        sys? (if (or (nil? sys?) (= "false" (:value sys?))) false true)]
 +    sys?))
 +
 +(defroutes main-routes
 +  (GET "/" [:as {cookies :cookies}]
 +       (-> (main-page)
 +           ui-template))
 +  (GET "/topology/:id" [:as {cookies :cookies} id & m]
 +       (let [include-sys? (get-include-sys? cookies)]
-          (-> (topology-page id (:window m) include-sys?)
++         (try
++           (-> (topology-page id (:window m) include-sys?)
 +             (concat [(mk-system-toggle-button include-sys?)])
-              ui-template)))
++             ui-template)
++           (catch Exception e (resp/redirect "/")))))
 +  (GET "/topology/:id/component/:component" [:as {cookies :cookies} id component & m]
 +       (let [include-sys? (get-include-sys? cookies)]
 +         (-> (component-page id component (:window m) include-sys?)
 +             (concat [(mk-system-toggle-button include-sys?)])
 +             ui-template)))
 +  (POST "/topology/:id/activate" [id]
 +    (with-nimbus nimbus
 +      (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
 +            name (.get_name tplg)]
 +        (.activate nimbus name)
 +        (log-message "Activating topology '" name "'")))
 +    (resp/redirect (str "/topology/" id)))
 +  (POST "/topology/:id/deactivate" [id]
 +    (with-nimbus nimbus
 +      (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
 +            name (.get_name tplg)]
 +        (.deactivate nimbus name)
 +        (log-message "Deactivating topology '" name "'")))
 +    (resp/redirect (str "/topology/" id)))
 +  (POST "/topology/:id/rebalance/:wait-time" [id wait-time]
 +    (with-nimbus nimbus
 +      (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
 +            name (.get_name tplg)
 +            options (RebalanceOptions.)]
 +        (.set_wait_secs options (Integer/parseInt wait-time))
 +        (.rebalance nimbus name options)
 +        (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
 +    (resp/redirect (str "/topology/" id)))
 +  (POST "/topology/:id/kill/:wait-time" [id wait-time]
 +    (with-nimbus nimbus
 +      (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
 +            name (.get_name tplg)
 +            options (KillOptions.)]
 +        (.set_wait_secs options (Integer/parseInt wait-time))
 +        (.killTopologyWithOpts nimbus name options)
 +        (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
 +    (resp/redirect (str "/topology/" id)))
 +  (route/resources "/")
 +  (route/not-found "Page not found"))
 +
 +(defn exception->html [ex]
 +  (concat
 +    [[:h2 "Internal Server Error"]]
 +    [[:pre (let [sw (java.io.StringWriter.)]
 +      (.printStackTrace ex (java.io.PrintWriter. sw))
 +      (.toString sw))]]))
 +
 +(defn catch-errors [handler]
 +  (fn [request]
 +    (try
 +      (handler request)
 +      (catch Exception ex
 +        (-> (resp/response (ui-template (exception->html ex)))
 +          (resp/status 500)
 +          (resp/content-type "text/html"))
 +        ))))
 +
 +(def app
 +  (handler/site (-> main-routes
 +                    (wrap-reload '[backtype.storm.ui.core])
 +                    catch-errors)))
 +
 +(defn start-server! [] (run-jetty app {:port (Integer. (*STORM-CONF* UI-PORT))
 +                                       :join? false}))
 +
 +(defn -main [] (start-server!))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/697849dc/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/util.clj
index 5e488e0,0000000..d83a194
mode 100644,000000..100644
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@@ -1,870 -1,0 +1,880 @@@
 +;; Licensed to the Apache Software Foundation (ASF) under one
 +;; or more contributor license agreements.  See the NOTICE file
 +;; distributed with this work for additional information
 +;; regarding copyright ownership.  The ASF licenses this file
 +;; to you under the Apache License, Version 2.0 (the
 +;; "License"); you may not use this file except in compliance
 +;; with the License.  You may obtain a copy of the License at
 +;;
 +;; http:;; www.apache.org/licenses/LICENSE-2.0
 +;;
 +;; Unless required by applicable law or agreed to in writing, software
 +;; distributed under the License is distributed on an "AS IS" BASIS,
 +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +;; See the License for the specific language governing permissions and
 +;; limitations under the License.
 +(ns backtype.storm.util
 +  (:import [java.net InetAddress])
 +  (:import [java.util Map Map$Entry List ArrayList Collection Iterator HashMap])
 +  (:import [java.io FileReader FileNotFoundException])
 +  (:import [backtype.storm Config])
 +  (:import [backtype.storm.utils Time Container ClojureTimerTask Utils
 +            MutableObject MutableInt])
 +  (:import [java.util UUID Random ArrayList List Collections])
 +  (:import [java.util.zip ZipFile])
 +  (:import [java.util.concurrent.locks ReentrantReadWriteLock])
 +  (:import [java.util.concurrent Semaphore])
 +  (:import [java.io File RandomAccessFile StringWriter PrintWriter])
 +  (:import [java.lang.management ManagementFactory])
 +  (:import [org.apache.commons.exec DefaultExecutor CommandLine])
 +  (:import [org.apache.commons.io FileUtils])
 +  (:import [org.apache.commons.exec ExecuteException])
 +  (:import [org.json.simple JSONValue])
 +  (:require [clojure [string :as str]])
 +  (:import [clojure.lang RT])
 +  (:require [clojure [set :as set]])
 +  (:use [clojure walk])
 +  (:use [backtype.storm log])
 +  )
 +
 +(defn wrap-in-runtime
 +  "Wraps an exception in a RuntimeException if needed" 
 +  [^Exception e]
 +  (if (instance? RuntimeException e)
 +    e
 +    (RuntimeException. e)))
 +
++(def on-windows?
++  (= "Windows_NT" (System/getenv "OS")))
++
++(def file-path-separator
++  (System/getProperty "file.separator"))
++
++(def class-path-separator
++  (System/getProperty "path.separator"))
++
 +(defmacro defalias
 +  "Defines an alias for a var: a new var with the same root binding (if
 +  any) and similar metadata. The metadata of the alias is its initial
 +  metadata (as provided by def) merged into the metadata of the original."
 +  ([name orig]
 +     `(do
 +        (alter-meta!
 +         (if (.hasRoot (var ~orig))
 +           (def ~name (.getRawRoot (var ~orig)))
 +           (def ~name))
 +         ;; When copying metadata, disregard {:macro false}.
 +         ;; Workaround for http://www.assembla.com/spaces/clojure/tickets/273
 +         #(conj (dissoc % :macro)
 +                (apply dissoc (meta (var ~orig)) (remove #{:macro} (keys %)))))
 +        (var ~name)))
 +  ([name orig doc]
 +     (list `defalias (with-meta name (assoc (meta name) :doc doc)) orig)))
 +
 +;; name-with-attributes by Konrad Hinsen:
 +(defn name-with-attributes
 +  "To be used in macro definitions.
 +   Handles optional docstrings and attribute maps for a name to be defined
 +   in a list of macro arguments. If the first macro argument is a string,
 +   it is added as a docstring to name and removed from the macro argument
 +   list. If afterwards the first macro argument is a map, its entries are
 +   added to the name's metadata map and the map is removed from the
 +   macro argument list. The return value is a vector containing the name
 +   with its extended metadata map and the list of unprocessed macro
 +   arguments."
 +  [name macro-args]
 +  (let [[docstring macro-args] (if (string? (first macro-args))
 +                                 [(first macro-args) (next macro-args)]
 +                                 [nil macro-args])
 +    [attr macro-args]          (if (map? (first macro-args))
 +                                 [(first macro-args) (next macro-args)]
 +                                 [{} macro-args])
 +    attr                       (if docstring
 +                                 (assoc attr :doc docstring)
 +                                 attr)
 +    attr                       (if (meta name)
 +                                 (conj (meta name) attr)
 +                                 attr)]
 +    [(with-meta name attr) macro-args]))
 +
 +(defmacro defnk
 + "Define a function accepting keyword arguments. Symbols up to the first
 + keyword in the parameter list are taken as positional arguments.  Then
 + an alternating sequence of keywords and defaults values is expected. The
 + values of the keyword arguments are available in the function body by
 + virtue of the symbol corresponding to the keyword (cf. :keys destructuring).
 + defnk accepts an optional docstring as well as an optional metadata map."
 + [fn-name & fn-tail]
 + (let [[fn-name [args & body]] (name-with-attributes fn-name fn-tail)
 +       [pos kw-vals]           (split-with symbol? args)
 +       syms                    (map #(-> % name symbol) (take-nth 2 kw-vals))
 +       values                  (take-nth 2 (rest kw-vals))
 +       sym-vals                (apply hash-map (interleave syms values))
 +       de-map                  {:keys (vec syms)
 +                                :or   sym-vals}]
 +   `(defn ~fn-name
 +      [~@pos & options#]
 +      (let [~de-map (apply hash-map options#)]
 +        ~@body))))
 +
 +(defn find-first
 +  "Returns the first item of coll for which (pred item) returns logical true.
 +  Consumes sequences up to the first match, will consume the entire sequence
 +  and return nil if no match is found."
 +  [pred coll]
 +  (first (filter pred coll)))
 +
 +(defn dissoc-in
 +  "Dissociates an entry from a nested associative structure returning a new
 +  nested structure. keys is a sequence of keys. Any empty maps that result
 +  will not be present in the new structure."
 +  [m [k & ks :as keys]]
 +  (if ks
 +    (if-let [nextmap (get m k)]
 +      (let [newmap (dissoc-in nextmap ks)]
 +        (if (seq newmap)
 +          (assoc m k newmap)
 +          (dissoc m k)))
 +      m)
 +    (dissoc m k)))
 +
 +(defn indexed
 +  "Returns a lazy sequence of [index, item] pairs, where items come
 +  from 's' and indexes count up from zero.
 +
 +  (indexed '(a b c d))  =>  ([0 a] [1 b] [2 c] [3 d])"
 +  [s]
 +  (map vector (iterate inc 0) s))
 +
 +(defn positions
 +  "Returns a lazy sequence containing the positions at which pred
 +   is true for items in coll."
 +  [pred coll]
 +  (for [[idx elt] (indexed coll) :when (pred elt)] idx))
 +
 +(defn exception-cause? [klass ^Throwable t]
 +  (->> (iterate #(.getCause ^Throwable %) t)
 +       (take-while identity)
 +       (some (partial instance? klass))
 +       boolean))
 +
 +(defmacro thrown-cause? [klass & body]
 +  `(try
 +    ~@body
 +    false
 +    (catch Throwable t#
 +      (exception-cause? ~klass t#))))
 +
 +(defmacro thrown-cause-with-msg? [klass re & body]
 +  `(try
 +    ~@body
 +    false
 +    (catch Throwable t#
 +      (and (re-matches ~re (.getMessage t#))
 +        (exception-cause? ~klass t#)))))
 +
 +(defmacro forcat [[args aseq] & body]
 +  `(mapcat (fn [~args]
 +             ~@body)
 +           ~aseq))
 +
 +(defmacro try-cause [& body]
 +  (let [checker (fn [form]
 +                  (or (not (sequential? form))
 +                      (not= 'catch (first form))))
 +        [code guards] (split-with checker body)
 +        error-local (gensym "t")
 +        guards (forcat [[_ klass local & guard-body] guards]
 +                 `((exception-cause? ~klass ~error-local)
 +                   (let [~local ~error-local]
 +                     ~@guard-body
 +                     )))
 +        ]
 +    `(try ~@code
 +          (catch Throwable ~error-local
 +            (cond ~@guards
 +                  true (throw ~error-local)
 +                  )))))
 +
 +(defn local-hostname []
 +  (.getCanonicalHostName (InetAddress/getLocalHost)))
 +
 +(letfn [(try-port [port]
 +          (with-open [socket (java.net.ServerSocket. port)]
 +            (.getLocalPort socket)))]
 +  (defn available-port
 +    ([] (try-port 0))
 +    ([preferred]
 +      (try
 +        (try-port preferred)
 +        (catch java.io.IOException e
 +          (available-port))))))
 +
 +(defn uuid []
 +  (str (UUID/randomUUID)))
 +
 +(defn current-time-secs []
 +  (Time/currentTimeSecs))
 +
 +(defn current-time-millis []
 +  (Time/currentTimeMillis))
 +
 +(defn clojurify-structure [s]
 +  (prewalk (fn [x]
 +              (cond (instance? Map x) (into {} x)
 +                    (instance? List x) (vec x)
 +                    true x))
 +           s))
 +
 +(defmacro with-file-lock [path & body]
 +  `(let [f# (File. ~path)
 +         _# (.createNewFile f#)
 +         rf# (RandomAccessFile. f# "rw")
 +         lock# (.. rf# (getChannel) (lock))]
 +      (try
 +        ~@body
 +        (finally
 +          (.release lock#)
 +          (.close rf#))
 +        )))
 +
 +(defn tokenize-path [^String path]
 +  (let [toks (.split path "/")]
 +    (vec (filter (complement empty?) toks))
 +    ))
 +
 +(defn assoc-conj [m k v]
 +  (merge-with concat m {k [v]}))
 +
 +;; returns [ones in first set not in second, ones in second set not in first]
 +(defn set-delta [old curr]
 +  (let [s1 (set old)
 +        s2 (set curr)]
 +    [(set/difference s1 s2) (set/difference s2 s1)]
 +    ))
 +
 +(defn parent-path [path]
 +  (let [toks (tokenize-path path)]
 +    (str "/" (str/join "/" (butlast toks)))
 +    ))
 +
 +(defn toks->path [toks]
 +  (str "/" (str/join "/" toks))
 +  )
 +
 +(defn normalize-path [^String path]
 +  (toks->path (tokenize-path path)))
 +
 +(defn map-val [afn amap]
 +  (into {}
 +    (for [[k v] amap]
 +      [k (afn v)]
 +      )))
 +
 +(defn filter-val [afn amap]
 +  (into {}
 +    (filter
 +      (fn [[k v]]
 +        (afn v))
 +       amap
 +       )))
 +
 +(defn filter-key [afn amap]
 +  (into {}
 +    (filter
 +      (fn [[k v]]
 +        (afn k))
 +       amap
 +       )))
 +
 +(defn map-key [afn amap]
 +  (into {}
 +    (for [[k v] amap]
 +      [(afn k) v]
 +      )))
 +
 +(defn separate [pred aseq]
 +  [(filter pred aseq) (filter (complement pred) aseq)])
 +
 +(defn full-path [parent name]
 +  (let [toks (tokenize-path parent)]
 +    (toks->path (conj toks name))
 +    ))
 +
 +(def not-nil? (complement nil?))
 +
 +(defn barr [& vals]
 +  (byte-array (map byte vals)))
 +
 +(defn halt-process! [val & msg]
 +  (log-message "Halting process: " msg)
 +  (.halt (Runtime/getRuntime) val)
 +  )
 +
 +(defn sum [vals]
 +  (reduce + vals))
 +
 +(defn repeat-seq
 +  ([aseq]
 +    (apply concat (repeat aseq)))
 +  ([amt aseq]
 +    (apply concat (repeat amt aseq))
 +    ))
 +
 +(defn div
 +  "Perform floating point division on the arguments."
 +  [f & rest] (apply / (double f) rest))
 +
 +(defn defaulted [val default]
 +  (if val val default))
 +
 +(defn mk-counter
 +  ([] (mk-counter 1))
 +  ([start-val]
 +     (let [val (atom (dec start-val))]
 +       (fn []
 +         (swap! val inc)))))
 +
 +(defmacro for-times [times & body]
 +  `(for [i# (range ~times)]
 +     ~@body
 +     ))
 +
 +(defmacro dofor [& body]
 +  `(doall (for ~@body)))
 +
 +(defn reverse-map
 +  "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
 +  [amap]
 +  (reduce (fn [m [k v]]
 +    (let [existing (get m v [])]
 +      (assoc m v (conj existing k))))
 +    {} amap))
 +
 +(defmacro print-vars [& vars]
 +  (let [prints (for [v vars] `(println ~(str v) ~v))]
 +    `(do ~@prints)))
 +
 +(defn process-pid
 +  "Gets the pid of this JVM. Hacky because Java doesn't provide a real way to do this."
 +  []
 +  (let [name (.getName (ManagementFactory/getRuntimeMXBean))
 +        split (.split name "@")]
 +    (when-not (= 2 (count split))
 +      (throw (RuntimeException. (str "Got unexpected process name: " name))))
 +    (first split)
 +    ))
 +
 +(defn exec-command! [command]
 +  (let [[comm-str & args] (seq (.split command " "))
 +        command (CommandLine. comm-str)]
 +    (doseq [a args]
 +      (.addArgument command a))
 +    (.execute (DefaultExecutor.) command)
 +    ))
 +
 +(defn extract-dir-from-jar [jarpath dir destdir]
 +  (try-cause
 +    (exec-command! (str "unzip -qq " jarpath " " dir "/** -d " destdir))
 +  (catch ExecuteException e
 +    (log-message "Could not extract " dir " from " jarpath))
 +  ))
 +
 +(defn ensure-process-killed! [pid]
 +  ;; TODO: should probably do a ps ax of some sort to make sure it was killed
 +  (try-cause
-     (exec-command! (str "kill -9 " pid))
++    (exec-command! (str (if on-windows? "taskkill /f /pid " "kill -9 ") pid))
 +  (catch ExecuteException e
 +    (log-message "Error when trying to kill " pid ". Process is probably already dead."))
 +    ))
 +
 +(defnk launch-process [command :environment {}]
 +  (let [command (->> (seq (.split command " "))
 +                     (filter (complement empty?)))
 +        builder (ProcessBuilder. command)
 +        process-env (.environment builder)]
 +    (doseq [[k v] environment]
 +      (.put process-env k v))
 +    (.start builder)
 +    ))
 +
 +(defn sleep-secs [secs]
 +  (when (pos? secs)
 +    (Time/sleep (* (long secs) 1000))))
 +
 +(defn sleep-until-secs [target-secs]
 +  (Time/sleepUntil (* (long target-secs) 1000)))
 +
 +(defprotocol SmartThread
 +  (start [this])
 +  (join [this])
 +  (interrupt [this])
 +  (sleeping? [this]))
 +
 +;; afn returns amount of time to sleep
 +(defnk async-loop [afn
 +                   :daemon false
 +                   :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
 +                   :priority Thread/NORM_PRIORITY
 +                   :factory? false
 +                   :start true
 +                   :thread-name nil]
 +  (let [thread (Thread.
 +                (fn []
 +                  (try-cause
 +                    (let [afn (if factory? (afn) afn)]
 +                      (loop []
 +                        (let [sleep-time (afn)]
 +                          (when-not (nil? sleep-time)
 +                            (sleep-secs sleep-time)
 +                            (recur))
 +                          )))
 +                    (catch InterruptedException e
 +                      (log-message "Async loop interrupted!")
 +                      )
 +                    (catch Throwable t
 +                      (log-error t "Async loop died!")
 +                      (kill-fn t)
 +                      ))
 +                  ))]
 +    (.setDaemon thread daemon)
 +    (.setPriority thread priority)
 +    (when thread-name
 +      (.setName thread (str (.getName thread) "-" thread-name)))
 +    (when start
 +      (.start thread))
 +    ;; should return object that supports stop, interrupt, join, and waiting?
 +    (reify SmartThread
 +      (start [this]
 +        (.start thread))
 +      (join [this]
 +        (.join thread))
 +      (interrupt [this]
 +        (.interrupt thread))
 +      (sleeping? [this]
 +        (Time/isThreadWaiting thread)
 +        ))
 +      ))
 +
 +(defn exists-file? [path]
 +  (.exists (File. path)))
 +
 +(defn rmr [path]
 +  (log-debug "Rmr path " path)
 +  (when (exists-file? path)
 +    (try
 +      (FileUtils/forceDelete (File. path))
 +    (catch FileNotFoundException e))))
 +
 +(defn rmpath
 +  "Removes file or directory at the path. Not recursive. Throws exception on failure"
 +  [path]
 +  (log-debug "Removing path " path)
 +  (when (exists-file? path)
 +    (let [deleted? (.delete (File. path))]
 +      (when-not deleted?
 +        (throw (RuntimeException. (str "Failed to delete " path))))
 +      )))
 +
 +(defn local-mkdirs
 +  [path]
 +  (log-debug "Making dirs at " path)
 +  (FileUtils/forceMkdir (File. path)))
 +
 +(defn touch [path]
 +  (log-debug "Touching file at " path)
-   (let [success? (.createNewFile (File. path))]
++  (let [success? (do (if on-windows? (.mkdirs (.getParentFile (File. path))))
++                     (.createNewFile (File. path)))]
 +    (when-not success?
 +      (throw (RuntimeException. (str "Failed to touch " path))))
 +    ))
 +
 +(defn read-dir-contents [dir]
 +  (if (exists-file? dir)
 +    (let [content-files (.listFiles (File. dir))]
 +      (map #(.getName ^File %) content-files))
 +    [] ))
 +
 +(defn compact [aseq]
 +  (filter (complement nil?) aseq))
 +
 +(defn current-classpath []
 +  (System/getProperty "java.class.path"))
 +
 +(defn add-to-classpath [classpath paths]
-   (str/join ":" (cons classpath paths)))
++  (str/join class-path-separator (cons classpath paths)))
 +
 +(defn ^ReentrantReadWriteLock mk-rw-lock []
 +  (ReentrantReadWriteLock.))
 +
 +(defmacro read-locked [rw-lock & body]
 +  (let [lock (with-meta rw-lock {:tag `ReentrantReadWriteLock})]
 +    `(let [rlock# (.readLock ~lock)]
 +       (try (.lock rlock#)
 +            ~@body
 +            (finally (.unlock rlock#))))))
 +
 +(defmacro write-locked [rw-lock & body]
 +  (let [lock (with-meta rw-lock {:tag `ReentrantReadWriteLock})]
 +    `(let [wlock# (.writeLock ~lock)]
 +       (try (.lock wlock#)
 +            ~@body
 +            (finally (.unlock wlock#))))))
 +
 +(defn wait-for-condition [apredicate]
 +  (while (not (apredicate))
 +    (Time/sleep 100)
 +    ))
 +
 +(defn some? [pred aseq]
 +  ((complement nil?) (some pred aseq)))
 +
 +(defn time-delta [time-secs]
 +  (- (current-time-secs) time-secs))
 +
 +(defn time-delta-ms [time-ms]
 +  (- (System/currentTimeMillis) (long time-ms)))
 +
 +(defn parse-int [str]
 +  (Integer/valueOf str))
 +
 +(defn integer-divided [sum num-pieces]
 +  (clojurify-structure (Utils/integerDivided sum num-pieces)))
 +
 +(defn collectify [obj]
 +  (if (or (sequential? obj) (instance? Collection obj)) obj [obj]))
 +
 +(defn to-json [obj]
 +  (JSONValue/toJSONString obj))
 +
 +(defn from-json [^String str]
 +  (if str
 +    (clojurify-structure
 +     (JSONValue/parse str))
 +    nil
 +    ))
 +
 +(defmacro letlocals [& body]
 +   (let [[tobind lexpr] (split-at (dec (count body)) body)
 +         binded (vec (mapcat (fn [e]
 +                  (if (and (list? e) (= 'bind (first e)))
 +                     [(second e) (last e)]
 +                     ['_ e]
 +                     ))
 +                  tobind ))]
 +     `(let ~binded
 +         ~(first lexpr)
 +      )))
 +
 +(defn remove-first [pred aseq]
 +  (let [[b e] (split-with (complement pred) aseq)]
 +    (when (empty? e)
 +      (throw (IllegalArgumentException. "Nothing to remove")))
 +    (concat b (rest e))
 +    ))
 +
 +(defn assoc-non-nil [m k v]
 +  (if v (assoc m k v) m))
 +
 +(defn multi-set
 +  "Returns a map of elem to count"
 +  [aseq]
 +  (apply merge-with +
 +         (map #(hash-map % 1) aseq)))
 +
 +(defn set-var-root* [avar val]
 +  (alter-var-root avar (fn [avar] val)))
 +
 +(defmacro set-var-root [var-sym val]
 +  `(set-var-root* (var ~var-sym) ~val))
 +
 +(defmacro with-var-roots [bindings & body]
 +  (let [settings (partition 2 bindings)
 +        tmpvars (repeatedly (count settings) (partial gensym "old"))
 +        vars (map first settings)
 +        savevals (vec (mapcat (fn [t v] [t v]) tmpvars vars))
 +        setters (for [[v s] settings] `(set-var-root ~v ~s))
 +        restorers (map (fn [v s] `(set-var-root ~v ~s)) vars tmpvars)
 +        ]
 +    `(let ~savevals
 +      ~@setters
 +      (try
 +        ~@body
 +      (finally
 +        ~@restorers))
 +      )))
 +
 +(defn map-diff
 +  "Returns mappings in m2 that aren't in m1"
 +  [m1 m2]
 +  (into {}
 +    (filter
 +      (fn [[k v]] (not= v (m1 k)))
 +      m2
 +      )))
 +
 +
 +(defn select-keys-pred [pred amap]
 +  (into {}
 +        (filter
 +         (fn [[k v]]
 +           (pred k))
 +         amap)))
 +
 +
 +(defn rotating-random-range [choices]
 +  (let [rand (Random.)
 +        choices (ArrayList. choices)]
 +    (Collections/shuffle choices rand)
 +    [(MutableInt. -1) choices rand]))
 +
 +(defn acquire-random-range-id [[^MutableInt curr ^List state ^Random rand]]
 +  (when (>= (.increment curr) (.size state))
 +    (.set curr 0)
 +    (Collections/shuffle state rand))
 +  (.get state (.get curr)))
 +
 +; this can be rewritten to be tail recursive
 +(defn interleave-all [& colls]
 +  (if (empty? colls)
 +    []
 +    (let [colls (filter (complement empty?) colls)
 +          my-elems (map first colls)
 +          rest-elems (apply interleave-all (map rest colls))]
 +      (concat my-elems rest-elems)
 +      )))
 +
 +(defn update [m k afn]
 +  (assoc m k (afn (get m k))))
 +
 +(defn any-intersection [& sets]
 +  (let [elem->count (multi-set (apply concat sets))]
 +    (-> (filter-val #(> % 1) elem->count)
 +        keys
 +        )))
 +
 +(defn between?
 +  "val >= lower and val <= upper"
 +  [val lower upper]
 +  (and (>= val lower)
 +       (<= val upper)))
 +
 +(defmacro benchmark [& body]
 +  `(let [l# (doall (range 1000000))]
 +     (time
 +       (doseq [i# l#]
 +         ~@body))))
 +
 +(defn rand-sampler [freq]
 +  (let [r (java.util.Random.)]
 +    (fn []
 +      (= 0 (.nextInt r freq)))
 +    ))
 +
 +(defn even-sampler [freq]
 +  (let [freq (int freq)
 +        start (int 0)
 +        r (java.util.Random.)
 +        curr (MutableInt. -1)
 +        target (MutableInt. (.nextInt r freq))]
 +    (with-meta
 +      (fn []
 +        (let [i (.increment curr)]
 +          (when (>= i freq)
 +            (.set curr start)
 +            (.set target (.nextInt r freq))))
 +          (= (.get curr) (.get target)))
 +      {:rate freq})))
 +
 +(defn sampler-rate [sampler]
 +  (:rate (meta sampler)))
 +
 +(defn class-selector [obj & args] (class obj))
 +
 +(defn uptime-computer []
 +  (let [start-time (current-time-secs)]
 +    (fn []
 +      (time-delta start-time)
 +      )))
 +
 +(defn stringify-error [error]
 +  (let [result (StringWriter.)
 +        printer (PrintWriter. result)]
 +    (.printStackTrace error printer)
 +    (.toString result)
 +    ))
 +
 +(defn nil-to-zero [v]
 +  (or v 0))
 +
 +(defn bit-xor-vals [vals]
 +  (reduce bit-xor 0 vals))
 +
 +(defmacro with-error-reaction [afn & body]
 +  `(try ~@body
 +     (catch Throwable t# (~afn t#))))
 +
 +(defn container []
 +  (Container.))
 +
 +(defn container-set! [^Container container obj]
 +  (set! (. container object) obj)
 +  container)
 +
 +(defn container-get [^Container container]
 +  (. container object))
 +
 +(defn to-millis [secs]
 +  (* 1000 (long secs)))
 +
 +(defn throw-runtime [& strs]
 +  (throw (RuntimeException. (apply str strs))))
 +
 +(defn redirect-stdio-to-slf4j! []
 +  ;; set-var-root doesn't work with *out* and *err*, so digging much deeper here
 +  ;; Unfortunately, this code seems to work at the REPL but not when spawned as worker processes
 +  ;; it might have something to do with being a child process
 +  ;; (set! (. (.getThreadBinding RT/OUT) val)
 +  ;;       (java.io.OutputStreamWriter.
 +  ;;         (log-stream :info "STDIO")))
 +  ;; (set! (. (.getThreadBinding RT/ERR) val)
 +  ;;       (PrintWriter.
 +  ;;         (java.io.OutputStreamWriter.
 +  ;;           (log-stream :error "STDIO"))
 +  ;;         true))
 +  (log-capture! "STDIO"))
 +
 +(defn spy [prefix val]
 +  (log-message prefix ": " val)
 +  val)
 +
 +(defn zip-contains-dir? [zipfile target]
 +  (let [entries (->> zipfile (ZipFile.) .entries enumeration-seq (map (memfn getName)))]
-     (some? #(.startsWith % (str target "/")) entries)
++    (some? #(.startsWith % (str target file-path-separator)) entries)
 +    ))
 +
 +(defn url-encode [s]
 +  (java.net.URLEncoder/encode s))
 +
 +(defn join-maps [& maps]
 +  (let [all-keys (apply set/union (for [m maps] (-> m keys set)))]
 +    (into {}
 +      (for [k all-keys]
 +        [k (for [m maps] (m k))]
 +        ))))
 +
 +(defn partition-fixed [max-num-chunks aseq]
 +  (if (zero? max-num-chunks)
 +    []
 +    (let [chunks (->> (integer-divided (count aseq) max-num-chunks)
 +                      (#(dissoc % 0))
 +                      (sort-by (comp - first))
 +                      (mapcat (fn [[size amt]] (repeat amt size)))
 +                      )]
 +      (loop [result []
 +             [chunk & rest-chunks] chunks
 +             data aseq]
 +        (if (nil? chunk)
 +          result
 +          (let [[c rest-data] (split-at chunk data)]
 +            (recur (conj result c)
 +                   rest-chunks
 +                   rest-data)))))))
 +
 +
 +(defn assoc-apply-self [curr key afn]
 +  (assoc curr key (afn curr)))
 +
 +(defmacro recursive-map [& forms]
 +  (->> (partition 2 forms)
 +       (map (fn [[key form]] `(assoc-apply-self ~key (fn [~'<>] ~form))))
 +       (concat `(-> {}))))
 +
 +(defn current-stack-trace []
 +  (->> (Thread/currentThread)
 +       .getStackTrace
 +       (map str)
 +       (str/join "\n")
 +       ))
 +
 +(defn get-iterator [^Iterable alist]
 +  (if alist (.iterator alist)))
 +
 +(defn iter-has-next? [^Iterator iter]
 +  (if iter (.hasNext iter) false))
 +
 +(defn iter-next [^Iterator iter]
 +  (.next iter))
 +
 +(defmacro fast-list-iter [pairs & body]
 +  (let [pairs (partition 2 pairs)
 +        lists (map second pairs)
 +        elems (map first pairs)
 +        iters (map (fn [_] (gensym)) lists)
 +        bindings (->> (map (fn [i l] [i `(get-iterator ~l)]) iters lists) (apply concat))
 +        tests (map (fn [i] `(iter-has-next? ~i)) iters)
 +        assignments (->> (map (fn [e i] [e `(iter-next ~i)]) elems iters) (apply concat))]
 +    `(let [~@bindings]
 +       (while (and ~@tests)
 +         (let [~@assignments]
 +           ~@body
 +           )))))
 +
 +(defn fast-list-map [afn alist]
 +  (let [ret (ArrayList.)]
 +    (fast-list-iter [e alist]
 +      (.add ret (afn e)))
 +    ret ))
 +
 +(defmacro fast-list-for [[e alist] & body]
 +  `(fast-list-map (fn [~e] ~@body) ~alist))
 +
 +(defn map-iter [^Map amap]
 +  (if amap (-> amap .entrySet .iterator)))
 +
 +(defn convert-entry [^Map$Entry entry]
 +  [(.getKey entry) (.getValue entry)])
 +
 +(defmacro fast-map-iter [[bind amap] & body]
 +  `(let [iter# (map-iter ~amap)]
 +    (while (iter-has-next? iter#)
 +      (let [entry# (iter-next iter#)
 +            ~bind (convert-entry entry#)]
 +        ~@body
 +        ))))
 +
 +(defn fast-first [^List alist]
 +  (.get alist 0))
 +
 +(defmacro get-with-default [amap key default-val]
 +  `(let [curr# (.get ~amap ~key)]
 +     (if curr#
 +       curr#
 +       (do
 +         (let [new# ~default-val]
 +           (.put ~amap ~key new#)
 +           new#
 +           )))))
 +
 +(defn fast-group-by [afn alist]
 +  (let [ret (HashMap.)]
 +    (fast-list-iter [e alist]
 +      (let [key (afn e)
 +            ^List curr (get-with-default ret key (ArrayList.))]
 +        (.add curr e)))
 +    ret ))
 +
 +(defn new-instance [klass]
 +  (let [klass (if (string? klass) (Class/forName klass) klass)]
 +    (.newInstance klass)
 +    ))
 +
 +(defmacro -<>
 +  ([x] x)
 +  ([x form] (if (seq? form)
 +              (with-meta
 +                (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
 +                  (concat begin [x] end))
 +                (meta form))
 +              (list form x)))
 +  ([x form & more] `(-<> (-<> ~x ~form) ~@more)))