You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/10/30 03:28:10 UTC

[3/6] storm git commit: Add unit tests for logviewer cleanup, filter, sorting, etc.

Add unit tests for logviewer cleanup, filter, sorting, etc.

Fix create/rm temp dir in supervisor test

Address event log dir issue and other comments

Add new web route for all daemon logs: view, list and download


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d009d67d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d009d67d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d009d67d

Branch: refs/heads/master
Commit: d009d67dd0323db4c2bd472b597a99ff9de9227a
Parents: 2c2858e
Author: zhuol <zh...@yahoo-inc.com>
Authored: Tue Oct 27 09:34:03 2015 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Oct 29 13:18:15 2015 -0500

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/logviewer.clj | 119 +++++--
 storm-core/src/clj/backtype/storm/ui/core.clj   |   3 +-
 storm-core/src/clj/backtype/storm/util.clj      |   2 +-
 .../storm/metric/FileBasedEventLogger.java      |   5 +
 .../src/jvm/backtype/storm/utils/Utils.java     |  11 +-
 .../test/clj/backtype/storm/logviewer_test.clj  | 312 ++++++++++++-------
 .../test/clj/backtype/storm/supervisor_test.clj |  19 +-
 7 files changed, 330 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
index 58561a4..1bcc0df 100644
--- a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
@@ -321,7 +321,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
     (if (and appender-name appender (instance? RollingFileAppender appender))
       (.getParent (File. (.getFileName appender)))
       (throw
-       (RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and logback agree.")))))
+       (RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and log4j agree.")))))
 
 (defnk to-btn-link
   "Create a link that is formatted like a button"
@@ -329,8 +329,8 @@ Note that if anything goes wrong, this will throw an Error and exit."
   [:a {:href (java.net.URI. url)
        :class (str "btn btn-default " (if enabled "enabled" "disabled"))} text])
 
-(defn log-file-selection-form [log-files]
-  [[:form {:action "log" :id "list-of-files"}
+(defn log-file-selection-form [log-files type]
+  [[:form {:action type :id "list-of-files"}
     (drop-down "file" log-files )
     [:input {:type "submit" :value "Switch file"}]]])
 
@@ -364,6 +364,9 @@ Note that if anything goes wrong, this will throw an Error and exit."
 (defn- download-link [fname]
   [[:p (link-to (url-format "/download/%s" fname) "Download Full File")]])
 
+(defn- daemon-download-link [fname]
+  [[:p (link-to (url-format "/daemondownload/%s" fname) "Download Full File")]])
+
 (def default-bytes-per-page 51200)
 
 (defn log-page [fname start length grep user root-dir]
@@ -372,20 +375,22 @@ Note that if anything goes wrong, this will throw an Error and exit."
     (let [file (.getCanonicalFile (File. root-dir fname))
           path (.getCanonicalPath file)
           zip-file? (.endsWith path ".gz")
-          file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
-          topo-dir (.getParentFile (.getParentFile file))
-          log-files (reduce clojure.set/union
-                            (sorted-set)
-                            (for [^File port-dir (.listFiles topo-dir)]
-                              (into [] (filter #(.isFile %) (.listFiles port-dir))))) ;all types of files included
-          files-str (for [file log-files]
-                      (get-topo-port-workerlog file))
-          reordered-files-str (conj (filter #(not= fname %) files-str) fname)]
-      (if (.exists file)
-        (let [length (if length
+          topo-dir (.getParentFile (.getParentFile file))]
+      (if (and (.exists file)
+               (= (.getCanonicalFile (File. root-dir))
+                  (.getParentFile topo-dir)))
+        (let [file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
+              log-files (reduce clojure.set/union
+                          (sorted-set)
+                          (for [^File port-dir (.listFiles topo-dir)]
+                            (into [] (filter #(.isFile %) (.listFiles port-dir))))) ;all types of files included
+              files-str (for [file log-files]
+                          (get-topo-port-workerlog file))
+              reordered-files-str (conj (filter #(not= fname %) files-str) fname)
+               length (if length
                        (min 10485760 length)
                        default-bytes-per-page)
-              is-txt-file (re-find #"\.(log.*|txt|yaml)$" fname)
+              is-txt-file (re-find #"\.(log.*|txt|yaml|pid)$" fname)
               log-string (escape-html
                            (if is-txt-file
                              (if start
@@ -401,7 +406,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
                           (string/join "\n"))
                      log-string)])
             (let [pager-data (if is-txt-file (pager-links fname start length file-length) nil)]
-              (html (concat (log-file-selection-form reordered-files-str) ; list all files for this topology
+              (html (concat (log-file-selection-form reordered-files-str "log") ; list all files for this topology
                             pager-data
                             (download-link fname)
                             [[:pre#logContent log-string]]
@@ -413,6 +418,52 @@ Note that if anything goes wrong, this will throw an Error and exit."
         (resp/status 404))
       (unauthorized-user-html user))))
 
+(defn daemonlog-page [fname start length grep user root-dir]
+  (if (or (blank? (*STORM-CONF* UI-FILTER))
+        (authorized-log-user? user fname *STORM-CONF*)) ;; how to deal with this???????????
+    (let [file (.getCanonicalFile (File. root-dir fname))
+          file-length (.length file)
+          path (.getCanonicalPath file)
+          zip-file? (.endsWith path ".gz")]
+      (if (and (= (.getCanonicalFile (File. root-dir))
+                 (.getParentFile file))
+            (.exists file))
+        (let [file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
+              length (if length
+                       (min 10485760 length)
+                       default-bytes-per-page)
+              log-files (into [] (filter #(.isFile %) (.listFiles (File. root-dir)))) ;all types of files included
+              files-str (for [file log-files]
+                          (.getName file))
+              reordered-files-str (conj (filter #(not= fname %) files-str) fname)
+              is-txt-file (re-find #"\.(log.*|txt|yaml|pid)$" fname)
+              log-string (escape-html
+                           (if is-txt-file
+                             (if start
+                               (page-file path start length)
+                               (page-file path length))
+                             "This is a binary file and cannot display! You may download the full file."))
+              start (or start (- file-length length))]
+          (if grep
+            (html [:pre#logContent
+                   (if grep
+                     (->> (.split log-string "\n")
+                       (filter #(.contains % grep))
+                       (string/join "\n"))
+                     log-string)])
+            (let [pager-data (if is-txt-file (pager-links fname start length file-length) nil)]
+              (html (concat (log-file-selection-form reordered-files-str "daemonlog") ; list all daemon logs
+                      pager-data
+                      (daemon-download-link fname)
+                      [[:pre#logContent log-string]]
+                      pager-data)))))
+        (-> (resp/response "Page not found")
+          (resp/status 404))))
+    (if (nil? (get-log-user-group-whitelist fname))
+      (-> (resp/response "Page not found")
+        (resp/status 404))
+      (unauthorized-user-html user))))
+
 (defn download-log-file [fname req resp user ^String root-dir]
   (let [file (.getCanonicalFile (File. root-dir fname))]
     (if (.exists file)
@@ -496,6 +547,19 @@ Note that if anything goes wrong, this will throw an Error and exit."
       (catch InvalidRequestException ex
         (log-error ex)
         (ring-response-from-exception ex))))
+  (GET "/daemonlog" [:as req & m]
+    (try
+      (let [servlet-request (:servlet-request req)
+            daemonlog-root (:daemonlog-root req)
+            user (.getUserName http-creds-handler servlet-request)
+            start (if (:start m) (parse-long-from-map m :start))
+            length (if (:length m) (parse-long-from-map m :length))
+            file (url-decode (:file m))]
+        (log-template (daemonlog-page file start length (:grep m) user daemonlog-root)
+          file user))
+      (catch InvalidRequestException ex
+        (log-error ex)
+        (ring-response-from-exception ex))))
   (GET "/download/:file" [:as {:keys [servlet-request servlet-response log-root]} file & m]
     ;; We do not use servlet-response here, but do not remove it from the
     ;; :keys list, or this rule could stop working when an authentication
@@ -506,6 +570,16 @@ Note that if anything goes wrong, this will throw an Error and exit."
       (catch InvalidRequestException ex
         (log-error ex)
         (ring-response-from-exception ex))))
+  (GET "/daemondownload/:file" [:as {:keys [servlet-request servlet-response daemonlog-root]} file & m]
+    ;; We do not use servlet-response here, but do not remove it from the
+    ;; :keys list, or this rule could stop working when an authentication
+    ;; filter is configured.
+    (try
+      (let [user (.getUserName http-creds-handler servlet-request)]
+        (download-log-file file servlet-request servlet-response user daemonlog-root))
+      (catch InvalidRequestException ex
+        (log-error ex)
+        (ring-response-from-exception ex))))
   (GET "/listLogs" [:as req & m]
     (try
       (let [servlet-request (:servlet-request req)
@@ -524,17 +598,17 @@ Note that if anything goes wrong, this will throw an Error and exit."
 
 (defn conf-middleware
   "For passing the storm configuration with each request."
-  [app log-root]
+  [app log-root daemonlog-root]
   (fn [req]
-    (app (assoc req :log-root log-root))))
+    (app (assoc req :log-root log-root :daemonlog-root daemonlog-root))))
 
-(defn start-logviewer! [conf log-root-dir]
+(defn start-logviewer! [conf log-root-dir daemonlog-root-dir]
   (try
     (let [header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES))
           filter-class (conf UI-FILTER)
           filter-params (conf UI-FILTER-PARAMS)
           logapp (handler/api log-routes) ;; query params as map
-          middle (conf-middleware logapp log-root-dir)
+          middle (conf-middleware logapp log-root-dir daemonlog-root-dir)
           filters-confs (if (conf UI-FILTER)
                           [{:filter-class filter-class
                             :filter-params (or (conf UI-FILTER-PARAMS) {})}]
@@ -572,7 +646,8 @@ Note that if anything goes wrong, this will throw an Error and exit."
 
 (defn -main []
   (let [conf (read-storm-config)
-        log-root (worker-artifacts-root conf)]
+        log-root (worker-artifacts-root conf)
+        daemonlog-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))]
     (setup-default-uncaught-exception-handler)
     (start-log-cleaner! conf log-root)
-    (start-logviewer! conf log-root)))
+    (start-logviewer! conf log-root daemonlog-root)))

http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index d30f496..130ea09 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -128,7 +128,8 @@
     (logviewer-link host fname secure?)))
 
 (defn nimbus-log-link [host port]
-  (url-format "http://%s:%s/log?file=nimbus.log" host (*STORM-CONF* LOGVIEWER-PORT) port))
+  (url-format "http://%s:%s/daemonlog?file=nimbus.log" host (*STORM-CONF* LOGVIEWER-PORT) port))
+
 (defn get-error-time
   [error]
   (if error

http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj
index 3284558..9ec8cd3 100644
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@ -593,7 +593,7 @@
           attrs (make-array FileAttribute 0)
           abs-path (.toAbsolutePath (Paths/get path empty-array))
           abs-target (.toAbsolutePath (Paths/get target empty-array))]
-      (log-message "Creating symlink [" abs-path "] to [" abs-target "]")
+      (log-debug "Creating symlink [" abs-path "] to [" abs-target "]")
       (if (not (.exists (.toFile abs-path)))
         (Files/createSymbolicLink abs-path abs-target attrs)))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java b/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
index d7c00e1..3abb940 100644
--- a/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
+++ b/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
@@ -5,6 +5,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedWriter;
+import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -82,6 +83,10 @@ public class FileBasedEventLogger implements IEventLogger {
             path = Paths.get(System.getProperty("storm.home"), logDir, "workers-artifacts",
                     stormId, Integer.toString(port), "events.log");
         }
+        File dir = path.toFile().getParentFile();
+        if (!dir.exists()) {
+             dir.mkdirs();
+        }
         initLogWriter(path);
         setUpFlushTask();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index ebf40b6..8660739 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -742,9 +742,14 @@ public class Utils {
         }
     }
 
-    //Note: Only works for zip files whose uncompressed size is less than 4 GB
-    //Otherwise returns the size module 2^32, per gzip specifications
-    //Returns a long, since that's what file lengths in Java/Clojure usually are.
+    /**
+     * Given a zip File input it will return its size
+     * Only works for zip files whose uncompressed size is less than 4 GB,
+     * otherwise returns the size module 2^32, per gzip specifications
+     * @param myFile The zip file as input
+     * @throws IOException
+     * @return zip file size as a long
+     */
     public static long zipFileSize(File myFile) throws IOException{
         RandomAccessFile raf = new RandomAccessFile(myFile, "r");
         raf.seek(raf.length() - 4);

http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/test/clj/backtype/storm/logviewer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/logviewer_test.clj b/storm-core/test/clj/backtype/storm/logviewer_test.clj
index a790ddf..e9840e8 100644
--- a/storm-core/test/clj/backtype/storm/logviewer_test.clj
+++ b/storm-core/test/clj/backtype/storm/logviewer_test.clj
@@ -20,30 +20,38 @@
   (:require [conjure.core])
   (:use [clojure test])
   (:use [conjure core])
+  (:use [backtype.storm.ui helpers])
+  (:import [java.nio.file Files])
+  (:import [java.nio.file.attribute FileAttribute])
+  (:import [java.io File])
   (:import [org.mockito Mockito]))
 
 (defmulti mk-mock-File #(:type %))
 
-(defmethod mk-mock-File :file [{file-name :name mtime :mtime
-                                :or {file-name "afile" mtime 1}}]
+(defmethod mk-mock-File :file [{file-name :name mtime :mtime length :length
+                                :or {file-name "afile"
+                                     mtime 1
+                                     length (* 10 (* 1024 (* 1024 1024))) }}] ; Length 10 GB
   (let [mockFile (Mockito/mock java.io.File)]
     (. (Mockito/when (.getName mockFile)) thenReturn file-name)
     (. (Mockito/when (.lastModified mockFile)) thenReturn mtime)
     (. (Mockito/when (.isFile mockFile)) thenReturn true)
     (. (Mockito/when (.getCanonicalPath mockFile))
-       thenReturn (str "/mock/canonical/path/to/" file-name))
+      thenReturn (str "/mock/canonical/path/to/" file-name))
+    (. (Mockito/when (.length mockFile)) thenReturn length)
     mockFile))
 
-(defmethod mk-mock-File :directory [{dir-name :name mtime :mtime
-                                     :or {dir-name "adir" mtime 1}}]
+(defmethod mk-mock-File :directory [{dir-name :name mtime :mtime files :files
+                                     :or {dir-name "adir" mtime 1 files []}}]
   (let [mockDir (Mockito/mock java.io.File)]
     (. (Mockito/when (.getName mockDir)) thenReturn dir-name)
     (. (Mockito/when (.lastModified mockDir)) thenReturn mtime)
     (. (Mockito/when (.isFile mockDir)) thenReturn false)
+    (. (Mockito/when (.listFiles mockDir)) thenReturn (into-array File files))
     mockDir))
 
 (deftest test-mk-FileFilter-for-log-cleanup
-  (testing "log file filter selects the correct log files for purge"
+  (testing "log file filter selects the correct worker-log dirs for purge"
     (let [now-millis (current-time-millis)
           conf {LOGVIEWER-CLEANUP-AGE-MINS 60
                 LOGVIEWER-CLEANUP-INTERVAL-SECS 300}
@@ -51,120 +59,181 @@
           old-mtime-millis (- cutoff-millis 500)
           new-mtime-millis (+ cutoff-millis 500)
           matching-files (map #(mk-mock-File %)
-                              [{:name "oldlog-1-2-worker-3.log"
-                                :type :file
-                                :mtime old-mtime-millis}
-                               {:name "oldlog-1-2-worker-3.log.8"
-                                :type :file
-                                :mtime old-mtime-millis}
-                               {:name "foobar*_topo-1-24242-worker-2834238.log"
-                                :type :file
-                                :mtime old-mtime-millis}])
+                           [{:name "3031"
+                             :type :directory
+                             :mtime old-mtime-millis}
+                            {:name "3032"
+                             :type :directory
+                             :mtime old-mtime-millis}
+                            {:name "7077"
+                             :type :directory
+                             :mtime old-mtime-millis}])
           excluded-files (map #(mk-mock-File %)
-                              [{:name "oldlog-1-2-worker-.log"
-                                :type :file
-                                :mtime old-mtime-millis}
-                               {:name "olddir-1-2-worker.log"
-                                :type :directory
-                                :mtime old-mtime-millis}
-                               {:name "newlog-1-2-worker.log"
-                                :type :file
-                                :mtime new-mtime-millis}
-                               {:name "some-old-file.txt"
-                                :type :file
-                                :mtime old-mtime-millis}
-                               {:name "metadata"
-                                :type :directory
-                                :mtime old-mtime-millis}
-                               {:name "newdir-1-2-worker.log"
-                                :type :directory
-                                :mtime new-mtime-millis}
-                               {:name "newdir"
-                                :type :directory
-                                :mtime new-mtime-millis}
-                              ])
+                           [{:name "oldlog-1-2-worker-.log"
+                             :type :file
+                             :mtime old-mtime-millis}
+                            {:name "newlog-1-2-worker.log"
+                             :type :file
+                             :mtime new-mtime-millis}
+                            {:name "some-old-file.txt"
+                             :type :file
+                             :mtime old-mtime-millis}
+                            {:name "olddir-1-2-worker.log"
+                             :type :directory
+                             :mtime new-mtime-millis}
+                            {:name "metadata"
+                             :type :directory
+                             :mtime new-mtime-millis}
+                            {:name "newdir"
+                             :type :directory
+                             :mtime new-mtime-millis}
+                            ])
           file-filter (logviewer/mk-FileFilter-for-log-cleanup conf now-millis)]
-        (is   (every? #(.accept file-filter %) matching-files))
-        (is (not-any? #(.accept file-filter %) excluded-files))
+      (is   (every? #(.accept file-filter %) matching-files))
+      (is (not-any? #(.accept file-filter %) excluded-files))
       )))
 
-(deftest test-get-log-root->files-map
-  (testing "returns map of root name to list of files"
-    (let [files (vec (map #(java.io.File. %) ["log-1-2-worker-3.log"
-                                              "log-1-2-worker-3.log.1.gz"
-                                              "log-1-2-worker-3.log.err"
-                                              "log-1-2-worker-3.log.out"
-                                              "log-1-2-worker-3.log.out.1.gz"
-                                              "log-1-2-worker-3.log.1"
-                                              "log-2-4-worker-6.log.1"]))
-          expected {"log-1-2-worker-3" #{(files 0) (files 1) (files 2) (files 3) (files 4) (files 5)}
-                    "log-2-4-worker-6" #{(files 6)}}]
-      (is (= expected (logviewer/get-log-root->files-map files))))))
-
-(deftest test-identify-worker-log-files
-  (testing "Does not include metadata file when there are any log files that
-           should not be cleaned up"
-    (let [cutoff-millis 2000
-          old-logFile (mk-mock-File {:name "mock-1-1-worker-1.log.1"
-                                     :type :file
-                                     :mtime (- cutoff-millis 1000)})
-          mock-metaFile (mk-mock-File {:name "mock-1-1-worker-1.yaml"
-                                       :type :file
-                                       :mtime 1})
-          new-logFile (mk-mock-File {:name "mock-1-1-worker-1.log"
-                                     :type :file
-                                     :mtime (+ cutoff-millis 1000)})
+(deftest test-sort-worker-logs
+  (testing "cleaner sorts the log files in ascending ages for deletion"
+    (stubbing [logviewer/filter-candidate-files (fn [x _] x)]
+      (let [now-millis (current-time-millis)
+            files1 (into-array File (map #(mk-mock-File {:name (str %)
+                                                         :type :file
+                                                         :mtime (- now-millis (* 100 %))})
+                                      (range 1 6)))
+            files2 (into-array File (map #(mk-mock-File {:name (str %)
+                                                         :type :file
+                                                         :mtime (- now-millis (* 100 %))})
+                                      (range 6 11)))
+            files3 (into-array File (map #(mk-mock-File {:name (str %)
+                                                         :type :file
+                                                         :mtime (- now-millis (* 100 %))})
+                                      (range 11 16)))
+            port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1"
+                                     :type :directory
+                                     :files files1})
+            port2-dir (mk-mock-File {:name "/workers-artifacts/topo1/port2"
+                                     :type :directory
+                                     :files files2})
+            port3-dir (mk-mock-File {:name "/workers-artifacts/topo2/port3"
+                                     :type :directory
+                                     :files files3})
+            topo1-files (into-array File [port1-dir port2-dir])
+            topo2-files (into-array File [port3-dir])
+            topo1-dir (mk-mock-File {:name "/workers-artifacts/topo1"
+                                     :type :directory
+                                     :files topo1-files})
+            topo2-dir (mk-mock-File {:name "/workers-artifacts/topo2"
+                                     :type :directory
+                                     :files topo2-files})
+            root-files (into-array File [topo1-dir topo2-dir])
+            root-dir (mk-mock-File {:name "/workers-artifacts"
+                                    :type :directory
+                                    :files root-files})
+            sorted-logs (logviewer/sorted-worker-logs root-dir)
+            sorted-ints (map #(Integer. (.getName %)) sorted-logs)]
+        (is (= (count sorted-logs) 15))
+        (is (= (count sorted-ints) 15))
+        (is (apply #'> sorted-ints))))))
+
+(deftest test-per-workerdir-cleanup
+  (testing "cleaner deletes oldest files in each worker dir if files are larger than per-dir quota."
+    (stubbing [rmr nil]
+      (let [now-millis (current-time-millis)
+            files1 (into-array File (map #(mk-mock-File {:name (str "A" %)
+                                                         :type :file
+                                                         :mtime (+ now-millis (* 100 %))
+                                                         :length 200 })
+                                      (range 0 10)))
+            files2 (into-array File (map #(mk-mock-File {:name (str "B" %)
+                                                         :type :file
+                                                         :mtime (+ now-millis (* 100 %))
+                                                         :length 200 })
+                                      (range 0 10)))
+            files3 (into-array File (map #(mk-mock-File {:name (str "C" %)
+                                                         :type :file
+                                                         :mtime (+ now-millis (* 100 %))
+                                                         :length 200 })
+                                      (range 0 10)))
+            port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1"
+                                     :type :directory
+                                     :files files1})
+            port2-dir (mk-mock-File {:name "/workers-artifacts/topo1/port2"
+                                     :type :directory
+                                     :files files2})
+            port3-dir (mk-mock-File {:name "/workers-artifacts/topo2/port3"
+                                     :type :directory
+                                     :files files3})
+            topo1-files (into-array File [port1-dir port2-dir])
+            topo2-files (into-array File [port3-dir])
+            topo1-dir (mk-mock-File {:name "/workers-artifacts/topo1"
+                                     :type :directory
+                                     :files topo1-files})
+            topo2-dir (mk-mock-File {:name "/workers-artifacts/topo2"
+                                     :type :directory
+                                     :files topo2-files})
+            root-files (into-array File [topo1-dir topo2-dir])
+            root-dir (mk-mock-File {:name "/workers-artifacts"
+                                    :type :directory
+                                    :files root-files})
+            remaining-logs (logviewer/per-workerdir-cleanup root-dir 1200)]
+        (is (= (count (first remaining-logs)) 6))
+        (is (= (count (second remaining-logs)) 6))
+        (is (= (count (last remaining-logs)) 6))))))
+
+(deftest test-delete-oldest-log-cleanup
+  (testing "delete oldest logs deletes the oldest set of logs when the total size gets too large."
+    (stubbing [rmr nil]
+      (let [now-millis (current-time-millis)
+            files (into-array File (map #(mk-mock-File {:name (str %)
+                                                        :type :file
+                                                        :mtime (+ now-millis (* 100 %))
+                                                        :length 100 })
+                                     (range 0 20)))
+            remaining-logs (logviewer/delete-oldest-while-logs-too-large files 501)]
+        (is (= (logviewer/sum-file-size files) 2000))
+        (is (= (count remaining-logs) 5))
+        (is (= (.getName (first remaining-logs)) "15"))))))
+
+(deftest test-identify-worker-log-dirs
+  (testing "Build up workerid-workerlogdir map for the old workers' dirs"
+    (let [port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1"
+                                   :type :directory})
+          mock-metaFile (mk-mock-File {:name "worker.yaml"
+                                       :type :file})
           exp-id "id12345"
-          exp-user "alice"
-          expected {exp-id {:owner exp-user
-                            :files #{old-logFile}}}]
+          expected {exp-id port1-dir}]
       (stubbing [supervisor/read-worker-heartbeats nil
-                logviewer/get-metadata-file-for-log-root-name mock-metaFile
-                read-dir-contents [(.getName old-logFile) (.getName new-logFile)]
-                logviewer/get-worker-id-from-metadata-file exp-id
-                logviewer/get-topo-owner-from-metadata-file exp-user]
-        (is (= expected (logviewer/identify-worker-log-files [old-logFile] "/tmp/")))))))
+                 logviewer/get-metadata-file-for-wroker-logdir mock-metaFile
+                 logviewer/get-worker-id-from-metadata-file exp-id]
+        (is (= expected (logviewer/identify-worker-log-dirs [port1-dir])))))))
 
-(deftest test-get-dead-worker-files-and-owners
+(deftest test-get-dead-worker-dirs
   (testing "removes any files of workers that are still alive"
     (let [conf {SUPERVISOR-WORKER-TIMEOUT-SECS 5}
           id->hb {"42" {:time-secs 1}}
           now-secs 2
-          log-files #{:expected-file :unexpected-file}
-          exp-owner "alice"]
-      (stubbing [logviewer/identify-worker-log-files {"42" {:owner exp-owner
-                                                            :files #{:unexpected-file}}
-                                                      "007" {:owner exp-owner
-                                                             :files #{:expected-file}}}
-                 logviewer/get-topo-owner-from-metadata-file "alice"
+          unexpected-dir (mk-mock-File {:name "dir1" :type :directory})
+          expected-dir (mk-mock-File {:name "dir2" :type :directory})
+          log-dirs #{unexpected-dir expected-dir}]
+      (stubbing [logviewer/identify-worker-log-dirs {"42" unexpected-dir,
+                                                     "007" expected-dir}
                  supervisor/read-worker-heartbeats id->hb]
-        (is (= [{:owner exp-owner :files #{:expected-file}}]
-               (logviewer/get-dead-worker-files-and-owners conf now-secs log-files "/tmp/")))))))
+        (is (= #{expected-dir}
+              (logviewer/get-dead-worker-dirs conf now-secs log-dirs)))))))
 
 (deftest test-cleanup-fn
-  (testing "cleanup function removes file as user when one is specified"
-    (let [exp-user "mock-user"
-          mockfile1 (mk-mock-File {:name "file1" :type :file})
-          mockfile2 (mk-mock-File {:name "file2" :type :file})
-          mockfile3 (mk-mock-File {:name "file3" :type :file})
-          mockyaml  (mk-mock-File {:name "foo.yaml" :type :file})
-          exp-cmd (str "rmr /mock/canonical/path/to/" (.getName mockfile3))]
-      (stubbing [logviewer/select-files-for-cleanup
-                   [(mk-mock-File {:name "throwaway" :type :file})]
-                 logviewer/get-dead-worker-files-and-owners
-                   [{:owner nil :files #{mockfile1}}
-                    {:files #{mockfile2}}
-                    {:owner exp-user :files #{mockfile3 mockyaml}}]
-                 supervisor/worker-launcher nil
+  (testing "cleanup function rmr's files of dead workers"
+    (let [mockfile1 (mk-mock-File {:name "delete-me1" :type :file})
+          mockfile2 (mk-mock-File {:name "delete-me2" :type :file})]
+      (stubbing [logviewer/select-dirs-for-cleanup nil
+                 logviewer/get-dead-worker-dirs (sorted-set mockfile1 mockfile2)
+                 logviewer/cleanup-empty-topodir nil
                  rmr nil]
-        (logviewer/cleanup-fn! "/tmp/")
-        (verify-call-times-for supervisor/worker-launcher 1)
-        (verify-first-call-args-for-indices supervisor/worker-launcher
-                                            [1 2] exp-user exp-cmd)
-        (verify-call-times-for rmr 3)
+        (logviewer/cleanup-fn! "/bogus/path")
+        (verify-call-times-for rmr 2)
         (verify-nth-call-args-for 1 rmr (.getCanonicalPath mockfile1))
-        (verify-nth-call-args-for 2 rmr (.getCanonicalPath mockfile2))
-        (verify-nth-call-args-for 3 rmr (.getCanonicalPath mockyaml))))))
+        (verify-nth-call-args-for 2 rmr (.getCanonicalPath mockfile2))))))
 
 (deftest test-authorized-log-user
   (testing "allow cluster admin"
@@ -212,3 +281,38 @@
       (is (not (logviewer/authorized-log-user? "alice" "non-blank-fname" {})))
       (verify-first-call-args-for logviewer/get-log-user-group-whitelist "non-blank-fname")
       (verify-first-call-args-for logviewer/user-groups "alice"))))
+
+(deftest test-list-log-files
+  (testing "list-log-files filter selects the correct log files to return"
+    (let [attrs (make-array FileAttribute 0)
+          root-path (.getCanonicalPath (.toFile (Files/createTempDirectory "workers-artifacts" attrs)))
+          file1 (clojure.java.io/file root-path "topoA" "port1" "worker.log")
+          file2 (clojure.java.io/file root-path "topoA" "port2" "worker.log")
+          file3 (clojure.java.io/file root-path "topoB" "port1" "worker.log")
+          _ (clojure.java.io/make-parents file1)
+          _ (clojure.java.io/make-parents file2)
+          _ (clojure.java.io/make-parents file3)
+          _ (.createNewFile file1)
+          _ (.createNewFile file2)
+          _ (.createNewFile file3)
+          origin "www.origin.server.net"
+          expected-all (json-response '("topoA/port1/worker.log" "topoA/port2/worker.log"
+                                         "topoB/port1/worker.log")
+                         nil
+                         :headers {"Access-Control-Allow-Origin" origin
+                                   "Access-Control-Allow-Credentials" "true"})
+          expected-filter-port (json-response '("topoA/port1/worker.log" "topoB/port1/worker.log")
+                                 nil
+                                 :headers {"Access-Control-Allow-Origin" origin
+                                           "Access-Control-Allow-Credentials" "true"})
+          expected-filter-topoId (json-response '("topoB/port1/worker.log")
+                                   nil
+                                   :headers {"Access-Control-Allow-Origin" origin
+                                             "Access-Control-Allow-Credentials" "true"})
+          returned-all (logviewer/list-log-files "user" nil nil root-path nil origin)
+          returned-filter-port (logviewer/list-log-files "user" nil "port1" root-path nil origin)
+          returned-filter-topoId (logviewer/list-log-files "user" "topoB" nil root-path nil origin)]
+      (rmr root-path)
+      (is   (= expected-all returned-all))
+      (is   (= expected-filter-port returned-filter-port))
+      (is   (= expected-filter-topoId returned-filter-topoId)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj
index 7acb477..04c8600 100644
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@ -23,6 +23,9 @@
   (:import [backtype.storm.scheduler ISupervisor])
   (:import [backtype.storm.generated RebalanceOptions])
   (:import [java.util UUID])
+  (:import [java.io File])
+  (:import [java.nio.file Files])
+  (:import [java.nio.file.attribute FileAttribute])
   (:use [backtype.storm config testing util timer])
   (:use [backtype.storm.daemon common])
   (:require [backtype.storm.daemon [worker :as worker] [supervisor :as supervisor]]
@@ -395,12 +398,6 @@
                                                         [2]
                                                         full-env)))))))
 
-(defn rm-r [f]
-  (if (.isDirectory f)
-    (for [sub (.listFiles f)] (rm-r sub))
-    (.delete f)
-  ))
-
 (deftest test-worker-launch-command-run-as-user
   (testing "*.worker.childopts configuration"
     (let [mock-port "42"
@@ -409,7 +406,8 @@
           mock-mem-onheap 512
           mock-sensitivity "S3"
           mock-cp "mock-classpath'quote-on-purpose"
-          storm-local (str "/tmp/" (UUID/randomUUID))
+          attrs (make-array FileAttribute 0)
+          storm-local (.getCanonicalPath (.toFile (Files/createTempDirectory "storm-local" attrs)))
           worker-script (str storm-local "/workers/" mock-worker-id "/storm-worker-script.sh")
           exp-launch ["/bin/worker-launcher"
                       "me"
@@ -450,13 +448,13 @@
                                " '" mock-storm-id "'"
                                " '" mock-port "'"
                                " '" mock-worker-id "';"))]
-      (.mkdirs (io/file storm-local "workers" mock-worker-id))
       (try
         (testing "testing *.worker.childopts as strings with extra spaces"
           (let [string-opts "-Dfoo=bar  -Xmx1024m"
                 topo-string-opts "-Dkau=aux   -Xmx2048m"
                 exp-script (exp-script-fn ["-Dfoo=bar" "-Xmx1024m"]
                                           ["-Dkau=aux" "-Xmx2048m"])
+                _ (.mkdirs (io/file storm-local "workers" mock-worker-id))
                 mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
                                         STORM-LOCAL-DIR storm-local
                                         SUPERVISOR-RUN-WORKER-AS-USER true
@@ -480,7 +478,8 @@
                                                           [0]
                                                           exp-launch))
             (is (= (slurp worker-script) exp-script))))
-        (finally (rm-r (io/file storm-local))))
+        (finally (rmr storm-local)))
+      (.mkdirs (io/file storm-local "workers" mock-worker-id))
       (try
         (testing "testing *.worker.childopts as list of strings, with spaces in values"
           (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
@@ -509,7 +508,7 @@
                                                           [0]
                                                           exp-launch))
             (is (= (slurp worker-script) exp-script))))
-        (finally (rm-r (io/file storm-local)))))))
+        (finally (rmr storm-local))))))
 
 (deftest test-workers-go-bananas
   ;; test that multiple workers are started for a port, and test that