You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:26 UTC
[30/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs
to org.apache.storm, but try to provide some form of backwards compatability
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
deleted file mode 100644
index f17a63d..0000000
--- a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
+++ /dev/null
@@ -1,1199 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.daemon.logviewer
- (:use compojure.core)
- (:use [clojure.set :only [difference intersection]])
- (:use [clojure.string :only [blank? split]])
- (:use [hiccup core page-helpers form-helpers])
- (:use [backtype.storm config util log timer])
- (:use [backtype.storm.ui helpers])
- (:import [backtype.storm.utils Utils VersionInfo])
- (:import [org.slf4j LoggerFactory])
- (:import [java.util Arrays ArrayList HashSet])
- (:import [java.util.zip GZIPInputStream])
- (:import [org.apache.logging.log4j LogManager])
- (:import [org.apache.logging.log4j.core Appender LoggerContext])
- (:import [org.apache.logging.log4j.core.appender RollingFileAppender])
- (:import [java.io BufferedInputStream File FileFilter FileInputStream
- InputStream InputStreamReader])
- (:import [java.nio.file Files Path Paths DirectoryStream])
- (:import [java.nio ByteBuffer])
- (:import [backtype.storm.utils Utils])
- (:import [backtype.storm.daemon DirectoryCleaner])
- (:import [org.yaml.snakeyaml Yaml]
- [org.yaml.snakeyaml.constructor SafeConstructor])
- (:import [backtype.storm.ui InvalidRequestException]
- [backtype.storm.security.auth AuthUtils])
- (:require [backtype.storm.daemon common [supervisor :as supervisor]])
- (:require [compojure.route :as route]
- [compojure.handler :as handler]
- [ring.middleware.keyword-params]
- [ring.util.codec :as codec]
- [ring.util.response :as resp]
- [clojure.string :as string])
- (:require [metrics.meters :refer [defmeter mark!]])
- (:use [backtype.storm.daemon.common :only [start-metrics-reporters]])
- (:gen-class))
-
-(def ^:dynamic *STORM-CONF* (read-storm-config))
-(def STORM-VERSION (VersionInfo/getVersion))
-
-(defmeter logviewer:num-log-page-http-requests)
-(defmeter logviewer:num-daemonlog-page-http-requests)
-(defmeter logviewer:num-download-log-file-http-requests)
-(defmeter logviewer:num-download-log-daemon-file-http-requests)
-(defmeter logviewer:num-list-logs-http-requests)
-
-(defn cleanup-cutoff-age-millis [conf now-millis]
- (- now-millis (* (conf LOGVIEWER-CLEANUP-AGE-MINS) 60 1000)))
-
-(defn get-stream-for-dir
- [^File f]
- (try (Files/newDirectoryStream (.toPath f))
- (catch Exception ex (log-error ex) nil)))
-
-(defn- last-modifiedtime-worker-logdir
- "Return the last modified time for all log files in a worker's log dir.
- Using stream rather than File.listFiles is to avoid large mem usage
- when a directory has too many files"
- [^File log-dir]
- (let [^DirectoryStream stream (get-stream-for-dir log-dir)
- dir-modified (.lastModified log-dir)
- last-modified (try (reduce
- (fn [maximum path]
- (let [curr (.lastModified (.toFile path))]
- (if (> curr maximum)
- curr
- maximum)))
- dir-modified
- stream)
- (catch Exception ex
- (log-error ex) dir-modified)
- (finally
- (if (instance? DirectoryStream stream)
- (.close stream))))]
- last-modified))
-
-(defn get-size-for-logdir
- "Return the sum of lengths for all log files in a worker's log dir.
- Using stream rather than File.listFiles is to avoid large mem usage
- when a directory has too many files"
- [log-dir]
- (let [^DirectoryStream stream (get-stream-for-dir log-dir)]
- (reduce
- (fn [sum path]
- (let [size (.length (.toFile path))]
- (+ sum size)))
- 0
- stream)))
-
-(defn mk-FileFilter-for-log-cleanup [conf now-millis]
- (let [cutoff-age-millis (cleanup-cutoff-age-millis conf now-millis)]
- (reify FileFilter (^boolean accept [this ^File file]
- (boolean (and
- (not (.isFile file))
- (<= (last-modifiedtime-worker-logdir file) cutoff-age-millis)))))))
-
-(defn select-dirs-for-cleanup [conf now-millis root-dir]
- (let [file-filter (mk-FileFilter-for-log-cleanup conf now-millis)]
- (reduce clojure.set/union
- (sorted-set)
- (for [^File topo-dir (.listFiles (File. root-dir))]
- (into [] (.listFiles topo-dir file-filter))))))
-
-(defn get-topo-port-workerlog
- "Return the path of the worker log with the format of topoId/port/worker.log.*"
- [^File file]
- (clojure.string/join file-path-separator
- (take-last 3
- (split (.getCanonicalPath file) (re-pattern file-path-separator)))))
-
-(defn get-metadata-file-for-log-root-name [root-name root-dir]
- (let [metaFile (clojure.java.io/file root-dir "metadata"
- (str root-name ".yaml"))]
- (if (.exists metaFile)
- metaFile
- (do
- (log-warn "Could not find " (.getCanonicalPath metaFile)
- " to clean up for " root-name)
- nil))))
-
-(defn get-metadata-file-for-wroker-logdir [logdir]
- (let [metaFile (clojure.java.io/file logdir "worker.yaml")]
- (if (.exists metaFile)
- metaFile
- (do
- (log-warn "Could not find " (.getCanonicalPath metaFile)
- " to clean up for " logdir)
- nil))))
-
-(defn get-worker-id-from-metadata-file [metaFile]
- (get (clojure-from-yaml-file metaFile) "worker-id"))
-
-(defn get-topo-owner-from-metadata-file [metaFile]
- (get (clojure-from-yaml-file metaFile) TOPOLOGY-SUBMITTER-USER))
-
-(defn identify-worker-log-dirs [log-dirs]
- "return the workerid to worker-log-dir map"
- (into {} (for [logdir log-dirs
- :let [metaFile (get-metadata-file-for-wroker-logdir logdir)]
- :when metaFile]
- {(get-worker-id-from-metadata-file metaFile) logdir})))
-
-(defn get-alive-ids
- [conf now-secs]
- (->>
- (supervisor/read-worker-heartbeats conf)
- (remove
- #(or (not (val %))
- (supervisor/is-worker-hb-timed-out? now-secs
- (val %)
- conf)))
- keys
- set))
-
-(defn get-dead-worker-dirs
- "Return a sorted set of java.io.Files that were written by workers that are
- now dead"
- [conf now-secs log-dirs]
- (if (empty? log-dirs)
- (sorted-set)
- (let [alive-ids (get-alive-ids conf now-secs)
- id->dir (identify-worker-log-dirs log-dirs)]
- (apply sorted-set
- (for [[id dir] id->dir
- :when (not (contains? alive-ids id))]
- dir)))))
-
-(defn get-all-worker-dirs [^File root-dir]
- (reduce clojure.set/union
- (sorted-set)
- (for [^File topo-dir (.listFiles root-dir)]
- (into [] (.listFiles topo-dir)))))
-
-(defn get-alive-worker-dirs
- "Return a sorted set of java.io.Files that were written by workers that are
- now active"
- [conf root-dir]
- (let [alive-ids (get-alive-ids conf (current-time-secs))
- log-dirs (get-all-worker-dirs root-dir)
- id->dir (identify-worker-log-dirs log-dirs)]
- (apply sorted-set
- (for [[id dir] id->dir
- :when (contains? alive-ids id)]
- (.getCanonicalPath dir)))))
-
-(defn get-all-logs-for-rootdir [^File log-dir]
- (reduce concat
- (for [port-dir (get-all-worker-dirs log-dir)]
- (into [] (DirectoryCleaner/getFilesForDir port-dir)))))
-
-(defn is-active-log [^File file]
- (re-find #"\.(log|err|out|current|yaml|pid)$" (.getName file)))
-
-(defn sum-file-size
- "Given a sequence of Files, sum their sizes."
- [files]
- (reduce #(+ %1 (.length %2)) 0 files))
-
-(defn per-workerdir-cleanup!
- "Delete the oldest files in each overloaded worker log dir"
- [^File root-dir size ^DirectoryCleaner cleaner]
- (dofor [worker-dir (get-all-worker-dirs root-dir)]
- (.deleteOldestWhileTooLarge cleaner (ArrayList. [worker-dir]) size true nil)))
-
-(defn global-log-cleanup!
- "Delete the oldest files in overloaded worker-artifacts globally"
- [^File root-dir size ^DirectoryCleaner cleaner]
- (let [worker-dirs (ArrayList. (get-all-worker-dirs root-dir))
- alive-worker-dirs (HashSet. (get-alive-worker-dirs *STORM-CONF* root-dir))]
- (.deleteOldestWhileTooLarge cleaner worker-dirs size false alive-worker-dirs)))
-
-(defn cleanup-empty-topodir!
- "Delete the topo dir if it contains zero port dirs"
- [^File dir]
- (let [topodir (.getParentFile dir)]
- (if (empty? (.listFiles topodir))
- (rmr (.getCanonicalPath topodir)))))
-
-(defn cleanup-fn!
- "Delete old log dirs for which the workers are no longer alive"
- [log-root-dir]
- (let [now-secs (current-time-secs)
- old-log-dirs (select-dirs-for-cleanup *STORM-CONF*
- (* now-secs 1000)
- log-root-dir)
- total-size (*STORM-CONF* LOGVIEWER-MAX-SUM-WORKER-LOGS-SIZE-MB)
- per-dir-size (*STORM-CONF* LOGVIEWER-MAX-PER-WORKER-LOGS-SIZE-MB)
- per-dir-size (min per-dir-size (* total-size 0.5))
- cleaner (DirectoryCleaner.)
- dead-worker-dirs (get-dead-worker-dirs *STORM-CONF*
- now-secs
- old-log-dirs)]
- (log-debug "log cleanup: now=" now-secs
- " old log dirs " (pr-str (map #(.getName %) old-log-dirs))
- " dead worker dirs " (pr-str
- (map #(.getName %) dead-worker-dirs)))
- (dofor [dir dead-worker-dirs]
- (let [path (.getCanonicalPath dir)]
- (log-message "Cleaning up: Removing " path)
- (try (rmr path)
- (cleanup-empty-topodir! dir)
- (catch Exception ex (log-error ex)))))
- (per-workerdir-cleanup! (File. log-root-dir) (* per-dir-size (* 1024 1024)) cleaner)
- (let [size (* total-size (* 1024 1024))]
- (global-log-cleanup! (File. log-root-dir) size cleaner))))
-
-(defn start-log-cleaner! [conf log-root-dir]
- (let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
- (when interval-secs
- (log-debug "starting log cleanup thread at interval: " interval-secs)
- (schedule-recurring (mk-timer :thread-name "logviewer-cleanup"
- :kill-fn (fn [t]
- (log-error t "Error when doing logs cleanup")
- (exit-process! 20 "Error when doing log cleanup")))
- 0 ;; Start immediately.
- interval-secs
- (fn [] (cleanup-fn! log-root-dir))))))
-
-(defn- skip-bytes
- "FileInputStream#skip may not work the first time, so ensure it successfully
- skips the given number of bytes."
- [^InputStream stream n]
- (loop [skipped 0]
- (let [skipped (+ skipped (.skip stream (- n skipped)))]
- (if (< skipped n) (recur skipped)))))
-
-(defn logfile-matches-filter?
- [log-file-name]
- (let [regex-string (str "worker.log.*")
- regex-pattern (re-pattern regex-string)]
- (not= (re-seq regex-pattern (.toString log-file-name)) nil)))
-
-(defn page-file
- ([path tail]
- (let [zip-file? (.endsWith path ".gz")
- flen (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
- skip (- flen tail)]
- (page-file path skip tail)))
- ([path start length]
- (let [zip-file? (.endsWith path ".gz")
- flen (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))]
- (with-open [input (if zip-file? (GZIPInputStream. (FileInputStream. path)) (FileInputStream. path))
- output (java.io.ByteArrayOutputStream.)]
- (if (>= start flen)
- (throw
- (InvalidRequestException. "Cannot start past the end of the file")))
- (if (> start 0) (skip-bytes input start))
- (let [buffer (make-array Byte/TYPE 1024)]
- (loop []
- (when (< (.size output) length)
- (let [size (.read input buffer 0 (min 1024 (- length (.size output))))]
- (when (pos? size)
- (.write output buffer 0 size)
- (recur)))))
- (.toString output))))))
-
-(defn get-log-user-group-whitelist [fname]
- (let [wl-file (get-log-metadata-file fname)
- m (clojure-from-yaml-file wl-file)]
- (if (not-nil? m)
- (do
- (let [user-wl (.get m LOGS-USERS)
- user-wl (if user-wl user-wl [])
- group-wl (.get m LOGS-GROUPS)
- group-wl (if group-wl group-wl [])]
- [user-wl group-wl]))
- nil)))
-
-(def igroup-mapper (AuthUtils/GetGroupMappingServiceProviderPlugin *STORM-CONF*))
-(defn user-groups
- [user]
- (if (blank? user) [] (.getGroups igroup-mapper user)))
-
-(defn authorized-log-user? [user fname conf]
- (if (or (blank? user) (blank? fname) (nil? (get-log-user-group-whitelist fname)))
- nil
- (let [groups (user-groups user)
- [user-wl group-wl] (get-log-user-group-whitelist fname)
- logs-users (concat (conf LOGS-USERS)
- (conf NIMBUS-ADMINS)
- user-wl)
- logs-groups (concat (conf LOGS-GROUPS)
- group-wl)]
- (or (some #(= % user) logs-users)
- (< 0 (.size (intersection (set groups) (set logs-groups))))))))
-
-(defn log-root-dir
- "Given an appender name, as configured, get the parent directory of the appender's log file.
- Note that if anything goes wrong, this will throw an Error and exit."
- [appender-name]
- (let [appender (.getAppender (.getConfiguration (LogManager/getContext)) appender-name)]
- (if (and appender-name appender (instance? RollingFileAppender appender))
- (.getParent (File. (.getFileName appender)))
- (throw
- (RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and log4j agree.")))))
-
-(defnk to-btn-link
- "Create a link that is formatted like a button"
- [url text :enabled true]
- [:a {:href (java.net.URI. url)
- :class (str "btn btn-default " (if enabled "enabled" "disabled"))} text])
-
-(defn search-file-form [fname]
- [[:form {:action "logviewer_search.html" :id "search-box"}
- "Search this file:"
- [:input {:type "text" :name "search"}]
- [:input {:type "hidden" :name "file" :value fname}]
- [:input {:type "submit" :value "Search"}]]])
-
-(defn log-file-selection-form [log-files type]
- [[:form {:action type :id "list-of-files"}
- (drop-down "file" log-files)
- [:input {:type "submit" :value "Switch file"}]]])
-
-(defn pager-links [fname start length file-size]
- (let [prev-start (max 0 (- start length))
- next-start (if (> file-size 0)
- (min (max 0 (- file-size length)) (+ start length))
- (+ start length))]
- [[:div
- (concat
- [(to-btn-link (url "/log"
- {:file fname
- :start (max 0 (- start length))
- :length length})
- "Prev" :enabled (< prev-start start))]
- [(to-btn-link (url "/log"
- {:file fname
- :start 0
- :length length}) "First")]
- [(to-btn-link (url "/log"
- {:file fname
- :length length})
- "Last")]
- [(to-btn-link (url "/log"
- {:file fname
- :start (min (max 0 (- file-size length))
- (+ start length))
- :length length})
- "Next" :enabled (> next-start start))])]]))
-
-(defn- download-link [fname]
- [[:p (link-to (url-format "/download/%s" fname) "Download Full File")]])
-
-(defn- daemon-download-link [fname]
- [[:p (link-to (url-format "/daemondownload/%s" fname) "Download Full File")]])
-
-(defn- is-txt-file [fname]
- (re-find #"\.(log.*|txt|yaml|pid)$" fname))
-
-(def default-bytes-per-page 51200)
-
-(defn log-page [fname start length grep user root-dir]
- (if (or (blank? (*STORM-CONF* UI-FILTER))
- (authorized-log-user? user fname *STORM-CONF*))
- (let [file (.getCanonicalFile (File. root-dir fname))
- path (.getCanonicalPath file)
- zip-file? (.endsWith path ".gz")
- topo-dir (.getParentFile (.getParentFile file))]
- (if (and (.exists file)
- (= (.getCanonicalFile (File. root-dir))
- (.getParentFile topo-dir)))
- (let [file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
- log-files (reduce clojure.set/union
- (sorted-set)
- (for [^File port-dir (.listFiles topo-dir)]
- (into [] (filter #(.isFile %) (DirectoryCleaner/getFilesForDir port-dir))))) ;all types of files included
- files-str (for [file log-files]
- (get-topo-port-workerlog file))
- reordered-files-str (conj (filter #(not= fname %) files-str) fname)
- length (if length
- (min 10485760 length)
- default-bytes-per-page)
- log-string (escape-html
- (if (is-txt-file fname)
- (if start
- (page-file path start length)
- (page-file path length))
- "This is a binary file and cannot display! You may download the full file."))
- start (or start (- file-length length))]
- (if grep
- (html [:pre#logContent
- (if grep
- (->> (.split log-string "\n")
- (filter #(.contains % grep))
- (string/join "\n"))
- log-string)])
- (let [pager-data (if (is-txt-file fname) (pager-links fname start length file-length) nil)]
- (html (concat (search-file-form fname)
- (log-file-selection-form reordered-files-str "log") ; list all files for this topology
- pager-data
- (download-link fname)
- [[:pre#logContent log-string]]
- pager-data)))))
- (-> (resp/response "Page not found")
- (resp/status 404))))
- (if (nil? (get-log-user-group-whitelist fname))
- (-> (resp/response "Page not found")
- (resp/status 404))
- (unauthorized-user-html user))))
-
-(defn daemonlog-page [fname start length grep user root-dir]
- (let [file (.getCanonicalFile (File. root-dir fname))
- file-length (.length file)
- path (.getCanonicalPath file)
- zip-file? (.endsWith path ".gz")]
- (if (and (= (.getCanonicalFile (File. root-dir))
- (.getParentFile file))
- (.exists file))
- (let [file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
- length (if length
- (min 10485760 length)
- default-bytes-per-page)
- log-files (into [] (filter #(.isFile %) (.listFiles (File. root-dir)))) ;all types of files included
- files-str (for [file log-files]
- (.getName file))
- reordered-files-str (conj (filter #(not= fname %) files-str) fname)
- log-string (escape-html
- (if (is-txt-file fname)
- (if start
- (page-file path start length)
- (page-file path length))
- "This is a binary file and cannot display! You may download the full file."))
- start (or start (- file-length length))]
- (if grep
- (html [:pre#logContent
- (if grep
- (->> (.split log-string "\n")
- (filter #(.contains % grep))
- (string/join "\n"))
- log-string)])
- (let [pager-data (if (is-txt-file fname) (pager-links fname start length file-length) nil)]
- (html (concat (log-file-selection-form reordered-files-str "daemonlog") ; list all daemon logs
- pager-data
- (daemon-download-link fname)
- [[:pre#logContent log-string]]
- pager-data)))))
- (-> (resp/response "Page not found")
- (resp/status 404)))))
-
-(defn download-log-file [fname req resp user ^String root-dir]
- (let [file (.getCanonicalFile (File. root-dir fname))]
- (if (.exists file)
- (if (or (blank? (*STORM-CONF* UI-FILTER))
- (authorized-log-user? user fname *STORM-CONF*))
- (-> (resp/response file)
- (resp/content-type "application/octet-stream"))
- (unauthorized-user-html user))
- (-> (resp/response "Page not found")
- (resp/status 404)))))
-
-(def grep-max-search-size 1024)
-(def grep-buf-size 2048)
-(def grep-context-size 128)
-
-(defn logviewer-port
- []
- (int (*STORM-CONF* LOGVIEWER-PORT)))
-
-(defn url-to-match-centered-in-log-page
- [needle fname offset port]
- (let [host (local-hostname)
- port (logviewer-port)
- fname (clojure.string/join file-path-separator (take-last 3 (split fname (re-pattern file-path-separator))))]
- (url (str "http://" host ":" port "/log")
- {:file fname
- :start (max 0
- (- offset
- (int (/ default-bytes-per-page 2))
- (int (/ (alength needle) -2)))) ;; Addition
- :length default-bytes-per-page})))
-
-(defnk mk-match-data
- [^bytes needle ^ByteBuffer haystack haystack-offset file-offset fname
- :before-bytes nil :after-bytes nil]
- (let [url (url-to-match-centered-in-log-page needle
- fname
- file-offset
- (*STORM-CONF* LOGVIEWER-PORT))
- haystack-bytes (.array haystack)
- before-string (if (>= haystack-offset grep-context-size)
- (String. haystack-bytes
- (- haystack-offset grep-context-size)
- grep-context-size
- "UTF-8")
- (let [num-desired (max 0 (- grep-context-size
- haystack-offset))
- before-size (if before-bytes
- (alength before-bytes)
- 0)
- num-expected (min before-size num-desired)]
- (if (pos? num-expected)
- (str (String. before-bytes
- (- before-size num-expected)
- num-expected
- "UTF-8")
- (String. haystack-bytes
- 0
- haystack-offset
- "UTF-8"))
- (String. haystack-bytes
- 0
- haystack-offset
- "UTF-8"))))
- after-string (let [needle-size (alength needle)
- after-offset (+ haystack-offset needle-size)
- haystack-size (.limit haystack)]
- (if (< (+ after-offset grep-context-size) haystack-size)
- (String. haystack-bytes
- after-offset
- grep-context-size
- "UTF-8")
- (let [num-desired (- grep-context-size
- (- haystack-size after-offset))
- after-size (if after-bytes
- (alength after-bytes)
- 0)
- num-expected (min after-size num-desired)]
- (if (pos? num-expected)
- (str (String. haystack-bytes
- after-offset
- (- haystack-size after-offset)
- "UTF-8")
- (String. after-bytes 0 num-expected "UTF-8"))
- (String. haystack-bytes
- after-offset
- (- haystack-size after-offset)
- "UTF-8")))))]
- {"byteOffset" file-offset
- "beforeString" before-string
- "afterString" after-string
- "matchString" (String. needle "UTF-8")
- "logviewerURL" url}))
-
-(defn- try-read-ahead!
- "Tries once to read ahead in the stream to fill the context and resets the
- stream to its position before the call."
- [^BufferedInputStream stream haystack offset file-len bytes-read]
- (let [num-expected (min (- file-len bytes-read)
- grep-context-size)
- after-bytes (byte-array num-expected)]
- (.mark stream num-expected)
- ;; Only try reading once.
- (.read stream after-bytes 0 num-expected)
- (.reset stream)
- after-bytes))
-
-(defn offset-of-bytes
- "Searches a given byte array for a match of a sub-array of bytes. Returns
- the offset to the byte that matches, or -1 if no match was found."
- [^bytes buf ^bytes value init-offset]
- {:pre [(> (alength value) 0)
- (not (neg? init-offset))]}
- (loop [offset init-offset
- candidate-offset init-offset
- val-offset 0]
- (if-not (pos? (- (alength value) val-offset))
- ;; Found
- candidate-offset
- (if (>= offset (alength buf))
- ;; We ran out of buffer for the search.
- -1
- (if (not= (aget value val-offset) (aget buf offset))
- ;; The match at this candidate offset failed, so start over with the
- ;; next candidate byte from the buffer.
- (let [new-offset (inc candidate-offset)]
- (recur new-offset new-offset 0))
- ;; So far it matches. Keep going...
- (recur (inc offset) candidate-offset (inc val-offset)))))))
-
-(defn- buffer-substring-search!
- "As the file is read into a buffer, 1/2 the buffer's size at a time, we
- search the buffer for matches of the substring and return a list of zero or
- more matches."
- [file file-len offset-to-buf init-buf-offset stream bytes-skipped
- bytes-read ^ByteBuffer haystack ^bytes needle initial-matches num-matches
- ^bytes before-bytes]
- (loop [buf-offset init-buf-offset
- matches initial-matches]
- (let [offset (offset-of-bytes (.array haystack) needle buf-offset)]
- (if (and (< (count matches) num-matches) (not (neg? offset)))
- (let [file-offset (+ offset-to-buf offset)
- bytes-needed-after-match (- (.limit haystack)
- grep-context-size
- (alength needle))
- before-arg (if (< offset grep-context-size) before-bytes)
- after-arg (if (> offset bytes-needed-after-match)
- (try-read-ahead! stream
- haystack
- offset
- file-len
- bytes-read))]
- (recur (+ offset (alength needle))
- (conj matches
- (mk-match-data needle
- haystack
- offset
- file-offset
- (.getCanonicalPath file)
- :before-bytes before-arg
- :after-bytes after-arg))))
- (let [before-str-to-offset (min (.limit haystack)
- grep-max-search-size)
- before-str-from-offset (max 0 (- before-str-to-offset
- grep-context-size))
- new-before-bytes (Arrays/copyOfRange (.array haystack)
- before-str-from-offset
- before-str-to-offset)
- ;; It's OK if new-byte-offset is negative. This is normal if
- ;; we are out of bytes to read from a small file.
- new-byte-offset (if (>= (count matches) num-matches)
- (+ (get (last matches) "byteOffset")
- (alength needle))
- (+ bytes-skipped
- bytes-read
- (- grep-max-search-size)))]
- [matches new-byte-offset new-before-bytes])))))
-
-(defn- mk-grep-response
- "This response data only includes a next byte offset if there is more of the
- file to read."
- [search-bytes offset matches next-byte-offset]
- (merge {"searchString" (String. search-bytes "UTF-8")
- "startByteOffset" offset
- "matches" matches}
- (and next-byte-offset {"nextByteOffset" next-byte-offset})))
-
-(defn rotate-grep-buffer!
- [^ByteBuffer buf ^BufferedInputStream stream total-bytes-read file file-len]
- (let [buf-arr (.array buf)]
- ;; Copy the 2nd half of the buffer to the first half.
- (System/arraycopy buf-arr
- grep-max-search-size
- buf-arr
- 0
- grep-max-search-size)
- ;; Zero-out the 2nd half to prevent accidental matches.
- (Arrays/fill buf-arr
- grep-max-search-size
- (count buf-arr)
- (byte 0))
- ;; Fill the 2nd half with new bytes from the stream.
- (let [bytes-read (.read stream
- buf-arr
- grep-max-search-size
- (min file-len grep-max-search-size))]
- (.limit buf (+ grep-max-search-size bytes-read))
- (swap! total-bytes-read + bytes-read))))
-
-(defnk substring-search
- "Searches for a substring in a log file, starting at the given offset,
- returning the given number of matches, surrounded by the given number of
- context lines. Other information is included to be useful for progressively
- searching through a file for display in a UI. The search string must
- grep-max-search-size bytes or fewer when decoded with UTF-8."
- [file ^String search-string :num-matches 10 :start-byte-offset 0]
- {:pre [(not (empty? search-string))
- (<= (count (.getBytes search-string "UTF-8")) grep-max-search-size)]}
- (let [zip-file? (.endsWith (.getName file) ".gz")
- f-input-steam (FileInputStream. file)
- gzipped-input-stream (if zip-file?
- (GZIPInputStream. f-input-steam)
- f-input-steam)
- stream ^BufferedInputStream (BufferedInputStream.
- gzipped-input-stream)
- file-len (if zip-file? (Utils/zipFileSize file) (.length file))
- buf ^ByteBuffer (ByteBuffer/allocate grep-buf-size)
- buf-arr ^bytes (.array buf)
- string nil
- total-bytes-read (atom 0)
- matches []
- search-bytes ^bytes (.getBytes search-string "UTF-8")
- num-matches (or num-matches 10)
- start-byte-offset (or start-byte-offset 0)]
- ;; Start at the part of the log file we are interested in.
- ;; Allow searching when start-byte-offset == file-len so it doesn't blow up on 0-length files
- (if (> start-byte-offset file-len)
- (throw
- (InvalidRequestException. "Cannot search past the end of the file")))
- (when (> start-byte-offset 0)
- (skip-bytes stream start-byte-offset))
- (java.util.Arrays/fill buf-arr (byte 0))
- (let [bytes-read (.read stream buf-arr 0 (min file-len grep-buf-size))]
- (.limit buf bytes-read)
- (swap! total-bytes-read + bytes-read))
- (loop [initial-matches []
- init-buf-offset 0
- byte-offset start-byte-offset
- before-bytes nil]
- (let [[matches new-byte-offset new-before-bytes]
- (buffer-substring-search! file
- file-len
- byte-offset
- init-buf-offset
- stream
- start-byte-offset
- @total-bytes-read
- buf
- search-bytes
- initial-matches
- num-matches
- before-bytes)]
- (if (and (< (count matches) num-matches)
- (< (+ @total-bytes-read start-byte-offset) file-len))
- (let [;; The start index is positioned to find any possible
- ;; occurrence search string that did not quite fit in the
- ;; buffer on the previous read.
- new-buf-offset (- (min (.limit ^ByteBuffer buf)
- grep-max-search-size)
- (alength search-bytes))]
- (rotate-grep-buffer! buf stream total-bytes-read file file-len)
- (when (< @total-bytes-read 0)
- (throw (InvalidRequestException. "Cannot search past the end of the file")))
- (recur matches
- new-buf-offset
- new-byte-offset
- new-before-bytes))
- (mk-grep-response search-bytes
- start-byte-offset
- matches
- (if-not (and (< (count matches) num-matches)
- (>= @total-bytes-read file-len))
- (let [next-byte-offset (+ (get (last matches)
- "byteOffset")
- (alength search-bytes))]
- (if (> file-len next-byte-offset)
- next-byte-offset)))))))))
-
-(defn- try-parse-int-param
- [nam value]
- (try
- (Integer/parseInt value)
- (catch java.lang.NumberFormatException e
- (->
- (str "Could not parse " nam " to an integer")
- (InvalidRequestException. e)
- throw))))
-
-(defn search-log-file
- [fname user ^String root-dir search num-matches offset callback origin]
- (let [file (.getCanonicalFile (File. root-dir fname))]
- (if (.exists file)
- (if (or (blank? (*STORM-CONF* UI-FILTER))
- (authorized-log-user? user fname *STORM-CONF*))
- (let [num-matches-int (if num-matches
- (try-parse-int-param "num-matches"
- num-matches))
- offset-int (if offset
- (try-parse-int-param "start-byte-offset" offset))]
- (try
- (if (and (not (empty? search))
- <= (count (.getBytes search "UTF-8")) grep-max-search-size)
- (json-response
- (substring-search file
- search
- :num-matches num-matches-int
- :start-byte-offset offset-int)
- callback
- :headers {"Access-Control-Allow-Origin" origin
- "Access-Control-Allow-Credentials" "true"})
- (throw
- (InvalidRequestException.
- (str "Search substring must be between 1 and 1024 UTF-8 "
- "bytes in size (inclusive)"))))
- (catch Exception ex
- (json-response (exception->json ex) callback :status 500))))
- (json-response (unauthorized-user-json user) callback :status 401))
- (json-response {"error" "Not Found"
- "errorMessage" "The file was not found on this node."}
- callback
- :status 404))))
-
-(defn find-n-matches [logs n file-offset offset search]
- (let [logs (drop file-offset logs)
- wrap-matches-fn (fn [matches]
- {"fileOffset" file-offset
- "searchString" search
- "matches" matches})]
- (loop [matches []
- logs logs
- offset offset
- file-offset file-offset
- match-count 0]
- (if (empty? logs)
- (wrap-matches-fn matches)
- (let [these-matches (try
- (log-debug "Looking through " (first logs))
- (substring-search (first logs)
- search
- :num-matches (- n match-count)
- :start-byte-offset offset)
- (catch InvalidRequestException e
- (log-error e "Can't search past end of file.")
- {}))
- file-name (get-topo-port-workerlog (first logs))
- new-matches (conj matches
- (merge these-matches
- { "fileName" file-name
- "port" (first (take-last 2 (split (.getCanonicalPath (first logs)) (re-pattern file-path-separator))))}))
- new-count (+ match-count (count (these-matches "matches")))]
- (if (empty? these-matches)
- (recur matches (rest logs) 0 (+ file-offset 1) match-count)
- (if (>= new-count n)
- (wrap-matches-fn new-matches)
- (recur new-matches (rest logs) 0 (+ file-offset 1) new-count))))))))
-
-(defn logs-for-port
- "Get the filtered, authorized, sorted log files for a port."
- [user port-dir]
- (let [filter-authorized-fn (fn [user logs]
- (filter #(or
- (blank? (*STORM-CONF* UI-FILTER))
- (authorized-log-user? user (get-topo-port-workerlog %) *STORM-CONF*)) logs))]
- (sort #(compare (.lastModified %2) (.lastModified %1))
- (filter-authorized-fn
- user
- (filter #(re-find worker-log-filename-pattern (.getName %)) (DirectoryCleaner/getFilesForDir port-dir))))))
-
-(defn deep-search-logs-for-topology
- [topology-id user ^String root-dir search num-matches port file-offset offset search-archived? callback origin]
- (json-response
- (if (or (not search) (not (.exists (File. (str root-dir file-path-separator topology-id)))))
- []
- (let [file-offset (if file-offset (Integer/parseInt file-offset) 0)
- offset (if offset (Integer/parseInt offset) 0)
- num-matches (or (Integer/parseInt num-matches) 1)
- port-dirs (vec (.listFiles (File. (str root-dir file-path-separator topology-id))))
- logs-for-port-fn (partial logs-for-port user)]
- (if (or (not port) (= "*" port))
- ;; Check for all ports
- (let [filtered-logs (filter (comp not empty?) (map logs-for-port-fn port-dirs))]
- (if search-archived?
- (map #(find-n-matches % num-matches 0 0 search)
- filtered-logs)
- (map #(find-n-matches % num-matches 0 0 search)
- (map (comp vector first) filtered-logs))))
- ;; Check just the one port
- (if (not (contains? (into #{} (map str (*STORM-CONF* SUPERVISOR-SLOTS-PORTS))) port))
- []
- (let [port-dir (File. (str root-dir file-path-separator topology-id file-path-separator port))]
- (if (or (not (.exists port-dir)) (empty? (logs-for-port user port-dir)))
- []
- (let [filtered-logs (logs-for-port user port-dir)]
- (if search-archived?
- (find-n-matches filtered-logs num-matches file-offset offset search)
- (find-n-matches [(first filtered-logs)] num-matches 0 offset search)))))))))
- callback
- :headers {"Access-Control-Allow-Origin" origin
- "Access-Control-Allow-Credentials" "true"}))
-
-(defn log-template
- ([body] (log-template body nil nil))
- ([body fname user]
- (html4
- [:head
- [:title (str (escape-html fname) " - Storm Log Viewer")]
- (include-css "/css/bootstrap-3.3.1.min.css")
- (include-css "/css/jquery.dataTables.1.10.4.min.css")
- (include-css "/css/style.css")
- ]
- [:body
- (concat
- (when (not (blank? user)) [[:div.ui-user [:p "User: " user]]])
- [[:div.ui-note [:p "Note: the drop-list shows at most 1024 files for each worker directory."]]]
- [[:h3 (escape-html fname)]]
- (seq body))
- ])))
-
-(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
-
-(defn- parse-long-from-map [m k]
- (try
- (Long/parseLong (k m))
- (catch NumberFormatException ex
- (throw (InvalidRequestException.
- (str "Could not make an integer out of the query parameter '"
- (name k) "'")
- ex)))))
-
-(defn list-log-files
- [user topoId port log-root callback origin]
- (let [file-results
- (if (nil? topoId)
- (if (nil? port)
- (get-all-logs-for-rootdir (File. log-root))
- (reduce concat
- (for [topo-dir (.listFiles (File. log-root))]
- (reduce concat
- (for [port-dir (.listFiles topo-dir)]
- (if (= (str port) (.getName port-dir))
- (into [] (DirectoryCleaner/getFilesForDir port-dir))))))))
- (if (nil? port)
- (let [topo-dir (File. (str log-root file-path-separator topoId))]
- (if (.exists topo-dir)
- (reduce concat
- (for [port-dir (.listFiles topo-dir)]
- (into [] (DirectoryCleaner/getFilesForDir port-dir))))
- []))
- (let [port-dir (get-worker-dir-from-root log-root topoId port)]
- (if (.exists port-dir)
- (into [] (DirectoryCleaner/getFilesForDir port-dir))
- []))))
- file-strs (sort (for [file file-results]
- (get-topo-port-workerlog file)))]
- (json-response file-strs
- callback
- :headers {"Access-Control-Allow-Origin" origin
- "Access-Control-Allow-Credentials" "true"})))
-
-(defn get-profiler-dump-files
- [dir]
- (filter (comp not nil?)
- (for [f (DirectoryCleaner/getFilesForDir dir)]
- (let [name (.getName f)]
- (if (or
- (.endsWith name ".txt")
- (.endsWith name ".jfr")
- (.endsWith name ".bin"))
- (.getName f))))))
-
-(defroutes log-routes
- (GET "/log" [:as req & m]
- (try
- (mark! logviewer:num-log-page-http-requests)
- (let [servlet-request (:servlet-request req)
- log-root (:log-root req)
- user (.getUserName http-creds-handler servlet-request)
- start (if (:start m) (parse-long-from-map m :start))
- length (if (:length m) (parse-long-from-map m :length))
- file (url-decode (:file m))]
- (log-template (log-page file start length (:grep m) user log-root)
- file user))
- (catch InvalidRequestException ex
- (log-error ex)
- (ring-response-from-exception ex))))
- (GET "/dumps/:topo-id/:host-port/:filename"
- [:as {:keys [servlet-request servlet-response log-root]} topo-id host-port filename &m]
- (let [user (.getUserName http-creds-handler servlet-request)
- port (second (split host-port #":"))
- dir (File. (str log-root
- file-path-separator
- topo-id
- file-path-separator
- port))
- file (File. (str log-root
- file-path-separator
- topo-id
- file-path-separator
- port
- file-path-separator
- filename))]
- (if (and (.exists dir) (.exists file))
- (if (or (blank? (*STORM-CONF* UI-FILTER))
- (authorized-log-user? user
- (str topo-id file-path-separator port file-path-separator "worker.log")
- *STORM-CONF*))
- (-> (resp/response file)
- (resp/content-type "application/octet-stream"))
- (unauthorized-user-html user))
- (-> (resp/response "Page not found")
- (resp/status 404)))))
- (GET "/dumps/:topo-id/:host-port"
- [:as {:keys [servlet-request servlet-response log-root]} topo-id host-port &m]
- (let [user (.getUserName http-creds-handler servlet-request)
- port (second (split host-port #":"))
- dir (File. (str log-root
- file-path-separator
- topo-id
- file-path-separator
- port))]
- (if (.exists dir)
- (if (or (blank? (*STORM-CONF* UI-FILTER))
- (authorized-log-user? user
- (str topo-id file-path-separator port file-path-separator "worker.log")
- *STORM-CONF*))
- (html4
- [:head
- [:title "File Dumps - Storm Log Viewer"]
- (include-css "/css/bootstrap-3.3.1.min.css")
- (include-css "/css/jquery.dataTables.1.10.4.min.css")
- (include-css "/css/style.css")]
- [:body
- [:ul
- (for [file (get-profiler-dump-files dir)]
- [:li
- [:a {:href (str "/dumps/" topo-id "/" host-port "/" file)} file ]])]])
- (unauthorized-user-html user))
- (-> (resp/response "Page not found")
- (resp/status 404)))))
- (GET "/daemonlog" [:as req & m]
- (try
- (mark! logviewer:num-daemonlog-page-http-requests)
- (let [servlet-request (:servlet-request req)
- daemonlog-root (:daemonlog-root req)
- user (.getUserName http-creds-handler servlet-request)
- start (if (:start m) (parse-long-from-map m :start))
- length (if (:length m) (parse-long-from-map m :length))
- file (url-decode (:file m))]
- (log-template (daemonlog-page file start length (:grep m) user daemonlog-root)
- file user))
- (catch InvalidRequestException ex
- (log-error ex)
- (ring-response-from-exception ex))))
- (GET "/download/:file" [:as {:keys [servlet-request servlet-response log-root]} file & m]
- (try
- (mark! logviewer:num-download-log-file-http-requests)
- (let [user (.getUserName http-creds-handler servlet-request)]
- (download-log-file file servlet-request servlet-response user log-root))
- (catch InvalidRequestException ex
- (log-error ex)
- (ring-response-from-exception ex))))
- (GET "/daemondownload/:file" [:as {:keys [servlet-request servlet-response daemonlog-root]} file & m]
- (try
- (mark! logviewer:num-download-log-daemon-file-http-requests)
- (let [user (.getUserName http-creds-handler servlet-request)]
- (download-log-file file servlet-request servlet-response user daemonlog-root))
- (catch InvalidRequestException ex
- (log-error ex)
- (ring-response-from-exception ex))))
- (GET "/search/:file" [:as {:keys [servlet-request servlet-response log-root]} file & m]
- ;; We do not use servlet-response here, but do not remove it from the
- ;; :keys list, or this rule could stop working when an authentication
- ;; filter is configured.
- (try
- (let [user (.getUserName http-creds-handler servlet-request)]
- (search-log-file (url-decode file)
- user
- log-root
- (:search-string m)
- (:num-matches m)
- (:start-byte-offset m)
- (:callback m)
- (.getHeader servlet-request "Origin")))
- (catch InvalidRequestException ex
- (log-error ex)
- (json-response (exception->json ex) (:callback m) :status 400))))
- (GET "/deepSearch/:topo-id" [:as {:keys [servlet-request servlet-response log-root]} topo-id & m]
- ;; We do not use servlet-response here, but do not remove it from the
- ;; :keys list, or this rule could stop working when an authentication
- ;; filter is configured.
- (try
- (let [user (.getUserName http-creds-handler servlet-request)]
- (deep-search-logs-for-topology topo-id
- user
- log-root
- (:search-string m)
- (:num-matches m)
- (:port m)
- (:start-file-offset m)
- (:start-byte-offset m)
- (:search-archived m)
- (:callback m)
- (.getHeader servlet-request "Origin")))
- (catch InvalidRequestException ex
- (log-error ex)
- (json-response (exception->json ex) (:callback m) :status 400))))
- (GET "/searchLogs" [:as req & m]
- (try
- (let [servlet-request (:servlet-request req)
- user (.getUserName http-creds-handler servlet-request)]
- (list-log-files user
- (:topoId m)
- (:port m)
- (:log-root req)
- (:callback m)
- (.getHeader servlet-request "Origin")))
- (catch InvalidRequestException ex
- (log-error ex)
- (json-response (exception->json ex) (:callback m) :status 400))))
- (GET "/listLogs" [:as req & m]
- (try
- (mark! logviewer:num-list-logs-http-requests)
- (let [servlet-request (:servlet-request req)
- user (.getUserName http-creds-handler servlet-request)]
- (list-log-files user
- (:topoId m)
- (:port m)
- (:log-root req)
- (:callback m)
- (.getHeader servlet-request "Origin")))
- (catch InvalidRequestException ex
- (log-error ex)
- (json-response (exception->json ex) (:callback m) :status 400))))
- (route/resources "/")
- (route/not-found "Page not found"))
-
-(defn conf-middleware
- "For passing the storm configuration with each request."
- [app log-root daemonlog-root]
- (fn [req]
- (app (assoc req :log-root log-root :daemonlog-root daemonlog-root))))
-
-(defn start-logviewer! [conf log-root-dir daemonlog-root-dir]
- (try
- (let [header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES))
- filter-class (conf UI-FILTER)
- filter-params (conf UI-FILTER-PARAMS)
- logapp (handler/api (-> log-routes
- requests-middleware)) ;; query params as map
- middle (conf-middleware logapp log-root-dir daemonlog-root-dir)
- filters-confs (if (conf UI-FILTER)
- [{:filter-class filter-class
- :filter-params (or (conf UI-FILTER-PARAMS) {})}]
- [])
- filters-confs (concat filters-confs
- [{:filter-class "org.eclipse.jetty.servlets.GzipFilter"
- :filter-name "Gzipper"
- :filter-params {}}])
- https-port (int (or (conf LOGVIEWER-HTTPS-PORT) 0))
- keystore-path (conf LOGVIEWER-HTTPS-KEYSTORE-PATH)
- keystore-pass (conf LOGVIEWER-HTTPS-KEYSTORE-PASSWORD)
- keystore-type (conf LOGVIEWER-HTTPS-KEYSTORE-TYPE)
- key-password (conf LOGVIEWER-HTTPS-KEY-PASSWORD)
- truststore-path (conf LOGVIEWER-HTTPS-TRUSTSTORE-PATH)
- truststore-password (conf LOGVIEWER-HTTPS-TRUSTSTORE-PASSWORD)
- truststore-type (conf LOGVIEWER-HTTPS-TRUSTSTORE-TYPE)
- want-client-auth (conf LOGVIEWER-HTTPS-WANT-CLIENT-AUTH)
- need-client-auth (conf LOGVIEWER-HTTPS-NEED-CLIENT-AUTH)]
- (storm-run-jetty {:port (int (conf LOGVIEWER-PORT))
- :configurator (fn [server]
- (config-ssl server
- https-port
- keystore-path
- keystore-pass
- keystore-type
- key-password
- truststore-path
- truststore-password
- truststore-type
- want-client-auth
- need-client-auth)
- (config-filter server middle filters-confs))}))
- (catch Exception ex
- (log-error ex))))
-
-(defn -main []
- (let [conf (read-storm-config)
- log-root (worker-artifacts-root conf)
- daemonlog-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))]
- (setup-default-uncaught-exception-handler)
- (start-log-cleaner! conf log-root)
- (log-message "Starting logviewer server for storm version '"
- STORM-VERSION
- "'")
- (start-logviewer! conf log-root daemonlog-root)
- (start-metrics-reporters)))