You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/10/30 03:28:08 UTC

[1/6] storm git commit: Address comments

Repository: storm
Updated Branches:
  refs/heads/master b6615d5b1 -> 0bba2baf9


Address comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce97de35
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce97de35
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce97de35

Branch: refs/heads/master
Commit: ce97de35ec8f3b12f32360e7b678e7ecab5d3f85
Parents: d009d67
Author: zhuol <zh...@yahoo-inc.com>
Authored: Thu Oct 29 12:58:53 2015 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Oct 29 13:18:15 2015 -0500

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/logviewer.clj | 23 ++++++++------------
 1 file changed, 9 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ce97de35/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
index 1bcc0df..353a58e 100644
--- a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
@@ -190,7 +190,7 @@
       logs)))
 
 (defn per-workerdir-cleanup
-  "Delete the oldest files in overloaded worker log dir"
+  "Delete the oldest files in each overloaded worker log dir"
   [^File root-dir size]
   (dofor [worker-dir (get-all-worker-dirs root-dir)]
     (let [filtered-logs (filter #(not (is-active-log %)) (.listFiles worker-dir))
@@ -367,6 +367,9 @@ Note that if anything goes wrong, this will throw an Error and exit."
 (defn- daemon-download-link [fname]
   [[:p (link-to (url-format "/daemondownload/%s" fname) "Download Full File")]])
 
+(defn- is-txt-file [fname]
+  (re-find #"\.(log.*|txt|yaml|pid)$" fname))
+
 (def default-bytes-per-page 51200)
 
 (defn log-page [fname start length grep user root-dir]
@@ -390,9 +393,8 @@ Note that if anything goes wrong, this will throw an Error and exit."
                length (if length
                        (min 10485760 length)
                        default-bytes-per-page)
-              is-txt-file (re-find #"\.(log.*|txt|yaml|pid)$" fname)
               log-string (escape-html
-                           (if is-txt-file
+                           (if (is-txt-file fname)
                              (if start
                                (page-file path start length)
                                (page-file path length))
@@ -405,7 +407,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
                           (filter #(.contains % grep))
                           (string/join "\n"))
                      log-string)])
-            (let [pager-data (if is-txt-file (pager-links fname start length file-length) nil)]
+            (let [pager-data (if (is-txt-file fname) (pager-links fname start length file-length) nil)]
               (html (concat (log-file-selection-form reordered-files-str "log") ; list all files for this topology
                             pager-data
                             (download-link fname)
@@ -420,7 +422,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
 
 (defn daemonlog-page [fname start length grep user root-dir]
   (if (or (blank? (*STORM-CONF* UI-FILTER))
-        (authorized-log-user? user fname *STORM-CONF*)) ;; how to deal with this???????????
+        (authorized-log-user? user fname *STORM-CONF*))
     (let [file (.getCanonicalFile (File. root-dir fname))
           file-length (.length file)
           path (.getCanonicalPath file)
@@ -436,9 +438,8 @@ Note that if anything goes wrong, this will throw an Error and exit."
               files-str (for [file log-files]
                           (.getName file))
               reordered-files-str (conj (filter #(not= fname %) files-str) fname)
-              is-txt-file (re-find #"\.(log.*|txt|yaml|pid)$" fname)
               log-string (escape-html
-                           (if is-txt-file
+                           (if (is-txt-file fname)
                              (if start
                                (page-file path start length)
                                (page-file path length))
@@ -451,7 +452,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
                        (filter #(.contains % grep))
                        (string/join "\n"))
                      log-string)])
-            (let [pager-data (if is-txt-file (pager-links fname start length file-length) nil)]
+            (let [pager-data (if (is-txt-file fname) (pager-links fname start length file-length) nil)]
               (html (concat (log-file-selection-form reordered-files-str "daemonlog") ; list all daemon logs
                       pager-data
                       (daemon-download-link fname)
@@ -561,9 +562,6 @@ Note that if anything goes wrong, this will throw an Error and exit."
         (log-error ex)
         (ring-response-from-exception ex))))
   (GET "/download/:file" [:as {:keys [servlet-request servlet-response log-root]} file & m]
-    ;; We do not use servlet-response here, but do not remove it from the
-    ;; :keys list, or this rule could stop working when an authentication
-    ;; filter is configured.
     (try
       (let [user (.getUserName http-creds-handler servlet-request)]
         (download-log-file file servlet-request servlet-response user log-root))
@@ -571,9 +569,6 @@ Note that if anything goes wrong, this will throw an Error and exit."
         (log-error ex)
         (ring-response-from-exception ex))))
   (GET "/daemondownload/:file" [:as {:keys [servlet-request servlet-response daemonlog-root]} file & m]
-    ;; We do not use servlet-response here, but do not remove it from the
-    ;; :keys list, or this rule could stop working when an authentication
-    ;; filter is configured.
     (try
       (let [user (.getUserName http-creds-handler servlet-request)]
         (download-log-file file servlet-request servlet-response user daemonlog-root))


[3/6] storm git commit: Add unit tests for logviewer cleanup, filter, sorting, etc.

Posted by da...@apache.org.
Add unit tests for logviewer cleanup, filter, sorting, etc.

Fix create/rm temp dir in supervisor test

Address event log dir issue and other comments

Add new web route for all daemon logs: view, list and download


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d009d67d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d009d67d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d009d67d

Branch: refs/heads/master
Commit: d009d67dd0323db4c2bd472b597a99ff9de9227a
Parents: 2c2858e
Author: zhuol <zh...@yahoo-inc.com>
Authored: Tue Oct 27 09:34:03 2015 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Oct 29 13:18:15 2015 -0500

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/logviewer.clj | 119 +++++--
 storm-core/src/clj/backtype/storm/ui/core.clj   |   3 +-
 storm-core/src/clj/backtype/storm/util.clj      |   2 +-
 .../storm/metric/FileBasedEventLogger.java      |   5 +
 .../src/jvm/backtype/storm/utils/Utils.java     |  11 +-
 .../test/clj/backtype/storm/logviewer_test.clj  | 312 ++++++++++++-------
 .../test/clj/backtype/storm/supervisor_test.clj |  19 +-
 7 files changed, 330 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
index 58561a4..1bcc0df 100644
--- a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
@@ -321,7 +321,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
     (if (and appender-name appender (instance? RollingFileAppender appender))
       (.getParent (File. (.getFileName appender)))
       (throw
-       (RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and logback agree.")))))
+       (RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and log4j agree.")))))
 
 (defnk to-btn-link
   "Create a link that is formatted like a button"
@@ -329,8 +329,8 @@ Note that if anything goes wrong, this will throw an Error and exit."
   [:a {:href (java.net.URI. url)
        :class (str "btn btn-default " (if enabled "enabled" "disabled"))} text])
 
-(defn log-file-selection-form [log-files]
-  [[:form {:action "log" :id "list-of-files"}
+(defn log-file-selection-form [log-files type]
+  [[:form {:action type :id "list-of-files"}
     (drop-down "file" log-files )
     [:input {:type "submit" :value "Switch file"}]]])
 
@@ -364,6 +364,9 @@ Note that if anything goes wrong, this will throw an Error and exit."
 (defn- download-link [fname]
   [[:p (link-to (url-format "/download/%s" fname) "Download Full File")]])
 
+(defn- daemon-download-link [fname]
+  [[:p (link-to (url-format "/daemondownload/%s" fname) "Download Full File")]])
+
 (def default-bytes-per-page 51200)
 
 (defn log-page [fname start length grep user root-dir]
@@ -372,20 +375,22 @@ Note that if anything goes wrong, this will throw an Error and exit."
     (let [file (.getCanonicalFile (File. root-dir fname))
           path (.getCanonicalPath file)
           zip-file? (.endsWith path ".gz")
-          file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
-          topo-dir (.getParentFile (.getParentFile file))
-          log-files (reduce clojure.set/union
-                            (sorted-set)
-                            (for [^File port-dir (.listFiles topo-dir)]
-                              (into [] (filter #(.isFile %) (.listFiles port-dir))))) ;all types of files included
-          files-str (for [file log-files]
-                      (get-topo-port-workerlog file))
-          reordered-files-str (conj (filter #(not= fname %) files-str) fname)]
-      (if (.exists file)
-        (let [length (if length
+          topo-dir (.getParentFile (.getParentFile file))]
+      (if (and (.exists file)
+               (= (.getCanonicalFile (File. root-dir))
+                  (.getParentFile topo-dir)))
+        (let [file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
+              log-files (reduce clojure.set/union
+                          (sorted-set)
+                          (for [^File port-dir (.listFiles topo-dir)]
+                            (into [] (filter #(.isFile %) (.listFiles port-dir))))) ;all types of files included
+              files-str (for [file log-files]
+                          (get-topo-port-workerlog file))
+              reordered-files-str (conj (filter #(not= fname %) files-str) fname)
+               length (if length
                        (min 10485760 length)
                        default-bytes-per-page)
-              is-txt-file (re-find #"\.(log.*|txt|yaml)$" fname)
+              is-txt-file (re-find #"\.(log.*|txt|yaml|pid)$" fname)
               log-string (escape-html
                            (if is-txt-file
                              (if start
@@ -401,7 +406,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
                           (string/join "\n"))
                      log-string)])
             (let [pager-data (if is-txt-file (pager-links fname start length file-length) nil)]
-              (html (concat (log-file-selection-form reordered-files-str) ; list all files for this topology
+              (html (concat (log-file-selection-form reordered-files-str "log") ; list all files for this topology
                             pager-data
                             (download-link fname)
                             [[:pre#logContent log-string]]
@@ -413,6 +418,52 @@ Note that if anything goes wrong, this will throw an Error and exit."
         (resp/status 404))
       (unauthorized-user-html user))))
 
+(defn daemonlog-page [fname start length grep user root-dir]
+  (if (or (blank? (*STORM-CONF* UI-FILTER))
+        (authorized-log-user? user fname *STORM-CONF*)) ;; how to deal with this???????????
+    (let [file (.getCanonicalFile (File. root-dir fname))
+          file-length (.length file)
+          path (.getCanonicalPath file)
+          zip-file? (.endsWith path ".gz")]
+      (if (and (= (.getCanonicalFile (File. root-dir))
+                 (.getParentFile file))
+            (.exists file))
+        (let [file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
+              length (if length
+                       (min 10485760 length)
+                       default-bytes-per-page)
+              log-files (into [] (filter #(.isFile %) (.listFiles (File. root-dir)))) ;all types of files included
+              files-str (for [file log-files]
+                          (.getName file))
+              reordered-files-str (conj (filter #(not= fname %) files-str) fname)
+              is-txt-file (re-find #"\.(log.*|txt|yaml|pid)$" fname)
+              log-string (escape-html
+                           (if is-txt-file
+                             (if start
+                               (page-file path start length)
+                               (page-file path length))
+                             "This is a binary file and cannot display! You may download the full file."))
+              start (or start (- file-length length))]
+          (if grep
+            (html [:pre#logContent
+                   (if grep
+                     (->> (.split log-string "\n")
+                       (filter #(.contains % grep))
+                       (string/join "\n"))
+                     log-string)])
+            (let [pager-data (if is-txt-file (pager-links fname start length file-length) nil)]
+              (html (concat (log-file-selection-form reordered-files-str "daemonlog") ; list all daemon logs
+                      pager-data
+                      (daemon-download-link fname)
+                      [[:pre#logContent log-string]]
+                      pager-data)))))
+        (-> (resp/response "Page not found")
+          (resp/status 404))))
+    (if (nil? (get-log-user-group-whitelist fname))
+      (-> (resp/response "Page not found")
+        (resp/status 404))
+      (unauthorized-user-html user))))
+
 (defn download-log-file [fname req resp user ^String root-dir]
   (let [file (.getCanonicalFile (File. root-dir fname))]
     (if (.exists file)
@@ -496,6 +547,19 @@ Note that if anything goes wrong, this will throw an Error and exit."
       (catch InvalidRequestException ex
         (log-error ex)
         (ring-response-from-exception ex))))
+  (GET "/daemonlog" [:as req & m]
+    (try
+      (let [servlet-request (:servlet-request req)
+            daemonlog-root (:daemonlog-root req)
+            user (.getUserName http-creds-handler servlet-request)
+            start (if (:start m) (parse-long-from-map m :start))
+            length (if (:length m) (parse-long-from-map m :length))
+            file (url-decode (:file m))]
+        (log-template (daemonlog-page file start length (:grep m) user daemonlog-root)
+          file user))
+      (catch InvalidRequestException ex
+        (log-error ex)
+        (ring-response-from-exception ex))))
   (GET "/download/:file" [:as {:keys [servlet-request servlet-response log-root]} file & m]
     ;; We do not use servlet-response here, but do not remove it from the
     ;; :keys list, or this rule could stop working when an authentication
@@ -506,6 +570,16 @@ Note that if anything goes wrong, this will throw an Error and exit."
       (catch InvalidRequestException ex
         (log-error ex)
         (ring-response-from-exception ex))))
+  (GET "/daemondownload/:file" [:as {:keys [servlet-request servlet-response daemonlog-root]} file & m]
+    ;; We do not use servlet-response here, but do not remove it from the
+    ;; :keys list, or this rule could stop working when an authentication
+    ;; filter is configured.
+    (try
+      (let [user (.getUserName http-creds-handler servlet-request)]
+        (download-log-file file servlet-request servlet-response user daemonlog-root))
+      (catch InvalidRequestException ex
+        (log-error ex)
+        (ring-response-from-exception ex))))
   (GET "/listLogs" [:as req & m]
     (try
       (let [servlet-request (:servlet-request req)
@@ -524,17 +598,17 @@ Note that if anything goes wrong, this will throw an Error and exit."
 
 (defn conf-middleware
   "For passing the storm configuration with each request."
-  [app log-root]
+  [app log-root daemonlog-root]
   (fn [req]
-    (app (assoc req :log-root log-root))))
+    (app (assoc req :log-root log-root :daemonlog-root daemonlog-root))))
 
-(defn start-logviewer! [conf log-root-dir]
+(defn start-logviewer! [conf log-root-dir daemonlog-root-dir]
   (try
     (let [header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES))
           filter-class (conf UI-FILTER)
           filter-params (conf UI-FILTER-PARAMS)
           logapp (handler/api log-routes) ;; query params as map
-          middle (conf-middleware logapp log-root-dir)
+          middle (conf-middleware logapp log-root-dir daemonlog-root-dir)
           filters-confs (if (conf UI-FILTER)
                           [{:filter-class filter-class
                             :filter-params (or (conf UI-FILTER-PARAMS) {})}]
@@ -572,7 +646,8 @@ Note that if anything goes wrong, this will throw an Error and exit."
 
 (defn -main []
   (let [conf (read-storm-config)
-        log-root (worker-artifacts-root conf)]
+        log-root (worker-artifacts-root conf)
+        daemonlog-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))]
     (setup-default-uncaught-exception-handler)
     (start-log-cleaner! conf log-root)
-    (start-logviewer! conf log-root)))
+    (start-logviewer! conf log-root daemonlog-root)))

http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index d30f496..130ea09 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -128,7 +128,8 @@
     (logviewer-link host fname secure?)))
 
 (defn nimbus-log-link [host port]
-  (url-format "http://%s:%s/log?file=nimbus.log" host (*STORM-CONF* LOGVIEWER-PORT) port))
+  (url-format "http://%s:%s/daemonlog?file=nimbus.log" host (*STORM-CONF* LOGVIEWER-PORT) port))
+
 (defn get-error-time
   [error]
   (if error

http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj
index 3284558..9ec8cd3 100644
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@ -593,7 +593,7 @@
           attrs (make-array FileAttribute 0)
           abs-path (.toAbsolutePath (Paths/get path empty-array))
           abs-target (.toAbsolutePath (Paths/get target empty-array))]
-      (log-message "Creating symlink [" abs-path "] to [" abs-target "]")
+      (log-debug "Creating symlink [" abs-path "] to [" abs-target "]")
       (if (not (.exists (.toFile abs-path)))
         (Files/createSymbolicLink abs-path abs-target attrs)))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java b/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
index d7c00e1..3abb940 100644
--- a/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
+++ b/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
@@ -5,6 +5,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedWriter;
+import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -82,6 +83,10 @@ public class FileBasedEventLogger implements IEventLogger {
             path = Paths.get(System.getProperty("storm.home"), logDir, "workers-artifacts",
                     stormId, Integer.toString(port), "events.log");
         }
+        File dir = path.toFile().getParentFile();
+        if (!dir.exists()) {
+             dir.mkdirs();
+        }
         initLogWriter(path);
         setUpFlushTask();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index ebf40b6..8660739 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -742,9 +742,14 @@ public class Utils {
         }
     }
 
-    //Note: Only works for zip files whose uncompressed size is less than 4 GB
-    //Otherwise returns the size module 2^32, per gzip specifications
-    //Returns a long, since that's what file lengths in Java/Clojure usually are.
+    /**
+     * Given a zip File input it will return its size
+     * Only works for zip files whose uncompressed size is less than 4 GB,
+     * otherwise returns the size module 2^32, per gzip specifications
+     * @param myFile The zip file as input
+     * @throws IOException
+     * @return zip file size as a long
+     */
     public static long zipFileSize(File myFile) throws IOException{
         RandomAccessFile raf = new RandomAccessFile(myFile, "r");
         raf.seek(raf.length() - 4);

http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/test/clj/backtype/storm/logviewer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/logviewer_test.clj b/storm-core/test/clj/backtype/storm/logviewer_test.clj
index a790ddf..e9840e8 100644
--- a/storm-core/test/clj/backtype/storm/logviewer_test.clj
+++ b/storm-core/test/clj/backtype/storm/logviewer_test.clj
@@ -20,30 +20,38 @@
   (:require [conjure.core])
   (:use [clojure test])
   (:use [conjure core])
+  (:use [backtype.storm.ui helpers])
+  (:import [java.nio.file Files])
+  (:import [java.nio.file.attribute FileAttribute])
+  (:import [java.io File])
   (:import [org.mockito Mockito]))
 
 (defmulti mk-mock-File #(:type %))
 
-(defmethod mk-mock-File :file [{file-name :name mtime :mtime
-                                :or {file-name "afile" mtime 1}}]
+(defmethod mk-mock-File :file [{file-name :name mtime :mtime length :length
+                                :or {file-name "afile"
+                                     mtime 1
+                                     length (* 10 (* 1024 (* 1024 1024))) }}] ; Length 10 GB
   (let [mockFile (Mockito/mock java.io.File)]
     (. (Mockito/when (.getName mockFile)) thenReturn file-name)
     (. (Mockito/when (.lastModified mockFile)) thenReturn mtime)
     (. (Mockito/when (.isFile mockFile)) thenReturn true)
     (. (Mockito/when (.getCanonicalPath mockFile))
-       thenReturn (str "/mock/canonical/path/to/" file-name))
+      thenReturn (str "/mock/canonical/path/to/" file-name))
+    (. (Mockito/when (.length mockFile)) thenReturn length)
     mockFile))
 
-(defmethod mk-mock-File :directory [{dir-name :name mtime :mtime
-                                     :or {dir-name "adir" mtime 1}}]
+(defmethod mk-mock-File :directory [{dir-name :name mtime :mtime files :files
+                                     :or {dir-name "adir" mtime 1 files []}}]
   (let [mockDir (Mockito/mock java.io.File)]
     (. (Mockito/when (.getName mockDir)) thenReturn dir-name)
     (. (Mockito/when (.lastModified mockDir)) thenReturn mtime)
     (. (Mockito/when (.isFile mockDir)) thenReturn false)
+    (. (Mockito/when (.listFiles mockDir)) thenReturn (into-array File files))
     mockDir))
 
 (deftest test-mk-FileFilter-for-log-cleanup
-  (testing "log file filter selects the correct log files for purge"
+  (testing "log file filter selects the correct worker-log dirs for purge"
     (let [now-millis (current-time-millis)
           conf {LOGVIEWER-CLEANUP-AGE-MINS 60
                 LOGVIEWER-CLEANUP-INTERVAL-SECS 300}
@@ -51,120 +59,181 @@
           old-mtime-millis (- cutoff-millis 500)
           new-mtime-millis (+ cutoff-millis 500)
           matching-files (map #(mk-mock-File %)
-                              [{:name "oldlog-1-2-worker-3.log"
-                                :type :file
-                                :mtime old-mtime-millis}
-                               {:name "oldlog-1-2-worker-3.log.8"
-                                :type :file
-                                :mtime old-mtime-millis}
-                               {:name "foobar*_topo-1-24242-worker-2834238.log"
-                                :type :file
-                                :mtime old-mtime-millis}])
+                           [{:name "3031"
+                             :type :directory
+                             :mtime old-mtime-millis}
+                            {:name "3032"
+                             :type :directory
+                             :mtime old-mtime-millis}
+                            {:name "7077"
+                             :type :directory
+                             :mtime old-mtime-millis}])
           excluded-files (map #(mk-mock-File %)
-                              [{:name "oldlog-1-2-worker-.log"
-                                :type :file
-                                :mtime old-mtime-millis}
-                               {:name "olddir-1-2-worker.log"
-                                :type :directory
-                                :mtime old-mtime-millis}
-                               {:name "newlog-1-2-worker.log"
-                                :type :file
-                                :mtime new-mtime-millis}
-                               {:name "some-old-file.txt"
-                                :type :file
-                                :mtime old-mtime-millis}
-                               {:name "metadata"
-                                :type :directory
-                                :mtime old-mtime-millis}
-                               {:name "newdir-1-2-worker.log"
-                                :type :directory
-                                :mtime new-mtime-millis}
-                               {:name "newdir"
-                                :type :directory
-                                :mtime new-mtime-millis}
-                              ])
+                           [{:name "oldlog-1-2-worker-.log"
+                             :type :file
+                             :mtime old-mtime-millis}
+                            {:name "newlog-1-2-worker.log"
+                             :type :file
+                             :mtime new-mtime-millis}
+                            {:name "some-old-file.txt"
+                             :type :file
+                             :mtime old-mtime-millis}
+                            {:name "olddir-1-2-worker.log"
+                             :type :directory
+                             :mtime new-mtime-millis}
+                            {:name "metadata"
+                             :type :directory
+                             :mtime new-mtime-millis}
+                            {:name "newdir"
+                             :type :directory
+                             :mtime new-mtime-millis}
+                            ])
           file-filter (logviewer/mk-FileFilter-for-log-cleanup conf now-millis)]
-        (is   (every? #(.accept file-filter %) matching-files))
-        (is (not-any? #(.accept file-filter %) excluded-files))
+      (is   (every? #(.accept file-filter %) matching-files))
+      (is (not-any? #(.accept file-filter %) excluded-files))
       )))
 
-(deftest test-get-log-root->files-map
-  (testing "returns map of root name to list of files"
-    (let [files (vec (map #(java.io.File. %) ["log-1-2-worker-3.log"
-                                              "log-1-2-worker-3.log.1.gz"
-                                              "log-1-2-worker-3.log.err"
-                                              "log-1-2-worker-3.log.out"
-                                              "log-1-2-worker-3.log.out.1.gz"
-                                              "log-1-2-worker-3.log.1"
-                                              "log-2-4-worker-6.log.1"]))
-          expected {"log-1-2-worker-3" #{(files 0) (files 1) (files 2) (files 3) (files 4) (files 5)}
-                    "log-2-4-worker-6" #{(files 6)}}]
-      (is (= expected (logviewer/get-log-root->files-map files))))))
-
-(deftest test-identify-worker-log-files
-  (testing "Does not include metadata file when there are any log files that
-           should not be cleaned up"
-    (let [cutoff-millis 2000
-          old-logFile (mk-mock-File {:name "mock-1-1-worker-1.log.1"
-                                     :type :file
-                                     :mtime (- cutoff-millis 1000)})
-          mock-metaFile (mk-mock-File {:name "mock-1-1-worker-1.yaml"
-                                       :type :file
-                                       :mtime 1})
-          new-logFile (mk-mock-File {:name "mock-1-1-worker-1.log"
-                                     :type :file
-                                     :mtime (+ cutoff-millis 1000)})
+(deftest test-sort-worker-logs
+  (testing "cleaner sorts the log files in ascending ages for deletion"
+    (stubbing [logviewer/filter-candidate-files (fn [x _] x)]
+      (let [now-millis (current-time-millis)
+            files1 (into-array File (map #(mk-mock-File {:name (str %)
+                                                         :type :file
+                                                         :mtime (- now-millis (* 100 %))})
+                                      (range 1 6)))
+            files2 (into-array File (map #(mk-mock-File {:name (str %)
+                                                         :type :file
+                                                         :mtime (- now-millis (* 100 %))})
+                                      (range 6 11)))
+            files3 (into-array File (map #(mk-mock-File {:name (str %)
+                                                         :type :file
+                                                         :mtime (- now-millis (* 100 %))})
+                                      (range 11 16)))
+            port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1"
+                                     :type :directory
+                                     :files files1})
+            port2-dir (mk-mock-File {:name "/workers-artifacts/topo1/port2"
+                                     :type :directory
+                                     :files files2})
+            port3-dir (mk-mock-File {:name "/workers-artifacts/topo2/port3"
+                                     :type :directory
+                                     :files files3})
+            topo1-files (into-array File [port1-dir port2-dir])
+            topo2-files (into-array File [port3-dir])
+            topo1-dir (mk-mock-File {:name "/workers-artifacts/topo1"
+                                     :type :directory
+                                     :files topo1-files})
+            topo2-dir (mk-mock-File {:name "/workers-artifacts/topo2"
+                                     :type :directory
+                                     :files topo2-files})
+            root-files (into-array File [topo1-dir topo2-dir])
+            root-dir (mk-mock-File {:name "/workers-artifacts"
+                                    :type :directory
+                                    :files root-files})
+            sorted-logs (logviewer/sorted-worker-logs root-dir)
+            sorted-ints (map #(Integer. (.getName %)) sorted-logs)]
+        (is (= (count sorted-logs) 15))
+        (is (= (count sorted-ints) 15))
+        (is (apply #'> sorted-ints))))))
+
+(deftest test-per-workerdir-cleanup
+  (testing "cleaner deletes oldest files in each worker dir if files are larger than per-dir quota."
+    (stubbing [rmr nil]
+      (let [now-millis (current-time-millis)
+            files1 (into-array File (map #(mk-mock-File {:name (str "A" %)
+                                                         :type :file
+                                                         :mtime (+ now-millis (* 100 %))
+                                                         :length 200 })
+                                      (range 0 10)))
+            files2 (into-array File (map #(mk-mock-File {:name (str "B" %)
+                                                         :type :file
+                                                         :mtime (+ now-millis (* 100 %))
+                                                         :length 200 })
+                                      (range 0 10)))
+            files3 (into-array File (map #(mk-mock-File {:name (str "C" %)
+                                                         :type :file
+                                                         :mtime (+ now-millis (* 100 %))
+                                                         :length 200 })
+                                      (range 0 10)))
+            port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1"
+                                     :type :directory
+                                     :files files1})
+            port2-dir (mk-mock-File {:name "/workers-artifacts/topo1/port2"
+                                     :type :directory
+                                     :files files2})
+            port3-dir (mk-mock-File {:name "/workers-artifacts/topo2/port3"
+                                     :type :directory
+                                     :files files3})
+            topo1-files (into-array File [port1-dir port2-dir])
+            topo2-files (into-array File [port3-dir])
+            topo1-dir (mk-mock-File {:name "/workers-artifacts/topo1"
+                                     :type :directory
+                                     :files topo1-files})
+            topo2-dir (mk-mock-File {:name "/workers-artifacts/topo2"
+                                     :type :directory
+                                     :files topo2-files})
+            root-files (into-array File [topo1-dir topo2-dir])
+            root-dir (mk-mock-File {:name "/workers-artifacts"
+                                    :type :directory
+                                    :files root-files})
+            remaining-logs (logviewer/per-workerdir-cleanup root-dir 1200)]
+        (is (= (count (first remaining-logs)) 6))
+        (is (= (count (second remaining-logs)) 6))
+        (is (= (count (last remaining-logs)) 6))))))
+
+(deftest test-delete-oldest-log-cleanup
+  (testing "delete oldest logs deletes the oldest set of logs when the total size gets too large."
+    (stubbing [rmr nil]
+      (let [now-millis (current-time-millis)
+            files (into-array File (map #(mk-mock-File {:name (str %)
+                                                        :type :file
+                                                        :mtime (+ now-millis (* 100 %))
+                                                        :length 100 })
+                                     (range 0 20)))
+            remaining-logs (logviewer/delete-oldest-while-logs-too-large files 501)]
+        (is (= (logviewer/sum-file-size files) 2000))
+        (is (= (count remaining-logs) 5))
+        (is (= (.getName (first remaining-logs)) "15"))))))
+
+(deftest test-identify-worker-log-dirs
+  (testing "Build up workerid-workerlogdir map for the old workers' dirs"
+    (let [port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1"
+                                   :type :directory})
+          mock-metaFile (mk-mock-File {:name "worker.yaml"
+                                       :type :file})
           exp-id "id12345"
-          exp-user "alice"
-          expected {exp-id {:owner exp-user
-                            :files #{old-logFile}}}]
+          expected {exp-id port1-dir}]
       (stubbing [supervisor/read-worker-heartbeats nil
-                logviewer/get-metadata-file-for-log-root-name mock-metaFile
-                read-dir-contents [(.getName old-logFile) (.getName new-logFile)]
-                logviewer/get-worker-id-from-metadata-file exp-id
-                logviewer/get-topo-owner-from-metadata-file exp-user]
-        (is (= expected (logviewer/identify-worker-log-files [old-logFile] "/tmp/")))))))
+                 logviewer/get-metadata-file-for-wroker-logdir mock-metaFile
+                 logviewer/get-worker-id-from-metadata-file exp-id]
+        (is (= expected (logviewer/identify-worker-log-dirs [port1-dir])))))))
 
-(deftest test-get-dead-worker-files-and-owners
+(deftest test-get-dead-worker-dirs
   (testing "removes any files of workers that are still alive"
     (let [conf {SUPERVISOR-WORKER-TIMEOUT-SECS 5}
           id->hb {"42" {:time-secs 1}}
           now-secs 2
-          log-files #{:expected-file :unexpected-file}
-          exp-owner "alice"]
-      (stubbing [logviewer/identify-worker-log-files {"42" {:owner exp-owner
-                                                            :files #{:unexpected-file}}
-                                                      "007" {:owner exp-owner
-                                                             :files #{:expected-file}}}
-                 logviewer/get-topo-owner-from-metadata-file "alice"
+          unexpected-dir (mk-mock-File {:name "dir1" :type :directory})
+          expected-dir (mk-mock-File {:name "dir2" :type :directory})
+          log-dirs #{unexpected-dir expected-dir}]
+      (stubbing [logviewer/identify-worker-log-dirs {"42" unexpected-dir,
+                                                     "007" expected-dir}
                  supervisor/read-worker-heartbeats id->hb]
-        (is (= [{:owner exp-owner :files #{:expected-file}}]
-               (logviewer/get-dead-worker-files-and-owners conf now-secs log-files "/tmp/")))))))
+        (is (= #{expected-dir}
+              (logviewer/get-dead-worker-dirs conf now-secs log-dirs)))))))
 
 (deftest test-cleanup-fn
-  (testing "cleanup function removes file as user when one is specified"
-    (let [exp-user "mock-user"
-          mockfile1 (mk-mock-File {:name "file1" :type :file})
-          mockfile2 (mk-mock-File {:name "file2" :type :file})
-          mockfile3 (mk-mock-File {:name "file3" :type :file})
-          mockyaml  (mk-mock-File {:name "foo.yaml" :type :file})
-          exp-cmd (str "rmr /mock/canonical/path/to/" (.getName mockfile3))]
-      (stubbing [logviewer/select-files-for-cleanup
-                   [(mk-mock-File {:name "throwaway" :type :file})]
-                 logviewer/get-dead-worker-files-and-owners
-                   [{:owner nil :files #{mockfile1}}
-                    {:files #{mockfile2}}
-                    {:owner exp-user :files #{mockfile3 mockyaml}}]
-                 supervisor/worker-launcher nil
+  (testing "cleanup function rmr's files of dead workers"
+    (let [mockfile1 (mk-mock-File {:name "delete-me1" :type :file})
+          mockfile2 (mk-mock-File {:name "delete-me2" :type :file})]
+      (stubbing [logviewer/select-dirs-for-cleanup nil
+                 logviewer/get-dead-worker-dirs (sorted-set mockfile1 mockfile2)
+                 logviewer/cleanup-empty-topodir nil
                  rmr nil]
-        (logviewer/cleanup-fn! "/tmp/")
-        (verify-call-times-for supervisor/worker-launcher 1)
-        (verify-first-call-args-for-indices supervisor/worker-launcher
-                                            [1 2] exp-user exp-cmd)
-        (verify-call-times-for rmr 3)
+        (logviewer/cleanup-fn! "/bogus/path")
+        (verify-call-times-for rmr 2)
         (verify-nth-call-args-for 1 rmr (.getCanonicalPath mockfile1))
-        (verify-nth-call-args-for 2 rmr (.getCanonicalPath mockfile2))
-        (verify-nth-call-args-for 3 rmr (.getCanonicalPath mockyaml))))))
+        (verify-nth-call-args-for 2 rmr (.getCanonicalPath mockfile2))))))
 
 (deftest test-authorized-log-user
   (testing "allow cluster admin"
@@ -212,3 +281,38 @@
       (is (not (logviewer/authorized-log-user? "alice" "non-blank-fname" {})))
       (verify-first-call-args-for logviewer/get-log-user-group-whitelist "non-blank-fname")
       (verify-first-call-args-for logviewer/user-groups "alice"))))
+
+(deftest test-list-log-files
+  (testing "list-log-files filter selects the correct log files to return"
+    (let [attrs (make-array FileAttribute 0)
+          root-path (.getCanonicalPath (.toFile (Files/createTempDirectory "workers-artifacts" attrs)))
+          file1 (clojure.java.io/file root-path "topoA" "port1" "worker.log")
+          file2 (clojure.java.io/file root-path "topoA" "port2" "worker.log")
+          file3 (clojure.java.io/file root-path "topoB" "port1" "worker.log")
+          _ (clojure.java.io/make-parents file1)
+          _ (clojure.java.io/make-parents file2)
+          _ (clojure.java.io/make-parents file3)
+          _ (.createNewFile file1)
+          _ (.createNewFile file2)
+          _ (.createNewFile file3)
+          origin "www.origin.server.net"
+          expected-all (json-response '("topoA/port1/worker.log" "topoA/port2/worker.log"
+                                         "topoB/port1/worker.log")
+                         nil
+                         :headers {"Access-Control-Allow-Origin" origin
+                                   "Access-Control-Allow-Credentials" "true"})
+          expected-filter-port (json-response '("topoA/port1/worker.log" "topoB/port1/worker.log")
+                                 nil
+                                 :headers {"Access-Control-Allow-Origin" origin
+                                           "Access-Control-Allow-Credentials" "true"})
+          expected-filter-topoId (json-response '("topoB/port1/worker.log")
+                                   nil
+                                   :headers {"Access-Control-Allow-Origin" origin
+                                             "Access-Control-Allow-Credentials" "true"})
+          returned-all (logviewer/list-log-files "user" nil nil root-path nil origin)
+          returned-filter-port (logviewer/list-log-files "user" nil "port1" root-path nil origin)
+          returned-filter-topoId (logviewer/list-log-files "user" "topoB" nil root-path nil origin)]
+      (rmr root-path)
+      (is   (= expected-all returned-all))
+      (is   (= expected-filter-port returned-filter-port))
+      (is   (= expected-filter-topoId returned-filter-topoId)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d009d67d/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj
index 7acb477..04c8600 100644
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@ -23,6 +23,9 @@
   (:import [backtype.storm.scheduler ISupervisor])
   (:import [backtype.storm.generated RebalanceOptions])
   (:import [java.util UUID])
+  (:import [java.io File])
+  (:import [java.nio.file Files])
+  (:import [java.nio.file.attribute FileAttribute])
   (:use [backtype.storm config testing util timer])
   (:use [backtype.storm.daemon common])
   (:require [backtype.storm.daemon [worker :as worker] [supervisor :as supervisor]]
@@ -395,12 +398,6 @@
                                                         [2]
                                                         full-env)))))))
 
-(defn rm-r [f]
-  (if (.isDirectory f)
-    (for [sub (.listFiles f)] (rm-r sub))
-    (.delete f)
-  ))
-
 (deftest test-worker-launch-command-run-as-user
   (testing "*.worker.childopts configuration"
     (let [mock-port "42"
@@ -409,7 +406,8 @@
           mock-mem-onheap 512
           mock-sensitivity "S3"
           mock-cp "mock-classpath'quote-on-purpose"
-          storm-local (str "/tmp/" (UUID/randomUUID))
+          attrs (make-array FileAttribute 0)
+          storm-local (.getCanonicalPath (.toFile (Files/createTempDirectory "storm-local" attrs)))
           worker-script (str storm-local "/workers/" mock-worker-id "/storm-worker-script.sh")
           exp-launch ["/bin/worker-launcher"
                       "me"
@@ -450,13 +448,13 @@
                                " '" mock-storm-id "'"
                                " '" mock-port "'"
                                " '" mock-worker-id "';"))]
-      (.mkdirs (io/file storm-local "workers" mock-worker-id))
       (try
         (testing "testing *.worker.childopts as strings with extra spaces"
           (let [string-opts "-Dfoo=bar  -Xmx1024m"
                 topo-string-opts "-Dkau=aux   -Xmx2048m"
                 exp-script (exp-script-fn ["-Dfoo=bar" "-Xmx1024m"]
                                           ["-Dkau=aux" "-Xmx2048m"])
+                _ (.mkdirs (io/file storm-local "workers" mock-worker-id))
                 mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
                                         STORM-LOCAL-DIR storm-local
                                         SUPERVISOR-RUN-WORKER-AS-USER true
@@ -480,7 +478,8 @@
                                                           [0]
                                                           exp-launch))
             (is (= (slurp worker-script) exp-script))))
-        (finally (rm-r (io/file storm-local))))
+        (finally (rmr storm-local)))
+      (.mkdirs (io/file storm-local "workers" mock-worker-id))
       (try
         (testing "testing *.worker.childopts as list of strings, with spaces in values"
           (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
@@ -509,7 +508,7 @@
                                                           [0]
                                                           exp-launch))
             (is (= (slurp worker-script) exp-script))))
-        (finally (rm-r (io/file storm-local)))))))
+        (finally (rmr storm-local))))))
 
 (deftest test-workers-go-bananas
   ;; test that multiple workers are started for a port, and test that


[6/6] storm git commit: Merge branch 'STORM-901'

Posted by da...@apache.org.
Merge branch 'STORM-901'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0bba2baf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0bba2baf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0bba2baf

Branch: refs/heads/master
Commit: 0bba2baf92bb4042b2c47b2266ff1e4df006a6a3
Parents: b6615d5 8cac782
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Thu Oct 29 21:26:27 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Thu Oct 29 21:26:27 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md                                    |   1 +
 conf/defaults.yaml                              |   2 +
 log4j2/worker.xml                               |  12 +-
 storm-core/src/clj/backtype/storm/config.clj    |  19 +
 .../src/clj/backtype/storm/daemon/logviewer.clj | 514 ++++++++++++++-----
 .../clj/backtype/storm/daemon/supervisor.clj    |  19 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  29 +-
 .../src/clj/backtype/storm/ui/helpers.clj       |  26 +-
 storm-core/src/clj/backtype/storm/util.clj      |  41 +-
 storm-core/src/jvm/backtype/storm/Config.java   |  12 +
 .../storm/metric/FileBasedEventLogger.java      |  18 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |  69 +++
 .../test/clj/backtype/storm/logviewer_test.clj  | 312 +++++++----
 .../test/clj/backtype/storm/supervisor_test.clj | 210 ++++----
 14 files changed, 881 insertions(+), 403 deletions(-)
----------------------------------------------------------------------



[5/6] storm git commit: STORM-901

Posted by da...@apache.org.
STORM-901


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8cac7823
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8cac7823
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8cac7823

Branch: refs/heads/master
Commit: 8cac78238532dde607560487a8bbed786f6152b7
Parents: 62c4034
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Thu Oct 29 21:25:57 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Thu Oct 29 21:25:57 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8cac7823/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a422b79..2303277 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -100,6 +100,7 @@
  * STORM-958: Add config for init params of group mapping service
  * STORM-949: On the topology summary UI page, last shown error should have the time and date
  * STORM-1142: Some config validators for positive ints need to allow 0
+ * STORM-901: Worker Artifacts Directory
 
 ## 0.10.0-beta2
  * STORM-1108: Fix NPE in simulated time


[2/6] storm git commit: STORM-901 worker-artifacts for logviewer

Posted by da...@apache.org.
STORM-901 worker-artifacts for logviewer

Solve event-log issue, ui, dir, routes, listLogs

Fix supervisor test


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2c2858e4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2c2858e4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2c2858e4

Branch: refs/heads/master
Commit: 2c2858e430d0c9065e98f71ba22113628612e022
Parents: b6615d5
Author: zhuol <zh...@yahoo-inc.com>
Authored: Mon Oct 26 11:37:43 2015 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Oct 29 13:18:15 2015 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |   2 +
 log4j2/worker.xml                               |  12 +-
 storm-core/src/clj/backtype/storm/config.clj    |  19 +
 .../src/clj/backtype/storm/daemon/logviewer.clj | 434 +++++++++++++------
 .../clj/backtype/storm/daemon/supervisor.clj    |  19 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  26 +-
 .../src/clj/backtype/storm/ui/helpers.clj       |  26 +-
 storm-core/src/clj/backtype/storm/util.clj      |  41 +-
 storm-core/src/jvm/backtype/storm/Config.java   |  12 +
 .../storm/metric/FileBasedEventLogger.java      |  13 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |  64 +++
 .../test/clj/backtype/storm/supervisor_test.clj | 197 +++++----
 12 files changed, 579 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index f4c189c..9150ca4 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -87,6 +87,8 @@ logviewer.port: 8000
 logviewer.childopts: "-Xmx128m"
 logviewer.cleanup.age.mins: 10080
 logviewer.appender.name: "A1"
+logviewer.max.sum.worker.logs.size.mb: 4096
+logviewer.max.per.worker.logs.size.mb: 2048
 
 logs.users: null
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/log4j2/worker.xml
----------------------------------------------------------------------
diff --git a/log4j2/worker.xml b/log4j2/worker.xml
index 2017699..ee7b7a1 100644
--- a/log4j2/worker.xml
+++ b/log4j2/worker.xml
@@ -23,8 +23,8 @@
 </properties>
 <appenders>
     <RollingFile name="A1"
-                 fileName="${sys:storm.log.dir}/${sys:logfile.name}"
-                 filePattern="${sys:storm.log.dir}/${sys:logfile.name}.%i.gz">
+		fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}"
+		filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.%i.gz">
         <PatternLayout>
             <pattern>${pattern}</pattern>
         </PatternLayout>
@@ -34,8 +34,8 @@
         <DefaultRolloverStrategy max="9"/>
     </RollingFile>
     <RollingFile name="STDOUT"
-                 fileName="${sys:storm.log.dir}/${sys:logfile.name}.out"
-                 filePattern="${sys:storm.log.dir}/${sys:logfile.name}.out.%i.gz">
+		fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out"
+		filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out.%i.gz">
         <PatternLayout>
             <pattern>${patternNoTime}</pattern>
         </PatternLayout>
@@ -45,8 +45,8 @@
         <DefaultRolloverStrategy max="4"/>
     </RollingFile>
     <RollingFile name="STDERR"
-                 fileName="${sys:storm.log.dir}/${sys:logfile.name}.err"
-                 filePattern="${sys:storm.log.dir}/${sys:logfile.name}.err.%i.gz">
+		fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err"
+		filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err.%i.gz">
         <PatternLayout>
             <pattern>${patternNoTime}</pattern>
         </PatternLayout>

http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
index e1cafec..f06f6e9 100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@ -221,6 +221,25 @@
   (log-message "REMOVE worker-user " worker-id)
   (.delete (File. (worker-user-file conf worker-id))))
 
+(defn worker-artifacts-root
+  ([conf]
+   (str (absolute-storm-local-dir conf) file-path-separator "workers-artifacts"))
+  ([conf id]
+   (str (worker-artifacts-root conf) file-path-separator id))
+  ([conf id port]
+   (str (worker-artifacts-root conf id) file-path-separator port)))
+
+(defn get-log-metadata-file
+  ([fname]
+    (let [[id port & _] (str/split fname (re-pattern file-path-separator))]
+      (get-log-metadata-file (read-storm-config) id port)))
+  ([conf id port]
+    (clojure.java.io/file (str (worker-artifacts-root conf id) file-path-separator port file-path-separator) "worker.yaml")))
+
+(defn get-worker-dir-from-root
+  [log-root id port]
+  (clojure.java.io/file (str log-root file-path-separator id file-path-separator port)))
+
 (defn worker-root
   ([conf]
    (str (absolute-storm-local-dir conf) file-path-separator "workers"))

http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
index 2a8a40c..58561a4 100644
--- a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
@@ -16,12 +16,14 @@
 (ns backtype.storm.daemon.logviewer
   (:use compojure.core)
   (:use [clojure.set :only [difference intersection]])
-  (:use [clojure.string :only [blank?]])
-  (:use [hiccup core page-helpers])
+  (:use [clojure.string :only [blank? split]])
+  (:use [hiccup core page-helpers form-helpers])
   (:use [backtype.storm config util log timer])
   (:use [backtype.storm.ui helpers])
+  (:import [backtype.storm.utils Utils])
   (:import [org.slf4j LoggerFactory])
-  (:import [java.io File FileFilter FileInputStream])
+  (:import [java.io File FileFilter FileInputStream InputStream])
+  (:import [java.util.zip GZIPInputStream])
   (:import [org.apache.logging.log4j LogManager])
   (:import [org.apache.logging.log4j.core Appender LoggerContext])
   (:import [org.apache.logging.log4j.core.appender RollingFileAppender])
@@ -29,14 +31,12 @@
            [org.yaml.snakeyaml.constructor SafeConstructor])
   (:import [backtype.storm.ui InvalidRequestException]
            [backtype.storm.security.auth AuthUtils])
-  (:require [compojure.route :as route]
-            [compojure.handler :as handler]
-            [ring.middleware.keyword-params]
-            [ring.util.response :as resp])
   (:require [backtype.storm.daemon common [supervisor :as supervisor]])
   (:import [java.io File FileFilter])
   (:require [compojure.route :as route]
             [compojure.handler :as handler]
+            [ring.middleware.keyword-params]
+            [ring.util.codec :as codec]
             [ring.util.response :as resp]
             [clojure.string :as string])
   (:gen-class))
@@ -46,18 +46,34 @@
 (defn cleanup-cutoff-age-millis [conf now-millis]
   (- now-millis (* (conf LOGVIEWER-CLEANUP-AGE-MINS) 60 1000)))
 
-;TODO: handle cleanup of old event log files
+(defn- last-modifiedtime-worker-logdir
+  "Return the last modified time for all log files in a worker's log dir"
+  [log-dir]
+  (apply max
+         (.lastModified log-dir)
+         (for [^File file (.listFiles log-dir)]
+           (.lastModified file))))
+
 (defn mk-FileFilter-for-log-cleanup [conf now-millis]
   (let [cutoff-age-millis (cleanup-cutoff-age-millis conf now-millis)]
     (reify FileFilter (^boolean accept [this ^File file]
                         (boolean (and
-                          (.isFile file)
-                          (re-find worker-log-filename-pattern (.getName file))
-                          (<= (.lastModified file) cutoff-age-millis)))))))
+                                   (not (.isFile file))
+                                   (<= (last-modifiedtime-worker-logdir file) cutoff-age-millis)))))))
 
-(defn select-files-for-cleanup [conf now-millis root-dir]
+(defn select-dirs-for-cleanup [conf now-millis root-dir]
   (let [file-filter (mk-FileFilter-for-log-cleanup conf now-millis)]
-    (.listFiles (File. root-dir) file-filter)))
+    (reduce clojure.set/union
+            (sorted-set)
+            (for [^File topo-dir (.listFiles (File. root-dir))]
+              (into [] (.listFiles topo-dir file-filter))))))
+
+(defn get-topo-port-workerlog
+  "Return the path of the worker log with the format of topoId/port/worker.log.*"
+  [^File file]
+  (clojure.string/join file-path-separator
+                       (take-last 3
+                                  (split (.getCanonicalPath file) (re-pattern file-path-separator)))))
 
 (defn get-metadata-file-for-log-root-name [root-name root-dir]
   (let [metaFile (clojure.java.io/file root-dir "metadata"
@@ -69,116 +85,202 @@
                   " to clean up for " root-name)
         nil))))
 
+(defn get-metadata-file-for-wroker-logdir [logdir]
+  (let [metaFile (clojure.java.io/file logdir "worker.yaml")]
+    (if (.exists metaFile)
+      metaFile
+      (do
+        (log-warn "Could not find " (.getCanonicalPath metaFile)
+                  " to clean up for " logdir)
+        nil))))
+
 (defn get-worker-id-from-metadata-file [metaFile]
   (get (clojure-from-yaml-file metaFile) "worker-id"))
 
 (defn get-topo-owner-from-metadata-file [metaFile]
   (get (clojure-from-yaml-file metaFile) TOPOLOGY-SUBMITTER-USER))
 
-(defn get-log-root->files-map [log-files]
-  "Returns a map of \"root name\" to a the set of files in log-files having the
-  root name.  The \"root name\" of a log file is the part of the name preceding
-  the extension."
-  (reduce #(assoc %1                                      ;; The accumulated map so far
-                  (first %2)                              ;; key: The root name of the log file
-                  (conj (%1 (first %2) #{}) (second %2))) ;; val: The set of log files with the root name
-          {}                                              ;; initial (empty) map
-          (map #(list
-                  (second (re-find worker-log-filename-pattern (.getName %))) ;; The root name of the log file
-                  %)                                                          ;; The log file
-               log-files)))
-
-(defn identify-worker-log-files [log-files root-dir]
-  (into {} (for [log-root-entry (get-log-root->files-map log-files)
-                 :let [metaFile (get-metadata-file-for-log-root-name
-                                  (key log-root-entry) root-dir)
-                       log-root (key log-root-entry)
-                       files (val log-root-entry)]
+(defn identify-worker-log-dirs [log-dirs]
+  "return the workerid to worker-log-dir map"
+  (into {} (for [logdir log-dirs
+                 :let [metaFile (get-metadata-file-for-wroker-logdir logdir)]
                  :when metaFile]
-             {(get-worker-id-from-metadata-file metaFile)
-              {:owner (get-topo-owner-from-metadata-file metaFile)
-               :files
-                 ;; If each log for this root name is to be deleted, then
-                 ;; include the metadata file also.
-                 (if (empty? (difference
-                                  (set (filter #(re-find (re-pattern log-root) %)
-                                               (read-dir-contents root-dir)))
-                                  (set (map #(.getName %) files))))
-                  (conj files metaFile)
-                  ;; Otherwise, keep the list of files as it is.
-                  files)}})))
-
-(defn get-dead-worker-files-and-owners [conf now-secs log-files root-dir]
-  (if (empty? log-files)
-    {}
-    (let [id->heartbeat (supervisor/read-worker-heartbeats conf)
-          alive-ids (keys (remove
-                            #(or (not (val %))
-                                 (supervisor/is-worker-hb-timed-out? now-secs (val %) conf))
-                            id->heartbeat))
-          id->entries (identify-worker-log-files log-files root-dir)]
-      (for [[id {:keys [owner files]}] id->entries
-            :when (not (contains? (set alive-ids) id))]
-        {:owner owner
-         :files files}))))
-
-(defn cleanup-fn! [log-root-dir]
+             {(get-worker-id-from-metadata-file metaFile) logdir})))
+
+(defn get-alive-ids
+  [conf now-secs]
+  (->>
+    (supervisor/read-worker-heartbeats conf)
+    (remove
+      #(or (not (val %))
+           (supervisor/is-worker-hb-timed-out? now-secs
+                                               (val %)
+                                               conf)))
+    keys
+    set))
+
+(defn get-dead-worker-dirs
+  "Return a sorted set of java.io.Files that were written by workers that are
+  now dead"
+  [conf now-secs log-dirs]
+  (if (empty? log-dirs)
+    (sorted-set)
+    (let [alive-ids (get-alive-ids conf now-secs)
+          id->dir (identify-worker-log-dirs log-dirs)]
+      (apply sorted-set
+             (for [[id dir] id->dir
+                   :when (not (contains? alive-ids id))]
+               dir)))))
+
+(defn get-all-worker-dirs [^File root-dir]
+  (reduce clojure.set/union
+          (sorted-set)
+          (for [^File topo-dir (.listFiles root-dir)]
+            (into [] (.listFiles topo-dir)))))
+
+(defn get-alive-worker-dirs
+  "Return a sorted set of java.io.Files that were written by workers that are
+  now active"
+  [conf root-dir]
+  (let [alive-ids (get-alive-ids conf (current-time-secs))
+        log-dirs (get-all-worker-dirs root-dir)
+        id->dir (identify-worker-log-dirs log-dirs)]
+    (apply sorted-set
+           (for [[id dir] id->dir
+                 :when (contains? alive-ids id)]
+             (.getCanonicalPath dir)))))
+
+(defn get-all-logs-for-rootdir [^File log-dir]
+  (reduce concat
+          (for [port-dir (get-all-worker-dirs log-dir)]
+            (into [] (.listFiles port-dir)))))
+
+(defn is-active-log [^File file]
+  (re-find #"\.(log|err|out|current|yaml|pid)$" (.getName file)))
+
+(defn filter-candidate-files
+  "Filter candidate files for global cleanup"
+  [logs log-dir]
+  (let [alive-worker-dirs (get-alive-worker-dirs *STORM-CONF* log-dir)]
+    (filter #(and (not= (.getName %) "worker.yaml")  ; exclude metadata file
+                  (not (and (contains? alive-worker-dirs (.getCanonicalPath (.getParentFile %)))
+                            (is-active-log %)))) ; exclude active workers' active logs
+            logs)))
+
+(defn sorted-worker-logs
+  "Collect the wroker log files recursively, sorted by decreasing age."
+  [^File root-dir]
+  (let [files (get-all-logs-for-rootdir root-dir)
+        logs (filter-candidate-files files root-dir)]
+    (sort-by #(.lastModified %) logs)))
+
+(defn sum-file-size
+  "Given a sequence of Files, sum their sizes."
+  [files]
+  (reduce #(+ %1 (.length %2)) 0 files))
+
+(defn delete-oldest-while-logs-too-large [logs_ size]
+  (loop [logs logs_]
+    (if (> (sum-file-size logs) size)
+      (do
+        (log-message "Log sizes too high. Going to delete: " (.getName (first logs)))
+        (try (rmr (.getCanonicalPath (first logs)))
+             (catch Exception ex (log-error ex)))
+        (recur (rest logs)))
+      logs)))
+
+(defn per-workerdir-cleanup
+  "Delete the oldest files in overloaded worker log dir"
+  [^File root-dir size]
+  (dofor [worker-dir (get-all-worker-dirs root-dir)]
+    (let [filtered-logs (filter #(not (is-active-log %)) (.listFiles worker-dir))
+          sorted-logs (sort-by #(.lastModified %) filtered-logs)]
+      (delete-oldest-while-logs-too-large sorted-logs size))))
+
+(defn cleanup-empty-topodir
+  "Delete the topo dir if it contains zero port dirs"
+  [^File dir]
+  (let [topodir (.getParentFile dir)]
+    (if (empty? (.listFiles topodir))
+      (rmr (.getCanonicalPath topodir)))))
+
+(defn cleanup-fn!
+  "Delete old log dirs for which the workers are no longer alive"
+  [log-root-dir]
   (let [now-secs (current-time-secs)
-        old-log-files (select-files-for-cleanup *STORM-CONF* (* now-secs 1000) log-root-dir)
-        dead-worker-files (get-dead-worker-files-and-owners *STORM-CONF* now-secs old-log-files log-root-dir)]
+        old-log-dirs (select-dirs-for-cleanup *STORM-CONF*
+                                              (* now-secs 1000)
+                                              log-root-dir)
+        total-size (*STORM-CONF* LOGVIEWER-MAX-SUM-WORKER-LOGS-SIZE-MB)
+        per-dir-size (*STORM-CONF* LOGVIEWER-MAX-PER-WORKER-LOGS-SIZE-MB)
+        per-dir-size (min per-dir-size (* total-size 0.5))
+        dead-worker-dirs (get-dead-worker-dirs *STORM-CONF*
+                                               now-secs
+                                               old-log-dirs)]
     (log-debug "log cleanup: now=" now-secs
-               " old log files " (pr-str (map #(.getName %) old-log-files))
-               " dead worker files " (->> dead-worker-files
-                                          (mapcat (fn [{l :files}] l))
-                                          (map #(.getName %))
-                                          (pr-str)))
-    (dofor [{:keys [owner files]} dead-worker-files
-            file files]
-      (let [path (.getCanonicalPath file)]
-        (log-message "Cleaning up: Removing " path)
-        (try
-          (if (or (blank? owner) (re-matches #".*\.yaml$" path))
-            (rmr path)
-            ;; worker-launcher does not actually launch a worker process.  It
-            ;; merely executes one of a prescribed set of commands.  In this case, we ask it
-            ;; to delete a file as the owner of that file.
-            (supervisor/worker-launcher *STORM-CONF* owner (str "rmr " path)))
-          (catch Exception ex
-            (log-error ex)))))))
+               " old log dirs " (pr-str (map #(.getName %) old-log-dirs))
+               " dead worker dirs " (pr-str
+                                       (map #(.getName %) dead-worker-dirs)))
+    (dofor [dir dead-worker-dirs]
+           (let [path (.getCanonicalPath dir)]
+             (log-message "Cleaning up: Removing " path)
+             (try (rmr path)
+                  (cleanup-empty-topodir dir)
+                  (catch Exception ex (log-error ex)))))
+    (per-workerdir-cleanup (File. log-root-dir) (* per-dir-size (* 1024 1024)))
+    (let [all-logs (sorted-worker-logs (File. log-root-dir))
+          size (* total-size (*  1024 1024))]
+      (delete-oldest-while-logs-too-large all-logs size))))
 
 (defn start-log-cleaner! [conf log-root-dir]
   (let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
     (when interval-secs
       (log-debug "starting log cleanup thread at interval: " interval-secs)
-      (schedule-recurring (mk-timer :thread-name "logviewer-cleanup")
+      (schedule-recurring (mk-timer :thread-name "logviewer-cleanup"
+                                    :kill-fn (fn [t]
+                                               (log-error t "Error when doing logs cleanup")
+                                               (exit-process! 20 "Error when doing log cleanup")))
                           0 ;; Start immediately.
                           interval-secs
                           (fn [] (cleanup-fn! log-root-dir))))))
 
+(defn- skip-bytes
+  "FileInputStream#skip may not work the first time, so ensure it successfully
+  skips the given number of bytes."
+  [^InputStream stream n]
+  (loop [skipped 0]
+    (let [skipped (+ skipped (.skip stream (- n skipped)))]
+      (if (< skipped n) (recur skipped)))))
+
+(defn logfile-matches-filter?
+  [log-file-name]
+  (let [regex-string (str "worker.log.*")
+        regex-pattern (re-pattern regex-string)]
+    (not= (re-seq regex-pattern (.toString log-file-name)) nil)))
+
 (defn page-file
   ([path tail]
-    (let [flen (.length (clojure.java.io/file path))
+    (let [zip-file? (.endsWith path ".gz")
+          flen (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
           skip (- flen tail)]
       (page-file path skip tail)))
   ([path start length]
-    (with-open [input (FileInputStream. path)
-                output (java.io.ByteArrayOutputStream.)]
-      (if (>= start (.length (clojure.java.io/file path)))
-        (throw
-          (InvalidRequestException. "Cannot start past the end of the file")))
-      (if (> start 0)
-        ;; FileInputStream#skip may not work the first time.
-        (loop [skipped 0]
-          (let [skipped (+ skipped (.skip input (- start skipped)))]
-            (if (< skipped start) (recur skipped)))))
-      (let [buffer (make-array Byte/TYPE 1024)]
-        (loop []
-          (when (< (.size output) length)
-            (let [size (.read input buffer 0 (min 1024 (- length (.size output))))]
-              (when (pos? size)
-                (.write output buffer 0 size)
-                (recur)))))
-      (.toString output)))))
+    (let [zip-file? (.endsWith path ".gz")
+          flen (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))]
+      (with-open [input (if zip-file? (GZIPInputStream. (FileInputStream. path)) (FileInputStream. path))
+                  output (java.io.ByteArrayOutputStream.)]
+        (if (>= start flen)
+          (throw
+            (InvalidRequestException. "Cannot start past the end of the file")))
+        (if (> start 0) (skip-bytes input start))
+        (let [buffer (make-array Byte/TYPE 1024)]
+          (loop []
+            (when (< (.size output) length)
+              (let [size (.read input buffer 0 (min 1024 (- length (.size output))))]
+                (when (pos? size)
+                  (.write output buffer 0 size)
+                  (recur)))))
+        (.toString output))))))
 
 (defn get-log-user-group-whitelist [fname]
   (let [wl-file (get-log-metadata-file fname)
@@ -219,7 +321,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
     (if (and appender-name appender (instance? RollingFileAppender appender))
       (.getParent (File. (.getFileName appender)))
       (throw
-       (RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and log4j2 agree.")))))
+       (RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and logback agree.")))))
 
 (defnk to-btn-link
   "Create a link that is formatted like a button"
@@ -227,6 +329,11 @@ Note that if anything goes wrong, this will throw an Error and exit."
   [:a {:href (java.net.URI. url)
        :class (str "btn btn-default " (if enabled "enabled" "disabled"))} text])
 
+(defn log-file-selection-form [log-files]
+  [[:form {:action "log" :id "list-of-files"}
+    (drop-down "file" log-files )
+    [:input {:type "submit" :value "Switch file"}]]])
+
 (defn pager-links [fname start length file-size]
   (let [prev-start (max 0 (- start length))
         next-start (if (> file-size 0)
@@ -255,25 +362,36 @@ Note that if anything goes wrong, this will throw an Error and exit."
                         "Next" :enabled (> next-start start))])]]))
 
 (defn- download-link [fname]
-  [[:p (link-to (url-format "/download/%s" fname) "Download Full Log")]])
+  [[:p (link-to (url-format "/download/%s" fname) "Download Full File")]])
+
+(def default-bytes-per-page 51200)
 
 (defn log-page [fname start length grep user root-dir]
   (if (or (blank? (*STORM-CONF* UI-FILTER))
           (authorized-log-user? user fname *STORM-CONF*))
     (let [file (.getCanonicalFile (File. root-dir fname))
-          file-length (.length file)
-          path (.getCanonicalPath file)]
-      (if (and (= (.getCanonicalFile (File. root-dir))
-                  (.getParentFile file))
-               (.exists file))
-        (let [default-length 51200
-              length (if length
+          path (.getCanonicalPath file)
+          zip-file? (.endsWith path ".gz")
+          file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
+          topo-dir (.getParentFile (.getParentFile file))
+          log-files (reduce clojure.set/union
+                            (sorted-set)
+                            (for [^File port-dir (.listFiles topo-dir)]
+                              (into [] (filter #(.isFile %) (.listFiles port-dir))))) ;all types of files included
+          files-str (for [file log-files]
+                      (get-topo-port-workerlog file))
+          reordered-files-str (conj (filter #(not= fname %) files-str) fname)]
+      (if (.exists file)
+        (let [length (if length
                        (min 10485760 length)
-                     default-length)
+                       default-bytes-per-page)
+              is-txt-file (re-find #"\.(log.*|txt|yaml)$" fname)
               log-string (escape-html
-                           (if start
-                             (page-file path start length)
-                             (page-file path length)))
+                           (if is-txt-file
+                             (if start
+                               (page-file path start length)
+                               (page-file path length))
+                             "This is a binary file and cannot display! You may download the full file."))
               start (or start (- file-length length))]
           (if grep
             (html [:pre#logContent
@@ -282,8 +400,9 @@ Note that if anything goes wrong, this will throw an Error and exit."
                           (filter #(.contains % grep))
                           (string/join "\n"))
                      log-string)])
-            (let [pager-data (pager-links fname start length file-length)]
-              (html (concat pager-data
+            (let [pager-data (if is-txt-file (pager-links fname start length file-length) nil)]
+              (html (concat (log-file-selection-form reordered-files-str) ; list all files for this topology
+                            pager-data
                             (download-link fname)
                             [[:pre#logContent log-string]]
                             pager-data)))))
@@ -296,7 +415,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
 
 (defn download-log-file [fname req resp user ^String root-dir]
   (let [file (.getCanonicalFile (File. root-dir fname))]
-    (if (= (File. root-dir) (.getParentFile file))
+    (if (.exists file)
       (if (or (blank? (*STORM-CONF* UI-FILTER))
               (authorized-log-user? user fname *STORM-CONF*))
         (-> (resp/response file)
@@ -333,26 +452,73 @@ Note that if anything goes wrong, this will throw an Error and exit."
                     (name k) "'")
                ex)))))
 
+(defn list-log-files
+  [user topoId port log-root callback origin]
+  (let [file-results
+        (if (nil? topoId)
+          (if (nil? port)
+            (get-all-logs-for-rootdir (File. log-root))
+            (reduce concat
+              (for [topo-dir (.listFiles (File. log-root))]
+                (reduce concat
+                  (for [port-dir (.listFiles topo-dir)]
+                    (if (= (str port) (.getName port-dir))
+                      (into [] (.listFiles port-dir))))))))
+          (if (nil? port)
+            (let [topo-dir (File. (str log-root file-path-separator topoId))]
+              (if (.exists topo-dir)
+                (reduce concat
+                  (for [port-dir (.listFiles topo-dir)]
+                    (into [] (.listFiles port-dir))))
+                []))
+            (let [port-dir (get-worker-dir-from-root log-root topoId port)]
+              (if (.exists port-dir)
+                (into [] (.listFiles port-dir))
+                []))))
+        file-strs (sort (for [file file-results]
+                          (get-topo-port-workerlog file)))]
+    (json-response file-strs
+      callback
+      :headers {"Access-Control-Allow-Origin" origin
+                "Access-Control-Allow-Credentials" "true"})))
+
 (defroutes log-routes
   (GET "/log" [:as req & m]
-       (try
-         (let [servlet-request (:servlet-request req)
-               log-root (:log-root req)
-               user (.getUserName http-creds-handler servlet-request)
-               start (if (:start m) (parse-long-from-map m :start))
-               length (if (:length m) (parse-long-from-map m :length))]
-           (log-template (log-page (:file m) start length (:grep m) user log-root)
-                         (:file m) user))
-         (catch InvalidRequestException ex
-           (log-error ex)
-           (ring-response-from-exception ex))))
+    (try
+      (let [servlet-request (:servlet-request req)
+            log-root (:log-root req)
+            user (.getUserName http-creds-handler servlet-request)
+            start (if (:start m) (parse-long-from-map m :start))
+            length (if (:length m) (parse-long-from-map m :length))
+            file (url-decode (:file m))]
+        (log-template (log-page file start length (:grep m) user log-root)
+          file user))
+      (catch InvalidRequestException ex
+        (log-error ex)
+        (ring-response-from-exception ex))))
   (GET "/download/:file" [:as {:keys [servlet-request servlet-response log-root]} file & m]
-       (try
-         (let [user (.getUserName http-creds-handler servlet-request)]
-           (download-log-file file servlet-request servlet-response user log-root))
-         (catch InvalidRequestException ex
-           (log-error ex)
-           (ring-response-from-exception ex))))
+    ;; We do not use servlet-response here, but do not remove it from the
+    ;; :keys list, or this rule could stop working when an authentication
+    ;; filter is configured.
+    (try
+      (let [user (.getUserName http-creds-handler servlet-request)]
+        (download-log-file file servlet-request servlet-response user log-root))
+      (catch InvalidRequestException ex
+        (log-error ex)
+        (ring-response-from-exception ex))))
+  (GET "/listLogs" [:as req & m]
+    (try
+      (let [servlet-request (:servlet-request req)
+            user (.getUserName http-creds-handler servlet-request)]
+        (list-log-files user
+          (:topoId m)
+          (:port m)
+          (:log-root req)
+          (:callback m)
+          (.getHeader servlet-request "Origin")))
+      (catch InvalidRequestException ex
+        (log-error ex)
+        (json-response (exception->json ex) (:callback m) :status 400))))
   (route/resources "/")
   (route/not-found "Page not found"))
 
@@ -406,7 +572,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
 
 (defn -main []
   (let [conf (read-storm-config)
-        log-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))]
+        log-root (worker-artifacts-root conf)]
     (setup-default-uncaught-exception-handler)
     (start-log-cleaner! conf log-root)
     (start-logviewer! conf log-root)))

http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 0461cd7..52b2057 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -605,7 +605,7 @@
         (log-message "Finished downloading code for storm id " storm-id " from " master-code-dir))))
 
 (defn write-log-metadata-to-yaml-file! [storm-id port data conf]
-  (let [file (get-log-metadata-file storm-id port)]
+  (let [file (get-log-metadata-file conf storm-id port)]
     ;;run worker as user needs the directory to have special permissions
     ;; or it is insecure
     (when (not (.exists (.getParentFile file)))
@@ -614,7 +614,7 @@
             (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) (.getCanonicalPath (.getParentFile file))))
         (.mkdirs (.getParentFile file))))
     (let [writer (java.io.FileWriter. file)
-        yaml (Yaml.)]
+          yaml (Yaml.)]
       (try
         (.dump yaml data writer)
         (finally
@@ -669,6 +669,15 @@
       (str java-home file-path-separator "bin" file-path-separator "java")
       )))
 
+(defn create-artifacts-link
+  "Create a symlink from workder directory to its port artifacts directory"
+  [conf storm-id port worker-id]
+  (let [worker-dir (worker-root conf worker-id)
+        topo-dir (worker-artifacts-root conf storm-id)]
+    (log-message "Creating symlinks for worker-id: " worker-id " storm-id: "
+                 storm-id " to its port artifacts directory")
+    (if (.exists (File. worker-dir))
+      (create-symlink! worker-dir topo-dir "artifacts" port))))
 
 (defmethod launch-worker
     :distributed [supervisor storm-id port worker-id mem-onheap]
@@ -701,8 +710,9 @@
           gc-opts (substitute-childopts (if top-gc-opts top-gc-opts (conf WORKER-GC-CHILDOPTS)) worker-id storm-id port mem-onheap)
           topo-worker-logwriter-childopts (storm-conf TOPOLOGY-WORKER-LOGWRITER-CHILDOPTS)
           user (storm-conf TOPOLOGY-SUBMITTER-USER)
+          logfilename "worker.log"
+          workers-artifacts (worker-artifacts-root conf)
           logging-sensitivity (storm-conf TOPOLOGY-LOGGING-SENSITIVITY "S3")
-          logfilename (logs-filename storm-id port)
           worker-childopts (when-let [s (conf WORKER-CHILDOPTS)]
                              (substitute-childopts s worker-id storm-id port mem-onheap))
           topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
@@ -715,6 +725,7 @@
                      topo-worker-logwriter-childopts
                      (str "-Dlogfile.name=" logfilename)
                      (str "-Dstorm.home=" storm-home)
+                     (str "-Dworkers.artifacts=" workers-artifacts)
                      (str "-Dstorm.id=" storm-id)
                      (str "-Dworker.id=" worker-id)
                      (str "-Dworker.port=" port)
@@ -729,6 +740,7 @@
                     [(str "-Djava.library.path=" jlp)
                      (str "-Dlogfile.name=" logfilename)
                      (str "-Dstorm.home=" storm-home)
+                     (str "-Dworkers.artifacts=" workers-artifacts)
                      (str "-Dstorm.conf.file=" storm-conf-file)
                      (str "-Dstorm.options=" storm-options)
                      (str "-Dstorm.log.dir=" storm-log-dir)
@@ -748,6 +760,7 @@
       (log-message "Launching worker with command: " (shell-cmd command))
       (write-log-metadata! storm-conf user worker-id storm-id port conf)
       (set-worker-user! conf worker-id user)
+      (create-artifacts-link conf storm-id port worker-id)
       (let [log-prefix (str "Worker Process " worker-id)
             callback (fn [exit-code]
                        (log-message log-prefix " exited with code: " exit-code)

http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index cb5dbbe..d30f496 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -117,8 +117,7 @@
     (url-format "http://%s:%s/log?file=%s"
       host
       (*STORM-CONF* LOGVIEWER-PORT)
-      fname))
-  )
+      fname)))
 
 (defn event-log-link
   [topology-id component-id host port secure?]
@@ -804,21 +803,6 @@
   [sys?]
   (if (or (nil? sys?) (= "false" sys?)) false true))
 
-(defn wrap-json-in-callback [callback response]
-  (str callback "(" response ");"))
-
-(defnk json-response
-  [data callback :serialize-fn to-json :status 200]
-     {:status status
-      :headers (merge {"Cache-Control" "no-cache, no-store"
-                       "Access-Control-Allow-Origin" "*"
-                       "Access-Control-Allow-Headers" "Content-Type, Access-Control-Allow-Headers, Access-Controler-Allow-Origin, X-Requested-By, Authorization, X-Requested-With"}
-                      (if (not-nil? callback) {"Content-Type" "application/javascript;charset=utf-8"}
-                          {"Content-Type" "application/json;charset=utf-8"}))
-      :body (if (not-nil? callback)
-              (wrap-json-in-callback callback (serialize-fn data))
-              (serialize-fn data))})
-
 (def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
 
 (defn populate-context!
@@ -992,14 +976,6 @@
   (route/resources "/")
   (route/not-found "Page not found"))
 
-(defn exception->json
-  [ex]
-  {"error" "Internal Server Error"
-   "errorMessage"
-   (let [sw (java.io.StringWriter.)]
-     (.printStackTrace ex (java.io.PrintWriter. sw))
-     (.toString sw))})
-
 (defn catch-errors
   [handler]
   (fn [request]

http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/clj/backtype/storm/ui/helpers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/helpers.clj b/storm-core/src/clj/backtype/storm/ui/helpers.clj
index cbbff85..9b82aaa 100644
--- a/storm-core/src/clj/backtype/storm/ui/helpers.clj
+++ b/storm-core/src/clj/backtype/storm/ui/helpers.clj
@@ -20,7 +20,7 @@
          [string :only [blank? join]]
          [walk :only [keywordize-keys]]])
   (:use [backtype.storm config log])
-  (:use [backtype.storm.util :only [clojurify-structure uuid defnk url-encode not-nil?]])
+  (:use [backtype.storm.util :only [clojurify-structure uuid defnk to-json url-encode not-nil?]])
   (:use [clj-time coerce format])
   (:import [backtype.storm.generated ExecutorInfo ExecutorSummary])
   (:import [backtype.storm.logging.filters AccessLoggingFilter])
@@ -200,3 +200,27 @@
         configurator (:configurator config)]
     (configurator s)
     (.start s)))
+
+(defn wrap-json-in-callback [callback response]
+  (str callback "(" response ");"))
+
+(defnk json-response
+  [data callback :serialize-fn to-json :status 200 :headers {}]
+  {:status status
+   :headers (merge {"Cache-Control" "no-cache, no-store"
+                    "Access-Control-Allow-Origin" "*"
+                    "Access-Control-Allow-Headers" "Content-Type, Access-Control-Allow-Headers, Access-Controler-Allow-Origin, X-Requested-By, X-Csrf-Token, Authorization, X-Requested-With"}
+              (if (not-nil? callback) {"Content-Type" "application/javascript;charset=utf-8"}
+                {"Content-Type" "application/json;charset=utf-8"})
+              headers)
+   :body (if (not-nil? callback)
+           (wrap-json-in-callback callback (serialize-fn data))
+           (serialize-fn data))})
+
+(defn exception->json
+  [ex]
+  {"error" "Internal Server Error"
+   "errorMessage"
+   (let [sw (java.io.StringWriter.)]
+     (.printStackTrace ex (java.io.PrintWriter. sw))
+     (.toString sw))})

http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj
index 19e52a6..3284558 100644
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@ -26,6 +26,8 @@
   (:import [java.util.zip ZipFile])
   (:import [java.util.concurrent.locks ReentrantReadWriteLock])
   (:import [java.util.concurrent Semaphore])
+  (:import [java.nio.file Files Paths])
+  (:import [java.nio.file.attribute FileAttribute])
   (:import [java.io File FileOutputStream RandomAccessFile StringWriter
             PrintWriter BufferedReader InputStreamReader IOException])
   (:import [java.lang.management ManagementFactory])
@@ -580,6 +582,21 @@
     (when-not success?
       (throw (RuntimeException. (str "Failed to touch " path))))))
 
+(defn create-symlink!
+  "Create symlink is to the target"
+  ([path-dir target-dir file-name]
+    (create-symlink! path-dir target-dir file-name file-name))
+  ([path-dir target-dir from-file-name to-file-name]
+    (let [path (str path-dir file-path-separator from-file-name)
+          target (str target-dir file-path-separator to-file-name)
+          empty-array (make-array String 0)
+          attrs (make-array FileAttribute 0)
+          abs-path (.toAbsolutePath (Paths/get path empty-array))
+          abs-target (.toAbsolutePath (Paths/get target empty-array))]
+      (log-message "Creating symlink [" abs-path "] to [" abs-target "]")
+      (if (not (.exists (.toFile abs-path)))
+        (Files/createSymbolicLink abs-path abs-target attrs)))))
+
 (defn read-dir-contents
   [dir]
   (if (exists-file? dir)
@@ -1019,27 +1036,13 @@
   (.getCanonicalPath 
                 (clojure.java.io/file (System/getProperty "storm.home") "logs")))
 
-(defn- logs-rootname
-  ([storm-id port] (logs-rootname storm-id port "-worker-"))
-  ([storm-id port type] (str storm-id type port)))
-
 (defn logs-filename
-  ([storm-id port] (str (logs-rootname storm-id port) ".log"))
-  ([storm-id port type] (str (logs-rootname storm-id port type) ".log")))
-
-(defn event-logs-filename [storm-id port] (logs-filename storm-id port "-events-"))
-
-(defn logs-metadata-filename [storm-id port]
-  (str (logs-rootname storm-id port) ".yaml"))
-
-(def worker-log-filename-pattern #"^((.*-\d+-\d+)-worker-(\d+))\.log")
+  [storm-id port]
+  (str storm-id file-path-separator port file-path-separator "worker.log"))
 
-(defn get-log-metadata-file
-  ([fname]
-    (if-let [[_ _ id port] (re-matches worker-log-filename-pattern fname)]
-      (get-log-metadata-file id port)))
-  ([id port]
-    (clojure.java.io/file LOG-DIR "metadata" (logs-metadata-filename id port))))
+(defn event-logs-filename
+  [storm-id port]
+  (str storm-id file-path-separator port file-path-separator "events.log"))
 
 (defn clojure-from-yaml-file [yamlFile]
   (try

http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 2ee423e..764497d 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -577,6 +577,18 @@ public class Config extends HashMap<String, Object> {
     public static final String LOGVIEWER_CLEANUP_AGE_MINS = "logviewer.cleanup.age.mins";
 
     /**
+     * The maximum number of bytes all worker log files can take up in MB
+     */
+    @isPositiveNumber
+    public static final String LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB = "logviewer.max.sum.worker.logs.size.mb";
+
+    /**
+     * The maximum number of bytes per worker's files can take up in MB
+     */
+    @isPositiveNumber
+    public static final String LOGVIEWER_MAX_PER_WORKER_LOGS_SIZE_MB = "logviewer.max.per.worker.logs.size.mb";
+
+    /**
      * Storm Logviewer HTTPS port
      */
     @isInteger

http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java b/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
index 3834c55..d7c00e1 100644
--- a/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
+++ b/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
@@ -61,11 +61,11 @@ public class FileBasedEventLogger implements IEventLogger {
 
     @Override
     public void prepare(Map stormConf, TopologyContext context) {
-        String logDir;
+        String logDir; // storm local directory
         String stormId = context.getStormId();
         int port = context.getThisWorkerPort();
-        if((logDir = System.getProperty("storm.log.dir")) == null
-                && (logDir = System.getProperty("java.io.tmpdir")) == null) {
+        if ((logDir = System.getProperty("storm.local.dir")) == null &&
+                (logDir = (String)stormConf.get("storm.local.dir")) == null) {
             String msg = "Could not determine the directory to log events.";
             LOG.error(msg);
             throw new RuntimeException(msg);
@@ -77,7 +77,12 @@ public class FileBasedEventLogger implements IEventLogger {
          * Include the topology name & worker port in the file name so that
          * multiple event loggers can log independently.
          */
-        initLogWriter(Paths.get(logDir, String.format("%s-events-%d.log", stormId, port)));
+        Path path = Paths.get(logDir, "workers-artifacts", stormId, Integer.toString(port), "events.log");
+        if (!path.isAbsolute()) {
+            path = Paths.get(System.getProperty("storm.home"), logDir, "workers-artifacts",
+                    stormId, Integer.toString(port), "events.log");
+        }
+        initLogWriter(path);
         setUpFlushTask();
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index c852306..ebf40b6 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -54,8 +54,10 @@ import java.io.ByteArrayInputStream;
 import java.io.OutputStreamWriter;
 import java.io.InputStreamReader;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.FileOutputStream;
 import java.io.BufferedReader;
+import java.io.RandomAccessFile;
 import java.io.Serializable;
 import java.io.IOException;
 import java.util.Map;
@@ -69,6 +71,8 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
 
 public class Utils {
     private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
@@ -692,5 +696,65 @@ public class Utils {
             }
         }
     }
+
+    /**
+     * Given a File input it will unzip the file in a the unzip directory
+     * passed as the second parameter
+     * @param inFile The zip file as input
+     * @param unzipDir The unzip directory where to unzip the zip file.
+     * @throws IOException
+     */
+    public static void unZip(File inFile, File unzipDir) throws IOException {
+        Enumeration<? extends ZipEntry> entries;
+        ZipFile zipFile = new ZipFile(inFile);
+
+        try {
+            entries = zipFile.entries();
+            while (entries.hasMoreElements()) {
+                ZipEntry entry = entries.nextElement();
+                if (!entry.isDirectory()) {
+                    InputStream in = zipFile.getInputStream(entry);
+                    try {
+                        File file = new File(unzipDir, entry.getName());
+                        if (!file.getParentFile().mkdirs()) {
+                            if (!file.getParentFile().isDirectory()) {
+                                throw new IOException("Mkdirs failed to create " +
+                                        file.getParentFile().toString());
+                            }
+                        }
+                        OutputStream out = new FileOutputStream(file);
+                        try {
+                            byte[] buffer = new byte[8192];
+                            int i;
+                            while ((i = in.read(buffer)) != -1) {
+                                out.write(buffer, 0, i);
+                            }
+                        } finally {
+                            out.close();
+                        }
+                    } finally {
+                        in.close();
+                    }
+                }
+            }
+        } finally {
+            zipFile.close();
+        }
+    }
+
+    //Note: Only works for zip files whose uncompressed size is less than 4 GB
+    //Otherwise returns the size module 2^32, per gzip specifications
+    //Returns a long, since that's what file lengths in Java/Clojure usually are.
+    public static long zipFileSize(File myFile) throws IOException{
+        RandomAccessFile raf = new RandomAccessFile(myFile, "r");
+        raf.seek(raf.length() - 4);
+        long b4 = raf.read();
+        long b3 = raf.read();
+        long b2 = raf.read();
+        long b1 = raf.read();
+        long val = (b1 << 24) | (b2 << 16) + (b3 << 8) + b4;
+        raf.close();
+        return val;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2c2858e4/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj
index fab401a..7acb477 100644
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@ -273,12 +273,13 @@
           mock-sensitivity "S3"
           mock-cp "/base:/stormjar.jar"
           exp-args-fn (fn [opts topo-opts classpath]
-                       (concat [(supervisor/java-cmd) "-cp" classpath 
-                               (str "-Dlogfile.name=" mock-storm-id "-worker-" mock-port ".log")
+                       (concat [(supervisor/java-cmd) "-cp" classpath
+                               (str "-Dlogfile.name=" "worker.log")
                                "-Dstorm.home="
-                                (str "-Dstorm.id=" mock-storm-id)
-                                (str "-Dworker.id=" mock-worker-id)
-                                (str "-Dworker.port=" mock-port)
+                               (str "-Dworkers.artifacts=" "/tmp/workers-artifacts")
+                               (str "-Dstorm.id=" mock-storm-id)
+                               (str "-Dworker.id=" mock-worker-id)
+                               (str "-Dworker.port=" mock-port)
                                "-Dstorm.log.dir=/logs"
                                "-Dlog4j.configurationFile=/log4j2/worker.xml"
                                "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
@@ -287,8 +288,9 @@
                                opts
                                topo-opts
                                ["-Djava.library.path="
-                                (str "-Dlogfile.name=" mock-storm-id "-worker-" mock-port ".log")
+                                (str "-Dlogfile.name=" "worker.log")
                                 "-Dstorm.home="
+                                "-Dworkers.artifacts=/tmp/workers-artifacts"
                                 "-Dstorm.conf.file="
                                 "-Dstorm.options="
                                 (str "-Dstorm.log.dir=" file-path-separator "logs")
@@ -318,6 +320,7 @@
                      launch-process nil
                      set-worker-user! nil
                      supervisor/jlp nil
+                     worker-artifacts-root "/tmp/workers-artifacts"
                      supervisor/write-log-metadata! nil]
             (supervisor/launch-worker mock-supervisor
                                       mock-storm-id
@@ -340,6 +343,7 @@
                      launch-process nil
                      set-worker-user! nil
                      supervisor/jlp nil
+                     worker-artifacts-root "/tmp/workers-artifacts"
                      supervisor/write-log-metadata! nil]
             (supervisor/launch-worker mock-supervisor
                                       mock-storm-id
@@ -356,6 +360,7 @@
           (stubbing [read-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
                      supervisor-stormdist-root nil
                      supervisor/jlp nil
+                     worker-artifacts-root "/tmp/workers-artifacts"
                      set-worker-user! nil
                      supervisor/write-log-metadata! nil
                      launch-process nil
@@ -376,6 +381,7 @@
           (stubbing [read-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}
                      supervisor-stormdist-root nil
                      supervisor/jlp nil
+                     worker-artifacts-root "/tmp/workers-artifacts"
                      launch-process nil
                      set-worker-user! nil
                      supervisor/write-log-metadata! nil
@@ -411,96 +417,99 @@
                       (str storm-local "/workers/" mock-worker-id)
                       worker-script]
           exp-script-fn (fn [opts topo-opts]
-                       (str "#!/bin/bash\n'export' 'LD_LIBRARY_PATH=';\n\nexec 'java'"
-                                " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
-                                " '-Dlogfile.name=" mock-storm-id "-worker-" mock-port ".log'"
-                                " '-Dstorm.home='"
-                                " '-Dstorm.id=" mock-storm-id "'"
-                                " '-Dworker.id=" mock-worker-id "'"
-                                " '-Dworker.port=" mock-port "'"
-                                " '-Dstorm.log.dir=/logs'"
-                                " '-Dlog4j.configurationFile=/log4j2/worker.xml'"
-                                " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
-                                " 'backtype.storm.LogWriter'"
-                                " 'java' '-server'"
-                                " " (shell-cmd opts)
-                                " " (shell-cmd topo-opts)
-                                " '-Djava.library.path='"
-                                " '-Dlogfile.name=" mock-storm-id "-worker-" mock-port ".log'"
-                                " '-Dstorm.home='"
-                                " '-Dstorm.conf.file='"
-                                " '-Dstorm.options='"
-                                " '-Dstorm.log.dir=/logs'"
-                                " '-Dlogging.sensitivity=" mock-sensitivity "'"
-                                " '-Dlog4j.configurationFile=/log4j2/worker.xml'"
-                                " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
-                                " '-Dstorm.id=" mock-storm-id "'"
-                                " '-Dworker.id=" mock-worker-id "'"
-                                " '-Dworker.port=" mock-port "'"
-                                " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
-                                " 'backtype.storm.daemon.worker'"
-                                " '" mock-storm-id "'"
-                                " '" mock-port "'"
-                                " '" mock-worker-id "';"))]
+                          (str "#!/bin/bash\n'export' 'LD_LIBRARY_PATH=';\n\nexec 'java'"
+                               " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
+                               " '-Dlogfile.name=" "worker.log'"
+                               " '-Dstorm.home='"
+                               " '-Dworkers.artifacts=" (str storm-local "/workers-artifacts'")
+                               " '-Dstorm.id=" mock-storm-id "'"
+                               " '-Dworker.id=" mock-worker-id "'"
+                               " '-Dworker.port=" mock-port "'"
+                               " '-Dstorm.log.dir=/logs'"
+                               " '-Dlog4j.configurationFile=/log4j2/worker.xml'"
+                               " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
+                               " 'backtype.storm.LogWriter'"
+                               " 'java' '-server'"
+                               " " (shell-cmd opts)
+                               " " (shell-cmd topo-opts)
+                               " '-Djava.library.path='"
+                               " '-Dlogfile.name=" "worker.log'"
+                               " '-Dstorm.home='"
+                               " '-Dworkers.artifacts=" (str storm-local "/workers-artifacts'")
+                               " '-Dstorm.conf.file='"
+                               " '-Dstorm.options='"
+                               " '-Dstorm.log.dir=/logs'"
+                               " '-Dlogging.sensitivity=" mock-sensitivity "'"
+                               " '-Dlog4j.configurationFile=/log4j2/worker.xml'"
+                               " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
+                               " '-Dstorm.id=" mock-storm-id "'"
+                               " '-Dworker.id=" mock-worker-id "'"
+                               " '-Dworker.port=" mock-port "'"
+                               " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
+                               " 'backtype.storm.daemon.worker'"
+                               " '" mock-storm-id "'"
+                               " '" mock-port "'"
+                               " '" mock-worker-id "';"))]
       (.mkdirs (io/file storm-local "workers" mock-worker-id))
       (try
-      (testing "testing *.worker.childopts as strings with extra spaces"
-        (let [string-opts "-Dfoo=bar  -Xmx1024m"
-              topo-string-opts "-Dkau=aux   -Xmx2048m"
-              exp-script (exp-script-fn ["-Dfoo=bar" "-Xmx1024m"]
-                                    ["-Dkau=aux" "-Xmx2048m"])
-              mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
-                                      STORM-LOCAL-DIR storm-local
-                                      SUPERVISOR-RUN-WORKER-AS-USER true
-                                      WORKER-CHILDOPTS string-opts}}]
-          (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                                 topo-string-opts
-                                                 TOPOLOGY-SUBMITTER-USER "me"}
-                     add-to-classpath mock-cp
-                     supervisor-stormdist-root nil
-                     launch-process nil
-                     set-worker-user! nil
-                     supervisor/java-cmd "java"
-                     supervisor/jlp nil
-                     supervisor/write-log-metadata! nil]
-            (supervisor/launch-worker mock-supervisor
-                                      mock-storm-id
-                                      mock-port
-                                      mock-worker-id
-                                      mock-mem-onheap)
-            (verify-first-call-args-for-indices launch-process
-                                                [0]
-                                                exp-launch))
-          (is (= (slurp worker-script) exp-script))))
-      (testing "testing *.worker.childopts as list of strings, with spaces in values"
-        (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
-              topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
-              exp-script (exp-script-fn list-opts topo-list-opts)
-              mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
-                                      STORM-LOCAL-DIR storm-local
-                                      SUPERVISOR-RUN-WORKER-AS-USER true
-                                      WORKER-CHILDOPTS list-opts}}]
-          (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                                 topo-list-opts
-                                                 TOPOLOGY-SUBMITTER-USER "me"}
-                     add-to-classpath mock-cp
-                     supervisor-stormdist-root nil
-                     launch-process nil
-                     set-worker-user! nil
-                     supervisor/java-cmd "java"
-                     supervisor/jlp nil
-                     supervisor/write-log-metadata! nil]
-            (supervisor/launch-worker mock-supervisor
-                                      mock-storm-id
-                                      mock-port
-                                      mock-worker-id
-                                      mock-mem-onheap)
-            (verify-first-call-args-for-indices launch-process
-                                                [0]
-                                                exp-launch))
-          (is (= (slurp worker-script) exp-script))))
-(finally (rm-r (io/file storm-local)))
-))))
+        (testing "testing *.worker.childopts as strings with extra spaces"
+          (let [string-opts "-Dfoo=bar  -Xmx1024m"
+                topo-string-opts "-Dkau=aux   -Xmx2048m"
+                exp-script (exp-script-fn ["-Dfoo=bar" "-Xmx1024m"]
+                                          ["-Dkau=aux" "-Xmx2048m"])
+                mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
+                                        STORM-LOCAL-DIR storm-local
+                                        SUPERVISOR-RUN-WORKER-AS-USER true
+                                        WORKER-CHILDOPTS string-opts}}]
+            (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+                                                   topo-string-opts
+                                                   TOPOLOGY-SUBMITTER-USER "me"}
+                       add-to-classpath mock-cp
+                       supervisor-stormdist-root nil
+                       launch-process nil
+                       set-worker-user! nil
+                       supervisor/java-cmd "java"
+                       supervisor/jlp nil
+                       supervisor/write-log-metadata! nil]
+                      (supervisor/launch-worker mock-supervisor
+                                                mock-storm-id
+                                                mock-port
+                                                mock-worker-id
+                                                mock-mem-onheap)
+                      (verify-first-call-args-for-indices launch-process
+                                                          [0]
+                                                          exp-launch))
+            (is (= (slurp worker-script) exp-script))))
+        (finally (rm-r (io/file storm-local))))
+      (try
+        (testing "testing *.worker.childopts as list of strings, with spaces in values"
+          (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
+                topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
+                exp-script (exp-script-fn list-opts topo-list-opts)
+                mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
+                                        STORM-LOCAL-DIR storm-local
+                                        SUPERVISOR-RUN-WORKER-AS-USER true
+                                        WORKER-CHILDOPTS list-opts}}]
+            (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+                                                   topo-list-opts
+                                                   TOPOLOGY-SUBMITTER-USER "me"}
+                       add-to-classpath mock-cp
+                       supervisor-stormdist-root nil
+                       launch-process nil
+                       set-worker-user! nil
+                       supervisor/java-cmd "java"
+                       supervisor/jlp nil
+                       supervisor/write-log-metadata! nil]
+                      (supervisor/launch-worker mock-supervisor
+                                                mock-storm-id
+                                                mock-port
+                                                mock-worker-id
+                                                mock-mem-onheap)
+                      (verify-first-call-args-for-indices launch-process
+                                                          [0]
+                                                          exp-launch))
+            (is (= (slurp worker-script) exp-script))))
+        (finally (rm-r (io/file storm-local)))))))
 
 (deftest test-workers-go-bananas
   ;; test that multiple workers are started for a port, and test that


[4/6] storm git commit: Merge branch '901' of https://github.com/zhuoliu/storm into STORM-901

Posted by da...@apache.org.
Merge branch '901' of https://github.com/zhuoliu/storm into STORM-901


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/62c40348
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/62c40348
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/62c40348

Branch: refs/heads/master
Commit: 62c40348ffdac1844067307f6b595633b1addac7
Parents: b6615d5 ce97de3
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Thu Oct 29 21:23:40 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Thu Oct 29 21:23:40 2015 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |   2 +
 log4j2/worker.xml                               |  12 +-
 storm-core/src/clj/backtype/storm/config.clj    |  19 +
 .../src/clj/backtype/storm/daemon/logviewer.clj | 514 ++++++++++++++-----
 .../clj/backtype/storm/daemon/supervisor.clj    |  19 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  29 +-
 .../src/clj/backtype/storm/ui/helpers.clj       |  26 +-
 storm-core/src/clj/backtype/storm/util.clj      |  41 +-
 storm-core/src/jvm/backtype/storm/Config.java   |  12 +
 .../storm/metric/FileBasedEventLogger.java      |  18 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |  69 +++
 .../test/clj/backtype/storm/logviewer_test.clj  | 312 +++++++----
 .../test/clj/backtype/storm/supervisor_test.clj | 210 ++++----
 13 files changed, 880 insertions(+), 403 deletions(-)
----------------------------------------------------------------------