You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/07/24 17:10:00 UTC
[07/19] storm git commit: STORM-1280 port
backtype.storm.daemon.logviewer to java
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-core/test/clj/org/apache/storm/logviewer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/logviewer_test.clj b/storm-core/test/clj/org/apache/storm/logviewer_test.clj
deleted file mode 100644
index ae9b651..0000000
--- a/storm-core/test/clj/org/apache/storm/logviewer_test.clj
+++ /dev/null
@@ -1,824 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.logviewer-test
- (:use [org.apache.storm daemon-config config util])
- (:require [org.apache.storm.daemon [logviewer :as logviewer]])
- (:require [conjure.core])
- (:use [clojure test])
- (:use [conjure core])
- (:use [org.apache.storm.ui helpers])
- (:import [org.apache.storm.daemon DirectoryCleaner]
- [org.apache.storm.utils Time Utils]
- [org.apache.storm.utils.staticmocking UtilsInstaller]
- [org.apache.storm.daemon.supervisor SupervisorUtils]
- [org.apache.storm.testing.staticmocking MockedSupervisorUtils]
- [org.apache.storm.generated LSWorkerHeartbeat])
- (:import [java.nio.file Files Path DirectoryStream])
- (:import [java.nio.file Files])
- (:import [java.nio.file.attribute FileAttribute])
- (:import [java.io File])
- (:import [java.util ArrayList])
- (:import [org.mockito Mockito]))
-
-(defn mk-mock-Path [file]
- (let [mockPath (Mockito/mock java.nio.file.Path)]
- (. (Mockito/when (.toFile mockPath)) thenReturn file)
- mockPath))
-
-(defn mk-DirectoryStream [^ArrayList list-of-paths]
- (reify DirectoryStream
- (close [this])
- (iterator [this]
- (.iterator list-of-paths))))
-
-(defmulti mk-mock-File #(:type %))
-
-(defmethod mk-mock-File :file [{file-name :name mtime :mtime length :length
- :or {file-name "afile"
- mtime 1
- length (* 10 (* 1024 (* 1024 1024))) }}] ; Length 10 GB
- (let [mockFile (Mockito/mock java.io.File)]
- (. (Mockito/when (.getName mockFile)) thenReturn file-name)
- (. (Mockito/when (.lastModified mockFile)) thenReturn mtime)
- (. (Mockito/when (.isFile mockFile)) thenReturn true)
- (. (Mockito/when (.getCanonicalPath mockFile))
- thenReturn (str "/mock/canonical/path/to/" file-name))
- (. (Mockito/when (.length mockFile)) thenReturn length)
- mockFile))
-
-(defmethod mk-mock-File :directory [{dir-name :name mtime :mtime files :files
- :or {dir-name "adir" mtime 1 files []}}]
- (let [mockDir (Mockito/mock java.io.File)]
- (. (Mockito/when (.getName mockDir)) thenReturn dir-name)
- (. (Mockito/when (.lastModified mockDir)) thenReturn mtime)
- (. (Mockito/when (.isFile mockDir)) thenReturn false)
- (. (Mockito/when (.listFiles mockDir)) thenReturn (into-array File files))
- (. (Mockito/when (.getCanonicalPath mockDir)) thenReturn dir-name)
- mockDir))
-
-(deftest test-get-size-for-logdir
- (testing "get the file sizes of a worker log directory"
- (stubbing [logviewer/get-stream-for-dir (fn [x] (map #(mk-mock-Path %) (.listFiles x)))]
- (let [now-millis (Time/currentTimeMillis)
- files1 (into-array File (map #(mk-mock-File {:name (str %)
- :type :file
- :mtime (- now-millis (* 100 %))
- :length 200})
- (range 1 6))) ;; 5 files
- port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1"
- :type :directory
- :files files1})]
- (is (= (logviewer/get-size-for-logdir port1-dir) 1000))))))
-
-(deftest test-mk-FileFilter-for-log-cleanup
- (testing "log file filter selects the correct worker-log dirs for purge"
- (stubbing [logviewer/get-stream-for-dir (fn [x] (map #(mk-mock-Path %) (.listFiles x)))]
- (let [now-millis (Time/currentTimeMillis)
- conf {LOGVIEWER-CLEANUP-AGE-MINS 60
- LOGVIEWER-CLEANUP-INTERVAL-SECS 300}
- cutoff-millis (logviewer/cleanup-cutoff-age-millis conf now-millis)
- old-mtime-millis (- cutoff-millis 500)
- new-mtime-millis (+ cutoff-millis 500)
- matching-files (map #(mk-mock-File %)
- [{:name "3031"
- :type :directory
- :mtime old-mtime-millis}
- {:name "3032"
- :type :directory
- :mtime old-mtime-millis}
- {:name "7077"
- :type :directory
- :mtime old-mtime-millis}])
- excluded-files (map #(mk-mock-File %)
- [{:name "oldlog-1-2-worker-.log"
- :type :file
- :mtime old-mtime-millis}
- {:name "newlog-1-2-worker.log"
- :type :file
- :mtime new-mtime-millis}
- {:name "some-old-file.txt"
- :type :file
- :mtime old-mtime-millis}
- {:name "olddir-1-2-worker.log"
- :type :directory
- :mtime new-mtime-millis}
- {:name "metadata"
- :type :directory
- :mtime new-mtime-millis}
- {:name "newdir"
- :type :directory
- :mtime new-mtime-millis}
- ])
- file-filter (logviewer/mk-FileFilter-for-log-cleanup conf now-millis)]
- (is (every? #(.accept file-filter %) matching-files))
- (is (not-any? #(.accept file-filter %) excluded-files))
- ))))
-
-(deftest test-per-workerdir-cleanup!
- (testing "cleaner deletes oldest files in each worker dir if files are larger than per-dir quota."
- (with-open [_ (UtilsInstaller. (proxy [Utils] []
- (forceDeleteImpl [path])))]
- (let [cleaner (proxy [org.apache.storm.daemon.DirectoryCleaner] []
- (getStreamForDirectory
- ([^File dir]
- (mk-DirectoryStream
- (ArrayList.
- (map #(mk-mock-Path %) (.listFiles dir)))))))
- now-millis (Time/currentTimeMillis)
- files1 (into-array File (map #(mk-mock-File {:name (str "A" %)
- :type :file
- :mtime (+ now-millis (* 100 %))
- :length 200 })
- (range 0 10)))
- files2 (into-array File (map #(mk-mock-File {:name (str "B" %)
- :type :file
- :mtime (+ now-millis (* 100 %))
- :length 200 })
- (range 0 10)))
- files3 (into-array File (map #(mk-mock-File {:name (str "C" %)
- :type :file
- :mtime (+ now-millis (* 100 %))
- :length 200 })
- (range 0 10)))
- port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1"
- :type :directory
- :files files1})
- port2-dir (mk-mock-File {:name "/workers-artifacts/topo1/port2"
- :type :directory
- :files files2})
- port3-dir (mk-mock-File {:name "/workers-artifacts/topo2/port3"
- :type :directory
- :files files3})
- topo1-files (into-array File [port1-dir port2-dir])
- topo2-files (into-array File [port3-dir])
- topo1-dir (mk-mock-File {:name "/workers-artifacts/topo1"
- :type :directory
- :files topo1-files})
- topo2-dir (mk-mock-File {:name "/workers-artifacts/topo2"
- :type :directory
- :files topo2-files})
- root-files (into-array File [topo1-dir topo2-dir])
- root-dir (mk-mock-File {:name "/workers-artifacts"
- :type :directory
- :files root-files})
- deletedFiles (logviewer/per-workerdir-cleanup! root-dir 1200 cleaner)]
- (is (= (first deletedFiles) 4))
- (is (= (second deletedFiles) 4))
- (is (= (last deletedFiles) 4))))))
-
-(deftest test-global-log-cleanup!
- (testing "cleaner deletes oldest when files' sizes are larger than the global quota."
- (stubbing [logviewer/get-alive-worker-dirs ["/workers-artifacts/topo1/port1"]]
- (with-open [_ (UtilsInstaller. (proxy [Utils] []
- (forceDeleteImpl [path])))]
- (let [cleaner (proxy [org.apache.storm.daemon.DirectoryCleaner] []
- (getStreamForDirectory
- ([^File dir]
- (mk-DirectoryStream
- (ArrayList.
- (map #(mk-mock-Path %) (.listFiles dir)))))))
- now-millis (Time/currentTimeMillis)
- files1 (into-array File (map #(mk-mock-File {:name (str "A" % ".log")
- :type :file
- :mtime (+ now-millis (* 100 %))
- :length 200 })
- (range 0 10)))
- files2 (into-array File (map #(mk-mock-File {:name (str "B" %)
- :type :file
- :mtime (+ now-millis (* 100 %))
- :length 200 })
- (range 0 10)))
- files3 (into-array File (map #(mk-mock-File {:name (str "C" %)
- :type :file
- :mtime (+ now-millis (* 100 %))
- :length 200 })
- (range 0 10)))
- port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1"
- :type :directory
- :files files1}) ;; note that port1-dir is active worker containing active logs
- port2-dir (mk-mock-File {:name "/workers-artifacts/topo1/port2"
- :type :directory
- :files files2})
- port3-dir (mk-mock-File {:name "/workers-artifacts/topo2/port3"
- :type :directory
- :files files3})
- topo1-files (into-array File [port1-dir port2-dir])
- topo2-files (into-array File [port3-dir])
- topo1-dir (mk-mock-File {:name "/workers-artifacts/topo1"
- :type :directory
- :files topo1-files})
- topo2-dir (mk-mock-File {:name "/workers-artifacts/topo2"
- :type :directory
- :files topo2-files})
- root-files (into-array File [topo1-dir topo2-dir])
- root-dir (mk-mock-File {:name "/workers-artifacts"
- :type :directory
- :files root-files})
- deletedFiles (logviewer/global-log-cleanup! root-dir 2400 cleaner)]
- (is (= deletedFiles 18)))))))
-
-(deftest test-identify-worker-log-dirs
- (testing "Build up workerid-workerlogdir map for the old workers' dirs"
- (let [port1-dir (mk-mock-File {:name "/workers-artifacts/topo1/port1"
- :type :directory})
- mock-metaFile (mk-mock-File {:name "worker.yaml"
- :type :file})
- exp-id "id12345"
- expected {exp-id port1-dir}
- supervisor-util (Mockito/mock SupervisorUtils)]
- (with-open [_ (MockedSupervisorUtils. supervisor-util)]
- (stubbing [logviewer/get-metadata-file-for-wroker-logdir mock-metaFile
- logviewer/get-worker-id-from-metadata-file exp-id]
- (. (Mockito/when (.readWorkerHeartbeatsImpl supervisor-util (Mockito/any))) (thenReturn nil))
- (is (= expected (logviewer/identify-worker-log-dirs [port1-dir]))))))))
-
-(deftest test-get-dead-worker-dirs
- (testing "return directories for workers that are not alive"
- (let [conf {SUPERVISOR-WORKER-TIMEOUT-SECS 5}
- hb (let [lwb (LSWorkerHeartbeat.)]
- (.set_time_secs lwb (int 1)) lwb)
- id->hb {"42" hb}
- now-secs 2
- unexpected-dir1 (mk-mock-File {:name "dir1" :type :directory})
- expected-dir2 (mk-mock-File {:name "dir2" :type :directory})
- expected-dir3 (mk-mock-File {:name "dir3" :type :directory})
- log-dirs #{unexpected-dir1 expected-dir2 expected-dir3}
- supervisor-util (Mockito/mock SupervisorUtils)]
- (with-open [_ (MockedSupervisorUtils. supervisor-util)]
- (stubbing [logviewer/identify-worker-log-dirs {"42" unexpected-dir1,
- "007" expected-dir2,
- "" expected-dir3}] ;; this tests a directory with no yaml file thus no worker id
- (. (Mockito/when (.readWorkerHeartbeatsImpl supervisor-util (Mockito/any))) (thenReturn id->hb))
- (is (= #{expected-dir2 expected-dir3}
- (logviewer/get-dead-worker-dirs conf now-secs log-dirs))))))))
-
-(deftest test-cleanup-fn
- (testing "cleanup function forceDeletes files of dead workers"
- (let [mockfile1 (mk-mock-File {:name "delete-me1" :type :file})
- mockfile2 (mk-mock-File {:name "delete-me2" :type :file})
- forceDelete-args (atom [])
- utils-proxy (proxy [Utils] []
- (forceDeleteImpl [path]
- (swap! forceDelete-args conj path)))]
- (with-open [_ (UtilsInstaller. utils-proxy)]
- (stubbing [logviewer/select-dirs-for-cleanup nil
- logviewer/get-dead-worker-dirs (sorted-set mockfile1 mockfile2)
- logviewer/cleanup-empty-topodir! nil]
- (logviewer/cleanup-fn! "/bogus/path")
- (is (= 2 (count @forceDelete-args)))
- (is (= (.getCanonicalPath mockfile1) (get @forceDelete-args 0)))
- (is (= (.getCanonicalPath mockfile2) (get @forceDelete-args 1))))))))
-
-(deftest test-authorized-log-user
- (testing "allow cluster admin"
- (let [conf {NIMBUS-ADMINS ["alice"]}]
- (stubbing [logviewer/get-log-user-group-whitelist [[] []]
- logviewer/user-groups []]
- (is (logviewer/authorized-log-user? "alice" "non-blank-fname" conf))
- (verify-first-call-args-for logviewer/get-log-user-group-whitelist "non-blank-fname")
- (verify-first-call-args-for logviewer/user-groups "alice"))))
-
- (testing "ignore any cluster-set topology.users topology.groups"
- (let [conf {TOPOLOGY-USERS ["alice"]
- TOPOLOGY-GROUPS ["alice-group"]}]
- (stubbing [logviewer/get-log-user-group-whitelist [[] []]
- logviewer/user-groups ["alice-group"]]
- (is (not (logviewer/authorized-log-user? "alice" "non-blank-fname" conf)))
- (verify-first-call-args-for logviewer/get-log-user-group-whitelist "non-blank-fname")
- (verify-first-call-args-for logviewer/user-groups "alice"))))
-
- (testing "allow cluster logs user"
- (let [conf {LOGS-USERS ["alice"]}]
- (stubbing [logviewer/get-log-user-group-whitelist [[] []]
- logviewer/user-groups []]
- (is (logviewer/authorized-log-user? "alice" "non-blank-fname" conf))
- (verify-first-call-args-for logviewer/get-log-user-group-whitelist "non-blank-fname")
- (verify-first-call-args-for logviewer/user-groups "alice"))))
-
- (testing "allow whitelisted topology user"
- (stubbing [logviewer/get-log-user-group-whitelist [["alice"] []]
- logviewer/user-groups []]
- (is (logviewer/authorized-log-user? "alice" "non-blank-fname" {}))
- (verify-first-call-args-for logviewer/get-log-user-group-whitelist "non-blank-fname")
- (verify-first-call-args-for logviewer/user-groups "alice")))
-
- (testing "allow whitelisted topology group"
- (stubbing [logviewer/get-log-user-group-whitelist [[] ["alice-group"]]
- logviewer/user-groups ["alice-group"]]
- (is (logviewer/authorized-log-user? "alice" "non-blank-fname" {}))
- (verify-first-call-args-for logviewer/get-log-user-group-whitelist "non-blank-fname")
- (verify-first-call-args-for logviewer/user-groups "alice")))
-
- (testing "disallow user not in nimbus admin, topo user, logs user, or whitelist"
- (stubbing [logviewer/get-log-user-group-whitelist [[] []]
- logviewer/user-groups []]
- (is (not (logviewer/authorized-log-user? "alice" "non-blank-fname" {})))
- (verify-first-call-args-for logviewer/get-log-user-group-whitelist "non-blank-fname")
- (verify-first-call-args-for logviewer/user-groups "alice"))))
-
-(deftest test-list-log-files
- (testing "list-log-files filter selects the correct log files to return"
- (let [attrs (make-array FileAttribute 0)
- root-path (.getCanonicalPath (.toFile (Files/createTempDirectory "workers-artifacts" attrs)))
- file1 (clojure.java.io/file root-path "topoA" "port1" "worker.log")
- file2 (clojure.java.io/file root-path "topoA" "port2" "worker.log")
- file3 (clojure.java.io/file root-path "topoB" "port1" "worker.log")
- _ (clojure.java.io/make-parents file1)
- _ (clojure.java.io/make-parents file2)
- _ (clojure.java.io/make-parents file3)
- _ (.createNewFile file1)
- _ (.createNewFile file2)
- _ (.createNewFile file3)
- origin "www.origin.server.net"
- expected-all (json-response '("topoA/port1/worker.log" "topoA/port2/worker.log"
- "topoB/port1/worker.log")
- nil
- :headers {"Access-Control-Allow-Origin" origin
- "Access-Control-Allow-Credentials" "true"})
- expected-filter-port (json-response '("topoA/port1/worker.log" "topoB/port1/worker.log")
- nil
- :headers {"Access-Control-Allow-Origin" origin
- "Access-Control-Allow-Credentials" "true"})
- expected-filter-topoId (json-response '("topoB/port1/worker.log")
- nil
- :headers {"Access-Control-Allow-Origin" origin
- "Access-Control-Allow-Credentials" "true"})
- returned-all (logviewer/list-log-files "user" nil nil root-path nil origin)
- returned-filter-port (logviewer/list-log-files "user" nil "port1" root-path nil origin)
- returned-filter-topoId (logviewer/list-log-files "user" "topoB" nil root-path nil origin)]
- (Utils/forceDelete root-path)
- (is (= expected-all returned-all))
- (is (= expected-filter-port returned-filter-port))
- (is (= expected-filter-topoId returned-filter-topoId)))))
-
-(deftest test-search-via-rest-api
- (testing "Throws if bogus file is given"
- (thrown-cause? java.lang.RuntimeException
- (logviewer/substring-search nil "a string")))
-
- (let [pattern "needle"
- expected-host "dev.null.invalid"
- expected-port 8888
- ;; When we click a link to the logviewer, we expect the match line to
- ;; be somewhere near the middle of the page. So we subtract half of
- ;; the default page length from the offset at which we found the
- ;; match.
- exp-offset-fn #(- (/ logviewer/default-bytes-per-page 2) %)]
-
- (stubbing [logviewer/logviewer-port expected-port]
- (with-open [_ (UtilsInstaller. (proxy [Utils] []
- (hostnameImpl [] expected-host)))]
- (testing "Logviewer link centers the match in the page"
- (let [expected-fname "foobar.log"]
- (is (= (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file="
- expected-fname
- "&start=1947&length="
- logviewer/default-bytes-per-page)
- (logviewer/url-to-match-centered-in-log-page (byte-array 42)
- expected-fname
- 27526
- 8888)))))
-
- (testing "Logviewer link centers the match in the page (daemon)"
- (let [expected-fname "foobar.log"]
- (is (= (str "http://"
- expected-host
- ":"
- expected-port
- "/daemonlog?file="
- expected-fname
- "&start=1947&length="
- logviewer/default-bytes-per-page)
- (logviewer/url-to-match-centered-in-log-page-daemon-file (byte-array 42)
- expected-fname
- 27526
- 8888)))))
-
- (let [file (->> "logviewer-search-context-tests.log.test"
- (clojure.java.io/file "src" "dev"))]
- (testing "returns correct before/after context"
- (is (= {"isDaemon" "no"
- "searchString" pattern
- "startByteOffset" 0
- "matches" [{"byteOffset" 0
- "beforeString" ""
- "afterString" " needle000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000needle "
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}
- {"byteOffset" 7
- "beforeString" "needle "
- "afterString" "000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000needle needle\n"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}
- {"byteOffset" 127
- "beforeString" "needle needle000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"
- "afterString" " needle\n"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}
- {"byteOffset" 134
- "beforeString" " needle000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000needle "
- "afterString" "\n"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}
- ]}
- (logviewer/substring-search file pattern)))))
-
- (let [file (clojure.java.io/file "src" "dev" "small-worker.log.test")]
- (testing "a really small log file"
- (is (= {"isDaemon" "no"
- "searchString" pattern
- "startByteOffset" 0
- "matches" [{"byteOffset" 7
- "beforeString" "000000 "
- "afterString" " 000000\n"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}]}
- (logviewer/substring-search file pattern)))))
-
- (let [file (clojure.java.io/file "src" "dev" "small-worker.log.test")]
- (testing "a really small log file (daemon)"
- (is (= {"isDaemon" "yes"
- "searchString" pattern
- "startByteOffset" 0
- "matches" [{"byteOffset" 7
- "beforeString" "000000 "
- "afterString" " 000000\n"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/daemonlog?file="
- (.getName file)
- "&start=0&length=51200")}]}
- (logviewer/substring-search file pattern :is-daemon true)))))
-
- (let [file (clojure.java.io/file "src" "dev" "test-3072.log.test")]
- (testing "no offset returned when file ends on buffer offset"
- (let [expected
- {"isDaemon" "no"
- "searchString" pattern
- "startByteOffset" 0
- "matches" [{"byteOffset" 3066
- "beforeString" (->>
- (repeat 128 '.)
- clojure.string/join)
- "afterString" ""
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}]}]
- (is (= expected
- (logviewer/substring-search file pattern)))
- (is (= expected
- (logviewer/substring-search file pattern :num-matches 1))))))
-
- (let [file (clojure.java.io/file "src" "dev" "test-worker.log.test")]
-
- (testing "next byte offsets are correct for each match"
- (doseq [[num-matches-sought
- num-matches-found
- expected-next-byte-offset] [[1 1 11]
- [2 2 2042]
- [3 3 2052]
- [4 4 3078]
- [5 5 3196]
- [6 6 3202]
- [7 7 6252]
- [8 8 6321]
- [9 9 6397]
- [10 10 6476]
- [11 11 6554]
- [12 12 nil]
- [13 12 nil]]]
- (let [result
- (logviewer/substring-search file
- pattern
- :num-matches num-matches-sought)]
- (is (= expected-next-byte-offset
- (get result "nextByteOffset")))
- (is (= num-matches-found (count (get result "matches")))))))
-
- (is
- (= {"isDaemon" "no"
- "nextByteOffset" 6252
- "searchString" pattern
- "startByteOffset" 0
- "matches" [
- {"byteOffset" 5
- "beforeString" "Test "
- "afterString" " is near the beginning of the file.\nThis file assumes a buffer size of 2048 bytes, a max search string size of 1024 bytes, and a"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}
- {"byteOffset" 2036
- "beforeString" "ng 146\npadding 147\npadding 148\npadding 149\npadding 150\npadding 151\npadding 152\npadding 153\nNear the end of a 1024 byte block, a "
- "afterString" ".\nA needle that straddles a 1024 byte boundary should also be detected.\n\npadding 157\npadding 158\npadding 159\npadding 160\npadding"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}
- {"byteOffset" 2046
- "beforeString" "ding 147\npadding 148\npadding 149\npadding 150\npadding 151\npadding 152\npadding 153\nNear the end of a 1024 byte block, a needle.\nA "
- "afterString" " that straddles a 1024 byte boundary should also be detected.\n\npadding 157\npadding 158\npadding 159\npadding 160\npadding 161\npaddi"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}
- {"byteOffset" 3072
- "beforeString" "adding 226\npadding 227\npadding 228\npadding 229\npadding 230\npadding 231\npadding 232\npadding 233\npadding 234\npadding 235\n\n\nHere a "
- "afterString" " occurs just after a 1024 byte boundary. It should have the correct context.\n\nText with two adjoining matches: needleneedle\n\npa"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}
- {"byteOffset" 3190
- "beforeString" "\n\n\nHere a needle occurs just after a 1024 byte boundary. It should have the correct context.\n\nText with two adjoining matches: "
- "afterString" "needle\n\npadding 243\npadding 244\npadding 245\npadding 246\npadding 247\npadding 248\npadding 249\npadding 250\npadding 251\npadding 252\n"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}
- {"byteOffset" 3196
- "beforeString" "e a needle occurs just after a 1024 byte boundary. It should have the correct context.\n\nText with two adjoining matches: needle"
- "afterString" "\n\npadding 243\npadding 244\npadding 245\npadding 246\npadding 247\npadding 248\npadding 249\npadding 250\npadding 251\npadding 252\npaddin"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}
- {"byteOffset" 6246
- "beforeString" "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n\nHere are four non-ascii 1-byte UTF-8 characters: αβγδε\n\n"
- "afterString" "\n\nHere are four printable 2-byte UTF-8 characters: ¡¢£¤¥\n\nneedle\n\n\n\nHere are four printable 3-byte UTF-8 characters: ऄअ"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}
- ]}
- (logviewer/substring-search file pattern :num-matches 7)))
-
- (testing "Correct match offset is returned when skipping bytes"
- (let [start-byte-offset 3197]
- (is (= {"isDaemon" "no"
- "nextByteOffset" 6252
- "searchString" pattern
- "startByteOffset" start-byte-offset
- "matches" [{"byteOffset" 6246
- "beforeString" "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n\nHere are four non-ascii 1-byte UTF-8 characters: αβγδε\n\n"
- "afterString" "\n\nHere are four printable 2-byte UTF-8 characters: ¡¢£¤¥\n\nneedle\n\n\n\nHere are four printable 3-byte UTF-8 characters: ऄअ"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}]}
- (logviewer/substring-search file
- pattern
- :num-matches 1
- :start-byte-offset start-byte-offset)))))
-
- (let [pattern (clojure.string/join (repeat 1024 'X))]
- (is
- (= {"isDaemon" "no"
- "nextByteOffset" 6183
- "searchString" pattern
- "startByteOffset" 0
- "matches" [
- {"byteOffset" 4075
- "beforeString" "\n\nThe following match of 1024 bytes completely fills half the byte buffer. It is a search substring of the maximum size......\n\n"
- "afterString" "\nThe following max-size match straddles a 1024 byte buffer.\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}
- {"byteOffset" 5159
- "beforeString" "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\nThe following max-size match straddles a 1024 byte buffer.\n"
- "afterString" "\n\nHere are four non-ascii 1-byte UTF-8 characters: αβγδε\n\nneedle\n\nHere are four printable 2-byte UTF-8 characters: ¡¢£¤"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}
- ]}
- (logviewer/substring-search file pattern :num-matches 2))))
-
- (let [pattern "𐄀𐄁𐄂"]
- (is
- (= {"isDaemon" "no"
- "nextByteOffset" 7176
- "searchString" pattern
- "startByteOffset" 0
- "matches" [
- {"byteOffset" 7164
- "beforeString" "padding 372\npadding 373\npadding 374\npadding 375\n\nThe following tests multibyte UTF-8 Characters straddling the byte boundary: "
- "afterString" "\n\nneedle"
- "matchString" pattern
- "logviewerURL" (str "http://"
- expected-host
- ":"
- expected-port
- "/log?file=src%2Fdev%2F"
- (.getName file)
- "&start=0&length=51200")}
- ]}
- (logviewer/substring-search file pattern :num-matches 1))))
-
- (testing "Returns 0 matches for unseen pattern"
- (let [pattern "Not There"]
- (is (= {"isDaemon" "no"
- "searchString" pattern
- "startByteOffset" 0
- "matches" []}
- (logviewer/substring-search file
- pattern
- :num-matches nil
- :start-byte-offset nil))))))))))
-
-(deftest test-find-n-matches
- (testing "find-n-matches looks through logs properly"
- (let [files [(clojure.java.io/file "src" "dev" "logviewer-search-context-tests.log.test")
- (clojure.java.io/file "src" "dev" "logviewer-search-context-tests.log.gz")]
- matches1 ((logviewer/find-n-matches files 20 0 0 "needle") "matches")
- matches2 ((logviewer/find-n-matches files 20 0 126 "needle") "matches")
- matches3 ((logviewer/find-n-matches files 20 1 0 "needle") "matches")]
-
- (is (= 2 (count matches1)))
- (is (= 4 (count ((first matches1) "matches"))))
- (is (= 4 (count ((second matches1) "matches"))))
- (is (= ((first matches1) "fileName") "src/dev/logviewer-search-context-tests.log.test"))
- (is (= ((second matches1) "fileName") "src/dev/logviewer-search-context-tests.log.gz"))
-
- (is (= 2 (count ((first matches2) "matches"))))
- (is (= 4 (count ((second matches2) "matches"))))
-
- (is (= 1 (count matches3)))
- (is (= 4 (count ((first matches3) "matches")))))))
-
-(deftest test-deep-search-logs-for-topology
- (let [files [(clojure.java.io/file "src" "dev" "logviewer-search-context-tests.log.test")
- (clojure.java.io/file "src" "dev" "logviewer-search-context-tests.log.gz")]
- attrs (make-array FileAttribute 0)
- topo-path (.getCanonicalPath (.toFile (Files/createTempDirectory "topoA" attrs)))
- _ (.createNewFile (clojure.java.io/file topo-path "6400"))
- _ (.createNewFile (clojure.java.io/file topo-path "6500"))
- _ (.createNewFile (clojure.java.io/file topo-path "6600"))
- _ (.createNewFile (clojure.java.io/file topo-path "6700"))]
- (stubbing
- [logviewer/logs-for-port files
- logviewer/find-n-matches nil]
- (testing "deep-search-logs-for-topology all-ports search-archived = true"
- (instrumenting
- [logviewer/find-n-matches
- logviewer/logs-for-port]
- (logviewer/deep-search-logs-for-topology "" nil topo-path "search" "20" "*" "20" "199" true nil nil)
- (verify-call-times-for logviewer/find-n-matches 4)
- (verify-call-times-for logviewer/logs-for-port 4)
- ; File offset and byte offset should always be zero when searching multiple workers (multiple ports).
- (verify-nth-call-args-for 1 logviewer/find-n-matches files 20 0 0 "search")
- (verify-nth-call-args-for 2 logviewer/find-n-matches files 20 0 0 "search")
- (verify-nth-call-args-for 3 logviewer/find-n-matches files 20 0 0 "search")
- (verify-nth-call-args-for 4 logviewer/find-n-matches files 20 0 0 "search")))
- (testing "deep-search-logs-for-topology all-ports search-archived = false"
- (instrumenting
- [logviewer/find-n-matches
- logviewer/logs-for-port]
- (logviewer/deep-search-logs-for-topology "" nil topo-path "search" "20" nil "20" "199" nil nil nil)
- (verify-call-times-for logviewer/find-n-matches 4)
- (verify-call-times-for logviewer/logs-for-port 4)
- ; File offset and byte offset should always be zero when searching multiple workers (multiple ports).
- (verify-nth-call-args-for 1 logviewer/find-n-matches [(first files)] 20 0 0 "search")
- (verify-nth-call-args-for 2 logviewer/find-n-matches [(first files)] 20 0 0 "search")
- (verify-nth-call-args-for 3 logviewer/find-n-matches [(first files)] 20 0 0 "search")
- (verify-nth-call-args-for 4 logviewer/find-n-matches [(first files)] 20 0 0 "search")))
- (testing "deep-search-logs-for-topology one-port search-archived = true, no file-offset"
- (instrumenting
- [logviewer/find-n-matches
- logviewer/logs-for-port]
- (logviewer/deep-search-logs-for-topology "" nil topo-path "search" "20" "6700" "0" "0" true nil nil)
- (verify-call-times-for logviewer/find-n-matches 1)
- (verify-call-times-for logviewer/logs-for-port 2)
- (verify-nth-call-args-for 1 logviewer/find-n-matches files 20 0 0 "search")))
- (testing "deep-search-logs-for-topology one-port search-archived = true, file-offset = 1"
- (instrumenting
- [logviewer/find-n-matches
- logviewer/logs-for-port]
- (logviewer/deep-search-logs-for-topology "" nil topo-path "search" "20" "6700" "1" "0" true nil nil)
- (verify-call-times-for logviewer/find-n-matches 1)
- (verify-call-times-for logviewer/logs-for-port 2)
- (verify-nth-call-args-for 1 logviewer/find-n-matches files 20 1 0 "search")))
- (testing "deep-search-logs-for-topology one-port search-archived = false, file-offset = 1"
- (instrumenting
- [logviewer/find-n-matches
- logviewer/logs-for-port]
- (logviewer/deep-search-logs-for-topology "" nil topo-path "search" "20" "6700" "1" "0" nil nil nil)
- (verify-call-times-for logviewer/find-n-matches 1)
- (verify-call-times-for logviewer/logs-for-port 2)
- ; File offset should be zero, since search-archived is false.
- (verify-nth-call-args-for 1 logviewer/find-n-matches [(first files)] 20 0 0 "search")))
- (testing "deep-search-logs-for-topology one-port search-archived = true, file-offset = 1, byte-offset = 100"
- (instrumenting
- [logviewer/find-n-matches
- logviewer/logs-for-port]
- (logviewer/deep-search-logs-for-topology "" nil topo-path "search" "20" "6700" "1" "100" true nil nil)
- (verify-call-times-for logviewer/find-n-matches 1)
- (verify-call-times-for logviewer/logs-for-port 2)
- ; File offset should be zero, since search-archived is false.
- (verify-nth-call-args-for 1 logviewer/find-n-matches files 20 1 100 "search")))
- (testing "deep-search-logs-for-topology bad-port search-archived = false, file-offset = 1"
- (instrumenting
- [logviewer/find-n-matches
- logviewer/logs-for-port]
- (logviewer/deep-search-logs-for-topology "" nil topo-path "search" "20" "2700" "1" "0" nil nil nil)
- ; Called with a bad port (not in the config) No searching should be done.
- (verify-call-times-for logviewer/find-n-matches 0)
- (verify-call-times-for logviewer/logs-for-port 0)))
- (Utils/forceDelete topo-path))))
-
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerConstant.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerConstant.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerConstant.java
new file mode 100644
index 0000000..612e0e2
--- /dev/null
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerConstant.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer;
+
+public class LogviewerConstant {
+ public static final int DEFAULT_BYTES_PER_PAGE = 51200;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
new file mode 100644
index 0000000..2b26702
--- /dev/null
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer;
+
+import com.codahale.metrics.Meter;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner;
+import org.apache.storm.daemon.logviewer.utils.LogCleaner;
+import org.apache.storm.daemon.logviewer.webapp.LogviewerApplication;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.ui.FilterConfiguration;
+import org.apache.storm.ui.UIHelpers;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.resource.Resource;
+import org.glassfish.jersey.servlet.ServletContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.DaemonConfig.UI_HEADER_BUFFER_BYTES;
+
+public class LogviewerServer implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(LogviewerServer.class);
+ private static final Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
+ public static final String STATIC_RESOURCE_DIRECTORY_PATH = "./public";
+
+ private static Server mkHttpServer(Map<String, Object> conf) {
+ Integer logviewerHttpPort = (Integer) conf.get(DaemonConfig.LOGVIEWER_PORT);
+ Server ret = null;
+ if (logviewerHttpPort != null && logviewerHttpPort >= 0) {
+ LOG.info("Starting Logviewer HTTP servers...");
+ Integer headerBufferSize = ObjectReader.getInt(conf.get(UI_HEADER_BUFFER_BYTES));
+ String filterClass = (String) (conf.get(DaemonConfig.UI_FILTER));
+ @SuppressWarnings("unchecked")
+ Map<String, String> filterParams = (Map<String, String>) (conf.get(DaemonConfig.UI_FILTER_PARAMS));
+ FilterConfiguration filterConfiguration = new FilterConfiguration(filterClass, filterParams);
+ final List<FilterConfiguration> filterConfigurations = Arrays.asList(filterConfiguration);
+
+ final Integer httpsPort = ObjectReader.getInt(conf.get(DaemonConfig.LOGVIEWER_HTTPS_PORT), 0);
+ final String httpsKsPath = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEYSTORE_PATH));
+ final String httpsKsPassword = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEYSTORE_PASSWORD));
+ final String httpsKsType = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEYSTORE_TYPE));
+ final String httpsKeyPassword = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_KEY_PASSWORD));
+ final String httpsTsPath = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_TRUSTSTORE_PATH));
+ final String httpsTsPassword = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_TRUSTSTORE_PASSWORD));
+ final String httpsTsType = (String) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_TRUSTSTORE_TYPE));
+ final Boolean httpsWantClientAuth = (Boolean) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_WANT_CLIENT_AUTH));
+ final Boolean httpsNeedClientAuth = (Boolean) (conf.get(DaemonConfig.LOGVIEWER_HTTPS_NEED_CLIENT_AUTH));
+
+ //TODO a better way to do this would be great.
+ LogviewerApplication.setup(conf);
+ ret = UIHelpers.jettyCreateServer(logviewerHttpPort, null, httpsPort);
+
+ UIHelpers.configSsl(ret, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType, httpsKeyPassword,
+ httpsTsPath, httpsTsPassword, httpsTsType, httpsNeedClientAuth, httpsWantClientAuth);
+
+ ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
+ try {
+ context.setBaseResource(Resource.newResource(STATIC_RESOURCE_DIRECTORY_PATH));
+ } catch (IOException e) {
+ throw new RuntimeException("Can't locate static resource directory " + STATIC_RESOURCE_DIRECTORY_PATH);
+ }
+
+ context.setContextPath("/");
+ ret.setHandler(context);
+
+ ServletHolder holderPwd = new ServletHolder("default", DefaultServlet.class);
+ holderPwd.setInitOrder(1);
+ context.addServlet(holderPwd,"/");
+
+ ServletHolder jerseyServlet = context.addServlet(ServletContainer.class, "/api/v1/*");
+ jerseyServlet.setInitOrder(2);
+ jerseyServlet.setInitParameter("javax.ws.rs.Application", LogviewerApplication.class.getName());
+
+ UIHelpers.configFilters(context, filterConfigurations);
+ }
+ return ret;
+ }
+
+ private final Server httpServer;
+ private boolean closed = false;
+
+ /**
+ * Constructor.
+ * @param conf Logviewer conf for the servers
+ */
+ public LogviewerServer(Map<String, Object> conf) {
+ httpServer = mkHttpServer(conf);
+ }
+
+ @VisibleForTesting
+ void start() throws Exception {
+ LOG.info("Starting Logviewer...");
+ if (httpServer != null) {
+ httpServer.start();
+ }
+ }
+
+ @VisibleForTesting
+ void awaitTermination() throws InterruptedException {
+ httpServer.join();
+ }
+
+ @Override
+ public synchronized void close() {
+ if (!closed) {
+ //This is kind of useless...
+ meterShutdownCalls.mark();
+
+ //TODO this is causing issues...
+ //if (httpServer != null) {
+ // httpServer.destroy();
+ //}
+
+ closed = true;
+ }
+ }
+
+ /**
+ * @return The port the HTTP server is listening on. Not available until {@link #start() } has run.
+ */
+ public int getHttpServerPort() {
+ assert httpServer.getConnectors().length == 1;
+
+ return httpServer.getConnectors()[0].getLocalPort();
+ }
+
+ /**
+ * Main method to start the server.
+ */
+ public static void main(String [] args) throws Exception {
+ Utils.setupDefaultUncaughtExceptionHandler();
+ Map<String, Object> conf = Utils.readStormConfig();
+
+ DirectoryCleaner directoryCleaner = new DirectoryCleaner();
+
+ try (LogviewerServer server = new LogviewerServer(conf);
+ LogCleaner logCleaner = new LogCleaner(conf, directoryCleaner)) {
+ Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close());
+ logCleaner.start();
+ StormMetricsRegistry.startMetricsReporters(conf);
+ server.start();
+ server.awaitTermination();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java
new file mode 100644
index 0000000..b8f32f2
--- /dev/null
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogDownloadHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.handler;
+
+import org.apache.storm.daemon.logviewer.utils.LogFileDownloader;
+import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer;
+
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+
+public class LogviewerLogDownloadHandler {
+
+ private final LogFileDownloader logFileDownloadHelper;
+
+ public LogviewerLogDownloadHandler(String logRoot, String daemonLogRoot, ResourceAuthorizer resourceAuthorizer) {
+ this.logFileDownloadHelper = new LogFileDownloader(logRoot, daemonLogRoot, resourceAuthorizer);
+ }
+
+ public Response downloadLogFile(String fileName, String user) throws IOException {
+ return logFileDownloadHelper.downloadFile(fileName, user, false);
+ }
+
+ public Response downloadDaemonLogFile(String fileName, String user) throws IOException {
+ return logFileDownloadHelper.downloadFile(fileName, user, true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
new file mode 100644
index 0000000..0881b69
--- /dev/null
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.handler;
+
+import j2html.TagCreator;
+import j2html.tags.DomContent;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner;
+import org.apache.storm.daemon.logviewer.LogviewerConstant;
+import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder;
+import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer;
+import org.apache.storm.daemon.logviewer.utils.WorkerLogs;
+import org.apache.storm.daemon.utils.StreamUtil;
+import org.apache.storm.daemon.utils.URLBuilder;
+import org.apache.storm.ui.InvalidRequestException;
+import org.apache.storm.ui.UIHelpers;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ServerUtils;
+import org.apache.storm.utils.Utils;
+import org.jooq.lambda.Unchecked;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.GZIPInputStream;
+
+import javax.ws.rs.core.Response;
+
+import static j2html.TagCreator.*;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.commons.lang.StringEscapeUtils.escapeHtml;
+
+public class LogviewerLogPageHandler {
+ private final String logRoot;
+ private final String daemonLogRoot;
+ private final ResourceAuthorizer resourceAuthorizer;
+
+ public LogviewerLogPageHandler(String logRoot, String daemonLogRoot,
+ ResourceAuthorizer resourceAuthorizer) {
+ this.logRoot = logRoot;
+ this.daemonLogRoot = daemonLogRoot;
+ this.resourceAuthorizer = resourceAuthorizer;
+ }
+
+ public Response listLogFiles(String user, Integer port, String topologyId, String callback, String origin) throws IOException {
+ List<File> fileResults = null;
+ if (topologyId == null) {
+ if (port == null) {
+ fileResults = WorkerLogs.getAllLogsForRootDir(new File(logRoot));
+ } else {
+ fileResults = new ArrayList<>();
+
+ File[] logRootFiles = new File(logRoot).listFiles();
+ if (logRootFiles != null) {
+ for (File topoDir : logRootFiles) {
+ File[] topoDirFiles = topoDir.listFiles();
+ if (topoDirFiles != null) {
+ for (File portDir : topoDirFiles) {
+ if (portDir.getName().equals(port.toString())) {
+ fileResults.addAll(DirectoryCleaner.getFilesForDir(portDir));
+ }
+ }
+ }
+ }
+ }
+ }
+ } else {
+ if (port == null) {
+ fileResults = new ArrayList<>();
+
+ File topoDir = new File(logRoot + Utils.FILE_PATH_SEPARATOR + topologyId);
+ if (topoDir.exists()) {
+ File[] topoDirFiles = topoDir.listFiles();
+ if (topoDirFiles != null) {
+ for (File portDir : topoDirFiles) {
+ fileResults.addAll(DirectoryCleaner.getFilesForDir(portDir));
+ }
+ }
+ }
+
+ } else {
+ File portDir = ConfigUtils.getWorkerDirFromRoot(logRoot, topologyId, port);
+ if (portDir.exists()) {
+ fileResults = DirectoryCleaner.getFilesForDir(portDir);
+ }
+ }
+ }
+
+ List<String> files;
+ if (fileResults != null) {
+ files = fileResults.stream()
+ .map(file -> WorkerLogs.getTopologyPortWorkerLog(file))
+ .sorted().collect(toList());
+ } else {
+ files = new ArrayList<>();
+ }
+
+ return LogviewerResponseBuilder.buildSuccessJsonResponse(files, callback, origin);
+ }
+
+ public Response logPage(String fileName, Integer start, Integer length, String grep, String user) throws IOException, InvalidRequestException {
+ String rootDir = logRoot;
+ if (resourceAuthorizer.isUserAllowedToAccessFile(fileName, user)) {
+ File file = new File(rootDir, fileName).getCanonicalFile();
+ String path = file.getCanonicalPath();
+ boolean isZipFile = path.endsWith(".gz");
+ File topoDir = file.getParentFile().getParentFile();
+
+ if (file.exists() && new File(rootDir).getCanonicalFile().equals(topoDir.getParentFile())) {
+ long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length();
+
+ SortedSet<File> logFiles;
+ try {
+ logFiles = Arrays.stream(topoDir.listFiles())
+ .flatMap(Unchecked.function(portDir -> DirectoryCleaner.getFilesForDir(portDir).stream()))
+ .filter(File::isFile)
+ .collect(toCollection(TreeSet::new));
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
+ }
+
+ List<String> filesStrWithoutFileParam = logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog)
+ .filter(fileStr -> !StringUtils.equals(fileName, fileStr)).collect(toList());
+
+ List<String> reorderedFilesStr = new ArrayList<>();
+ reorderedFilesStr.addAll(filesStrWithoutFileParam);
+ reorderedFilesStr.add(fileName);
+
+ length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE;
+
+ String logString;
+ if (isTxtFile(fileName)) {
+ logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length));
+ } else {
+ logString = escapeHtml("This is a binary file and cannot display! You may download the full file.");
+ }
+
+ start = start != null ? start : Long.valueOf(fileLength - length).intValue();
+
+ List<DomContent> bodyContents = new ArrayList<>();
+ if (StringUtils.isNotEmpty(grep)) {
+ String matchedString = String.join("\n", Arrays.stream(logString.split("\n"))
+ .filter(str -> str.contains(grep)).collect(toList()));
+ bodyContents.add(pre(matchedString).withId("logContent"));
+ } else {
+ DomContent pagerData = null;
+ if (isTxtFile(fileName)) {
+ pagerData = pagerLinks(fileName, start, length, Long.valueOf(fileLength).intValue(), "log");
+ }
+
+ bodyContents.add(searchFileForm(fileName, "no"));
+ // list all files for this topology
+ bodyContents.add(logFileSelectionForm(reorderedFilesStr, "log"));
+ if (pagerData != null) {
+ bodyContents.add(pagerData);
+ }
+ bodyContents.add(downloadLink(fileName));
+ bodyContents.add(pre(logString).withClass("logContent"));
+ if (pagerData != null) {
+ bodyContents.add(pagerData);
+ }
+ }
+
+ String content = logTemplate(bodyContents, fileName, user).render();
+ return LogviewerResponseBuilder.buildSuccessHtmlResponse(content);
+ } else {
+ return LogviewerResponseBuilder.buildResponsePageNotFound();
+ }
+ } else {
+ if (resourceAuthorizer.getLogUserGroupWhitelist(fileName) != null) {
+ return LogviewerResponseBuilder.buildResponsePageNotFound();
+ } else {
+ return LogviewerResponseBuilder.buildResponseUnautohrizedUser(user);
+ }
+ }
+ }
+
+ public Response daemonLogPage(String fileName, Integer start, Integer length, String grep, String user) throws IOException, InvalidRequestException {
+ String rootDir = daemonLogRoot;
+ File file = new File(rootDir, fileName).getCanonicalFile();
+ String path = file.getCanonicalPath();
+ boolean isZipFile = path.endsWith(".gz");
+
+ if (file.exists() && new File(rootDir).getCanonicalFile().equals(file.getParentFile())) {
+ long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length();
+
+ // all types of files included
+ List<File> logFiles = Arrays.stream(new File(rootDir).listFiles())
+ .filter(File::isFile)
+ .collect(toList());
+
+ List<String> filesStrWithoutFileParam = logFiles.stream()
+ .map(File::getName).filter(fName -> !StringUtils.equals(fileName, fName)).collect(toList());
+
+ List<String> reorderedFilesStr = new ArrayList<>();
+ reorderedFilesStr.addAll(filesStrWithoutFileParam);
+ reorderedFilesStr.add(fileName);
+
+ length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE;
+
+ String logString;
+ if (isTxtFile(fileName)) {
+ logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length));
+ } else {
+ logString = escapeHtml("This is a binary file and cannot display! You may download the full file.");
+ }
+
+ start = start != null ? start : Long.valueOf(fileLength - length).intValue();
+
+ List<DomContent> bodyContents = new ArrayList<>();
+ if (StringUtils.isNotEmpty(grep)) {
+ String matchedString = String.join("\n", Arrays.stream(logString.split("\n"))
+ .filter(str -> str.contains(grep)).collect(toList()));
+ bodyContents.add(pre(matchedString).withId("logContent"));
+ } else {
+ DomContent pagerData = null;
+ if (isTxtFile(fileName)) {
+ pagerData = pagerLinks(fileName, start, length, Long.valueOf(fileLength).intValue(), "daemonlog");
+ }
+
+ bodyContents.add(searchFileForm(fileName, "yes"));
+ // list all daemon logs
+ bodyContents.add(logFileSelectionForm(reorderedFilesStr, "daemonlog"));
+ if (pagerData != null) {
+ bodyContents.add(pagerData);
+ }
+ bodyContents.add(daemonDownloadLink(fileName));
+ bodyContents.add(pre(logString).withClass("logContent"));
+ if (pagerData != null) {
+ bodyContents.add(pagerData);
+ }
+ }
+
+ String content = logTemplate(bodyContents, fileName, user).render();
+ return LogviewerResponseBuilder.buildSuccessHtmlResponse(content);
+ } else {
+ return LogviewerResponseBuilder.buildResponsePageNotFound();
+ }
+ }
+
+ private DomContent logTemplate(List<DomContent> bodyContents, String fileName, String user) {
+ List<DomContent> finalBodyContents = new ArrayList<>();
+
+ if (StringUtils.isNotBlank(user)) {
+ finalBodyContents.add(div(p("User: " + user)).withClass("ui-user"));
+ }
+
+ finalBodyContents.add(div(p("Note: the drop-list shows at most 1024 files for each worker directory.")).withClass("ui-note"));
+ finalBodyContents.add(h3(escapeHtml(fileName)));
+ finalBodyContents.addAll(bodyContents);
+
+ return html(
+ head(
+ title(escapeHtml(fileName) + " - Storm Log Viewer"),
+ link().withRel("stylesheet").withHref("/css/bootstrap-3.3.1.min.css"),
+ link().withRel("stylesheet").withHref("/css/jquery.dataTables.1.10.4.min.css"),
+ link().withRel("stylesheet").withHref("/css/style.css")
+ ),
+ body(
+ finalBodyContents.toArray(new DomContent[]{})
+ )
+ );
+ }
+
+ private DomContent downloadLink(String fileName) {
+ return p(linkTo(UIHelpers.urlFormat("/api/v1/download?file=%s", fileName), "Download Full File"));
+ }
+
+ private DomContent daemonDownloadLink(String fileName) {
+ return p(linkTo(UIHelpers.urlFormat("/api/v1/daemondownload?file=%s", fileName), "Download Full File"));
+ }
+
+ private DomContent linkTo(String url, String content) {
+ return a(content).withHref(url);
+ }
+
+ private DomContent logFileSelectionForm(List<String> logFiles, String type) {
+ return form(
+ dropDown("file", logFiles),
+ input().withType("submit").withValue("Switch file")
+ ).withAction(type).withId("list-of-files");
+ }
+
+ private DomContent dropDown(String name, List<String> logFiles) {
+ List<DomContent> options = logFiles.stream().map(TagCreator::option).collect(toList());
+ return select(options.toArray(new DomContent[]{})).withName(name).withId(name);
+ }
+
+ private DomContent searchFileForm(String fileName, String isDaemonValue) {
+ return form(
+ text("search this file:"),
+ input().withType("text").withName("search"),
+ input().withType("hidden").withName("is-daemon").withValue(isDaemonValue),
+ input().withType("hidden").withName("file").withValue(fileName),
+ input().withType("submit").withValue("Search")
+ ).withAction("/logviewer_search.html").withId("search-box");
+ }
+
+ private DomContent pagerLinks(String fileName, Integer start, Integer length, Integer fileLength, String type) {
+ int prevStart = Math.max(0, start - length);
+ int nextStart = fileLength > 0 ? Math.min(Math.max(0, fileLength - length), start + length) : start + length;
+ List<DomContent> btnLinks = new ArrayList<>();
+
+ Map<String, Object> urlQueryParams = new HashMap<>();
+ urlQueryParams.put("file", fileName);
+ urlQueryParams.put("start", Math.max(0, start - length));
+ urlQueryParams.put("length", length);
+
+ btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "Prev", prevStart < start));
+
+ urlQueryParams.clear();
+ urlQueryParams.put("file", fileName);
+ urlQueryParams.put("start", 0);
+ urlQueryParams.put("length", length);
+
+ btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "First"));
+
+ urlQueryParams.clear();
+ urlQueryParams.put("file", fileName);
+ urlQueryParams.put("length", length);
+
+ btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "Last"));
+
+ urlQueryParams.clear();
+ urlQueryParams.put("file", fileName);
+ urlQueryParams.put("start", Math.min(Math.max(0, fileLength - length), start + length));
+ urlQueryParams.put("length", length);
+
+ btnLinks.add(toButtonLink(URLBuilder.build("/api/v1/" + type, urlQueryParams), "Next", nextStart > start));
+
+ return div(btnLinks.toArray(new DomContent[]{}));
+ }
+
+ private DomContent toButtonLink(String url, String text) {
+ return toButtonLink(url, text, true);
+ }
+
+ private DomContent toButtonLink(String url, String text, boolean enabled) {
+ return a(text).withHref(url).withClass("btn btn-default " + (enabled ? "enabled" : "disabled"));
+ }
+
+ private String pageFile(String path, Integer tail) throws IOException, InvalidRequestException {
+ boolean isZipFile = path.endsWith(".gz");
+ long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length();
+ long skip = fileLength - tail;
+ return pageFile(path, Long.valueOf(skip).intValue(), tail);
+ }
+
+ private String pageFile(String path, Integer start, Integer length) throws IOException, InvalidRequestException {
+ boolean isZipFile = path.endsWith(".gz");
+ long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length();
+
+ try (InputStream input = isZipFile ? new GZIPInputStream(new FileInputStream(path)) : new FileInputStream(path);
+ ByteArrayOutputStream output = new ByteArrayOutputStream()) {
+ if (start >= fileLength) {
+ throw new InvalidRequestException("Cannot start past the end of the file");
+ }
+ if (start > 0) {
+ StreamUtil.skipBytes(input, start);
+ }
+
+ byte[] buffer = new byte[1024];
+ while (output.size() < length) {
+ int size = input.read(buffer, 0, Math.min(1024, length - output.size()));
+ if (size > 0) {
+ output.write(buffer, 0, size);
+ } else {
+ break;
+ }
+ }
+
+ return output.toString();
+ }
+ }
+
+ private boolean isTxtFile(String fileName) {
+ Pattern p = Pattern.compile("\\.(log.*|txt|yaml|pid)$");
+ Matcher matcher = p.matcher(fileName);
+ return matcher.find();
+ }
+}