You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/10/30 03:28:08 UTC
[1/6] storm git commit: Address comments
Repository: storm
Updated Branches:
refs/heads/master b6615d5b1 -> 0bba2baf9
Address comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce97de35
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce97de35
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce97de35
Branch: refs/heads/master
Commit: ce97de35ec8f3b12f32360e7b678e7ecab5d3f85
Parents: d009d67
Author: zhuol <zh...@yahoo-inc.com>
Authored: Thu Oct 29 12:58:53 2015 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Oct 29 13:18:15 2015 -0500
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/logviewer.clj | 23 ++++++++------------
1 file changed, 9 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ce97de35/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
index 1bcc0df..353a58e 100644
--- a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
@@ -190,7 +190,7 @@
logs)))
(defn per-workerdir-cleanup
- "Delete the oldest files in overloaded worker log dir"
+ "Delete the oldest files in each overloaded worker log dir"
[^File root-dir size]
(dofor [worker-dir (get-all-worker-dirs root-dir)]
(let [filtered-logs (filter #(not (is-active-log %)) (.listFiles worker-dir))
@@ -367,6 +367,9 @@ Note that if anything goes wrong, this will throw an Error and exit."
(defn- daemon-download-link [fname]
[[:p (link-to (url-format "/daemondownload/%s" fname) "Download Full File")]])
+(defn- is-txt-file [fname]
+ (re-find #"\.(log.*|txt|yaml|pid)$" fname))
+
(def default-bytes-per-page 51200)
(defn log-page [fname start length grep user root-dir]
@@ -390,9 +393,8 @@ Note that if anything goes wrong, this will throw an Error and exit."
length (if length
(min 10485760 length)
default-bytes-per-page)
- is-txt-file (re-find #"\.(log.*|txt|yaml|pid)$" fname)
log-string (escape-html
- (if is-txt-file
+ (if (is-txt-file fname)
(if start
(page-file path start length)
(page-file path length))
@@ -405,7 +407,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
(filter #(.contains % grep))
(string/join "\n"))
log-string)])
- (let [pager-data (if is-txt-file (pager-links fname start length file-length) nil)]
+ (let [pager-data (if (is-txt-file fname) (pager-links fname start length file-length) nil)]
(html (concat (log-file-selection-form reordered-files-str "log") ; list all files for this topology
pager-data
(download-link fname)
@@ -420,7 +422,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
(defn daemonlog-page [fname start length grep user root-dir]
(if (or (blank? (*STORM-CONF* UI-FILTER))
- (authorized-log-user? user fname *STORM-CONF*)) ;; how to deal with this???????????
+ (authorized-log-user? user fname *STORM-CONF*))
(let [file (.getCanonicalFile (File. root-dir fname))
file-length (.length file)
path (.getCanonicalPath file)
@@ -436,9 +438,8 @@ Note that if anything goes wrong, this will throw an Error and exit."
files-str (for [file log-files]
(.getName file))
reordered-files-str (conj (filter #(not= fname %) files-str) fname)
- is-txt-file (re-find #"\.(log.*|txt|yaml|pid)$" fname)
log-string (escape-html
- (if is-txt-file
+ (if (is-txt-file fname)
(if start
(page-file path start length)
(page-file path length))
@@ -451,7 +452,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
(filter #(.contains % grep))
(string/join "\n"))
log-string)])
- (let [pager-data (if is-txt-file (pager-links fname start length file-length) nil)]
+ (let [pager-data (if (is-txt-file fname) (pager-links fname start length file-length) nil)]
(html (concat (log-file-selection-form reordered-files-str "daemonlog") ; list all daemon logs
pager-data
(daemon-download-link fname)
@@ -561,9 +562,6 @@ Note that if anything goes wrong, this will throw an Error and exit."
(log-error ex)
(ring-response-from-exception ex))))
(GET "/download/:file" [:as {:keys [servlet-request servlet-response log-root]} file & m]
- ;; We do not use servlet-response here, but do not remove it from the
- ;; :keys list, or this rule could stop working when an authentication
- ;; filter is configured.
(try
(let [user (.getUserName http-creds-handler servlet-request)]
(download-log-file file servlet-request servlet-response user log-root))
@@ -571,9 +569,6 @@ Note that if anything goes wrong, this will throw an Error and exit."
(log-error ex)
(ring-response-from-exception ex))))
(GET "/daemondownload/:file" [:as {:keys [servlet-request servlet-response daemonlog-root]} file & m]
- ;; We do not use servlet-response here, but do not remove it from the
- ;; :keys list, or this rule could stop working when an authentication
- ;; filter is configured.
(try
(let [user (.getUserName http-creds-handler servlet-request)]
(download-log-file file servlet-request servlet-response user daemonlog-root))
[3/6] storm git commit: Add unit tests for logviewer cleanup, filter,
sorting, etc.
Posted by da...@apache.org.
Add unit tests for logviewer cleanup, filter, sorting, etc.
Fix create/rm temp dir in supervisor test
Address event log dir issue and other comments
Add new web route for all daemon logs: view, list and download
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d009d67d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d009d67d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d009d67d
Branch: refs/heads/master
Commit: d009d67dd0323db4c2bd472b597a99ff9de9227a
Parents: 2c2858e
Author: zhuol <zh...@yahoo-inc.com>
Authored: Tue Oct 27 09:34:03 2015 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Oct 29 13:18:15 2015 -0500
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/logviewer.clj | 119 +++++--
storm-core/src/clj/backtype/storm/ui/core.clj | 3 +-
storm-core/src/clj/backtype/storm/util.clj | 2 +-
.../storm/metric/FileBasedEventLogger.java | 5 +
.../src/jvm/backtype/storm/utils/Utils.java | 11 +-
.../test/clj/backtype/storm/logviewer_test.clj | 312 ++++++++++++-------
.../test/clj/backtype/storm/supervisor_test.clj | 19 +-
7 files changed, 330 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
index 58561a4..1bcc0df 100644
--- a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
@@ -321,7 +321,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
(if (and appender-name appender (instance? RollingFileAppender appender))
(.getParent (File. (.getFileName appender)))
(throw
- (RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and logback agree.")))))
+ (RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and log4j agree.")))))
(defnk to-btn-link
"Create a link that is formatted like a button"
@@ -329,8 +329,8 @@ Note that if anything goes wrong, this will throw an Error and exit."
[:a {:href (java.net.URI. url)
:class (str "btn btn-default " (if enabled "enabled" "disabled"))} text])
-(defn log-file-selection-form [log-files]
- [[:form {:action "log" :id "list-of-files"}
+(defn log-file-selection-form [log-files type]
+ [[:form {:action type :id "list-of-files"}
(drop-down "file" log-files )
[:input {:type "submit" :value "Switch file"}]]])
@@ -364,6 +364,9 @@ Note that if anything goes wrong, this will throw an Error and exit."
(defn- download-link [fname]
[[:p (link-to (url-format "/download/%s" fname) "Download Full File")]])
+(defn- daemon-download-link [fname]
+ [[:p (link-to (url-format "/daemondownload/%s" fname) "Download Full File")]])
+
(def default-bytes-per-page 51200)
(defn log-page [fname start length grep user root-dir]
@@ -372,20 +375,22 @@ Note that if anything goes wrong, this will throw an Error and exit."
(let [file (.getCanonicalFile (File. root-dir fname))
path (.getCanonicalPath file)
zip-file? (.endsWith path ".gz")
- file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
- topo-dir (.getParentFile (.getParentFile file))
- log-files (reduce clojure.set/union
- (sorted-set)
- (for [^File port-dir (.listFiles topo-dir)]
- (into [] (filter #(.isFile %) (.listFiles port-dir))))) ;all types of files included
- files-str (for [file log-files]
- (get-topo-port-workerlog file))
- reordered-files-str (conj (filter #(not= fname %) files-str) fname)]
- (if (.exists file)
- (let [length (if length
+ topo-dir (.getParentFile (.getParentFile file))]
+ (if (and (.exists file)
+ (= (.getCanonicalFile (File. root-dir))
+ (.getParentFile topo-dir)))
+ (let [file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
+ log-files (reduce clojure.set/union
+ (sorted-set)
+ (for [^File port-dir (.listFiles topo-dir)]
+ (into [] (filter #(.isFile %) (.listFiles port-dir))))) ;all types of files included
+ files-str (for [file log-files]
+ (get-topo-port-workerlog file))
+ reordered-files-str (conj (filter #(not= fname %) files-str) fname)
+ length (if length
(min 10485760 length)
default-bytes-per-page)
- is-txt-file (re-find #"\.(log.*|txt|yaml)$" fname)
+ is-txt-file (re-find #"\.(log.*|txt|yaml|pid)$" fname)
log-string (escape-html
(if is-txt-file
(if start
@@ -401,7 +406,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
(string/join "\n"))
log-string)])
(let [pager-data (if is-txt-file (pager-links fname start length file-length) nil)]
- (html (concat (log-file-selection-form reordered-files-str) ; list all files for this topology
+ (html (concat (log-file-selection-form reordered-files-str "log") ; list all files for this topology
pager-data
(download-link fname)
[[:pre#logContent log-string]]
@@ -413,6 +418,52 @@ Note that if anything goes wrong, this will throw an Error and exit."
(resp/status 404))
(unauthorized-user-html user))))
+(defn daemonlog-page [fname start length grep user root-dir]
+ (if (or (blank? (*STORM-CONF* UI-FILTER))
+ (authorized-log-user? user fname *STORM-CONF*)) ;; how to deal with this???????????
+ (let [file (.getCanonicalFile (File. root-dir fname))
+ file-length (.length file)
+ path (.getCanonicalPath file)
+ zip-file? (.endsWith path ".gz")]
+ (if (and (= (.getCanonicalFile (File. root-dir))
+ (.getParentFile file))
+ (.exists file))
+ (let [file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
+ length (if length
+ (min 10485760 length)
+ default-bytes-per-page)
+ log-files (into [] (filter #(.isFile %) (.listFiles (File. root-dir)))) ;all types of files included
+ files-str (for [file log-files]
+ (.getName file))
+ reordered-files-str (conj (filter #(not= fname %) files-str) fname)
+ is-txt-file (re-find #"\.(log.*|txt|yaml|pid)$" fname)
+ log-string (escape-html
+ (if is-txt-file
+ (if start
+ (page-file path start length)
+ (page-file path length))
+ "This is a binary file and cannot display! You may download the full file."))
+ start (or start (- file-length length))]
+ (if grep
+ (html [:pre#logContent
+ (if grep
+ (->> (.split log-string "\n")
+ (filter #(.contains % grep))
+ (string/join "\n"))
+ log-string)])
+ (let [pager-data (if is-txt-file (pager-links fname start length file-length) nil)]
+ (html (concat (log-file-selection-form reordered-files-str "daemonlog") ; list all daemon logs
+ pager-data
+ (daemon-download-link fname)
+ [[:pre#logContent log-string]]
+ pager-data)))))
+ (-> (resp/response "Page not found")
+ (resp/status 404))))
+ (if (nil? (get-log-user-group-whitelist fname))
+ (-> (resp/response "Page not found")
+ (resp/status 404))
+ (unauthorized-user-html user))))
+
(defn download-log-file [fname req resp user ^String root-dir]
(let [file (.getCanonicalFile (File. root-dir fname))]
(if (.exists file)
@@ -496,6 +547,19 @@ Note that if anything goes wrong, this will throw an Error and exit."
(catch InvalidRequestException ex
(log-error ex)
(ring-response-from-exception ex))))
+ (GET "/daemonlog" [:as req & m]
+ (try
+ (let [servlet-request (:servlet-request req)
+ daemonlog-root (:daemonlog-root req)
+ user (.getUserName http-creds-handler servlet-request)
+ start (if (:start m) (parse-long-from-map m :start))
+ length (if (:length m) (parse-long-from-map m :length))
+ file (url-decode (:file m))]
+ (log-template (daemonlog-page file start length (:grep m) user daemonlog-root)
+ file user))
+ (catch InvalidRequestException ex
+ (log-error ex)
+ (ring-response-from-exception ex))))
(GET "/download/:file" [:as {:keys [servlet-request servlet-response log-root]} file & m]
;; We do not use servlet-response here, but do not remove it from the
;; :keys list, or this rule could stop working when an authentication
@@ -506,6 +570,16 @@ Note that if anything goes wrong, this will throw an Error and exit."
(catch InvalidRequestException ex
(log-error ex)
(ring-response-from-exception ex))))
+ (GET "/daemondownload/:file" [:as {:keys [servlet-request servlet-response daemonlog-root]} file & m]
+ ;; We do not use servlet-response here, but do not remove it from the
+ ;; :keys list, or this rule could stop working when an authentication
+ ;; filter is configured.
+ (try
+ (let [user (.getUserName http-creds-handler servlet-request)]
+ (download-log-file file servlet-request servlet-response user daemonlog-root))
+ (catch InvalidRequestException ex
+ (log-error ex)
+ (ring-response-from-exception ex))))
(GET "/listLogs" [:as req & m]
(try
(let [servlet-request (:servlet-request req)
@@ -524,17 +598,17 @@ Note that if anything goes wrong, this will throw an Error and exit."
(defn conf-middleware
"For passing the storm configuration with each request."
- [app log-root]
+ [app log-root daemonlog-root]
(fn [req]
- (app (assoc req :log-root log-root))))
+ (app (assoc req :log-root log-root :daemonlog-root daemonlog-root))))
-(defn start-logviewer! [conf log-root-dir]
+(defn start-logviewer! [conf log-root-dir daemonlog-root-dir]
(try
(let [header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES))
filter-class (conf UI-FILTER)
filter-params (conf UI-FILTER-PARAMS)
logapp (handler/api log-routes) ;; query params as map
- middle (conf-middleware logapp log-root-dir)
+ middle (conf-middleware logapp log-root-dir daemonlog-root-dir)
filters-confs (if (conf UI-FILTER)
[{:filter-class filter-class
:filter-params (or (conf UI-FILTER-PARAMS) {})}]
@@ -572,7 +646,8 @@ Note that if anything goes wrong, this will throw an Error and exit."
(defn -main []
(let [conf (read-storm-config)
- log-root (worker-artifacts-root conf)]
+ log-root (worker-artifacts-root conf)
+ daemonlog-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))]
(setup-default-uncaught-exception-handler)
(start-log-cleaner! conf log-root)
- (start-logviewer! conf log-root)))
+ (start-logviewer! conf log-root daemonlog-root)))
http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index d30f496..130ea09 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -128,7 +128,8 @@
(logviewer-link host fname secure?)))
(defn nimbus-log-link [host port]
- (url-format "http://%s:%s/log?file=nimbus.log" host (*STORM-CONF* LOGVIEWER-PORT) port))
+ (url-format "http://%s:%s/daemonlog?file=nimbus.log" host (*STORM-CONF* LOGVIEWER-PORT) port))
+
(defn get-error-time
[error]
(if error
http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj
index 3284558..9ec8cd3 100644
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@ -593,7 +593,7 @@
attrs (make-array FileAttribute 0)
abs-path (.toAbsolutePath (Paths/get path empty-array))
abs-target (.toAbsolutePath (Paths/get target empty-array))]
- (log-message "Creating symlink [" abs-path "] to [" abs-target "]")
+ (log-debug "Creating symlink [" abs-path "] to [" abs-target "]")
(if (not (.exists (.toFile abs-path)))
(Files/createSymbolicLink abs-path abs-target attrs)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java b/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
index d7c00e1..3abb940 100644
--- a/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
+++ b/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
@@ -5,6 +5,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
+import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@@ -82,6 +83,10 @@ public class FileBasedEventLogger implements IEventLogger {
path = Paths.get(System.getProperty("storm.home"), logDir, "workers-artifacts",
stormId, Integer.toString(port), "events.log");
}
+ File dir = path.toFile().getParentFile();
+ if (!dir.exists()) {
+ dir.mkdirs();
+ }
initLogWriter(path);
setUpFlushTask();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index ebf40b6..8660739 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -742,9 +742,14 @@ public class Utils {
}
}
- //Note: Only works for zip files whose uncompressed size is less than 4 GB
- //Otherwise returns the size module 2^32, per gzip specifications
- //Returns a long, since that's what file lengths in Java/Clojure usually are.
+ /**
+ * Given a zip File input it will return its size
+ * Only works for zip files whose uncompressed size is less than 4 GB,
+ * otherwise returns the size module 2^32, per gzip specifications
+ * @param myFile The zip file as input
+ * @throws IOException
+ * @return zip file size as a long
+ */
public static long zipFileSize(File myFile) throws IOException{
RandomAccessFile raf = new RandomAccessFile(myFile, "r");
raf.seek(raf.length() - 4);
http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/test/clj/backtype/storm/logviewer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/logviewer_test.clj b/storm-core/test/clj/backtype/storm/logviewer_test.clj
index a790ddf..e9840e8 100644
--- a/storm-core/test/clj/backtype/storm/logviewer_test.clj
+++ b/storm-core/test/clj/backtype/storm/logviewer_test.clj
@@ -20,30 +20,38 @@
(:require [conjure.core])
(:use [clojure test])
(:use [conjure core])
+ (:use [backtype.storm.ui helpers])
+ (:import [java.nio.file Files])
+ (:import [java.nio.file.attribute FileAttribute])
+ (:import [java.io File])
(:import [org.mockito Mockito]))
(defmulti mk-mock-File #(:type %))
-(defmethod mk-mock-File :file [{file-name :name mtime :mtime
- :or {file-name "afile" mtime 1}}]
+(defmethod mk-mock-File :file [{file-name :name mtime :mtime length :length
+ :or {file-name "afile"
+ mtime 1
+ length (* 10 (* 1024 (* 1024 1024))) }}] ; Length 10 GB
(let [mockFile (Mockito/mock java.io.File)]
(. (Mockito/when (.getName mockFile)) thenReturn file-name)
(. (Mockito/when (.lastModified mockFile)) thenReturn mtime)
(. (Mockito/when (.isFile mockFile)) thenReturn true)
(. (Mockito/when (.getCanonicalPath mockFile))
- thenReturn (str "/mock/canonical/path/to/" file-name))
+ thenReturn (str "/mock/canonical/path/to/" file-name))
+ (. (Mockito/when (.length mockFile)) thenReturn length)
mockFile))
-(defmethod mk-mock-File :directory [{dir-name :name mtime :mtime
- :or {dir-name "adir" mtime 1}}]
+(defmethod mk-mock-File :directory [{dir-name :name mtime :mtime files :files
+ :or {dir-name "adir" mtime 1 files []}}]
(let [mockDir (Mockito/mock java.io.File)]
(. (Mockito/when (.getName mockDir)) thenReturn dir-name)
(. (Mockito/when (.lastModified mockDir)) thenReturn mtime)
(. (Mockito/when (.isFile mockDir)) thenReturn false)
+ (. (Mockito/when (.listFiles mockDir)) thenReturn (into-array File files))
mockDir))
(deftest test-mk-FileFilter-for-log-cleanup
- (testing "log file filter selects the correct log files for purge"
+ (testing "log file filter selects the correct worker-log dirs for purge"
(let [now-millis (current-time-millis)
conf {LOGVIEWER-CLEANUP-AGE-MINS 60
LOGVIEWER-CLEANUP-INTERVAL-SECS 300}
@@ -51,120 +59,181 @@
old-mtime-millis (- cutoff-millis 500)
new-mtime-millis (+ cutoff-millis 500)
matching-files (map #(mk-mock-File %)
- [{:name "oldlog-1-2-worker-3.log"
- :type :file
- :mtime old-mtime-millis}
- {:name "oldlog-1-2-worker-3.log.8"
- :type :file
- :mtime old-mtime-millis}
- {:name "foobar*_topo-1-24242-worker-2834238.log"
- :type :file
- :mtime old-mtime-millis}])
+ [{:name "3031"
+ :type :directory
+ :mtime old-mtime-millis}
+ {:name "3032"
+ :type :directory
+ :mtime old-mtime-millis}
+ {:name "7077"
+ :type :directory
+ :mtime old-mtime-millis}])
excluded-files (map #(mk-mock-File %)
- [{:name "oldlog-1-2-worker-.log"
- :type :file
- :mtime old-mtime-millis}
- {:name "olddir-1-2-worker.log"
- :type :directory
- :mtime old-mtime-millis}
- {:name "newlog-1-2-worker.log"
- :type :file
- :mtime new-mtime-millis}
- {:name "some-old-file.txt"
- :type :file
- :mtime old-mtime-millis}
- {:name "metadata"
- :type :directory
- :mtime old-mtime-millis}
- {:name "newdir-1-2-worker.log"
- :type :directory
- :mtime new-mtime-millis}
- {:name "newdir"
- :type :directory
- :mtime new-mtime-millis}
- ])
+ [{:name "oldlog-1-2-worker-.log"
+ :type :file
+ :mtime old-mtime-millis}
+ {:name "newlog-1-2-worker.log"
+ :type :file
+ :mtime new-mtime-millis}
+ {:name "some-old-file.txt"
+ :type :file
+ :mtime old-mtime-millis}
+ {:name "olddir-1-2-worker.log"
+ :type :directory
+ :mtime new-mtime-millis}
+ {:name "metadata"
+ :type :directory
+ :mtime new-mtime-millis}
+ {:name "newdir"
+ :type :directory
+ :mtime new-mtime-millis}
+ ])
file-filter (logviewer/mk-FileFilter-for-log-cleanup conf now-millis)]
- (is (every? #(.accept file-filter %) matching-files))
- (is (not-any? #(.accept file-filter %) excluded-files))
+ (is (every? #(.accept file-filter %) matching-files))
+ (is (not-any? #(.accept file-filter %) excluded-files))
)))
-(deftest test-get-log-root->files-map
- (testing "returns map of root name to list of files"
- (let [files (vec (map #(java.io.File. %) ["log-1-2-worker-3.log"
- "log-1-2-worker-3.log.1.gz"
- "log-1-2-worker-3.log.err"
- "log-1-2-worker-3.log.out"
- "log-1-2-worker-3.log.out.1.gz"
- "log-1-2-worker-3.log.1"
- "log-2-4-worker-6.log.1"]))
- expected {"log-1-2-worker-3" #{(files 0) (files 1) (files 2) (files 3) (files 4) (files 5)}
- "log-2-4-worker-6" #{(files 6)}}]
- (is (= expected (logviewer/get-log-root->files-map files))))))
-
-(deftest test-identify-worker-log-files
- (testing "Does not include metadata file when there are any log files that
- should not be cleaned up"
- (let [cutoff-millis 2000
- old-logFile (mk-mock-File {:name "mock-1-1-worker-1.log.1"
- :type :file
- :mtime (- cutoff-millis 1000)})
- mock-metaFile (mk-mock-File {:name "mock-1-1-worker-1.yaml"
- :type :file
- :mtime 1})
- new-logFile (mk-mock-File {:name "mock-1-1-worker-1.log"
- :type :file
- :mtime (+ cutoff-millis 1000)})
+(deftest test-sort-worker-logs
+ (testing "cleaner sorts the log files in ascending ages for deletion"
+ (stubbing [logviewer/filter-candidate-files (fn [x _] x)]
+ (let [now-millis (current-time-millis)
+ files1 (into-array File (map #(mk-mock-File {:name (str %)
+ :type :file
+ :mtime (- now-millis (* 100 %))})
+ (range 1 6)))
+ files2 (into-array File (map #(mk-mock-File {:name (str %)
+ :type :file
+ :mtime (- now-millis (* 100 %))})
+ (range 6 11)))
+ files3 (into-array File (map #(mk-mock-File {:name (str %)
+ :type :file
+ :mtime (- now-millis (* 100 %))})
+ (range 11 16)))
+ port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1"
+ :type :directory
+ :files files1})
+ port2-dir (mk-mock-File {:name "/workers-artifacts/topo1/port2"
+ :type :directory
+ :files files2})
+ port3-dir (mk-mock-File {:name "/workers-artifacts/topo2/port3"
+ :type :directory
+ :files files3})
+ topo1-files (into-array File [port1-dir port2-dir])
+ topo2-files (into-array File [port3-dir])
+ topo1-dir (mk-mock-File {:name "/workers-artifacts/topo1"
+ :type :directory
+ :files topo1-files})
+ topo2-dir (mk-mock-File {:name "/workers-artifacts/topo2"
+ :type :directory
+ :files topo2-files})
+ root-files (into-array File [topo1-dir topo2-dir])
+ root-dir (mk-mock-File {:name "/workers-artifacts"
+ :type :directory
+ :files root-files})
+ sorted-logs (logviewer/sorted-worker-logs root-dir)
+ sorted-ints (map #(Integer. (.getName %)) sorted-logs)]
+ (is (= (count sorted-logs) 15))
+ (is (= (count sorted-ints) 15))
+ (is (apply #'> sorted-ints))))))
+
+(deftest test-per-workerdir-cleanup
+ (testing "cleaner deletes oldest files in each worker dir if files are larger than per-dir quota."
+ (stubbing [rmr nil]
+ (let [now-millis (current-time-millis)
+ files1 (into-array File (map #(mk-mock-File {:name (str "A" %)
+ :type :file
+ :mtime (+ now-millis (* 100 %))
+ :length 200 })
+ (range 0 10)))
+ files2 (into-array File (map #(mk-mock-File {:name (str "B" %)
+ :type :file
+ :mtime (+ now-millis (* 100 %))
+ :length 200 })
+ (range 0 10)))
+ files3 (into-array File (map #(mk-mock-File {:name (str "C" %)
+ :type :file
+ :mtime (+ now-millis (* 100 %))
+ :length 200 })
+ (range 0 10)))
+ port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1"
+ :type :directory
+ :files files1})
+ port2-dir (mk-mock-File {:name "/workers-artifacts/topo1/port2"
+ :type :directory
+ :files files2})
+ port3-dir (mk-mock-File {:name "/workers-artifacts/topo2/port3"
+ :type :directory
+ :files files3})
+ topo1-files (into-array File [port1-dir port2-dir])
+ topo2-files (into-array File [port3-dir])
+ topo1-dir (mk-mock-File {:name "/workers-artifacts/topo1"
+ :type :directory
+ :files topo1-files})
+ topo2-dir (mk-mock-File {:name "/workers-artifacts/topo2"
+ :type :directory
+ :files topo2-files})
+ root-files (into-array File [topo1-dir topo2-dir])
+ root-dir (mk-mock-File {:name "/workers-artifacts"
+ :type :directory
+ :files root-files})
+ remaining-logs (logviewer/per-workerdir-cleanup root-dir 1200)]
+ (is (= (count (first remaining-logs)) 6))
+ (is (= (count (second remaining-logs)) 6))
+ (is (= (count (last remaining-logs)) 6))))))
+
+(deftest test-delete-oldest-log-cleanup
+ (testing "delete oldest logs deletes the oldest set of logs when the total size gets too large."
+ (stubbing [rmr nil]
+ (let [now-millis (current-time-millis)
+ files (into-array File (map #(mk-mock-File {:name (str %)
+ :type :file
+ :mtime (+ now-millis (* 100 %))
+ :length 100 })
+ (range 0 20)))
+ remaining-logs (logviewer/delete-oldest-while-logs-too-large files 501)]
+ (is (= (logviewer/sum-file-size files) 2000))
+ (is (= (count remaining-logs) 5))
+ (is (= (.getName (first remaining-logs)) "15"))))))
+
+(deftest test-identify-worker-log-dirs
+ (testing "Build up workerid-workerlogdir map for the old workers' dirs"
+ (let [port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1"
+ :type :directory})
+ mock-metaFile (mk-mock-File {:name "worker.yaml"
+ :type :file})
exp-id "id12345"
- exp-user "alice"
- expected {exp-id {:owner exp-user
- :files #{old-logFile}}}]
+ expected {exp-id port1-dir}]
(stubbing [supervisor/read-worker-heartbeats nil
- logviewer/get-metadata-file-for-log-root-name mock-metaFile
- read-dir-contents [(.getName old-logFile) (.getName new-logFile)]
- logviewer/get-worker-id-from-metadata-file exp-id
- logviewer/get-topo-owner-from-metadata-file exp-user]
- (is (= expected (logviewer/identify-worker-log-files [old-logFile] "/tmp/")))))))
+ logviewer/get-metadata-file-for-wroker-logdir mock-metaFile
+ logviewer/get-worker-id-from-metadata-file exp-id]
+ (is (= expected (logviewer/identify-worker-log-dirs [port1-dir])))))))
-(deftest test-get-dead-worker-files-and-owners
+(deftest test-get-dead-worker-dirs
(testing "removes any files of workers that are still alive"
(let [conf {SUPERVISOR-WORKER-TIMEOUT-SECS 5}
id->hb {"42" {:time-secs 1}}
now-secs 2
- log-files #{:expected-file :unexpected-file}
- exp-owner "alice"]
- (stubbing [logviewer/identify-worker-log-files {"42" {:owner exp-owner
- :files #{:unexpected-file}}
- "007" {:owner exp-owner
- :files #{:expected-file}}}
- logviewer/get-topo-owner-from-metadata-file "alice"
+ unexpected-dir (mk-mock-File {:name "dir1" :type :directory})
+ expected-dir (mk-mock-File {:name "dir2" :type :directory})
+ log-dirs #{unexpected-dir expected-dir}]
+ (stubbing [logviewer/identify-worker-log-dirs {"42" unexpected-dir,
+ "007" expected-dir}
supervisor/read-worker-heartbeats id->hb]
- (is (= [{:owner exp-owner :files #{:expected-file}}]
- (logviewer/get-dead-worker-files-and-owners conf now-secs log-files "/tmp/")))))))
+ (is (= #{expected-dir}
+ (logviewer/get-dead-worker-dirs conf now-secs log-dirs)))))))
(deftest test-cleanup-fn
- (testing "cleanup function removes file as user when one is specified"
- (let [exp-user "mock-user"
- mockfile1 (mk-mock-File {:name "file1" :type :file})
- mockfile2 (mk-mock-File {:name "file2" :type :file})
- mockfile3 (mk-mock-File {:name "file3" :type :file})
- mockyaml (mk-mock-File {:name "foo.yaml" :type :file})
- exp-cmd (str "rmr /mock/canonical/path/to/" (.getName mockfile3))]
- (stubbing [logviewer/select-files-for-cleanup
- [(mk-mock-File {:name "throwaway" :type :file})]
- logviewer/get-dead-worker-files-and-owners
- [{:owner nil :files #{mockfile1}}
- {:files #{mockfile2}}
- {:owner exp-user :files #{mockfile3 mockyaml}}]
- supervisor/worker-launcher nil
+ (testing "cleanup function rmr's files of dead workers"
+ (let [mockfile1 (mk-mock-File {:name "delete-me1" :type :file})
+ mockfile2 (mk-mock-File {:name "delete-me2" :type :file})]
+ (stubbing [logviewer/select-dirs-for-cleanup nil
+ logviewer/get-dead-worker-dirs (sorted-set mockfile1 mockfile2)
+ logviewer/cleanup-empty-topodir nil
rmr nil]
- (logviewer/cleanup-fn! "/tmp/")
- (verify-call-times-for supervisor/worker-launcher 1)
- (verify-first-call-args-for-indices supervisor/worker-launcher
- [1 2] exp-user exp-cmd)
- (verify-call-times-for rmr 3)
+ (logviewer/cleanup-fn! "/bogus/path")
+ (verify-call-times-for rmr 2)
(verify-nth-call-args-for 1 rmr (.getCanonicalPath mockfile1))
- (verify-nth-call-args-for 2 rmr (.getCanonicalPath mockfile2))
- (verify-nth-call-args-for 3 rmr (.getCanonicalPath mockyaml))))))
+ (verify-nth-call-args-for 2 rmr (.getCanonicalPath mockfile2))))))
(deftest test-authorized-log-user
(testing "allow cluster admin"
@@ -212,3 +281,38 @@
(is (not (logviewer/authorized-log-user? "alice" "non-blank-fname" {})))
(verify-first-call-args-for logviewer/get-log-user-group-whitelist "non-blank-fname")
(verify-first-call-args-for logviewer/user-groups "alice"))))
+
+(deftest test-list-log-files
+ (testing "list-log-files filter selects the correct log files to return"
+ (let [attrs (make-array FileAttribute 0)
+ root-path (.getCanonicalPath (.toFile (Files/createTempDirectory "workers-artifacts" attrs)))
+ file1 (clojure.java.io/file root-path "topoA" "port1" "worker.log")
+ file2 (clojure.java.io/file root-path "topoA" "port2" "worker.log")
+ file3 (clojure.java.io/file root-path "topoB" "port1" "worker.log")
+ _ (clojure.java.io/make-parents file1)
+ _ (clojure.java.io/make-parents file2)
+ _ (clojure.java.io/make-parents file3)
+ _ (.createNewFile file1)
+ _ (.createNewFile file2)
+ _ (.createNewFile file3)
+ origin "www.origin.server.net"
+ expected-all (json-response '("topoA/port1/worker.log" "topoA/port2/worker.log"
+ "topoB/port1/worker.log")
+ nil
+ :headers {"Access-Control-Allow-Origin" origin
+ "Access-Control-Allow-Credentials" "true"})
+ expected-filter-port (json-response '("topoA/port1/worker.log" "topoB/port1/worker.log")
+ nil
+ :headers {"Access-Control-Allow-Origin" origin
+ "Access-Control-Allow-Credentials" "true"})
+ expected-filter-topoId (json-response '("topoB/port1/worker.log")
+ nil
+ :headers {"Access-Control-Allow-Origin" origin
+ "Access-Control-Allow-Credentials" "true"})
+ returned-all (logviewer/list-log-files "user" nil nil root-path nil origin)
+ returned-filter-port (logviewer/list-log-files "user" nil "port1" root-path nil origin)
+ returned-filter-topoId (logviewer/list-log-files "user" "topoB" nil root-path nil origin)]
+ (rmr root-path)
+ (is (= expected-all returned-all))
+ (is (= expected-filter-port returned-filter-port))
+ (is (= expected-filter-topoId returned-filter-topoId)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj
index 7acb477..04c8600 100644
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@ -23,6 +23,9 @@
(:import [backtype.storm.scheduler ISupervisor])
(:import [backtype.storm.generated RebalanceOptions])
(:import [java.util UUID])
+ (:import [java.io File])
+ (:import [java.nio.file Files])
+ (:import [java.nio.file.attribute FileAttribute])
(:use [backtype.storm config testing util timer])
(:use [backtype.storm.daemon common])
(:require [backtype.storm.daemon [worker :as worker] [supervisor :as supervisor]]
@@ -395,12 +398,6 @@
[2]
full-env)))))))
-(defn rm-r [f]
- (if (.isDirectory f)
- (for [sub (.listFiles f)] (rm-r sub))
- (.delete f)
- ))
-
(deftest test-worker-launch-command-run-as-user
(testing "*.worker.childopts configuration"
(let [mock-port "42"
@@ -409,7 +406,8 @@
mock-mem-onheap 512
mock-sensitivity "S3"
mock-cp "mock-classpath'quote-on-purpose"
- storm-local (str "/tmp/" (UUID/randomUUID))
+ attrs (make-array FileAttribute 0)
+ storm-local (.getCanonicalPath (.toFile (Files/createTempDirectory "storm-local" attrs)))
worker-script (str storm-local "/workers/" mock-worker-id "/storm-worker-script.sh")
exp-launch ["/bin/worker-launcher"
"me"
@@ -450,13 +448,13 @@
" '" mock-storm-id "'"
" '" mock-port "'"
" '" mock-worker-id "';"))]
- (.mkdirs (io/file storm-local "workers" mock-worker-id))
(try
(testing "testing *.worker.childopts as strings with extra spaces"
(let [string-opts "-Dfoo=bar -Xmx1024m"
topo-string-opts "-Dkau=aux -Xmx2048m"
exp-script (exp-script-fn ["-Dfoo=bar" "-Xmx1024m"]
["-Dkau=aux" "-Xmx2048m"])
+ _ (.mkdirs (io/file storm-local "workers" mock-worker-id))
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
STORM-LOCAL-DIR storm-local
SUPERVISOR-RUN-WORKER-AS-USER true
@@ -480,7 +478,8 @@
[0]
exp-launch))
(is (= (slurp worker-script) exp-script))))
- (finally (rm-r (io/file storm-local))))
+ (finally (rmr storm-local)))
+ (.mkdirs (io/file storm-local "workers" mock-worker-id))
(try
(testing "testing *.worker.childopts as list of strings, with spaces in values"
(let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
@@ -509,7 +508,7 @@
[0]
exp-launch))
(is (= (slurp worker-script) exp-script))))
- (finally (rm-r (io/file storm-local)))))))
+ (finally (rmr storm-local))))))
(deftest test-workers-go-bananas
;; test that multiple workers are started for a port, and test that
[6/6] storm git commit: Merge branch 'STORM-901'
Posted by da...@apache.org.
Merge branch 'STORM-901'
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0bba2baf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0bba2baf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0bba2baf
Branch: refs/heads/master
Commit: 0bba2baf92bb4042b2c47b2266ff1e4df006a6a3
Parents: b6615d5 8cac782
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Thu Oct 29 21:26:27 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Thu Oct 29 21:26:27 2015 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
conf/defaults.yaml | 2 +
log4j2/worker.xml | 12 +-
storm-core/src/clj/backtype/storm/config.clj | 19 +
.../src/clj/backtype/storm/daemon/logviewer.clj | 514 ++++++++++++++-----
.../clj/backtype/storm/daemon/supervisor.clj | 19 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 29 +-
.../src/clj/backtype/storm/ui/helpers.clj | 26 +-
storm-core/src/clj/backtype/storm/util.clj | 41 +-
storm-core/src/jvm/backtype/storm/Config.java | 12 +
.../storm/metric/FileBasedEventLogger.java | 18 +-
.../src/jvm/backtype/storm/utils/Utils.java | 69 +++
.../test/clj/backtype/storm/logviewer_test.clj | 312 +++++++----
.../test/clj/backtype/storm/supervisor_test.clj | 210 ++++----
14 files changed, 881 insertions(+), 403 deletions(-)
----------------------------------------------------------------------
[5/6] storm git commit: STORM-901
Posted by da...@apache.org.
STORM-901
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8cac7823
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8cac7823
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8cac7823
Branch: refs/heads/master
Commit: 8cac78238532dde607560487a8bbed786f6152b7
Parents: 62c4034
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Thu Oct 29 21:25:57 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Thu Oct 29 21:25:57 2015 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8cac7823/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a422b79..2303277 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -100,6 +100,7 @@
* STORM-958: Add config for init params of group mapping service
* STORM-949: On the topology summary UI page, last shown error should have the time and date
* STORM-1142: Some config validators for positive ints need to allow 0
+ * STORM-901: Worker Artifacts Directory
## 0.10.0-beta2
* STORM-1108: Fix NPE in simulated time
[2/6] storm git commit: STORM-901 worker-artifacts for logviewer
Posted by da...@apache.org.
STORM-901 worker-artifacts for logviewer
Solve event-log issue, ui, dir, routes, listLogs
Fix supervisor test
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2c2858e4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2c2858e4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2c2858e4
Branch: refs/heads/master
Commit: 2c2858e430d0c9065e98f71ba22113628612e022
Parents: b6615d5
Author: zhuol <zh...@yahoo-inc.com>
Authored: Mon Oct 26 11:37:43 2015 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Oct 29 13:18:15 2015 -0500
----------------------------------------------------------------------
conf/defaults.yaml | 2 +
log4j2/worker.xml | 12 +-
storm-core/src/clj/backtype/storm/config.clj | 19 +
.../src/clj/backtype/storm/daemon/logviewer.clj | 434 +++++++++++++------
.../clj/backtype/storm/daemon/supervisor.clj | 19 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 26 +-
.../src/clj/backtype/storm/ui/helpers.clj | 26 +-
storm-core/src/clj/backtype/storm/util.clj | 41 +-
storm-core/src/jvm/backtype/storm/Config.java | 12 +
.../storm/metric/FileBasedEventLogger.java | 13 +-
.../src/jvm/backtype/storm/utils/Utils.java | 64 +++
.../test/clj/backtype/storm/supervisor_test.clj | 197 +++++----
12 files changed, 579 insertions(+), 286 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index f4c189c..9150ca4 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -87,6 +87,8 @@ logviewer.port: 8000
logviewer.childopts: "-Xmx128m"
logviewer.cleanup.age.mins: 10080
logviewer.appender.name: "A1"
+logviewer.max.sum.worker.logs.size.mb: 4096
+logviewer.max.per.worker.logs.size.mb: 2048
logs.users: null
http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/log4j2/worker.xml
----------------------------------------------------------------------
diff --git a/log4j2/worker.xml b/log4j2/worker.xml
index 2017699..ee7b7a1 100644
--- a/log4j2/worker.xml
+++ b/log4j2/worker.xml
@@ -23,8 +23,8 @@
</properties>
<appenders>
<RollingFile name="A1"
- fileName="${sys:storm.log.dir}/${sys:logfile.name}"
- filePattern="${sys:storm.log.dir}/${sys:logfile.name}.%i.gz">
+ fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}"
+ filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.%i.gz">
<PatternLayout>
<pattern>${pattern}</pattern>
</PatternLayout>
@@ -34,8 +34,8 @@
<DefaultRolloverStrategy max="9"/>
</RollingFile>
<RollingFile name="STDOUT"
- fileName="${sys:storm.log.dir}/${sys:logfile.name}.out"
- filePattern="${sys:storm.log.dir}/${sys:logfile.name}.out.%i.gz">
+ fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out"
+ filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out.%i.gz">
<PatternLayout>
<pattern>${patternNoTime}</pattern>
</PatternLayout>
@@ -45,8 +45,8 @@
<DefaultRolloverStrategy max="4"/>
</RollingFile>
<RollingFile name="STDERR"
- fileName="${sys:storm.log.dir}/${sys:logfile.name}.err"
- filePattern="${sys:storm.log.dir}/${sys:logfile.name}.err.%i.gz">
+ fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err"
+ filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err.%i.gz">
<PatternLayout>
<pattern>${patternNoTime}</pattern>
</PatternLayout>
http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
index e1cafec..f06f6e9 100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@ -221,6 +221,25 @@
(log-message "REMOVE worker-user " worker-id)
(.delete (File. (worker-user-file conf worker-id))))
+(defn worker-artifacts-root
+ ([conf]
+ (str (absolute-storm-local-dir conf) file-path-separator "workers-artifacts"))
+ ([conf id]
+ (str (worker-artifacts-root conf) file-path-separator id))
+ ([conf id port]
+ (str (worker-artifacts-root conf id) file-path-separator port)))
+
+(defn get-log-metadata-file
+ ([fname]
+ (let [[id port & _] (str/split fname (re-pattern file-path-separator))]
+ (get-log-metadata-file (read-storm-config) id port)))
+ ([conf id port]
+ (clojure.java.io/file (str (worker-artifacts-root conf id) file-path-separator port file-path-separator) "worker.yaml")))
+
+(defn get-worker-dir-from-root
+ [log-root id port]
+ (clojure.java.io/file (str log-root file-path-separator id file-path-separator port)))
+
(defn worker-root
([conf]
(str (absolute-storm-local-dir conf) file-path-separator "workers"))
http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
index 2a8a40c..58561a4 100644
--- a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
@@ -16,12 +16,14 @@
(ns backtype.storm.daemon.logviewer
(:use compojure.core)
(:use [clojure.set :only [difference intersection]])
- (:use [clojure.string :only [blank?]])
- (:use [hiccup core page-helpers])
+ (:use [clojure.string :only [blank? split]])
+ (:use [hiccup core page-helpers form-helpers])
(:use [backtype.storm config util log timer])
(:use [backtype.storm.ui helpers])
+ (:import [backtype.storm.utils Utils])
(:import [org.slf4j LoggerFactory])
- (:import [java.io File FileFilter FileInputStream])
+ (:import [java.io File FileFilter FileInputStream InputStream])
+ (:import [java.util.zip GZIPInputStream])
(:import [org.apache.logging.log4j LogManager])
(:import [org.apache.logging.log4j.core Appender LoggerContext])
(:import [org.apache.logging.log4j.core.appender RollingFileAppender])
@@ -29,14 +31,12 @@
[org.yaml.snakeyaml.constructor SafeConstructor])
(:import [backtype.storm.ui InvalidRequestException]
[backtype.storm.security.auth AuthUtils])
- (:require [compojure.route :as route]
- [compojure.handler :as handler]
- [ring.middleware.keyword-params]
- [ring.util.response :as resp])
(:require [backtype.storm.daemon common [supervisor :as supervisor]])
(:import [java.io File FileFilter])
(:require [compojure.route :as route]
[compojure.handler :as handler]
+ [ring.middleware.keyword-params]
+ [ring.util.codec :as codec]
[ring.util.response :as resp]
[clojure.string :as string])
(:gen-class))
@@ -46,18 +46,34 @@
(defn cleanup-cutoff-age-millis [conf now-millis]
(- now-millis (* (conf LOGVIEWER-CLEANUP-AGE-MINS) 60 1000)))
-;TODO: handle cleanup of old event log files
+(defn- last-modifiedtime-worker-logdir
+ "Return the last modified time for all log files in a worker's log dir"
+ [log-dir]
+ (apply max
+ (.lastModified log-dir)
+ (for [^File file (.listFiles log-dir)]
+ (.lastModified file))))
+
(defn mk-FileFilter-for-log-cleanup [conf now-millis]
(let [cutoff-age-millis (cleanup-cutoff-age-millis conf now-millis)]
(reify FileFilter (^boolean accept [this ^File file]
(boolean (and
- (.isFile file)
- (re-find worker-log-filename-pattern (.getName file))
- (<= (.lastModified file) cutoff-age-millis)))))))
+ (not (.isFile file))
+ (<= (last-modifiedtime-worker-logdir file) cutoff-age-millis)))))))
-(defn select-files-for-cleanup [conf now-millis root-dir]
+(defn select-dirs-for-cleanup [conf now-millis root-dir]
(let [file-filter (mk-FileFilter-for-log-cleanup conf now-millis)]
- (.listFiles (File. root-dir) file-filter)))
+ (reduce clojure.set/union
+ (sorted-set)
+ (for [^File topo-dir (.listFiles (File. root-dir))]
+ (into [] (.listFiles topo-dir file-filter))))))
+
+(defn get-topo-port-workerlog
+ "Return the path of the worker log with the format of topoId/port/worker.log.*"
+ [^File file]
+ (clojure.string/join file-path-separator
+ (take-last 3
+ (split (.getCanonicalPath file) (re-pattern file-path-separator)))))
(defn get-metadata-file-for-log-root-name [root-name root-dir]
(let [metaFile (clojure.java.io/file root-dir "metadata"
@@ -69,116 +85,202 @@
" to clean up for " root-name)
nil))))
+(defn get-metadata-file-for-wroker-logdir [logdir]
+ (let [metaFile (clojure.java.io/file logdir "worker.yaml")]
+ (if (.exists metaFile)
+ metaFile
+ (do
+ (log-warn "Could not find " (.getCanonicalPath metaFile)
+ " to clean up for " logdir)
+ nil))))
+
(defn get-worker-id-from-metadata-file [metaFile]
(get (clojure-from-yaml-file metaFile) "worker-id"))
(defn get-topo-owner-from-metadata-file [metaFile]
(get (clojure-from-yaml-file metaFile) TOPOLOGY-SUBMITTER-USER))
-(defn get-log-root->files-map [log-files]
- "Returns a map of \"root name\" to a the set of files in log-files having the
- root name. The \"root name\" of a log file is the part of the name preceding
- the extension."
- (reduce #(assoc %1 ;; The accumulated map so far
- (first %2) ;; key: The root name of the log file
- (conj (%1 (first %2) #{}) (second %2))) ;; val: The set of log files with the root name
- {} ;; initial (empty) map
- (map #(list
- (second (re-find worker-log-filename-pattern (.getName %))) ;; The root name of the log file
- %) ;; The log file
- log-files)))
-
-(defn identify-worker-log-files [log-files root-dir]
- (into {} (for [log-root-entry (get-log-root->files-map log-files)
- :let [metaFile (get-metadata-file-for-log-root-name
- (key log-root-entry) root-dir)
- log-root (key log-root-entry)
- files (val log-root-entry)]
+(defn identify-worker-log-dirs [log-dirs]
+ "return the workerid to worker-log-dir map"
+ (into {} (for [logdir log-dirs
+ :let [metaFile (get-metadata-file-for-wroker-logdir logdir)]
:when metaFile]
- {(get-worker-id-from-metadata-file metaFile)
- {:owner (get-topo-owner-from-metadata-file metaFile)
- :files
- ;; If each log for this root name is to be deleted, then
- ;; include the metadata file also.
- (if (empty? (difference
- (set (filter #(re-find (re-pattern log-root) %)
- (read-dir-contents root-dir)))
- (set (map #(.getName %) files))))
- (conj files metaFile)
- ;; Otherwise, keep the list of files as it is.
- files)}})))
-
-(defn get-dead-worker-files-and-owners [conf now-secs log-files root-dir]
- (if (empty? log-files)
- {}
- (let [id->heartbeat (supervisor/read-worker-heartbeats conf)
- alive-ids (keys (remove
- #(or (not (val %))
- (supervisor/is-worker-hb-timed-out? now-secs (val %) conf))
- id->heartbeat))
- id->entries (identify-worker-log-files log-files root-dir)]
- (for [[id {:keys [owner files]}] id->entries
- :when (not (contains? (set alive-ids) id))]
- {:owner owner
- :files files}))))
-
-(defn cleanup-fn! [log-root-dir]
+ {(get-worker-id-from-metadata-file metaFile) logdir})))
+
+(defn get-alive-ids
+ [conf now-secs]
+ (->>
+ (supervisor/read-worker-heartbeats conf)
+ (remove
+ #(or (not (val %))
+ (supervisor/is-worker-hb-timed-out? now-secs
+ (val %)
+ conf)))
+ keys
+ set))
+
+(defn get-dead-worker-dirs
+ "Return a sorted set of java.io.Files that were written by workers that are
+ now dead"
+ [conf now-secs log-dirs]
+ (if (empty? log-dirs)
+ (sorted-set)
+ (let [alive-ids (get-alive-ids conf now-secs)
+ id->dir (identify-worker-log-dirs log-dirs)]
+ (apply sorted-set
+ (for [[id dir] id->dir
+ :when (not (contains? alive-ids id))]
+ dir)))))
+
+(defn get-all-worker-dirs [^File root-dir]
+ (reduce clojure.set/union
+ (sorted-set)
+ (for [^File topo-dir (.listFiles root-dir)]
+ (into [] (.listFiles topo-dir)))))
+
+(defn get-alive-worker-dirs
+ "Return a sorted set of java.io.Files that were written by workers that are
+ now active"
+ [conf root-dir]
+ (let [alive-ids (get-alive-ids conf (current-time-secs))
+ log-dirs (get-all-worker-dirs root-dir)
+ id->dir (identify-worker-log-dirs log-dirs)]
+ (apply sorted-set
+ (for [[id dir] id->dir
+ :when (contains? alive-ids id)]
+ (.getCanonicalPath dir)))))
+
+(defn get-all-logs-for-rootdir [^File log-dir]
+ (reduce concat
+ (for [port-dir (get-all-worker-dirs log-dir)]
+ (into [] (.listFiles port-dir)))))
+
+(defn is-active-log [^File file]
+ (re-find #"\.(log|err|out|current|yaml|pid)$" (.getName file)))
+
+(defn filter-candidate-files
+ "Filter candidate files for global cleanup"
+ [logs log-dir]
+ (let [alive-worker-dirs (get-alive-worker-dirs *STORM-CONF* log-dir)]
+ (filter #(and (not= (.getName %) "worker.yaml") ; exclude metadata file
+ (not (and (contains? alive-worker-dirs (.getCanonicalPath (.getParentFile %)))
+ (is-active-log %)))) ; exclude active workers' active logs
+ logs)))
+
+(defn sorted-worker-logs
+ "Collect the wroker log files recursively, sorted by decreasing age."
+ [^File root-dir]
+ (let [files (get-all-logs-for-rootdir root-dir)
+ logs (filter-candidate-files files root-dir)]
+ (sort-by #(.lastModified %) logs)))
+
+(defn sum-file-size
+ "Given a sequence of Files, sum their sizes."
+ [files]
+ (reduce #(+ %1 (.length %2)) 0 files))
+
+(defn delete-oldest-while-logs-too-large [logs_ size]
+ (loop [logs logs_]
+ (if (> (sum-file-size logs) size)
+ (do
+ (log-message "Log sizes too high. Going to delete: " (.getName (first logs)))
+ (try (rmr (.getCanonicalPath (first logs)))
+ (catch Exception ex (log-error ex)))
+ (recur (rest logs)))
+ logs)))
+
+(defn per-workerdir-cleanup
+ "Delete the oldest files in overloaded worker log dir"
+ [^File root-dir size]
+ (dofor [worker-dir (get-all-worker-dirs root-dir)]
+ (let [filtered-logs (filter #(not (is-active-log %)) (.listFiles worker-dir))
+ sorted-logs (sort-by #(.lastModified %) filtered-logs)]
+ (delete-oldest-while-logs-too-large sorted-logs size))))
+
+(defn cleanup-empty-topodir
+ "Delete the topo dir if it contains zero port dirs"
+ [^File dir]
+ (let [topodir (.getParentFile dir)]
+ (if (empty? (.listFiles topodir))
+ (rmr (.getCanonicalPath topodir)))))
+
+(defn cleanup-fn!
+ "Delete old log dirs for which the workers are no longer alive"
+ [log-root-dir]
(let [now-secs (current-time-secs)
- old-log-files (select-files-for-cleanup *STORM-CONF* (* now-secs 1000) log-root-dir)
- dead-worker-files (get-dead-worker-files-and-owners *STORM-CONF* now-secs old-log-files log-root-dir)]
+ old-log-dirs (select-dirs-for-cleanup *STORM-CONF*
+ (* now-secs 1000)
+ log-root-dir)
+ total-size (*STORM-CONF* LOGVIEWER-MAX-SUM-WORKER-LOGS-SIZE-MB)
+ per-dir-size (*STORM-CONF* LOGVIEWER-MAX-PER-WORKER-LOGS-SIZE-MB)
+ per-dir-size (min per-dir-size (* total-size 0.5))
+ dead-worker-dirs (get-dead-worker-dirs *STORM-CONF*
+ now-secs
+ old-log-dirs)]
(log-debug "log cleanup: now=" now-secs
- " old log files " (pr-str (map #(.getName %) old-log-files))
- " dead worker files " (->> dead-worker-files
- (mapcat (fn [{l :files}] l))
- (map #(.getName %))
- (pr-str)))
- (dofor [{:keys [owner files]} dead-worker-files
- file files]
- (let [path (.getCanonicalPath file)]
- (log-message "Cleaning up: Removing " path)
- (try
- (if (or (blank? owner) (re-matches #".*\.yaml$" path))
- (rmr path)
- ;; worker-launcher does not actually launch a worker process. It
- ;; merely executes one of a prescribed set of commands. In this case, we ask it
- ;; to delete a file as the owner of that file.
- (supervisor/worker-launcher *STORM-CONF* owner (str "rmr " path)))
- (catch Exception ex
- (log-error ex)))))))
+ " old log dirs " (pr-str (map #(.getName %) old-log-dirs))
+ " dead worker dirs " (pr-str
+ (map #(.getName %) dead-worker-dirs)))
+ (dofor [dir dead-worker-dirs]
+ (let [path (.getCanonicalPath dir)]
+ (log-message "Cleaning up: Removing " path)
+ (try (rmr path)
+ (cleanup-empty-topodir dir)
+ (catch Exception ex (log-error ex)))))
+ (per-workerdir-cleanup (File. log-root-dir) (* per-dir-size (* 1024 1024)))
+ (let [all-logs (sorted-worker-logs (File. log-root-dir))
+ size (* total-size (* 1024 1024))]
+ (delete-oldest-while-logs-too-large all-logs size))))
(defn start-log-cleaner! [conf log-root-dir]
(let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
(when interval-secs
(log-debug "starting log cleanup thread at interval: " interval-secs)
- (schedule-recurring (mk-timer :thread-name "logviewer-cleanup")
+ (schedule-recurring (mk-timer :thread-name "logviewer-cleanup"
+ :kill-fn (fn [t]
+ (log-error t "Error when doing logs cleanup")
+ (exit-process! 20 "Error when doing log cleanup")))
0 ;; Start immediately.
interval-secs
(fn [] (cleanup-fn! log-root-dir))))))
+(defn- skip-bytes
+ "FileInputStream#skip may not work the first time, so ensure it successfully
+ skips the given number of bytes."
+ [^InputStream stream n]
+ (loop [skipped 0]
+ (let [skipped (+ skipped (.skip stream (- n skipped)))]
+ (if (< skipped n) (recur skipped)))))
+
+(defn logfile-matches-filter?
+ [log-file-name]
+ (let [regex-string (str "worker.log.*")
+ regex-pattern (re-pattern regex-string)]
+ (not= (re-seq regex-pattern (.toString log-file-name)) nil)))
+
(defn page-file
([path tail]
- (let [flen (.length (clojure.java.io/file path))
+ (let [zip-file? (.endsWith path ".gz")
+ flen (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
skip (- flen tail)]
(page-file path skip tail)))
([path start length]
- (with-open [input (FileInputStream. path)
- output (java.io.ByteArrayOutputStream.)]
- (if (>= start (.length (clojure.java.io/file path)))
- (throw
- (InvalidRequestException. "Cannot start past the end of the file")))
- (if (> start 0)
- ;; FileInputStream#skip may not work the first time.
- (loop [skipped 0]
- (let [skipped (+ skipped (.skip input (- start skipped)))]
- (if (< skipped start) (recur skipped)))))
- (let [buffer (make-array Byte/TYPE 1024)]
- (loop []
- (when (< (.size output) length)
- (let [size (.read input buffer 0 (min 1024 (- length (.size output))))]
- (when (pos? size)
- (.write output buffer 0 size)
- (recur)))))
- (.toString output)))))
+ (let [zip-file? (.endsWith path ".gz")
+ flen (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))]
+ (with-open [input (if zip-file? (GZIPInputStream. (FileInputStream. path)) (FileInputStream. path))
+ output (java.io.ByteArrayOutputStream.)]
+ (if (>= start flen)
+ (throw
+ (InvalidRequestException. "Cannot start past the end of the file")))
+ (if (> start 0) (skip-bytes input start))
+ (let [buffer (make-array Byte/TYPE 1024)]
+ (loop []
+ (when (< (.size output) length)
+ (let [size (.read input buffer 0 (min 1024 (- length (.size output))))]
+ (when (pos? size)
+ (.write output buffer 0 size)
+ (recur)))))
+ (.toString output))))))
(defn get-log-user-group-whitelist [fname]
(let [wl-file (get-log-metadata-file fname)
@@ -219,7 +321,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
(if (and appender-name appender (instance? RollingFileAppender appender))
(.getParent (File. (.getFileName appender)))
(throw
- (RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and log4j2 agree.")))))
+ (RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and logback agree.")))))
(defnk to-btn-link
"Create a link that is formatted like a button"
@@ -227,6 +329,11 @@ Note that if anything goes wrong, this will throw an Error and exit."
[:a {:href (java.net.URI. url)
:class (str "btn btn-default " (if enabled "enabled" "disabled"))} text])
+(defn log-file-selection-form [log-files]
+ [[:form {:action "log" :id "list-of-files"}
+ (drop-down "file" log-files )
+ [:input {:type "submit" :value "Switch file"}]]])
+
(defn pager-links [fname start length file-size]
(let [prev-start (max 0 (- start length))
next-start (if (> file-size 0)
@@ -255,25 +362,36 @@ Note that if anything goes wrong, this will throw an Error and exit."
"Next" :enabled (> next-start start))])]]))
(defn- download-link [fname]
- [[:p (link-to (url-format "/download/%s" fname) "Download Full Log")]])
+ [[:p (link-to (url-format "/download/%s" fname) "Download Full File")]])
+
+(def default-bytes-per-page 51200)
(defn log-page [fname start length grep user root-dir]
(if (or (blank? (*STORM-CONF* UI-FILTER))
(authorized-log-user? user fname *STORM-CONF*))
(let [file (.getCanonicalFile (File. root-dir fname))
- file-length (.length file)
- path (.getCanonicalPath file)]
- (if (and (= (.getCanonicalFile (File. root-dir))
- (.getParentFile file))
- (.exists file))
- (let [default-length 51200
- length (if length
+ path (.getCanonicalPath file)
+ zip-file? (.endsWith path ".gz")
+ file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
+ topo-dir (.getParentFile (.getParentFile file))
+ log-files (reduce clojure.set/union
+ (sorted-set)
+ (for [^File port-dir (.listFiles topo-dir)]
+ (into [] (filter #(.isFile %) (.listFiles port-dir))))) ;all types of files included
+ files-str (for [file log-files]
+ (get-topo-port-workerlog file))
+ reordered-files-str (conj (filter #(not= fname %) files-str) fname)]
+ (if (.exists file)
+ (let [length (if length
(min 10485760 length)
- default-length)
+ default-bytes-per-page)
+ is-txt-file (re-find #"\.(log.*|txt|yaml)$" fname)
log-string (escape-html
- (if start
- (page-file path start length)
- (page-file path length)))
+ (if is-txt-file
+ (if start
+ (page-file path start length)
+ (page-file path length))
+ "This is a binary file and cannot display! You may download the full file."))
start (or start (- file-length length))]
(if grep
(html [:pre#logContent
@@ -282,8 +400,9 @@ Note that if anything goes wrong, this will throw an Error and exit."
(filter #(.contains % grep))
(string/join "\n"))
log-string)])
- (let [pager-data (pager-links fname start length file-length)]
- (html (concat pager-data
+ (let [pager-data (if is-txt-file (pager-links fname start length file-length) nil)]
+ (html (concat (log-file-selection-form reordered-files-str) ; list all files for this topology
+ pager-data
(download-link fname)
[[:pre#logContent log-string]]
pager-data)))))
@@ -296,7 +415,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
(defn download-log-file [fname req resp user ^String root-dir]
(let [file (.getCanonicalFile (File. root-dir fname))]
- (if (= (File. root-dir) (.getParentFile file))
+ (if (.exists file)
(if (or (blank? (*STORM-CONF* UI-FILTER))
(authorized-log-user? user fname *STORM-CONF*))
(-> (resp/response file)
@@ -333,26 +452,73 @@ Note that if anything goes wrong, this will throw an Error and exit."
(name k) "'")
ex)))))
+(defn list-log-files
+ [user topoId port log-root callback origin]
+ (let [file-results
+ (if (nil? topoId)
+ (if (nil? port)
+ (get-all-logs-for-rootdir (File. log-root))
+ (reduce concat
+ (for [topo-dir (.listFiles (File. log-root))]
+ (reduce concat
+ (for [port-dir (.listFiles topo-dir)]
+ (if (= (str port) (.getName port-dir))
+ (into [] (.listFiles port-dir))))))))
+ (if (nil? port)
+ (let [topo-dir (File. (str log-root file-path-separator topoId))]
+ (if (.exists topo-dir)
+ (reduce concat
+ (for [port-dir (.listFiles topo-dir)]
+ (into [] (.listFiles port-dir))))
+ []))
+ (let [port-dir (get-worker-dir-from-root log-root topoId port)]
+ (if (.exists port-dir)
+ (into [] (.listFiles port-dir))
+ []))))
+ file-strs (sort (for [file file-results]
+ (get-topo-port-workerlog file)))]
+ (json-response file-strs
+ callback
+ :headers {"Access-Control-Allow-Origin" origin
+ "Access-Control-Allow-Credentials" "true"})))
+
(defroutes log-routes
(GET "/log" [:as req & m]
- (try
- (let [servlet-request (:servlet-request req)
- log-root (:log-root req)
- user (.getUserName http-creds-handler servlet-request)
- start (if (:start m) (parse-long-from-map m :start))
- length (if (:length m) (parse-long-from-map m :length))]
- (log-template (log-page (:file m) start length (:grep m) user log-root)
- (:file m) user))
- (catch InvalidRequestException ex
- (log-error ex)
- (ring-response-from-exception ex))))
+ (try
+ (let [servlet-request (:servlet-request req)
+ log-root (:log-root req)
+ user (.getUserName http-creds-handler servlet-request)
+ start (if (:start m) (parse-long-from-map m :start))
+ length (if (:length m) (parse-long-from-map m :length))
+ file (url-decode (:file m))]
+ (log-template (log-page file start length (:grep m) user log-root)
+ file user))
+ (catch InvalidRequestException ex
+ (log-error ex)
+ (ring-response-from-exception ex))))
(GET "/download/:file" [:as {:keys [servlet-request servlet-response log-root]} file & m]
- (try
- (let [user (.getUserName http-creds-handler servlet-request)]
- (download-log-file file servlet-request servlet-response user log-root))
- (catch InvalidRequestException ex
- (log-error ex)
- (ring-response-from-exception ex))))
+ ;; We do not use servlet-response here, but do not remove it from the
+ ;; :keys list, or this rule could stop working when an authentication
+ ;; filter is configured.
+ (try
+ (let [user (.getUserName http-creds-handler servlet-request)]
+ (download-log-file file servlet-request servlet-response user log-root))
+ (catch InvalidRequestException ex
+ (log-error ex)
+ (ring-response-from-exception ex))))
+ (GET "/listLogs" [:as req & m]
+ (try
+ (let [servlet-request (:servlet-request req)
+ user (.getUserName http-creds-handler servlet-request)]
+ (list-log-files user
+ (:topoId m)
+ (:port m)
+ (:log-root req)
+ (:callback m)
+ (.getHeader servlet-request "Origin")))
+ (catch InvalidRequestException ex
+ (log-error ex)
+ (json-response (exception->json ex) (:callback m) :status 400))))
(route/resources "/")
(route/not-found "Page not found"))
@@ -406,7 +572,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
(defn -main []
(let [conf (read-storm-config)
- log-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))]
+ log-root (worker-artifacts-root conf)]
(setup-default-uncaught-exception-handler)
(start-log-cleaner! conf log-root)
(start-logviewer! conf log-root)))
http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 0461cd7..52b2057 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -605,7 +605,7 @@
(log-message "Finished downloading code for storm id " storm-id " from " master-code-dir))))
(defn write-log-metadata-to-yaml-file! [storm-id port data conf]
- (let [file (get-log-metadata-file storm-id port)]
+ (let [file (get-log-metadata-file conf storm-id port)]
;;run worker as user needs the directory to have special permissions
;; or it is insecure
(when (not (.exists (.getParentFile file)))
@@ -614,7 +614,7 @@
(setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) (.getCanonicalPath (.getParentFile file))))
(.mkdirs (.getParentFile file))))
(let [writer (java.io.FileWriter. file)
- yaml (Yaml.)]
+ yaml (Yaml.)]
(try
(.dump yaml data writer)
(finally
@@ -669,6 +669,15 @@
(str java-home file-path-separator "bin" file-path-separator "java")
)))
+(defn create-artifacts-link
+ "Create a symlink from workder directory to its port artifacts directory"
+ [conf storm-id port worker-id]
+ (let [worker-dir (worker-root conf worker-id)
+ topo-dir (worker-artifacts-root conf storm-id)]
+ (log-message "Creating symlinks for worker-id: " worker-id " storm-id: "
+ storm-id " to its port artifacts directory")
+ (if (.exists (File. worker-dir))
+ (create-symlink! worker-dir topo-dir "artifacts" port))))
(defmethod launch-worker
:distributed [supervisor storm-id port worker-id mem-onheap]
@@ -701,8 +710,9 @@
gc-opts (substitute-childopts (if top-gc-opts top-gc-opts (conf WORKER-GC-CHILDOPTS)) worker-id storm-id port mem-onheap)
topo-worker-logwriter-childopts (storm-conf TOPOLOGY-WORKER-LOGWRITER-CHILDOPTS)
user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ logfilename "worker.log"
+ workers-artifacts (worker-artifacts-root conf)
logging-sensitivity (storm-conf TOPOLOGY-LOGGING-SENSITIVITY "S3")
- logfilename (logs-filename storm-id port)
worker-childopts (when-let [s (conf WORKER-CHILDOPTS)]
(substitute-childopts s worker-id storm-id port mem-onheap))
topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
@@ -715,6 +725,7 @@
topo-worker-logwriter-childopts
(str "-Dlogfile.name=" logfilename)
(str "-Dstorm.home=" storm-home)
+ (str "-Dworkers.artifacts=" workers-artifacts)
(str "-Dstorm.id=" storm-id)
(str "-Dworker.id=" worker-id)
(str "-Dworker.port=" port)
@@ -729,6 +740,7 @@
[(str "-Djava.library.path=" jlp)
(str "-Dlogfile.name=" logfilename)
(str "-Dstorm.home=" storm-home)
+ (str "-Dworkers.artifacts=" workers-artifacts)
(str "-Dstorm.conf.file=" storm-conf-file)
(str "-Dstorm.options=" storm-options)
(str "-Dstorm.log.dir=" storm-log-dir)
@@ -748,6 +760,7 @@
(log-message "Launching worker with command: " (shell-cmd command))
(write-log-metadata! storm-conf user worker-id storm-id port conf)
(set-worker-user! conf worker-id user)
+ (create-artifacts-link conf storm-id port worker-id)
(let [log-prefix (str "Worker Process " worker-id)
callback (fn [exit-code]
(log-message log-prefix " exited with code: " exit-code)
http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index cb5dbbe..d30f496 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -117,8 +117,7 @@
(url-format "http://%s:%s/log?file=%s"
host
(*STORM-CONF* LOGVIEWER-PORT)
- fname))
- )
+ fname)))
(defn event-log-link
[topology-id component-id host port secure?]
@@ -804,21 +803,6 @@
[sys?]
(if (or (nil? sys?) (= "false" sys?)) false true))
-(defn wrap-json-in-callback [callback response]
- (str callback "(" response ");"))
-
-(defnk json-response
- [data callback :serialize-fn to-json :status 200]
- {:status status
- :headers (merge {"Cache-Control" "no-cache, no-store"
- "Access-Control-Allow-Origin" "*"
- "Access-Control-Allow-Headers" "Content-Type, Access-Control-Allow-Headers, Access-Controler-Allow-Origin, X-Requested-By, Authorization, X-Requested-With"}
- (if (not-nil? callback) {"Content-Type" "application/javascript;charset=utf-8"}
- {"Content-Type" "application/json;charset=utf-8"}))
- :body (if (not-nil? callback)
- (wrap-json-in-callback callback (serialize-fn data))
- (serialize-fn data))})
-
(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
(defn populate-context!
@@ -992,14 +976,6 @@
(route/resources "/")
(route/not-found "Page not found"))
-(defn exception->json
- [ex]
- {"error" "Internal Server Error"
- "errorMessage"
- (let [sw (java.io.StringWriter.)]
- (.printStackTrace ex (java.io.PrintWriter. sw))
- (.toString sw))})
-
(defn catch-errors
[handler]
(fn [request]
http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/clj/backtype/storm/ui/helpers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/helpers.clj b/storm-core/src/clj/backtype/storm/ui/helpers.clj
index cbbff85..9b82aaa 100644
--- a/storm-core/src/clj/backtype/storm/ui/helpers.clj
+++ b/storm-core/src/clj/backtype/storm/ui/helpers.clj
@@ -20,7 +20,7 @@
[string :only [blank? join]]
[walk :only [keywordize-keys]]])
(:use [backtype.storm config log])
- (:use [backtype.storm.util :only [clojurify-structure uuid defnk url-encode not-nil?]])
+ (:use [backtype.storm.util :only [clojurify-structure uuid defnk to-json url-encode not-nil?]])
(:use [clj-time coerce format])
(:import [backtype.storm.generated ExecutorInfo ExecutorSummary])
(:import [backtype.storm.logging.filters AccessLoggingFilter])
@@ -200,3 +200,27 @@
configurator (:configurator config)]
(configurator s)
(.start s)))
+
+(defn wrap-json-in-callback [callback response]
+ (str callback "(" response ");"))
+
+(defnk json-response
+ [data callback :serialize-fn to-json :status 200 :headers {}]
+ {:status status
+ :headers (merge {"Cache-Control" "no-cache, no-store"
+ "Access-Control-Allow-Origin" "*"
+ "Access-Control-Allow-Headers" "Content-Type, Access-Control-Allow-Headers, Access-Controler-Allow-Origin, X-Requested-By, X-Csrf-Token, Authorization, X-Requested-With"}
+ (if (not-nil? callback) {"Content-Type" "application/javascript;charset=utf-8"}
+ {"Content-Type" "application/json;charset=utf-8"})
+ headers)
+ :body (if (not-nil? callback)
+ (wrap-json-in-callback callback (serialize-fn data))
+ (serialize-fn data))})
+
+(defn exception->json
+ [ex]
+ {"error" "Internal Server Error"
+ "errorMessage"
+ (let [sw (java.io.StringWriter.)]
+ (.printStackTrace ex (java.io.PrintWriter. sw))
+ (.toString sw))})
http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj
index 19e52a6..3284558 100644
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@ -26,6 +26,8 @@
(:import [java.util.zip ZipFile])
(:import [java.util.concurrent.locks ReentrantReadWriteLock])
(:import [java.util.concurrent Semaphore])
+ (:import [java.nio.file Files Paths])
+ (:import [java.nio.file.attribute FileAttribute])
(:import [java.io File FileOutputStream RandomAccessFile StringWriter
PrintWriter BufferedReader InputStreamReader IOException])
(:import [java.lang.management ManagementFactory])
@@ -580,6 +582,21 @@
(when-not success?
(throw (RuntimeException. (str "Failed to touch " path))))))
+(defn create-symlink!
+ "Create symlink is to the target"
+ ([path-dir target-dir file-name]
+ (create-symlink! path-dir target-dir file-name file-name))
+ ([path-dir target-dir from-file-name to-file-name]
+ (let [path (str path-dir file-path-separator from-file-name)
+ target (str target-dir file-path-separator to-file-name)
+ empty-array (make-array String 0)
+ attrs (make-array FileAttribute 0)
+ abs-path (.toAbsolutePath (Paths/get path empty-array))
+ abs-target (.toAbsolutePath (Paths/get target empty-array))]
+ (log-message "Creating symlink [" abs-path "] to [" abs-target "]")
+ (if (not (.exists (.toFile abs-path)))
+ (Files/createSymbolicLink abs-path abs-target attrs)))))
+
(defn read-dir-contents
[dir]
(if (exists-file? dir)
@@ -1019,27 +1036,13 @@
(.getCanonicalPath
(clojure.java.io/file (System/getProperty "storm.home") "logs")))
-(defn- logs-rootname
- ([storm-id port] (logs-rootname storm-id port "-worker-"))
- ([storm-id port type] (str storm-id type port)))
-
(defn logs-filename
- ([storm-id port] (str (logs-rootname storm-id port) ".log"))
- ([storm-id port type] (str (logs-rootname storm-id port type) ".log")))
-
-(defn event-logs-filename [storm-id port] (logs-filename storm-id port "-events-"))
-
-(defn logs-metadata-filename [storm-id port]
- (str (logs-rootname storm-id port) ".yaml"))
-
-(def worker-log-filename-pattern #"^((.*-\d+-\d+)-worker-(\d+))\.log")
+ [storm-id port]
+ (str storm-id file-path-separator port file-path-separator "worker.log"))
-(defn get-log-metadata-file
- ([fname]
- (if-let [[_ _ id port] (re-matches worker-log-filename-pattern fname)]
- (get-log-metadata-file id port)))
- ([id port]
- (clojure.java.io/file LOG-DIR "metadata" (logs-metadata-filename id port))))
+(defn event-logs-filename
+ [storm-id port]
+ (str storm-id file-path-separator port file-path-separator "events.log"))
(defn clojure-from-yaml-file [yamlFile]
(try
http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 2ee423e..764497d 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -577,6 +577,18 @@ public class Config extends HashMap<String, Object> {
public static final String LOGVIEWER_CLEANUP_AGE_MINS = "logviewer.cleanup.age.mins";
/**
+ * The maximum number of bytes all worker log files can take up in MB
+ */
+ @isPositiveNumber
+ public static final String LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB = "logviewer.max.sum.worker.logs.size.mb";
+
+ /**
+ * The maximum number of bytes per worker's files can take up in MB
+ */
+ @isPositiveNumber
+ public static final String LOGVIEWER_MAX_PER_WORKER_LOGS_SIZE_MB = "logviewer.max.per.worker.logs.size.mb";
+
+ /**
* Storm Logviewer HTTPS port
*/
@isInteger
http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java b/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
index 3834c55..d7c00e1 100644
--- a/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
+++ b/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
@@ -61,11 +61,11 @@ public class FileBasedEventLogger implements IEventLogger {
@Override
public void prepare(Map stormConf, TopologyContext context) {
- String logDir;
+ String logDir; // storm local directory
String stormId = context.getStormId();
int port = context.getThisWorkerPort();
- if((logDir = System.getProperty("storm.log.dir")) == null
- && (logDir = System.getProperty("java.io.tmpdir")) == null) {
+ if ((logDir = System.getProperty("storm.local.dir")) == null &&
+ (logDir = (String)stormConf.get("storm.local.dir")) == null) {
String msg = "Could not determine the directory to log events.";
LOG.error(msg);
throw new RuntimeException(msg);
@@ -77,7 +77,12 @@ public class FileBasedEventLogger implements IEventLogger {
* Include the topology name & worker port in the file name so that
* multiple event loggers can log independently.
*/
- initLogWriter(Paths.get(logDir, String.format("%s-events-%d.log", stormId, port)));
+ Path path = Paths.get(logDir, "workers-artifacts", stormId, Integer.toString(port), "events.log");
+ if (!path.isAbsolute()) {
+ path = Paths.get(System.getProperty("storm.home"), logDir, "workers-artifacts",
+ stormId, Integer.toString(port), "events.log");
+ }
+ initLogWriter(path);
setUpFlushTask();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index c852306..ebf40b6 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -54,8 +54,10 @@ import java.io.ByteArrayInputStream;
import java.io.OutputStreamWriter;
import java.io.InputStreamReader;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.FileOutputStream;
import java.io.BufferedReader;
+import java.io.RandomAccessFile;
import java.io.Serializable;
import java.io.IOException;
import java.util.Map;
@@ -69,6 +71,8 @@ import java.util.TreeMap;
import java.util.UUID;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
public class Utils {
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
@@ -692,5 +696,65 @@ public class Utils {
}
}
}
+
+ /**
+ * Given a File input it will unzip the file in a the unzip directory
+ * passed as the second parameter
+ * @param inFile The zip file as input
+ * @param unzipDir The unzip directory where to unzip the zip file.
+ * @throws IOException
+ */
+ public static void unZip(File inFile, File unzipDir) throws IOException {
+ Enumeration<? extends ZipEntry> entries;
+ ZipFile zipFile = new ZipFile(inFile);
+
+ try {
+ entries = zipFile.entries();
+ while (entries.hasMoreElements()) {
+ ZipEntry entry = entries.nextElement();
+ if (!entry.isDirectory()) {
+ InputStream in = zipFile.getInputStream(entry);
+ try {
+ File file = new File(unzipDir, entry.getName());
+ if (!file.getParentFile().mkdirs()) {
+ if (!file.getParentFile().isDirectory()) {
+ throw new IOException("Mkdirs failed to create " +
+ file.getParentFile().toString());
+ }
+ }
+ OutputStream out = new FileOutputStream(file);
+ try {
+ byte[] buffer = new byte[8192];
+ int i;
+ while ((i = in.read(buffer)) != -1) {
+ out.write(buffer, 0, i);
+ }
+ } finally {
+ out.close();
+ }
+ } finally {
+ in.close();
+ }
+ }
+ }
+ } finally {
+ zipFile.close();
+ }
+ }
+
+ //Note: Only works for zip files whose uncompressed size is less than 4 GB
+ //Otherwise returns the size module 2^32, per gzip specifications
+ //Returns a long, since that's what file lengths in Java/Clojure usually are.
+ public static long zipFileSize(File myFile) throws IOException{
+ RandomAccessFile raf = new RandomAccessFile(myFile, "r");
+ raf.seek(raf.length() - 4);
+ long b4 = raf.read();
+ long b3 = raf.read();
+ long b2 = raf.read();
+ long b1 = raf.read();
+ long val = (b1 << 24) | (b2 << 16) + (b3 << 8) + b4;
+ raf.close();
+ return val;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj
index fab401a..7acb477 100644
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@ -273,12 +273,13 @@
mock-sensitivity "S3"
mock-cp "/base:/stormjar.jar"
exp-args-fn (fn [opts topo-opts classpath]
- (concat [(supervisor/java-cmd) "-cp" classpath
- (str "-Dlogfile.name=" mock-storm-id "-worker-" mock-port ".log")
+ (concat [(supervisor/java-cmd) "-cp" classpath
+ (str "-Dlogfile.name=" "worker.log")
"-Dstorm.home="
- (str "-Dstorm.id=" mock-storm-id)
- (str "-Dworker.id=" mock-worker-id)
- (str "-Dworker.port=" mock-port)
+ (str "-Dworkers.artifacts=" "/tmp/workers-artifacts")
+ (str "-Dstorm.id=" mock-storm-id)
+ (str "-Dworker.id=" mock-worker-id)
+ (str "-Dworker.port=" mock-port)
"-Dstorm.log.dir=/logs"
"-Dlog4j.configurationFile=/log4j2/worker.xml"
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
@@ -287,8 +288,9 @@
opts
topo-opts
["-Djava.library.path="
- (str "-Dlogfile.name=" mock-storm-id "-worker-" mock-port ".log")
+ (str "-Dlogfile.name=" "worker.log")
"-Dstorm.home="
+ "-Dworkers.artifacts=/tmp/workers-artifacts"
"-Dstorm.conf.file="
"-Dstorm.options="
(str "-Dstorm.log.dir=" file-path-separator "logs")
@@ -318,6 +320,7 @@
launch-process nil
set-worker-user! nil
supervisor/jlp nil
+ worker-artifacts-root "/tmp/workers-artifacts"
supervisor/write-log-metadata! nil]
(supervisor/launch-worker mock-supervisor
mock-storm-id
@@ -340,6 +343,7 @@
launch-process nil
set-worker-user! nil
supervisor/jlp nil
+ worker-artifacts-root "/tmp/workers-artifacts"
supervisor/write-log-metadata! nil]
(supervisor/launch-worker mock-supervisor
mock-storm-id
@@ -356,6 +360,7 @@
(stubbing [read-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
supervisor-stormdist-root nil
supervisor/jlp nil
+ worker-artifacts-root "/tmp/workers-artifacts"
set-worker-user! nil
supervisor/write-log-metadata! nil
launch-process nil
@@ -376,6 +381,7 @@
(stubbing [read-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}
supervisor-stormdist-root nil
supervisor/jlp nil
+ worker-artifacts-root "/tmp/workers-artifacts"
launch-process nil
set-worker-user! nil
supervisor/write-log-metadata! nil
@@ -411,96 +417,99 @@
(str storm-local "/workers/" mock-worker-id)
worker-script]
exp-script-fn (fn [opts topo-opts]
- (str "#!/bin/bash\n'export' 'LD_LIBRARY_PATH=';\n\nexec 'java'"
- " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
- " '-Dlogfile.name=" mock-storm-id "-worker-" mock-port ".log'"
- " '-Dstorm.home='"
- " '-Dstorm.id=" mock-storm-id "'"
- " '-Dworker.id=" mock-worker-id "'"
- " '-Dworker.port=" mock-port "'"
- " '-Dstorm.log.dir=/logs'"
- " '-Dlog4j.configurationFile=/log4j2/worker.xml'"
- " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
- " 'backtype.storm.LogWriter'"
- " 'java' '-server'"
- " " (shell-cmd opts)
- " " (shell-cmd topo-opts)
- " '-Djava.library.path='"
- " '-Dlogfile.name=" mock-storm-id "-worker-" mock-port ".log'"
- " '-Dstorm.home='"
- " '-Dstorm.conf.file='"
- " '-Dstorm.options='"
- " '-Dstorm.log.dir=/logs'"
- " '-Dlogging.sensitivity=" mock-sensitivity "'"
- " '-Dlog4j.configurationFile=/log4j2/worker.xml'"
- " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
- " '-Dstorm.id=" mock-storm-id "'"
- " '-Dworker.id=" mock-worker-id "'"
- " '-Dworker.port=" mock-port "'"
- " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
- " 'backtype.storm.daemon.worker'"
- " '" mock-storm-id "'"
- " '" mock-port "'"
- " '" mock-worker-id "';"))]
+ (str "#!/bin/bash\n'export' 'LD_LIBRARY_PATH=';\n\nexec 'java'"
+ " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
+ " '-Dlogfile.name=" "worker.log'"
+ " '-Dstorm.home='"
+ " '-Dworkers.artifacts=" (str storm-local "/workers-artifacts'")
+ " '-Dstorm.id=" mock-storm-id "'"
+ " '-Dworker.id=" mock-worker-id "'"
+ " '-Dworker.port=" mock-port "'"
+ " '-Dstorm.log.dir=/logs'"
+ " '-Dlog4j.configurationFile=/log4j2/worker.xml'"
+ " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
+ " 'backtype.storm.LogWriter'"
+ " 'java' '-server'"
+ " " (shell-cmd opts)
+ " " (shell-cmd topo-opts)
+ " '-Djava.library.path='"
+ " '-Dlogfile.name=" "worker.log'"
+ " '-Dstorm.home='"
+ " '-Dworkers.artifacts=" (str storm-local "/workers-artifacts'")
+ " '-Dstorm.conf.file='"
+ " '-Dstorm.options='"
+ " '-Dstorm.log.dir=/logs'"
+ " '-Dlogging.sensitivity=" mock-sensitivity "'"
+ " '-Dlog4j.configurationFile=/log4j2/worker.xml'"
+ " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
+ " '-Dstorm.id=" mock-storm-id "'"
+ " '-Dworker.id=" mock-worker-id "'"
+ " '-Dworker.port=" mock-port "'"
+ " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
+ " 'backtype.storm.daemon.worker'"
+ " '" mock-storm-id "'"
+ " '" mock-port "'"
+ " '" mock-worker-id "';"))]
(.mkdirs (io/file storm-local "workers" mock-worker-id))
(try
- (testing "testing *.worker.childopts as strings with extra spaces"
- (let [string-opts "-Dfoo=bar -Xmx1024m"
- topo-string-opts "-Dkau=aux -Xmx2048m"
- exp-script (exp-script-fn ["-Dfoo=bar" "-Xmx1024m"]
- ["-Dkau=aux" "-Xmx2048m"])
- mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
- STORM-LOCAL-DIR storm-local
- SUPERVISOR-RUN-WORKER-AS-USER true
- WORKER-CHILDOPTS string-opts}}]
- (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
- topo-string-opts
- TOPOLOGY-SUBMITTER-USER "me"}
- add-to-classpath mock-cp
- supervisor-stormdist-root nil
- launch-process nil
- set-worker-user! nil
- supervisor/java-cmd "java"
- supervisor/jlp nil
- supervisor/write-log-metadata! nil]
- (supervisor/launch-worker mock-supervisor
- mock-storm-id
- mock-port
- mock-worker-id
- mock-mem-onheap)
- (verify-first-call-args-for-indices launch-process
- [0]
- exp-launch))
- (is (= (slurp worker-script) exp-script))))
- (testing "testing *.worker.childopts as list of strings, with spaces in values"
- (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
- topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
- exp-script (exp-script-fn list-opts topo-list-opts)
- mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
- STORM-LOCAL-DIR storm-local
- SUPERVISOR-RUN-WORKER-AS-USER true
- WORKER-CHILDOPTS list-opts}}]
- (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
- topo-list-opts
- TOPOLOGY-SUBMITTER-USER "me"}
- add-to-classpath mock-cp
- supervisor-stormdist-root nil
- launch-process nil
- set-worker-user! nil
- supervisor/java-cmd "java"
- supervisor/jlp nil
- supervisor/write-log-metadata! nil]
- (supervisor/launch-worker mock-supervisor
- mock-storm-id
- mock-port
- mock-worker-id
- mock-mem-onheap)
- (verify-first-call-args-for-indices launch-process
- [0]
- exp-launch))
- (is (= (slurp worker-script) exp-script))))
-(finally (rm-r (io/file storm-local)))
-))))
+ (testing "testing *.worker.childopts as strings with extra spaces"
+ (let [string-opts "-Dfoo=bar -Xmx1024m"
+ topo-string-opts "-Dkau=aux -Xmx2048m"
+ exp-script (exp-script-fn ["-Dfoo=bar" "-Xmx1024m"]
+ ["-Dkau=aux" "-Xmx2048m"])
+ mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
+ STORM-LOCAL-DIR storm-local
+ SUPERVISOR-RUN-WORKER-AS-USER true
+ WORKER-CHILDOPTS string-opts}}]
+ (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+ topo-string-opts
+ TOPOLOGY-SUBMITTER-USER "me"}
+ add-to-classpath mock-cp
+ supervisor-stormdist-root nil
+ launch-process nil
+ set-worker-user! nil
+ supervisor/java-cmd "java"
+ supervisor/jlp nil
+ supervisor/write-log-metadata! nil]
+ (supervisor/launch-worker mock-supervisor
+ mock-storm-id
+ mock-port
+ mock-worker-id
+ mock-mem-onheap)
+ (verify-first-call-args-for-indices launch-process
+ [0]
+ exp-launch))
+ (is (= (slurp worker-script) exp-script))))
+ (finally (rm-r (io/file storm-local))))
+ (try
+ (testing "testing *.worker.childopts as list of strings, with spaces in values"
+ (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
+ topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
+ exp-script (exp-script-fn list-opts topo-list-opts)
+ mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
+ STORM-LOCAL-DIR storm-local
+ SUPERVISOR-RUN-WORKER-AS-USER true
+ WORKER-CHILDOPTS list-opts}}]
+ (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+ topo-list-opts
+ TOPOLOGY-SUBMITTER-USER "me"}
+ add-to-classpath mock-cp
+ supervisor-stormdist-root nil
+ launch-process nil
+ set-worker-user! nil
+ supervisor/java-cmd "java"
+ supervisor/jlp nil
+ supervisor/write-log-metadata! nil]
+ (supervisor/launch-worker mock-supervisor
+ mock-storm-id
+ mock-port
+ mock-worker-id
+ mock-mem-onheap)
+ (verify-first-call-args-for-indices launch-process
+ [0]
+ exp-launch))
+ (is (= (slurp worker-script) exp-script))))
+ (finally (rm-r (io/file storm-local)))))))
(deftest test-workers-go-bananas
;; test that multiple workers are started for a port, and test that
[4/6] storm git commit: Merge branch '901' of
https://github.com/zhuoliu/storm into STORM-901
Posted by da...@apache.org.
Merge branch '901' of https://github.com/zhuoliu/storm into STORM-901
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/62c40348
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/62c40348
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/62c40348
Branch: refs/heads/master
Commit: 62c40348ffdac1844067307f6b595633b1addac7
Parents: b6615d5 ce97de3
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Thu Oct 29 21:23:40 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Thu Oct 29 21:23:40 2015 -0500
----------------------------------------------------------------------
conf/defaults.yaml | 2 +
log4j2/worker.xml | 12 +-
storm-core/src/clj/backtype/storm/config.clj | 19 +
.../src/clj/backtype/storm/daemon/logviewer.clj | 514 ++++++++++++++-----
.../clj/backtype/storm/daemon/supervisor.clj | 19 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 29 +-
.../src/clj/backtype/storm/ui/helpers.clj | 26 +-
storm-core/src/clj/backtype/storm/util.clj | 41 +-
storm-core/src/jvm/backtype/storm/Config.java | 12 +
.../storm/metric/FileBasedEventLogger.java | 18 +-
.../src/jvm/backtype/storm/utils/Utils.java | 69 +++
.../test/clj/backtype/storm/logviewer_test.clj | 312 +++++++----
.../test/clj/backtype/storm/supervisor_test.clj | 210 ++++----
13 files changed, 880 insertions(+), 403 deletions(-)
----------------------------------------------------------------------