You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/07/24 17:10:01 UTC
[08/19] storm git commit: STORM-1280 port
backtype.storm.daemon.logviewer to java
STORM-1280 port backtype.storm.daemon.logviewer to java
* ported logviewer-test.clj
* TODO: some tests are failing, looking into it
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/44b268ba
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/44b268ba
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/44b268ba
Branch: refs/heads/master
Commit: 44b268badb7ee1a92b1cc8abf81dbb640fa9af19
Parents: 11a7905
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Jul 13 00:27:38 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jul 14 12:11:41 2017 +0900
----------------------------------------------------------------------
bin/storm.py | 2 +-
.../clj/org/apache/storm/daemon/logviewer.clj | 1254 ------------------
.../dev/logviewer-search-context-tests.log.gz | Bin 72 -> 0 bytes
.../dev/logviewer-search-context-tests.log.test | 1 -
storm-core/src/dev/small-worker.log.test | 1 -
storm-core/src/dev/test-3072.log.test | 3 -
storm-core/src/dev/test-worker.log.test | 380 ------
.../apache/storm/daemon/DirectoryCleaner.java | 188 ---
.../clj/org/apache/storm/logviewer_test.clj | 824 ------------
.../daemon/logviewer/LogviewerConstant.java | 23 +
.../storm/daemon/logviewer/LogviewerServer.java | 172 +++
.../handler/LogviewerLogDownloadHandler.java | 43 +
.../handler/LogviewerLogPageHandler.java | 410 ++++++
.../handler/LogviewerLogSearchHandler.java | 707 ++++++++++
.../handler/LogviewerProfileHandler.java | 115 ++
.../logviewer/utils/DirectoryCleaner.java | 183 +++
.../daemon/logviewer/utils/LogCleaner.java | 305 +++++
.../logviewer/utils/LogFileDownloader.java | 51 +
.../utils/LogviewerResponseBuilder.java | 118 ++
.../logviewer/utils/ResourceAuthorizer.java | 130 ++
.../daemon/logviewer/utils/WorkerLogs.java | 60 +
.../logviewer/webapp/LogviewerApplication.java | 94 ++
.../logviewer/webapp/LogviewerResource.java | 221 +++
.../daemon/wip/logviewer/LogviewerConstant.java | 23 -
.../daemon/wip/logviewer/LogviewerServer.java | 174 ---
.../handler/LogviewerLogDownloadHandler.java | 43 -
.../handler/LogviewerLogPageHandler.java | 412 ------
.../handler/LogviewerLogSearchHandler.java | 686 ----------
.../handler/LogviewerProfileHandler.java | 115 --
.../daemon/wip/logviewer/utils/LogCleaner.java | 296 -----
.../wip/logviewer/utils/LogFileDownloader.java | 51 -
.../utils/LogviewerResponseBuilder.java | 118 --
.../wip/logviewer/utils/ResourceAuthorizer.java | 129 --
.../daemon/wip/logviewer/utils/WorkerLogs.java | 63 -
.../logviewer/webapp/LogviewerApplication.java | 94 --
.../wip/logviewer/webapp/LogviewerResource.java | 221 ---
.../storm/daemon/logviewer/LogviewerTest.java | 51 +
.../handler/LogviewerLogPageHandlerTest.java | 101 ++
.../handler/LogviewerLogSearchHandlerTest.java | 854 ++++++++++++
.../testsupport/ArgumentsVerifier.java | 34 +
.../testsupport/MockDirectoryBuilder.java | 66 +
.../logviewer/testsupport/MockFileBuilder.java | 66 +
.../daemon/logviewer/utils/LogCleanerTest.java | 376 ++++++
.../logviewer/utils/ResourceAuthorizerTest.java | 182 +++
.../logviewer-search-context-tests.log.gz | Bin 0 -> 72 bytes
.../logviewer-search-context-tests.log.test | 1 +
.../src/test/resources/small-worker.log.test | 1 +
.../src/test/resources/test-3072.log.test | 3 +
.../src/test/resources/test-worker.log.test | 380 ++++++
49 files changed, 4748 insertions(+), 5077 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index dade6b5..a3c6506 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -812,7 +812,7 @@ def logviewer():
allextrajars = get_wildcard_dir(STORM_WEBAPP_LIB_DIR)
allextrajars.append(CLUSTER_CONF_DIR)
exec_storm_class(
- "org.apache.storm.daemon.wip.logviewer.LogviewerServer",
+ "org.apache.storm.daemon.logviewer.LogviewerServer",
jvmtype="-server",
daemonName="logviewer",
jvmopts=jvmopts,
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
deleted file mode 100644
index 27b4ba1..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ /dev/null
@@ -1,1254 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.logviewer
- (:use compojure.core)
- (:use [clojure.set :only [difference intersection]])
- (:use [clojure.string :only [blank? split]])
- (:use [hiccup core page-helpers form-helpers])
- (:use [org.apache.storm config daemon-config util log])
- (:use [org.apache.storm.ui helpers])
- (:import [org.apache.storm StormTimer]
- [org.apache.storm.daemon.supervisor ClientSupervisorUtils]
- [org.apache.storm.daemon.supervisor SupervisorUtils]
- [org.apache.storm.metric StormMetricsRegistry])
- (:import [org.apache.storm.utils Time VersionInfo ConfigUtils Utils ServerUtils ServerConfigUtils])
- (:import [java.util Arrays ArrayList HashSet])
- (:import [java.util.zip GZIPInputStream])
- (:import [org.apache.logging.log4j LogManager])
- (:import [org.apache.logging.log4j.core.appender RollingFileAppender])
- (:import [java.io BufferedInputStream File FileFilter FileInputStream
- InputStream]
- [java.net URLDecoder])
- (:import [java.nio.file Files DirectoryStream])
- (:import [java.nio ByteBuffer])
- (:import [org.apache.storm.daemon DirectoryCleaner])
- (:import [org.apache.storm.ui InvalidRequestException UIHelpers IConfigurator FilterConfiguration]
- [org.apache.storm.security.auth AuthUtils])
- (:require [compojure.route :as route]
- [compojure.handler :as handler]
- [ring.middleware.keyword-params]
- [ring.util.codec :as codec]
- [ring.util.response :as resp]
- [clojure.string :as string])
- (:gen-class))
-
-(def ^:dynamic *STORM-CONF* (clojurify-structure (ConfigUtils/readStormConfig)))
-(def STORM-VERSION (VersionInfo/getVersion))
-
-(def worker-log-filename-pattern #"^worker.log(.*)")
-
-(def logviewer:num-log-page-http-requests (StormMetricsRegistry/registerMeter "logviewer:num-log-page-http-requests"))
-(def logviewer:num-daemonlog-page-http-requests (StormMetricsRegistry/registerMeter "logviewer:num-daemonlog-page-http-requests"))
-(def logviewer:num-download-log-file-http-requests (StormMetricsRegistry/registerMeter "logviewer:num-download-log-file-http-requests"))
-(def logviewer:num-download-log-daemon-file-http-requests (StormMetricsRegistry/registerMeter "logviewer:num-download-log-daemon-file-http-requests"))
-(def logviewer:num-list-logs-http-requests (StormMetricsRegistry/registerMeter "logviewer:num-list-logs-http-requests"))
-
-(defn cleanup-cutoff-age-millis [conf now-millis]
- (- now-millis (* (conf LOGVIEWER-CLEANUP-AGE-MINS) 60 1000)))
-
-(defn get-stream-for-dir
- [^File f]
- (try (Files/newDirectoryStream (.toPath f))
- (catch Exception ex (log-error ex) nil)))
-
-(defn- last-modifiedtime-worker-logdir
- "Return the last modified time for all log files in a worker's log dir.
- Using stream rather than File.listFiles is to avoid large mem usage
- when a directory has too many files"
- [^File log-dir]
- (let [^DirectoryStream stream (get-stream-for-dir log-dir)
- dir-modified (.lastModified log-dir)
- last-modified (try (reduce
- (fn [maximum path]
- (let [curr (.lastModified (.toFile path))]
- (if (> curr maximum)
- curr
- maximum)))
- dir-modified
- stream)
- (catch Exception ex
- (log-error ex) dir-modified)
- (finally
- (if (instance? DirectoryStream stream)
- (.close stream))))]
- last-modified))
-
-(defn get-size-for-logdir
- "Return the sum of lengths for all log files in a worker's log dir.
- Using stream rather than File.listFiles is to avoid large mem usage
- when a directory has too many files"
- [log-dir]
- (let [^DirectoryStream stream (get-stream-for-dir log-dir)]
- (reduce
- (fn [sum path]
- (let [size (.length (.toFile path))]
- (+ sum size)))
- 0
- stream)))
-
-(defn mk-FileFilter-for-log-cleanup [conf now-millis]
- (let [cutoff-age-millis (cleanup-cutoff-age-millis conf now-millis)]
- (reify FileFilter (^boolean accept [this ^File file]
- (boolean (and
- (not (.isFile file))
- (<= (last-modifiedtime-worker-logdir file) cutoff-age-millis)))))))
-
-(defn select-dirs-for-cleanup [conf now-millis root-dir]
- (let [file-filter (mk-FileFilter-for-log-cleanup conf now-millis)]
- (reduce clojure.set/union
- (sorted-set)
- (for [^File topo-dir (.listFiles (File. root-dir))]
- (into [] (.listFiles topo-dir file-filter))))))
-
-(defn get-topo-port-workerlog
- "Return the path of the worker log with the format of topoId/port/worker.log.*"
- [^File file]
- (clojure.string/join ServerUtils/FILE_PATH_SEPARATOR
- (take-last 3
- (split (.getCanonicalPath file) (re-pattern ServerUtils/FILE_PATH_SEPARATOR)))))
-
-(defn get-metadata-file-for-log-root-name [root-name root-dir]
- (let [metaFile (clojure.java.io/file root-dir "metadata"
- (str root-name ".yaml"))]
- (if (.exists metaFile)
- metaFile
- (do
- (log-warn "Could not find " (.getCanonicalPath metaFile)
- " to clean up for " root-name)
- nil))))
-
-(defn get-metadata-file-for-wroker-logdir [logdir]
- (let [metaFile (clojure.java.io/file logdir "worker.yaml")]
- (if (.exists metaFile)
- metaFile
- (do
- (log-warn "Could not find " (.getCanonicalPath metaFile)
- " to clean up for " logdir)
- nil))))
-
-(defn get-worker-id-from-metadata-file [metaFile]
- (get (clojurify-structure (Utils/readYamlFile metaFile)) "worker-id"))
-
-(defn get-topo-owner-from-metadata-file [metaFile]
- (get (clojurify-structure (Utils/readYamlFile metaFile)) TOPOLOGY-SUBMITTER-USER))
-
-(defn identify-worker-log-dirs [log-dirs]
- "return the workerid to worker-log-dir map"
- (into {} (for [logdir log-dirs
- :let [metaFile (get-metadata-file-for-wroker-logdir logdir)]]
- (if metaFile
- {(get-worker-id-from-metadata-file metaFile) logdir}
- {"" logdir})))) ;; an old directory that has no yaml file will be treated as a dead dir for deleting
-
-(defn get-alive-ids
- [conf now-secs]
- (->>
- (clojurify-structure (SupervisorUtils/readWorkerHeartbeats conf))
- (remove
- #(or (not (val %))
- (SupervisorUtils/isWorkerHbTimedOut now-secs
- (val %)
- conf)))
- keys
- set))
-
-(defn get-dead-worker-dirs
- "Return a sorted set of java.io.Files that were written by workers that are
- now dead"
- [conf now-secs log-dirs]
- (if (empty? log-dirs)
- (sorted-set)
- (let [alive-ids (get-alive-ids conf now-secs)
- id->dir (identify-worker-log-dirs log-dirs)]
- (apply sorted-set
- (for [[id dir] id->dir
- :when (not (contains? alive-ids id))]
- dir)))))
-
-(defn get-all-worker-dirs [^File root-dir]
- (reduce clojure.set/union
- (sorted-set)
- (for [^File topo-dir (.listFiles root-dir)]
- (into [] (.listFiles topo-dir)))))
-
-(defn get-alive-worker-dirs
- "Return a sorted set of java.io.Files that were written by workers that are
- now active"
- [conf root-dir]
- (let [alive-ids (get-alive-ids conf (Time/currentTimeSecs))
- log-dirs (get-all-worker-dirs root-dir)
- id->dir (identify-worker-log-dirs log-dirs)]
- (apply sorted-set
- (for [[id dir] id->dir
- :when (contains? alive-ids id)]
- (.getCanonicalPath dir)))))
-
-(defn get-all-logs-for-rootdir [^File log-dir]
- (reduce concat
- (for [port-dir (get-all-worker-dirs log-dir)]
- (into [] (DirectoryCleaner/getFilesForDir port-dir)))))
-
-(defn is-active-log [^File file]
- (re-find #"\.(log|err|out|current|yaml|pid)$" (.getName file)))
-
-(defn sum-file-size
- "Given a sequence of Files, sum their sizes."
- [files]
- (reduce #(+ %1 (.length %2)) 0 files))
-
-(defn per-workerdir-cleanup!
- "Delete the oldest files in each overloaded worker log dir"
- [^File root-dir size ^DirectoryCleaner cleaner]
- (dofor [worker-dir (get-all-worker-dirs root-dir)]
- (.deleteOldestWhileTooLarge cleaner (ArrayList. [worker-dir]) size true nil)))
-
-(defn global-log-cleanup!
- "Delete the oldest files in overloaded worker-artifacts globally"
- [^File root-dir size ^DirectoryCleaner cleaner]
- (let [worker-dirs (ArrayList. (get-all-worker-dirs root-dir))
- alive-worker-dirs (HashSet. (get-alive-worker-dirs *STORM-CONF* root-dir))]
- (.deleteOldestWhileTooLarge cleaner worker-dirs size false alive-worker-dirs)))
-
-(defn cleanup-empty-topodir!
- "Delete the topo dir if it contains zero port dirs"
- [^File dir]
- (let [topodir (.getParentFile dir)]
- (if (empty? (.listFiles topodir))
- (Utils/forceDelete (.getCanonicalPath topodir)))))
-
-(defn cleanup-fn!
- "Delete old log dirs for which the workers are no longer alive"
- [log-root-dir]
- (let [now-secs (Time/currentTimeSecs)
- old-log-dirs (select-dirs-for-cleanup *STORM-CONF*
- (* now-secs 1000)
- log-root-dir)
- total-size (*STORM-CONF* LOGVIEWER-MAX-SUM-WORKER-LOGS-SIZE-MB)
- per-dir-size (*STORM-CONF* LOGVIEWER-MAX-PER-WORKER-LOGS-SIZE-MB)
- per-dir-size (min per-dir-size (* total-size 0.5))
- cleaner (DirectoryCleaner.)
- dead-worker-dirs (get-dead-worker-dirs *STORM-CONF*
- now-secs
- old-log-dirs)]
- (log-debug "log cleanup: now=" now-secs
- " old log dirs " (pr-str (map #(.getName %) old-log-dirs))
- " dead worker dirs " (pr-str
- (map #(.getName %) dead-worker-dirs)))
- (dofor [dir dead-worker-dirs]
- (let [path (.getCanonicalPath dir)]
- (log-message "Cleaning up: Removing " path)
- (try (Utils/forceDelete path)
- (cleanup-empty-topodir! dir)
- (catch Exception ex (log-error ex)))))
- (per-workerdir-cleanup! (File. log-root-dir) (* per-dir-size (* 1024 1024)) cleaner)
- (let [size (* total-size (* 1024 1024))]
- (global-log-cleanup! (File. log-root-dir) size cleaner))))
-
-(defn start-log-cleaner! [conf log-root-dir]
- (let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
- (when interval-secs
- (log-debug "starting log cleanup thread at interval: " interval-secs)
-
- (let [timer (StormTimer. "logviewer-cleanup"
- (reify Thread$UncaughtExceptionHandler
- (^void uncaughtException
- [this ^Thread t ^Throwable e]
- (log-error t "Error when doing logs cleanup")
- (Utils/exitProcess 20 "Error when doing log cleanup"))))]
- (.scheduleRecurring timer 0 interval-secs
- (fn [] (cleanup-fn! log-root-dir)))))))
-
-(defn- skip-bytes
- "FileInputStream#skip may not work the first time, so ensure it successfully
- skips the given number of bytes."
- [^InputStream stream n]
- (loop [skipped 0]
- (let [skipped (+ skipped (.skip stream (- n skipped)))]
- (if (< skipped n) (recur skipped)))))
-
-(defn logfile-matches-filter?
- [log-file-name]
- (let [regex-string (str "worker.log.*")
- regex-pattern (re-pattern regex-string)]
- (not= (re-seq regex-pattern (.toString log-file-name)) nil)))
-
-(defn page-file
- ([path tail]
- (let [zip-file? (.endsWith path ".gz")
- flen (if zip-file? (ServerUtils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
- skip (- flen tail)]
- (page-file path skip tail)))
- ([path start length]
- (let [zip-file? (.endsWith path ".gz")
- flen (if zip-file? (ServerUtils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))]
- (with-open [input (if zip-file? (GZIPInputStream. (FileInputStream. path)) (FileInputStream. path))
- output (java.io.ByteArrayOutputStream.)]
- (if (>= start flen)
- (throw
- (InvalidRequestException. "Cannot start past the end of the file")))
- (if (> start 0) (skip-bytes input start))
- (let [buffer (make-array Byte/TYPE 1024)]
- (loop []
- (when (< (.size output) length)
- (let [size (.read input buffer 0 (min 1024 (- length (.size output))))]
- (when (pos? size)
- (.write output buffer 0 size)
- (recur)))))
- (.toString output))))))
-
-(defn get-log-user-group-whitelist [fname]
- (let [wl-file (ServerConfigUtils/getLogMetaDataFile fname)
- m (clojurify-structure (Utils/readYamlFile wl-file))]
- (if (not-nil? m)
- (do
- (let [user-wl (.get m LOGS-USERS)
- user-wl (if user-wl user-wl [])
- group-wl (.get m LOGS-GROUPS)
- group-wl (if group-wl group-wl [])]
- [user-wl group-wl]))
- nil)))
-
-(def igroup-mapper (AuthUtils/GetGroupMappingServiceProviderPlugin *STORM-CONF*))
-(defn user-groups
- [user]
- (if (blank? user) [] (.getGroups igroup-mapper user)))
-
-(defn authorized-log-user? [user fname conf]
- (if (or (blank? user) (blank? fname) (nil? (get-log-user-group-whitelist fname)))
- nil
- (let [groups (user-groups user)
- [user-wl group-wl] (get-log-user-group-whitelist fname)
- logs-users (concat (conf LOGS-USERS)
- (conf NIMBUS-ADMINS)
- user-wl)
- logs-groups (concat (conf LOGS-GROUPS)
- group-wl)]
- (or (some #(= % user) logs-users)
- (< 0 (.size (intersection (set groups) (set logs-groups))))))))
-
-(defn log-root-dir
- "Given an appender name, as configured, get the parent directory of the appender's log file.
- Note that if anything goes wrong, this will throw an Error and exit."
- [appender-name]
- (let [appender (.getAppender (.getConfiguration (LogManager/getContext)) appender-name)]
- (if (and appender-name appender (instance? RollingFileAppender appender))
- (.getParent (File. (.getFileName appender)))
- (throw
- (RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and log4j agree.")))))
-
-(defnk to-btn-link
- "Create a link that is formatted like a button"
- [url text :enabled true]
- [:a {:href (java.net.URI. url)
- :class (str "btn btn-default " (if enabled "enabled" "disabled"))} text])
-
-(defn search-file-form [fname is-daemon]
- [[:form {:action "logviewer_search.html" :id "search-box"}
- "Search this file:"
- [:input {:type "text" :name "search"}]
- [:input {:type "hidden" :name "is-daemon" :value is-daemon}]
- [:input {:type "hidden" :name "file" :value fname}]
- [:input {:type "submit" :value "Search"}]]])
-
-(defn log-file-selection-form [log-files type]
- [[:form {:action type :id "list-of-files"}
- (drop-down "file" log-files)
- [:input {:type "submit" :value "Switch file"}]]])
-
-(defn pager-links [fname start length file-size type]
- (let [prev-start (max 0 (- start length))
- next-start (if (> file-size 0)
- (min (max 0 (- file-size length)) (+ start length))
- (+ start length))]
- [[:div
- (concat
- [(to-btn-link (url (str "/" type)
- {:file fname
- :start (max 0 (- start length))
- :length length})
- "Prev" :enabled (< prev-start start))]
- [(to-btn-link (url (str "/" type)
- {:file fname
- :start 0
- :length length}) "First")]
- [(to-btn-link (url (str "/" type)
- {:file fname
- :length length})
- "Last")]
- [(to-btn-link (url (str "/" type)
- {:file fname
- :start (min (max 0 (- file-size length))
- (+ start length))
- :length length})
- "Next" :enabled (> next-start start))])]]))
-
-(defn- download-link [fname]
- [[:p (link-to (UIHelpers/urlFormat "/download?file=%s" (to-array [fname])) "Download Full File")]])
-
-(defn- daemon-download-link [fname]
- [[:p (link-to (UIHelpers/urlFormat "/daemondownload/%s" (to-array [fname])) "Download Full File")]])
-
-(defn- is-txt-file [fname]
- (re-find #"\.(log.*|txt|yaml|pid)$" fname))
-
-(defn unauthorized-user-html [user]
- [[:h2 "User '" (escape-html user) "' is not authorized."]])
-
-(defn ring-response-from-exception [ex]
- {:headers {}
- :status 400
- :body (.getMessage ex)})
-
-(def default-bytes-per-page 51200)
-
-(defn log-page [fname start length grep user root-dir]
- (if (or (blank? (*STORM-CONF* UI-FILTER))
- (authorized-log-user? user fname *STORM-CONF*))
- (let [file (.getCanonicalFile (File. root-dir fname))
- path (.getCanonicalPath file)
- zip-file? (.endsWith path ".gz")
- topo-dir (.getParentFile (.getParentFile file))]
- (if (and (.exists file)
- (= (.getCanonicalFile (File. root-dir))
- (.getParentFile topo-dir)))
- (let [file-length (if zip-file? (ServerUtils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
- log-files (reduce clojure.set/union
- (sorted-set)
- (for [^File port-dir (.listFiles topo-dir)]
- (into [] (filter #(.isFile %) (DirectoryCleaner/getFilesForDir port-dir))))) ;all types of files included
- files-str (for [file log-files]
- (get-topo-port-workerlog file))
- reordered-files-str (conj (filter #(not= fname %) files-str) fname)
- length (if length
- (min 10485760 length)
- default-bytes-per-page)
- log-string (escape-html
- (if (is-txt-file fname)
- (if start
- (page-file path start length)
- (page-file path length))
- "This is a binary file and cannot display! You may download the full file."))
- start (or start (- file-length length))]
- (if grep
- (html [:pre#logContent
- (if grep
- (->> (.split log-string "\n")
- (filter #(.contains % grep))
- (string/join "\n"))
- log-string)])
- (let [pager-data (if (is-txt-file fname) (pager-links fname start length file-length "log") nil)]
- (html (concat (search-file-form fname "no")
- (log-file-selection-form reordered-files-str "log") ; list all files for this topology
- pager-data
- (download-link fname)
- [[:pre#logContent log-string]]
- pager-data)))))
- (-> (resp/response "Page not found")
- (resp/status 404))))
- (if (nil? (get-log-user-group-whitelist fname))
- (-> (resp/response "Page not found")
- (resp/status 404))
- (unauthorized-user-html user))))
-
-(defn daemonlog-page [fname start length grep user root-dir]
- (let [file (.getCanonicalFile (File. root-dir fname))
- file-length (.length file)
- path (.getCanonicalPath file)
- zip-file? (.endsWith path ".gz")]
- (if (and (= (.getCanonicalFile (File. root-dir))
- (.getParentFile file))
- (.exists file))
- (let [file-length (if zip-file? (ServerUtils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
- length (if length
- (min 10485760 length)
- default-bytes-per-page)
- log-files (into [] (filter #(.isFile %) (.listFiles (File. root-dir)))) ;all types of files included
- files-str (for [file log-files]
- (.getName file))
- reordered-files-str (conj (filter #(not= fname %) files-str) fname)
- log-string (escape-html
- (if (is-txt-file fname)
- (if start
- (page-file path start length)
- (page-file path length))
- "This is a binary file and cannot display! You may download the full file."))
- start (or start (- file-length length))]
- (if grep
- (html [:pre#logContent
- (if grep
- (->> (.split log-string "\n")
- (filter #(.contains % grep))
- (string/join "\n"))
- log-string)])
- (let [pager-data (if (is-txt-file fname) (pager-links fname start length file-length "daemonlog") nil)]
- (html (concat (search-file-form fname "yes")
- (log-file-selection-form reordered-files-str "daemonlog") ; list all daemon logs
- pager-data
- (daemon-download-link fname)
- [[:pre#logContent log-string]]
- pager-data)))))
- (-> (resp/response "Page not found")
- (resp/status 404)))))
-
-(defnk set-log-file-permissions [fname root-dir]
- (let [file (.getCanonicalFile (File. root-dir fname))
- run-as-user (*STORM-CONF* SUPERVISOR-RUN-WORKER-AS-USER)
- parent (.getParent (File. root-dir fname))
- md-file (if (nil? parent) nil (get-metadata-file-for-wroker-logdir parent))
- topo-owner (if (nil? md-file) nil (get-topo-owner-from-metadata-file (.getCanonicalPath md-file)))]
- (when (and run-as-user
- (not-nil? topo-owner)
- (.exists file)
- (not (Files/isReadable (.toPath file))))
- (log-debug "Setting permissions on file " fname " with topo-owner " topo-owner)
- (ClientSupervisorUtils/processLauncherAndWait *STORM-CONF* topo-owner ["blob" (.getCanonicalPath file)] nil (str "setup group read permissions for file: " fname)))))
-
-(defnk download-log-file [fname req resp user ^String root-dir :is-daemon false]
- (let [file (.getCanonicalFile (File. root-dir fname))]
- (if (.exists file)
-
- (if (or is-daemon
- (or (blank? (*STORM-CONF* UI-FILTER))
- (authorized-log-user? user fname *STORM-CONF*)))
- (-> (resp/response file)
- (resp/content-type "application/octet-stream"))
- (unauthorized-user-html user))
- (-> (resp/response "Page not found")
- (resp/status 404)))))
-
-(def grep-max-search-size 1024)
-(def grep-buf-size 2048)
-(def grep-context-size 128)
-
-(defn logviewer-port
- []
- (int (*STORM-CONF* LOGVIEWER-PORT)))
-
-(defn url-to-match-centered-in-log-page
- [needle fname offset port]
- (let [host (Utils/hostname)
- port (logviewer-port)
- fname (clojure.string/join ServerUtils/FILE_PATH_SEPARATOR (take-last 3 (split fname (re-pattern ServerUtils/FILE_PATH_SEPARATOR))))]
- (url (str "http://" host ":" port "/log")
- {:file fname
- :start (max 0
- (- offset
- (int (/ default-bytes-per-page 2))
- (int (/ (alength needle) -2)))) ;; Addition
- :length default-bytes-per-page})))
-
-(defn url-to-match-centered-in-log-page-daemon-file
- [needle fname offset port]
- (let [host (Utils/hostname)
- port (logviewer-port)
- fname (clojure.string/join ServerUtils/FILE_PATH_SEPARATOR (take-last 1 (split fname (re-pattern ServerUtils/FILE_PATH_SEPARATOR))))]
- (url (str "http://" host ":" port "/daemonlog")
- {:file fname
- :start (max 0
- (- offset
- (int (/ default-bytes-per-page 2))
- (int (/ (alength needle) -2)))) ;; Addition
- :length default-bytes-per-page})))
-
-(defnk mk-match-data
- [^bytes needle ^ByteBuffer haystack haystack-offset file-offset fname
- :is-daemon false :before-bytes nil :after-bytes nil]
- (let [url (if is-daemon
- (url-to-match-centered-in-log-page-daemon-file needle
- fname
- file-offset
- (*STORM-CONF* LOGVIEWER-PORT))
- (url-to-match-centered-in-log-page needle
- fname
- file-offset
- (*STORM-CONF* LOGVIEWER-PORT)))
- haystack-bytes (.array haystack)
- before-string (if (>= haystack-offset grep-context-size)
- (String. haystack-bytes
- (- haystack-offset grep-context-size)
- grep-context-size
- "UTF-8")
- (let [num-desired (max 0 (- grep-context-size
- haystack-offset))
- before-size (if before-bytes
- (alength before-bytes)
- 0)
- num-expected (min before-size num-desired)]
- (if (pos? num-expected)
- (str (String. before-bytes
- (- before-size num-expected)
- num-expected
- "UTF-8")
- (String. haystack-bytes
- 0
- haystack-offset
- "UTF-8"))
- (String. haystack-bytes
- 0
- haystack-offset
- "UTF-8"))))
- after-string (let [needle-size (alength needle)
- after-offset (+ haystack-offset needle-size)
- haystack-size (.limit haystack)]
- (if (< (+ after-offset grep-context-size) haystack-size)
- (String. haystack-bytes
- after-offset
- grep-context-size
- "UTF-8")
- (let [num-desired (- grep-context-size
- (- haystack-size after-offset))
- after-size (if after-bytes
- (alength after-bytes)
- 0)
- num-expected (min after-size num-desired)]
- (if (pos? num-expected)
- (str (String. haystack-bytes
- after-offset
- (- haystack-size after-offset)
- "UTF-8")
- (String. after-bytes 0 num-expected "UTF-8"))
- (String. haystack-bytes
- after-offset
- (- haystack-size after-offset)
- "UTF-8")))))]
- {"byteOffset" file-offset
- "beforeString" before-string
- "afterString" after-string
- "matchString" (String. needle "UTF-8")
- "logviewerURL" url}))
-
-(defn- try-read-ahead!
- "Tries once to read ahead in the stream to fill the context and resets the
- stream to its position before the call."
- [^BufferedInputStream stream haystack offset file-len bytes-read]
- (let [num-expected (min (- file-len bytes-read)
- grep-context-size)
- after-bytes (byte-array num-expected)]
- (.mark stream num-expected)
- ;; Only try reading once.
- (.read stream after-bytes 0 num-expected)
- (.reset stream)
- after-bytes))
-
-(defn offset-of-bytes
- "Searches a given byte array for a match of a sub-array of bytes. Returns
- the offset to the byte that matches, or -1 if no match was found."
- [^bytes buf ^bytes value init-offset]
- {:pre [(> (alength value) 0)
- (not (neg? init-offset))]}
- (loop [offset init-offset
- candidate-offset init-offset
- val-offset 0]
- (if-not (pos? (- (alength value) val-offset))
- ;; Found
- candidate-offset
- (if (>= offset (alength buf))
- ;; We ran out of buffer for the search.
- -1
- (if (not= (aget value val-offset) (aget buf offset))
- ;; The match at this candidate offset failed, so start over with the
- ;; next candidate byte from the buffer.
- (let [new-offset (inc candidate-offset)]
- (recur new-offset new-offset 0))
- ;; So far it matches. Keep going...
- (recur (inc offset) candidate-offset (inc val-offset)))))))
-
-(defn- buffer-substring-search!
- "As the file is read into a buffer, 1/2 the buffer's size at a time, we
- search the buffer for matches of the substring and return a list of zero or
- more matches."
- [is-daemon file file-len offset-to-buf init-buf-offset stream bytes-skipped
- bytes-read ^ByteBuffer haystack ^bytes needle initial-matches num-matches
- ^bytes before-bytes]
- (loop [buf-offset init-buf-offset
- matches initial-matches]
- (let [offset (offset-of-bytes (.array haystack) needle buf-offset)]
- (if (and (< (count matches) num-matches) (not (neg? offset)))
- (let [file-offset (+ offset-to-buf offset)
- bytes-needed-after-match (- (.limit haystack)
- grep-context-size
- (alength needle))
- before-arg (if (< offset grep-context-size) before-bytes)
- after-arg (if (> offset bytes-needed-after-match)
- (try-read-ahead! stream
- haystack
- offset
- file-len
- bytes-read))]
- (recur (+ offset (alength needle))
- (conj matches
- (mk-match-data needle
- haystack
- offset
- file-offset
- (.getCanonicalPath file)
- :is-daemon is-daemon
- :before-bytes before-arg
- :after-bytes after-arg))))
- (let [before-str-to-offset (min (.limit haystack)
- grep-max-search-size)
- before-str-from-offset (max 0 (- before-str-to-offset
- grep-context-size))
- new-before-bytes (Arrays/copyOfRange (.array haystack)
- before-str-from-offset
- before-str-to-offset)
- ;; It's OK if new-byte-offset is negative. This is normal if
- ;; we are out of bytes to read from a small file.
- new-byte-offset (if (>= (count matches) num-matches)
- (+ (get (last matches) "byteOffset")
- (alength needle))
- (+ bytes-skipped
- bytes-read
- (- grep-max-search-size)))]
- [matches new-byte-offset new-before-bytes])))))
-
-(defn- mk-grep-response
- "This response data only includes a next byte offset if there is more of the
- file to read."
- [search-bytes offset matches next-byte-offset]
- (merge {"searchString" (String. search-bytes "UTF-8")
- "startByteOffset" offset
- "matches" matches}
- (and next-byte-offset {"nextByteOffset" next-byte-offset})))
-
-(defn rotate-grep-buffer!
- [^ByteBuffer buf ^BufferedInputStream stream total-bytes-read file file-len]
- (let [buf-arr (.array buf)]
- ;; Copy the 2nd half of the buffer to the first half.
- (System/arraycopy buf-arr
- grep-max-search-size
- buf-arr
- 0
- grep-max-search-size)
- ;; Zero-out the 2nd half to prevent accidental matches.
- (Arrays/fill buf-arr
- grep-max-search-size
- (count buf-arr)
- (byte 0))
- ;; Fill the 2nd half with new bytes from the stream.
- (let [bytes-read (.read stream
- buf-arr
- grep-max-search-size
- (min file-len grep-max-search-size))]
- (.limit buf (+ grep-max-search-size bytes-read))
- (swap! total-bytes-read + bytes-read))))
-
-(defnk substring-search
- "Searches for a substring in a log file, starting at the given offset,
- returning the given number of matches, surrounded by the given number of
- context lines. Other information is included to be useful for progressively
- searching through a file for display in a UI. The search string must
- grep-max-search-size bytes or fewer when decoded with UTF-8."
- [file ^String search-string :is-daemon false :num-matches 10 :start-byte-offset 0]
- {:pre [(not (empty? search-string))
- (<= (count (.getBytes search-string "UTF-8")) grep-max-search-size)]}
- (let [zip-file? (.endsWith (.getName file) ".gz")
- f-input-steam (FileInputStream. file)
- gzipped-input-stream (if zip-file?
- (GZIPInputStream. f-input-steam)
- f-input-steam)
- stream ^BufferedInputStream (BufferedInputStream.
- gzipped-input-stream)
- file-len (if zip-file? (ServerUtils/zipFileSize file) (.length file))
- buf ^ByteBuffer (ByteBuffer/allocate grep-buf-size)
- buf-arr ^bytes (.array buf)
- string nil
- total-bytes-read (atom 0)
- matches []
- search-bytes ^bytes (.getBytes search-string "UTF-8")
- num-matches (or num-matches 10)
- start-byte-offset (or start-byte-offset 0)]
- ;; Start at the part of the log file we are interested in.
- ;; Allow searching when start-byte-offset == file-len so it doesn't blow up on 0-length files
- (if (> start-byte-offset file-len)
- (throw
- (InvalidRequestException. "Cannot search past the end of the file")))
- (when (> start-byte-offset 0)
- (skip-bytes stream start-byte-offset))
- (java.util.Arrays/fill buf-arr (byte 0))
- (let [bytes-read (.read stream buf-arr 0 (min file-len grep-buf-size))]
- (.limit buf bytes-read)
- (swap! total-bytes-read + bytes-read))
- (loop [initial-matches []
- init-buf-offset 0
- byte-offset start-byte-offset
- before-bytes nil]
- (let [[matches new-byte-offset new-before-bytes]
- (buffer-substring-search!
- is-daemon
- file
- file-len
- byte-offset
- init-buf-offset
- stream
- start-byte-offset
- @total-bytes-read
- buf
- search-bytes
- initial-matches
- num-matches
- before-bytes)]
- (if (and (< (count matches) num-matches)
- (< (+ @total-bytes-read start-byte-offset) file-len))
- (let [;; The start index is positioned to find any possible
- ;; occurrence search string that did not quite fit in the
- ;; buffer on the previous read.
- new-buf-offset (- (min (.limit ^ByteBuffer buf)
- grep-max-search-size)
- (alength search-bytes))]
- (rotate-grep-buffer! buf stream total-bytes-read file file-len)
- (when (< @total-bytes-read 0)
- (throw (InvalidRequestException. "Cannot search past the end of the file")))
- (recur matches
- new-buf-offset
- new-byte-offset
- new-before-bytes))
- (merge {"isDaemon" (if is-daemon "yes" "no")}
- (mk-grep-response search-bytes
- start-byte-offset
- matches
- (if-not (and (< (count matches) num-matches)
- (>= @total-bytes-read file-len))
- (let [next-byte-offset (+ (get (last matches)
- "byteOffset")
- (alength search-bytes))]
- (if (> file-len next-byte-offset)
- next-byte-offset))))))))))
-
-(defn- try-parse-int-param
- [nam value]
- (try
- (Integer/parseInt value)
- (catch java.lang.NumberFormatException e
- (->
- (str "Could not parse " nam " to an integer")
- (InvalidRequestException. e)
- throw))))
-
-(defn search-log-file
- [fname user ^String root-dir is-daemon search num-matches offset callback origin]
- (let [file (.getCanonicalFile (File. root-dir fname))]
- (if (.exists file)
- (if (or is-daemon
- (or (blank? (*STORM-CONF* UI-FILTER))
- (authorized-log-user? user fname *STORM-CONF*)))
- (let [num-matches-int (if num-matches
- (try-parse-int-param "num-matches"
- num-matches))
- offset-int (if offset
- (try-parse-int-param "start-byte-offset" offset))]
- (try
- (if (and (not (empty? search))
- <= (count (.getBytes search "UTF-8")) grep-max-search-size)
- (json-response
- (merge {"isDaemon" (if is-daemon "yes" "no")}
- (substring-search file
- search
- :is-daemon is-daemon
- :num-matches num-matches-int
- :start-byte-offset offset-int))
-
- callback
- :headers {"Access-Control-Allow-Origin" origin
- "Access-Control-Allow-Credentials" "true"})
- (throw
- (InvalidRequestException.
- (str "Search substring must be between 1 and 1024 UTF-8 "
- "bytes in size (inclusive)"))))
- (catch Exception ex
- (json-response (UIHelpers/exceptionToJson ex) callback :status 500))))
- (json-response (UIHelpers/unauthorizedUserJson user) callback :status 401))
- (json-response {"error" "Not Found"
- "errorMessage" "The file was not found on this node."}
- callback
- :status 404))))
-
-(defn find-n-matches [logs n file-offset offset search]
- (let [logs (drop file-offset logs)
- wrap-matches-fn (fn [matches]
- {"fileOffset" file-offset
- "searchString" search
- "matches" matches})]
- (loop [matches []
- logs logs
- offset offset
- file-offset file-offset
- match-count 0]
- (if (empty? logs)
- (wrap-matches-fn matches)
- (let [these-matches (try
- (log-debug "Looking through " (first logs))
- (substring-search (first logs)
- search
- :num-matches (- n match-count)
- :start-byte-offset offset)
- (catch InvalidRequestException e
- (log-error e "Can't search past end of file.")
- {}))
- file-name (get-topo-port-workerlog (first logs))
- new-matches (conj matches
- (merge these-matches
- { "fileName" file-name
- "port" (first (take-last 2 (split (.getCanonicalPath (first logs)) (re-pattern ServerUtils/FILE_PATH_SEPARATOR))))}))
- new-count (+ match-count (count (these-matches "matches")))]
- (if (empty? these-matches)
- (recur matches (rest logs) 0 (+ file-offset 1) match-count)
- (if (>= new-count n)
- (wrap-matches-fn new-matches)
- (recur new-matches (rest logs) 0 (+ file-offset 1) new-count))))))))
-
-(defn logs-for-port
- "Get the filtered, authorized, sorted log files for a port."
- [user port-dir]
- (let [filter-authorized-fn (fn [user logs]
- (filter #(or
- (blank? (*STORM-CONF* UI-FILTER))
- (authorized-log-user? user (get-topo-port-workerlog %) *STORM-CONF*)) logs))]
- (sort #(compare (.lastModified %2) (.lastModified %1))
- (filter-authorized-fn
- user
- (filter #(re-find worker-log-filename-pattern (.getName %)) (DirectoryCleaner/getFilesForDir port-dir))))))
-
-(defn deep-search-logs-for-topology
- [topology-id user ^String root-dir search num-matches port file-offset offset search-archived? callback origin]
- (json-response
- (if (or (not search) (not (.exists (File. (str root-dir ServerUtils/FILE_PATH_SEPARATOR topology-id)))))
- []
- (let [file-offset (if file-offset (Integer/parseInt file-offset) 0)
- offset (if offset (Integer/parseInt offset) 0)
- num-matches (or (Integer/parseInt num-matches) 1)
- port-dirs (vec (.listFiles (File. (str root-dir ServerUtils/FILE_PATH_SEPARATOR topology-id))))
- logs-for-port-fn (partial logs-for-port user)]
- (if (or (not port) (= "*" port))
- ;; Check for all ports
- (let [filtered-logs (filter (comp not empty?) (map logs-for-port-fn port-dirs))]
- (if search-archived?
- (map #(find-n-matches % num-matches 0 0 search)
- filtered-logs)
- (map #(find-n-matches % num-matches 0 0 search)
- (map (comp vector first) filtered-logs))))
- ;; Check just the one port
- (if (not (contains? (into #{} (map str (*STORM-CONF* SUPERVISOR-SLOTS-PORTS))) port))
- []
- (let [port-dir (File. (str root-dir ServerUtils/FILE_PATH_SEPARATOR topology-id ServerUtils/FILE_PATH_SEPARATOR port))]
- (if (or (not (.exists port-dir)) (empty? (logs-for-port user port-dir)))
- []
- (let [filtered-logs (logs-for-port user port-dir)]
- (if search-archived?
- (find-n-matches filtered-logs num-matches file-offset offset search)
- (find-n-matches [(first filtered-logs)] num-matches 0 offset search)))))))))
- callback
- :headers {"Access-Control-Allow-Origin" origin
- "Access-Control-Allow-Credentials" "true"}))
-
-(defn log-template
- ([body] (log-template body nil nil))
- ([body fname user]
- (html4
- [:head
- [:title (str (escape-html fname) " - Storm Log Viewer")]
- (include-css "/css/bootstrap-3.3.1.min.css")
- (include-css "/css/jquery.dataTables.1.10.4.min.css")
- (include-css "/css/style.css")
- ]
- [:body
- (concat
- (when (not (blank? user)) [[:div.ui-user [:p "User: " user]]])
- [[:div.ui-note [:p "Note: the drop-list shows at most 1024 files for each worker directory."]]]
- [[:h3 (escape-html fname)]]
- (seq body))
- ])))
-
-(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
-
-(defn- parse-long-from-map [m k]
- (try
- (Long/parseLong (k m))
- (catch NumberFormatException ex
- (throw (InvalidRequestException.
- (str "Could not make an integer out of the query parameter '"
- (name k) "'")
- ex)))))
-
-(defn list-log-files
- [user topoId port log-root callback origin]
- (let [file-results
- (if (nil? topoId)
- (if (nil? port)
- (get-all-logs-for-rootdir (File. log-root))
- (reduce concat
- (for [topo-dir (.listFiles (File. log-root))]
- (reduce concat
- (for [port-dir (.listFiles topo-dir)]
- (if (= (str port) (.getName port-dir))
- (into [] (DirectoryCleaner/getFilesForDir port-dir))))))))
- (if (nil? port)
- (let [topo-dir (File. (str log-root ServerUtils/FILE_PATH_SEPARATOR topoId))]
- (if (.exists topo-dir)
- (reduce concat
- (for [port-dir (.listFiles topo-dir)]
- (into [] (DirectoryCleaner/getFilesForDir port-dir))))
- []))
- (let [port-dir (ConfigUtils/getWorkerDirFromRoot log-root topoId port)]
- (if (.exists port-dir)
- (into [] (DirectoryCleaner/getFilesForDir port-dir))
- []))))
- file-strs (sort (for [file file-results]
- (get-topo-port-workerlog file)))]
- (json-response file-strs
- callback
- :headers {"Access-Control-Allow-Origin" origin
- "Access-Control-Allow-Credentials" "true"})))
-
-(defn get-profiler-dump-files
- [dir]
- (filter (comp not nil?)
- (for [f (DirectoryCleaner/getFilesForDir dir)]
- (let [name (.getName f)]
- (if (or
- (.endsWith name ".txt")
- (.endsWith name ".jfr")
- (.endsWith name ".bin"))
- (.getName f))))))
-
-(defroutes log-routes
- (GET "/log" [:as req & m]
- (try
- (.mark logviewer:num-log-page-http-requests)
- (let [servlet-request (:servlet-request req)
- log-root (:log-root req)
- user (.getUserName http-creds-handler servlet-request)
- start (if (:start m) (parse-long-from-map m :start))
- length (if (:length m) (parse-long-from-map m :length))
- file (URLDecoder/decode (:file m))]
- (set-log-file-permissions file log-root)
- (log-template (log-page file start length (:grep m) user log-root)
- file user))
- (catch InvalidRequestException ex
- (log-error ex)
- (ring-response-from-exception ex))))
- (GET "/dumps/:topo-id/:host-port/:filename"
- [:as {:keys [servlet-request servlet-response log-root]} topo-id host-port filename &m]
- (let [user (.getUserName http-creds-handler servlet-request)
- port (second (split host-port #":"))
- dir (File. (str log-root
- ServerUtils/FILE_PATH_SEPARATOR
- topo-id
- ServerUtils/FILE_PATH_SEPARATOR
- port))
- file (File. (str log-root
- ServerUtils/FILE_PATH_SEPARATOR
- topo-id
- ServerUtils/FILE_PATH_SEPARATOR
- port
- ServerUtils/FILE_PATH_SEPARATOR
- filename))]
- (if (and (.exists dir) (.exists file))
- (if (or (blank? (*STORM-CONF* UI-FILTER))
- (authorized-log-user? user
- (str topo-id ServerUtils/FILE_PATH_SEPARATOR port ServerUtils/FILE_PATH_SEPARATOR "worker.log")
- *STORM-CONF*))
- (-> (resp/response file)
- (resp/content-type "application/octet-stream"))
- (unauthorized-user-html user))
- (-> (resp/response "Page not found")
- (resp/status 404)))))
- (GET "/dumps/:topo-id/:host-port"
- [:as {:keys [servlet-request servlet-response log-root]} topo-id host-port &m]
- (let [user (.getUserName http-creds-handler servlet-request)
- port (second (split host-port #":"))
- dir (File. (str log-root
- ServerUtils/FILE_PATH_SEPARATOR
- topo-id
- ServerUtils/FILE_PATH_SEPARATOR
- port))]
- (if (.exists dir)
- (if (or (blank? (*STORM-CONF* UI-FILTER))
- (authorized-log-user? user
- (str topo-id ServerUtils/FILE_PATH_SEPARATOR port ServerUtils/FILE_PATH_SEPARATOR "worker.log")
- *STORM-CONF*))
- (html4
- [:head
- [:title "File Dumps - Storm Log Viewer"]
- (include-css "/css/bootstrap-3.3.1.min.css")
- (include-css "/css/jquery.dataTables.1.10.4.min.css")
- (include-css "/css/style.css")]
- [:body
- [:ul
- (for [file (get-profiler-dump-files dir)]
- [:li
- [:a {:href (str "/dumps/" topo-id "/" host-port "/" file)} file ]])]])
- (unauthorized-user-html user))
- (-> (resp/response "Page not found")
- (resp/status 404)))))
- (GET "/daemonlog" [:as req & m]
- (try
- (.mark logviewer:num-daemonlog-page-http-requests)
- (let [servlet-request (:servlet-request req)
- daemonlog-root (:daemonlog-root req)
- user (.getUserName http-creds-handler servlet-request)
- start (if (:start m) (parse-long-from-map m :start))
- length (if (:length m) (parse-long-from-map m :length))
- file (URLDecoder/decode (:file m))]
- (log-template (daemonlog-page file start length (:grep m) user daemonlog-root)
- file user))
- (catch InvalidRequestException ex
- (log-error ex)
- (ring-response-from-exception ex))))
- (GET "/download" [:as {:keys [servlet-request servlet-response log-root]} & m]
- (try
- (.mark logviewer:num-download-log-file-http-requests)
- (let [user (.getUserName http-creds-handler servlet-request)
- file (URLDecoder/decode (:file m))]
- (set-log-file-permissions file log-root)
- (download-log-file file servlet-request servlet-response user log-root))
- (catch InvalidRequestException ex
- (log-error ex)
- (ring-response-from-exception ex))))
- (GET "/daemondownload/:file" [:as {:keys [servlet-request servlet-response daemonlog-root]} file & m]
- (try
- (.mark logviewer:num-download-log-daemon-file-http-requests)
- (let [user (.getUserName http-creds-handler servlet-request)]
- (download-log-file file servlet-request servlet-response user daemonlog-root :is-daemon true))
- (catch InvalidRequestException ex
- (log-error ex)
- (ring-response-from-exception ex))))
- (GET "/search/:file" [:as {:keys [servlet-request servlet-response log-root daemonlog-root]} file & m]
- ;; We do not use servlet-response here, but do not remove it from the
- ;; :keys list, or this rule could stop working when an authentication
- ;; filter is configured.
- (try
- (let [user (.getUserName http-creds-handler servlet-request)
- is-daemon (= (:is-daemon m) "yes")]
- (search-log-file (URLDecoder/decode file)
- user
- (if is-daemon daemonlog-root log-root)
- is-daemon
- (:search-string m)
- (:num-matches m)
- (:start-byte-offset m)
- (:callback m)
- (.getHeader servlet-request "Origin")))
- (catch InvalidRequestException ex
- (log-error ex)
- (json-response (UIHelpers/exceptionToJson ex) (:callback m) :status 400))))
- (GET "/deepSearch/:topo-id" [:as {:keys [servlet-request servlet-response log-root]} topo-id & m]
- ;; We do not use servlet-response here, but do not remove it from the
- ;; :keys list, or this rule could stop working when an authentication
- ;; filter is configured.
- (try
- (let [user (.getUserName http-creds-handler servlet-request)]
- (deep-search-logs-for-topology topo-id
- user
- log-root
- (:search-string m)
- (:num-matches m)
- (:port m)
- (:start-file-offset m)
- (:start-byte-offset m)
- (:search-archived m)
- (:callback m)
- (.getHeader servlet-request "Origin")))
- (catch InvalidRequestException ex
- (log-error ex)
- (json-response (UIHelpers/exceptionToJson ex) (:callback m) :status 400))))
- (GET "/searchLogs" [:as req & m]
- (try
- (let [servlet-request (:servlet-request req)
- user (.getUserName http-creds-handler servlet-request)]
- (list-log-files user
- (:topoId m)
- (:port m)
- (:log-root req)
- (:callback m)
- (.getHeader servlet-request "Origin")))
- (catch InvalidRequestException ex
- (log-error ex)
- (json-response (UIHelpers/exceptionToJson ex) (:callback m) :status 400))))
- (GET "/listLogs" [:as req & m]
- (try
- (.mark logviewer:num-list-logs-http-requests)
- (let [servlet-request (:servlet-request req)
- user (.getUserName http-creds-handler servlet-request)]
- (list-log-files user
- (:topoId m)
- (:port m)
- (:log-root req)
- (:callback m)
- (.getHeader servlet-request "Origin")))
- (catch InvalidRequestException ex
- (log-error ex)
- (json-response (UIHelpers/exceptionToJson ex) (:callback m) :status 400))))
- (route/resources "/")
- (route/not-found "Page not found"))
-
-(defn conf-middleware
- "For passing the storm configuration with each request."
- [app log-root daemonlog-root]
- (fn [req]
- (app (assoc req :log-root log-root :daemonlog-root daemonlog-root))))
-
-(defn start-logviewer! [conf log-root-dir daemonlog-root-dir]
- (try
- (let [header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES))
- filter-class (conf UI-FILTER)
- filter-params (conf UI-FILTER-PARAMS)
- logapp (handler/api (-> log-routes
- requests-middleware)) ;; query params as map
- middle (conf-middleware logapp log-root-dir daemonlog-root-dir)
- filters-confs (if (conf UI-FILTER)
- [(FilterConfiguration. filter-class (or (conf UI-FILTER-PARAMS) {}))]
- [])
- filters-confs (concat filters-confs
- [(FilterConfiguration. "org.eclipse.jetty.servlets.GzipFilter" "Gzipper" {})])
- https-port (int (or (conf LOGVIEWER-HTTPS-PORT) 0))
- keystore-path (conf LOGVIEWER-HTTPS-KEYSTORE-PATH)
- keystore-pass (conf LOGVIEWER-HTTPS-KEYSTORE-PASSWORD)
- keystore-type (conf LOGVIEWER-HTTPS-KEYSTORE-TYPE)
- key-password (conf LOGVIEWER-HTTPS-KEY-PASSWORD)
- truststore-path (conf LOGVIEWER-HTTPS-TRUSTSTORE-PATH)
- truststore-password (conf LOGVIEWER-HTTPS-TRUSTSTORE-PASSWORD)
- truststore-type (conf LOGVIEWER-HTTPS-TRUSTSTORE-TYPE)
- want-client-auth (conf LOGVIEWER-HTTPS-WANT-CLIENT-AUTH)
- need-client-auth (conf LOGVIEWER-HTTPS-NEED-CLIENT-AUTH)]
- (UIHelpers/stormRunJetty (int (conf LOGVIEWER-PORT))
- (reify IConfigurator (execute [this server]
- (UIHelpers/configSsl server
- https-port
- keystore-path
- keystore-pass
- keystore-type
- key-password
- truststore-path
- truststore-password
- truststore-type
- want-client-auth
- need-client-auth)
- (UIHelpers/configFilter server (ring.util.servlet/servlet middle) filters-confs)))))
- (catch Exception ex
- (log-error ex))))
-
-(defn -main []
- (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
- log-root (ConfigUtils/workerArtifactsRoot conf)
- daemonlog-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))]
- (Utils/setupDefaultUncaughtExceptionHandler)
- (start-log-cleaner! conf log-root)
- (log-message "Starting logviewer server for storm version '"
- STORM-VERSION
- "'")
- (start-logviewer! conf log-root daemonlog-root)
- (StormMetricsRegistry/startMetricsReporters conf)))
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/src/dev/logviewer-search-context-tests.log.gz
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/logviewer-search-context-tests.log.gz b/storm-core/src/dev/logviewer-search-context-tests.log.gz
deleted file mode 100644
index 5cf2a06..0000000
Binary files a/storm-core/src/dev/logviewer-search-context-tests.log.gz and /dev/null differ
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/src/dev/logviewer-search-context-tests.log.test
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/logviewer-search-context-tests.log.test b/storm-core/src/dev/logviewer-search-context-tests.log.test
deleted file mode 100644
index 6e4d4af..0000000
--- a/storm-core/src/dev/logviewer-search-context-tests.log.test
+++ /dev/null
@@ -1 +0,0 @@
-needle needle000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000needle needle
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/src/dev/small-worker.log.test
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/small-worker.log.test b/storm-core/src/dev/small-worker.log.test
deleted file mode 100644
index 27d61d1..0000000
--- a/storm-core/src/dev/small-worker.log.test
+++ /dev/null
@@ -1 +0,0 @@
-000000 needle 000000
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/src/dev/test-3072.log.test
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/test-3072.log.test b/storm-core/src/dev/test-3072.log.test
deleted file mode 100644
index 56dc6f1..0000000
--- a/storm-core/src/dev/test-3072.log.test
+++ /dev/null
@@ -1,3 +0,0 @@
-This is a test log file of size 3072.
-
-.....................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................
.....................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................
.....................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................
....................................needle
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/src/dev/test-worker.log.test
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/test-worker.log.test b/storm-core/src/dev/test-worker.log.test
deleted file mode 100644
index 8fb4c53..0000000
--- a/storm-core/src/dev/test-worker.log.test
+++ /dev/null
@@ -1,380 +0,0 @@
-Test needle is near the beginning of the file.
-This file assumes a buffer size of 2048 bytes, a max search string size of 1024 bytes, and a context length of 128 UTF-8 characters.
-The early match tests the case when we find a match too close to the start of the file to give the normal before context strings.
-
-padding 5
-padding 6
-padding 7
-padding 8
-padding 9
-padding 10
-padding 11
-padding 12
-padding 13
-padding 14
-padding 15
-padding 16
-padding 17
-padding 18
-padding 19
-padding 20
-padding 21
-padding 22
-padding 23
-padding 24
-padding 25
-padding 26
-padding 27
-padding 28
-padding 29
-padding 30
-padding 31
-padding 32
-padding 33
-padding 34
-padding 35
-padding 36
-padding 37
-padding 38
-padding 39
-padding 40
-padding 41
-padding 42
-padding 43
-padding 44
-padding 45
-padding 46
-padding 47
-padding 48
-padding 49
-padding 50
-padding 51
-padding 52
-padding 53
-padding 54
-padding 55
-padding 56
-padding 57
-padding 58
-padding 59
-padding 60
-padding 61
-padding 62
-padding 63
-padding 64
-padding 65
-padding 66
-padding 67
-padding 68
-padding 69
-padding 70
-padding 71
-padding 72
-padding 73
-padding 74
-padding 75
-padding 76
-padding 77
-padding 78
-padding 79
-padding 80
-padding 81
-padding 82
-padding 83
-padding 84
-padding 85
-padding 86
-padding 87
-padding 88
-padding 89
-padding 90
-padding 91
-padding 92
-padding 93
-padding 94
-padding 95
-padding 96
-padding 97
-padding 98
-padding 99
-padding 100
-padding 101
-padding 102
-padding 103
-padding 104
-padding 105
-padding 106
-padding 107
-padding 108
-padding 109
-padding 110
-padding 111
-padding 112
-padding 113
-padding 114
-padding 115
-padding 116
-padding 117
-padding 118
-padding 119
-padding 120
-padding 121
-padding 122
-padding 123
-padding 124
-padding 125
-padding 126
-padding 127
-padding 128
-padding 129
-padding 130
-padding 131
-padding 132
-padding 133
-padding 134
-padding 135
-padding 136
-padding 137
-padding 138
-padding 139
-padding 140
-padding 141
-padding 142
-padding 143
-padding 144
-padding 145
-padding 146
-padding 147
-padding 148
-padding 149
-padding 150
-padding 151
-padding 152
-padding 153
-Near the end of a 1024 byte block, a needle.
-A needle that straddles a 1024 byte boundary should also be detected.
-
-padding 157
-padding 158
-padding 159
-padding 160
-padding 161
-padding 162
-padding 163
-padding 164
-padding 165
-padding 166
-padding 167
-padding 168
-padding 169
-padding 170
-padding 171
-padding 172
-padding 173
-padding 174
-padding 175
-padding 176
-padding 177
-padding 178
-padding 179
-padding 180
-padding 181
-padding 182
-padding 183
-padding 184
-padding 185
-padding 186
-padding 187
-padding 188
-padding 189
-padding 190
-padding 191
-padding 192
-padding 193
-padding 194
-padding 195
-padding 196
-padding 197
-padding 198
-padding 199
-padding 200
-padding 201
-padding 202
-padding 203
-padding 204
-padding 205
-padding 206
-padding 207
-padding 208
-padding 209
-padding 210
-padding 211
-padding 212
-padding 213
-padding 214
-padding 215
-padding 216
-padding 217
-padding 218
-padding 219
-padding 220
-padding 221
-padding 222
-padding 223
-padding 224
-padding 225
-padding 226
-padding 227
-padding 228
-padding 229
-padding 230
-padding 231
-padding 232
-padding 233
-padding 234
-padding 235
-
-
-Here a needle occurs just after a 1024 byte boundary. It should have the correct context.
-
-Text with two adjoining matches: needleneedle
-
-padding 243
-padding 244
-padding 245
-padding 246
-padding 247
-padding 248
-padding 249
-padding 250
-padding 251
-padding 252
-padding 253
-padding 254
-padding 255
-padding 256
-padding 257
-padding 258
-padding 259
-padding 260
-padding 261
-padding 262
-padding 263
-padding 264
-padding 265
-padding 266
-padding 267
-padding 268
-padding 269
-padding 270
-padding 271
-padding 272
-padding 273
-padding 274
-padding 275
-padding 276
-padding 277
-padding 278
-padding 279
-padding 280
-padding 281
-padding 282
-padding 283
-padding 284
-padding 285
-padding 286
-padding 287
-padding 288
-padding 289
-padding 290
-padding 291
-padding 292
-padding 293
-padding 294
-padding 295
-padding 296
-padding 297
-padding 298
-padding 299
-padding 300
-padding 301
-padding 302
-padding 303
-padding 304
-
-The following match of 1024 bytes completely fills half the byte buffer. It is a search substring of the maximum size......
-
-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
XXXXXXXXXXXXXXXXXXXXXXXXXXX
-The following max-size match straddles a 1024 byte buffer.
-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
XXXXXXXXXXXXXXXXXXXXXXXXXXX
-
-Here are four non-ascii 1-byte UTF-8 characters: αβγδε
-
-needle
-
-Here are four printable 2-byte UTF-8 characters: ¡¢£¤¥
-
-needle
-
-
-
-Here are four printable 3-byte UTF-8 characters: ऄअआइई
-
-needle
-
-Here are four printable 4-byte UTF-8 characters: 𐄇𐄈𐄉𐄋𐄌
-
-needle
-
-Here are four of the same invalid UTF-8 characters: ����������������
-
-needle
-
-padding 332
-padding 333
-padding 334
-padding 335
-padding 336
-padding 337
-padding 338
-padding 339
-padding 340
-padding 341
-padding 342
-padding 343
-padding 344
-padding 345
-padding 346
-padding 347
-padding 348
-padding 349
-padding 350
-padding 351
-padding 352
-padding 353
-padding 354
-padding 355
-padding 356
-padding 357
-padding 358
-padding 359
-padding 360
-padding 361
-padding 362
-padding 363
-padding 364
-padding 365
-padding 366
-padding 367
-padding 368
-padding 369
-padding 370
-padding 371
-padding 372
-padding 373
-padding 374
-padding 375
-
-The following tests multibyte UTF-8 Characters straddling the byte boundary: 𐄀𐄁𐄂
-
-needle
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/src/jvm/org/apache/storm/daemon/DirectoryCleaner.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/DirectoryCleaner.java b/storm-core/src/jvm/org/apache/storm/daemon/DirectoryCleaner.java
deleted file mode 100644
index dc76157..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/DirectoryCleaner.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon;
-
-import java.io.IOException;
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.DirectoryStream;
-import java.util.Stack;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.Comparator;
-import java.util.PriorityQueue;
-import java.util.regex.Pattern;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provide methods to help Logviewer to clean up
- * files in directories and to get a list of files without
- * worrying about excessive memory usage.
- *
- */
-public class DirectoryCleaner {
- private static final Logger LOG = LoggerFactory.getLogger(DirectoryCleaner.class);
- // used to recognize the pattern of active log files, we may remove the "current" from this list
- private static final Pattern ACTIVE_LOG_PATTERN = Pattern.compile(".*\\.(log|err|out|current|yaml|pid)$");
- // used to recognize the pattern of some meta files in a worker log directory
- private static final Pattern META_LOG_PATTERN= Pattern.compile(".*\\.(yaml|pid)$");
-
- // not defining this as static is to allow for mocking in tests
- public DirectoryStream<Path> getStreamForDirectory(File dir) throws IOException {
- DirectoryStream<Path> stream = Files.newDirectoryStream(dir.toPath());
- return stream;
- }
-
- /**
- * If totalSize of files exceeds the either the per-worker quota or global quota,
- * Logviewer deletes oldest inactive log files in a worker directory or in all worker dirs.
- * We use the parameter forPerDir to switch between the two deletion modes.
- * @param dirs the list of directories to be scanned for deletion
- * @param quota the per-dir quota or the total quota for the all directories
- * @param forPerDir if true, deletion happens for a single dir; otherwise, for all directories globally
- * @param activeDirs only for global deletion, we want to skip the active logs in activeDirs
- * @return number of files deleted
- */
- public int deleteOldestWhileTooLarge(List<File> dirs,
- long quota, boolean forPerDir, Set<String> activeDirs) throws IOException {
- final int PQ_SIZE = 1024; // max number of files to delete for every round
- final int MAX_ROUNDS = 512; // max rounds of scanning the dirs
- long totalSize = 0;
- int deletedFiles = 0;
-
- for (File dir : dirs) {
- try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) {
- for (Path path : stream) {
- File file = path.toFile();
-
- if (isFileEligibleToSkipDelete(forPerDir, activeDirs, dir, file)) {
- continue; // skip adding length
- }
-
- totalSize += file.length();
- }
- }
- }
-
- LOG.debug("totalSize: {} quota: {}", totalSize, quota);
- long toDeleteSize = totalSize - quota;
- if (toDeleteSize <= 0) {
- return deletedFiles;
- }
-
- Comparator<File> comparator = new Comparator<File>() {
- public int compare(File f1, File f2) {
- if (f1.lastModified() > f2.lastModified()) {
- return -1;
- } else {
- return 1;
- }
- }
- };
- // the oldest pq_size files in this directory will be placed in PQ, with the newest at the root
- PriorityQueue<File> pq = new PriorityQueue<File>(PQ_SIZE, comparator);
- int round = 0;
- while (toDeleteSize > 0) {
- LOG.debug("To delete size is {}, start a new round of deletion, round: {}", toDeleteSize, round);
- for (File dir : dirs) {
- try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) {
- for (Path path : stream) {
- File file = path.toFile();
- if (isFileEligibleToSkipDelete(forPerDir, activeDirs, dir, file)) {
- continue;
- }
- if (pq.size() < PQ_SIZE) {
- pq.offer(file);
- } else {
- if (file.lastModified() < pq.peek().lastModified()) {
- pq.poll();
- pq.offer(file);
- }
- }
- }
- }
- }
- // need to reverse the order of elements in PQ to delete files from oldest to newest
- Stack<File> stack = new Stack<File>();
- while (!pq.isEmpty()) {
- File file = pq.poll();
- stack.push(file);
- }
- while (!stack.isEmpty() && toDeleteSize > 0) {
- File file = stack.pop();
- toDeleteSize -= file.length();
- LOG.info("Delete file: {}, size: {}, lastModified: {}", file.getCanonicalPath(), file.length(), file.lastModified());
- file.delete();
- deletedFiles++;
- }
- pq.clear();
- round++;
- if (round >= MAX_ROUNDS) {
- if (forPerDir) {
- LOG.warn("Reach the MAX_ROUNDS: {} during per-dir deletion, you may have too many files in " +
- "a single directory : {}, will delete the rest files in next interval.",
- MAX_ROUNDS, dirs.get(0).getCanonicalPath());
- } else {
- LOG.warn("Reach the MAX_ROUNDS: {} during global deletion, you may have too many files, " +
- "will delete the rest files in next interval.", MAX_ROUNDS);
- }
- break;
- }
- }
- return deletedFiles;
- }
-
- private boolean isFileEligibleToSkipDelete(boolean forPerDir, Set<String> activeDirs, File dir, File file) throws IOException {
- if (forPerDir) {
- if (ACTIVE_LOG_PATTERN.matcher(file.getName()).matches()) {
- return true;
- }
- } else { // for global cleanup
- if (activeDirs.contains(dir.getCanonicalPath())) { // for an active worker's dir, make sure for the last "/"
- if (ACTIVE_LOG_PATTERN.matcher(file.getName()).matches()) {
- return true;
- }
- } else {
- if (META_LOG_PATTERN.matcher(file.getName()).matches()) {
- return true;
- }
- }
- }
- return false;
- }
-
- // Note that to avoid memory problem, we only return the first 1024 files in a directory
- public static List<File> getFilesForDir(File dir) throws IOException {
- List<File> files = new ArrayList<File>();
- final int MAX_NUM = 1024;
-
- try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir.toPath())) {
- for (Path path : stream) {
- files.add(path.toFile());
- if (files.size() >= MAX_NUM) {
- break;
- }
- }
- }
- return files;
- }
-}