You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2016/02/11 20:10:48 UTC
[05/15] storm git commit: Squashing util conversion changes.
Squashing util conversion changes.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3befae32
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3befae32
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3befae32
Branch: refs/heads/master
Commit: 3befae326c24b10cea1b1e6992ef808d481134c2
Parents: 00c18c9
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Mon Feb 8 11:46:51 2016 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Mon Feb 8 11:46:51 2016 -0600
----------------------------------------------------------------------
pom.xml | 6 +
storm-core/pom.xml | 4 +
.../src/clj/org/apache/storm/LocalCluster.clj | 2 +-
storm-core/src/clj/org/apache/storm/clojure.clj | 8 +-
storm-core/src/clj/org/apache/storm/cluster.clj | 25 +-
.../cluster_state/zookeeper_state_factory.clj | 11 +-
.../clj/org/apache/storm/command/blobstore.clj | 9 +-
.../org/apache/storm/command/dev_zookeeper.clj | 6 +-
.../clj/org/apache/storm/command/get_errors.clj | 12 +-
.../apache/storm/command/shell_submission.clj | 3 +-
storm-core/src/clj/org/apache/storm/config.clj | 18 +-
.../src/clj/org/apache/storm/converter.clj | 17 +-
.../src/clj/org/apache/storm/daemon/acker.clj | 13 +-
.../src/clj/org/apache/storm/daemon/common.clj | 29 +-
.../src/clj/org/apache/storm/daemon/drpc.clj | 20 +-
.../clj/org/apache/storm/daemon/executor.clj | 80 +-
.../clj/org/apache/storm/daemon/logviewer.clj | 68 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 155 ++-
.../clj/org/apache/storm/daemon/supervisor.clj | 200 ++--
.../src/clj/org/apache/storm/daemon/task.clj | 2 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 64 +-
.../src/clj/org/apache/storm/disruptor.clj | 10 +-
storm-core/src/clj/org/apache/storm/event.clj | 2 +-
.../src/clj/org/apache/storm/local_state.clj | 9 +-
.../org/apache/storm/pacemaker/pacemaker.clj | 7 +-
.../storm/pacemaker/pacemaker_state_factory.clj | 24 +-
.../clj/org/apache/storm/process_simulator.clj | 2 -
.../apache/storm/scheduler/DefaultScheduler.clj | 7 +-
.../apache/storm/scheduler/EvenScheduler.clj | 23 +-
.../storm/scheduler/IsolationScheduler.clj | 29 +-
storm-core/src/clj/org/apache/storm/stats.clj | 82 +-
storm-core/src/clj/org/apache/storm/testing.clj | 81 +-
storm-core/src/clj/org/apache/storm/thrift.clj | 6 +-
storm-core/src/clj/org/apache/storm/timer.clj | 12 +-
.../clj/org/apache/storm/trident/testing.clj | 9 +-
storm-core/src/clj/org/apache/storm/ui/core.clj | 97 +-
.../src/clj/org/apache/storm/ui/helpers.clj | 14 +-
storm-core/src/clj/org/apache/storm/util.clj | 921 +--------------
.../src/clj/org/apache/storm/zookeeper.clj | 1 -
.../serialization/SerializationFactory.java | 3 +-
.../staticmocking/MockedConfigUtils.java | 31 -
.../jvm/org/apache/storm/utils/ConfigUtils.java | 20 +-
.../jvm/org/apache/storm/utils/IPredicate.java | 22 +
.../org/apache/storm/utils/StaticMockable.java | 21 +
.../jvm/org/apache/storm/utils/TestUtils.java | 34 -
.../src/jvm/org/apache/storm/utils/Time.java | 25 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 1112 +++++++++++++++++-
.../org/apache/storm/integration_test.clj | 98 +-
.../org/apache/storm/testing4j_test.clj | 35 +-
.../apache/storm/trident/integration_test.clj | 12 +
.../test/clj/org/apache/storm/cluster_test.clj | 20 +-
.../test/clj/org/apache/storm/drpc_test.clj | 23 +-
.../clj/org/apache/storm/logviewer_test.clj | 267 +++--
.../test/clj/org/apache/storm/nimbus_test.clj | 131 ++-
.../scheduler/resource_aware_scheduler_test.clj | 21 +-
.../apache/storm/security/auth/auth_test.clj | 11 +-
.../BlowfishTupleSerializer_test.clj | 1 -
.../clj/org/apache/storm/serialization_test.clj | 23 +-
.../clj/org/apache/storm/supervisor_test.clj | 645 +++++-----
.../clj/org/apache/storm/transactional_test.clj | 18 +
.../clj/org/apache/storm/trident/state_test.clj | 3 +-
.../clj/org/apache/storm/trident/tuple_test.clj | 12 +
.../test/clj/org/apache/storm/utils_test.clj | 14 +-
.../staticmocking/ConfigUtilsInstaller.java | 38 +
.../utils/staticmocking/UtilsInstaller.java | 38 +
.../storm/utils/staticmocking/package-info.java | 95 ++
66 files changed, 2868 insertions(+), 1993 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 831059a..37dbb19 100644
--- a/pom.xml
+++ b/pom.xml
@@ -215,6 +215,7 @@
<jgrapht.version>0.9.0</jgrapht.version>
<guava.version>16.0.1</guava.version>
<netty.version>3.9.0.Final</netty.version>
+ <sysout-over-slf4j.version>1.0.2</sysout-over-slf4j.version>
<log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
<log4j.version>2.1</log4j.version>
<slf4j.version>1.7.7</slf4j.version>
@@ -829,6 +830,11 @@
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
+ <dependency>
+ <groupId>uk.org.lidalia</groupId>
+ <artifactId>sysout-over-slf4j</artifactId>
+ <version>${sysout-over-slf4j.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 8de2461..9dcad96 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -42,6 +42,10 @@
This is here as a work around to place it at the beginning of the classpath
even though maven does not officially support ordering of the classpath.-->
<dependency>
+ <groupId>uk.org.lidalia</groupId>
+ <artifactId>sysout-over-slf4j</artifactId>
+ </dependency>
+ <dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/LocalCluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/LocalCluster.clj b/storm-core/src/clj/org/apache/storm/LocalCluster.clj
index df3c180..8397707 100644
--- a/storm-core/src/clj/org/apache/storm/LocalCluster.clj
+++ b/storm-core/src/clj/org/apache/storm/LocalCluster.clj
@@ -48,7 +48,7 @@
[this name conf topology]
(submit-local-topology
(:nimbus (. this state)) name conf topology)
- (let [hook (get-configured-class conf STORM-TOPOLOGY-SUBMISSION-NOTIFIER-PLUGIN)]
+ (let [hook (Utils/getConfiguredClass conf STORM-TOPOLOGY-SUBMISSION-NOTIFIER-PLUGIN)]
(when hook (submit-hook hook name conf topology))))
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/clojure.clj b/storm-core/src/clj/org/apache/storm/clojure.clj
index ff33829..9e1836f 100644
--- a/storm-core/src/clj/org/apache/storm/clojure.clj
+++ b/storm-core/src/clj/org/apache/storm/clojure.clj
@@ -23,7 +23,7 @@
(:import [org.apache.storm.spout SpoutOutputCollector ISpout])
(:import [org.apache.storm.utils Utils])
(:import [org.apache.storm.clojure ClojureBolt ClojureSpout])
- (:import [java.util List])
+ (:import [java.util Collection List])
(:require [org.apache.storm [thrift :as thrift]]))
(defn direct-stream [fields]
@@ -153,6 +153,12 @@
(tuple-values [this collector stream]
this))
+(defn- collectify
+ [obj]
+ (if (or (sequential? obj) (instance? Collection obj))
+ obj
+ [obj]))
+
(defnk emit-bolt! [collector values
:stream Utils/DEFAULT_STREAM_ID :anchor []]
(let [^List anchor (collectify anchor)
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj
index 152423a..2ecae72 100644
--- a/storm-core/src/clj/org/apache/storm/cluster.clj
+++ b/storm-core/src/clj/org/apache/storm/cluster.clj
@@ -18,10 +18,11 @@
(:import [org.apache.zookeeper.data Stat ACL Id]
[org.apache.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary
LogConfig ProfileAction ProfileRequest NodeInfo]
- [java.io Serializable])
+ [java.io Serializable StringWriter PrintWriter]
+ [java.net URLEncoder])
(:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
(:import [org.apache.curator.framework CuratorFramework])
- (:import [org.apache.storm.utils Utils])
+ (:import [org.apache.storm.utils Utils Time])
(:import [org.apache.storm.cluster ClusterState ClusterStateContext ClusterStateListener ConnectionState])
(:import [java.security MessageDigest])
(:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
@@ -176,7 +177,7 @@
(defn error-path
[storm-id component-id]
- (str (error-storm-root storm-id) "/" (url-encode component-id)))
+ (str (error-storm-root storm-id) "/" (URLEncoder/encode component-id)))
(def last-error-path-seg "last-error")
@@ -184,7 +185,7 @@
[storm-id component-id]
(str (error-storm-root storm-id)
"/"
- (url-encode component-id)
+ (URLEncoder/encode component-id)
"-"
last-error-path-seg))
@@ -240,6 +241,12 @@
:stats (get executor-stats t)}})))
(into {}))))
+(defn- stringify-error [error]
+ (let [result (StringWriter.)
+ printer (PrintWriter. result)]
+ (.printStackTrace error printer)
+ (.toString result)))
+
;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
(defnk mk-storm-cluster-state
[cluster-state-spec :acls nil :context (ClusterStateContext.)]
@@ -259,7 +266,7 @@
state-id (.register
cluster-state
(fn [type path]
- (let [[subtree & args] (tokenize-path path)]
+ (let [[subtree & args] (Utils/tokenizePath path)]
(condp = subtree
ASSIGNMENTS-ROOT (if (empty? args)
(issue-callback! assignments-callback)
@@ -274,7 +281,9 @@
LOGCONFIG-ROOT (issue-map-callback! log-config-callback (first args))
BACKPRESSURE-ROOT (issue-map-callback! backpressure-callback (first args))
;; this should never happen
- (exit-process! 30 "Unknown callback for subtree " subtree args)))))]
+ ;(exit-process! 30 "Unknown callback for subtree " subtree args)
+ (Utils/exitProcess 30 ["Unknown callback for subtree " subtree args])
+ ))))]
(doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE
LOGCONFIG-SUBTREE]]
(.mkdirs cluster-state p acls))
@@ -381,7 +390,7 @@
;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
;; we avoid situations like that
- (let [node+port->executors (reverse-map executor->node+port)
+ (let [node+port->executors (clojurify-structure (Utils/reverseMap executor->node+port))
all-heartbeats (for [[[node port] executors] node+port->executors]
(->> (get-worker-heartbeat this storm-id node port)
(convert-executor-beats executors)
@@ -580,7 +589,7 @@
[this storm-id component-id node port error]
(let [path (error-path storm-id component-id)
last-error-path (last-error-path storm-id component-id)
- data (thriftify-error {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port})
+ data (thriftify-error {:time-secs (Time/currentTimeSecs) :error (stringify-error error) :host node :port port})
_ (.mkdirs cluster-state path acls)
ser-data (Utils/serialize data)
_ (.mkdirs cluster-state path acls)
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
index dcfa8d8..9594aab 100644
--- a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
+++ b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
@@ -16,9 +16,10 @@
(ns org.apache.storm.cluster-state.zookeeper-state-factory
(:import [org.apache.curator.framework.state ConnectionStateListener]
- [org.apache.storm.zookeeper Zookeeper])
+ [org.apache.storm.zookeeper Zookeeper]
+ [org.apache.storm.utils Utils])
(:import [org.apache.zookeeper KeeperException$NoNodeException CreateMode
- Watcher$Event$EventType Watcher$Event$KeeperState]
+ Watcher$Event$EventType Watcher$Event$KeeperState]
[org.apache.storm.cluster ClusterState DaemonType])
(:use [org.apache.storm cluster config log util])
(:require [org.apache.storm [zookeeper :as zk]])
@@ -63,7 +64,7 @@
(register
[this callback]
- (let [id (uuid)]
+ (let [id (Utils/uuid)]
(swap! callbacks assoc id callback)
id))
@@ -73,7 +74,7 @@
(set-ephemeral-node
[this path data acls]
- (Zookeeper/mkdirs zk-writer (parent-path path) acls)
+ (Zookeeper/mkdirs zk-writer (Utils/parentPath path) acls)
(if (Zookeeper/exists zk-writer path false)
(try-cause
(Zookeeper/setData zk-writer path data) ; should verify that it's ephemeral
@@ -92,7 +93,7 @@
(if (Zookeeper/exists zk-writer path false)
(Zookeeper/setData zk-writer path data)
(do
- (Zookeeper/mkdirs zk-writer (parent-path path) acls)
+ (Zookeeper/mkdirs zk-writer (Utils/parentPath path) acls)
(Zookeeper/createNode zk-writer path data CreateMode/PERSISTENT acls))))
(set-worker-hb
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/command/blobstore.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/blobstore.clj b/storm-core/src/clj/org/apache/storm/command/blobstore.clj
index b1496db..76d8afb 100644
--- a/storm-core/src/clj/org/apache/storm/command/blobstore.clj
+++ b/storm-core/src/clj/org/apache/storm/command/blobstore.clj
@@ -17,7 +17,8 @@
(:import [java.io InputStream OutputStream]
[org.apache.storm.generated SettableBlobMeta AccessControl AuthorizationException
KeyNotFoundException]
- [org.apache.storm.blobstore BlobStoreAclHandler])
+ [org.apache.storm.blobstore BlobStoreAclHandler]
+ [org.apache.storm.utils Utils])
(:use [org.apache.storm config]
[clojure.string :only [split]]
[clojure.tools.cli :only [cli]]
@@ -88,10 +89,10 @@
(defn create-cli [args]
(let [[{file :file acl :acl replication-factor :replication-factor} [key] _] (cli args ["-f" "--file" :default nil]
["-a" "--acl" :default [] :parse-fn as-acl]
- ["-r" "--replication-factor" :default -1 :parse-fn parse-int])
+ ["-r" "--replication-factor" :default -1 :parse-fn #(Integer/parseInt %)])
meta (doto (SettableBlobMeta. acl)
(.set_replication_factor replication-factor))]
- (validate-key-name! key)
+ (Utils/validateKeyName key)
(log-message "Creating " key " with ACL " (pr-str (map access-control-str acl)))
(if file
(with-open [f (input-stream file)]
@@ -140,7 +141,7 @@
(log-message "Current replication factor " blob-replication)
blob-replication)
"--update" (let [[{replication-factor :replication-factor} [key] _]
- (cli new-args ["-r" "--replication-factor" :parse-fn parse-int])]
+ (cli new-args ["-r" "--replication-factor" :parse-fn #(Integer/parseInt %)])]
(if (nil? replication-factor)
(throw (RuntimeException. (str "Please set the replication factor")))
(let [blob-replication (.updateBlobReplication blobstore key replication-factor)]
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
index ef9ecbb..657e242 100644
--- a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
+++ b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
@@ -14,6 +14,7 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.command.dev-zookeeper
+ (:import [org.apache.storm.utils Utils])
(:use [org.apache.storm zookeeper util config])
(:import [org.apache.storm.utils ConfigUtils])
(:import [org.apache.storm.zookeeper Zookeeper])
@@ -23,6 +24,5 @@
(let [conf (clojurify-structure (ConfigUtils/readStormConfig))
port (conf STORM-ZOOKEEPER-PORT)
localpath (conf DEV-ZOOKEEPER-PATH)]
- (rmr localpath)
- (Zookeeper/mkInprocessZookeeper localpath port)
- ))
+ (Utils/forceDelete localpath)
+ (Zookeeper/mkInprocessZookeeper localpath port)))
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/command/get_errors.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/get_errors.clj b/storm-core/src/clj/org/apache/storm/command/get_errors.clj
index c267390..615a5f3 100644
--- a/storm-core/src/clj/org/apache/storm/command/get_errors.clj
+++ b/storm-core/src/clj/org/apache/storm/command/get_errors.clj
@@ -21,7 +21,8 @@
[nimbus :as nimbus]
[common :as common]])
(:import [org.apache.storm.generated GetInfoOptions NumErrorsChoice
- TopologySummary ErrorInfo])
+ TopologySummary ErrorInfo]
+ [org.json.simple JSONValue])
(:gen-class))
(defn get-topology-id [name topologies]
@@ -44,9 +45,10 @@
topo-id (get-topology-id name topologies)
topo-info (when (not-nil? topo-id) (.getTopologyInfoWithOpts nimbus topo-id opts))]
(if (or (nil? topo-id) (nil? topo-info))
- (println (to-json {"Failure" (str "No topologies running with name " name)}))
+ (println (JSONValue/toJSONString {"Failure" (str "No topologies running with name " name)}))
(let [topology-name (.get_name topo-info)
topology-errors (.get_errors topo-info)]
- (println (to-json (hash-map
- "Topology Name" topology-name
- "Comp-Errors" (get-component-errors topology-errors)))))))))
+ (println (JSONValue/toJSONString
+ (hash-map
+ "Topology Name" topology-name
+ "Comp-Errors" (get-component-errors topology-errors)))))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 8a5eb21..0d5783b 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -15,6 +15,7 @@
;; limitations under the License.
(ns org.apache.storm.command.shell-submission
(:import [org.apache.storm StormSubmitter]
+ [org.apache.storm.utils Utils]
[org.apache.storm.zookeeper Zookeeper])
(:use [org.apache.storm thrift util config log zookeeper])
(:require [clojure.string :as str])
@@ -31,5 +32,5 @@
no-op (.close zk-leader-elector)
jarpath (StormSubmitter/submitJar conf tmpjarpath)
args (concat args [host port jarpath])]
- (exec-command! (str/join " " args))
+ (Utils/execCommand (str/join " " args))
))
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/config.clj b/storm-core/src/clj/org/apache/storm/config.clj
index 3666e13..e50f023 100644
--- a/storm-core/src/clj/org/apache/storm/config.clj
+++ b/storm-core/src/clj/org/apache/storm/config.clj
@@ -18,7 +18,7 @@
(:import [java.io FileReader File IOException]
[org.apache.storm.generated StormTopology])
(:import [org.apache.storm Config])
- (:import [org.apache.storm.utils Utils LocalState ConfigUtils])
+ (:import [org.apache.storm.utils Utils LocalState ConfigUtils MutableInt])
(:import [org.apache.storm.validation ConfigValidation])
(:import [org.apache.commons.io FileUtils])
(:require [clojure [string :as str]])
@@ -49,6 +49,22 @@
(/ 1)
int))
+(defn- even-sampler
+ [freq]
+ (let [freq (int freq)
+ start (int 0)
+ r (java.util.Random.)
+ curr (MutableInt. -1)
+ target (MutableInt. (.nextInt r freq))]
+ (with-meta
+ (fn []
+ (let [i (.increment curr)]
+ (when (>= i freq)
+ (.set curr start)
+ (.set target (.nextInt r freq))))
+ (= (.get curr) (.get target)))
+ {:rate freq})))
+
;; TODO this function together with sampling-rate are to be replaced with Java version when util.clj is in
(defn mk-stats-sampler
[conf]
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj
index bb2dc87..23e7452 100644
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@ -16,7 +16,8 @@
(ns org.apache.storm.converter
(:import [org.apache.storm.generated SupervisorInfo NodeInfo Assignment WorkerResources
StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions
- TopologyActionOptions DebugOptions ProfileRequest])
+ TopologyActionOptions DebugOptions ProfileRequest]
+ [org.apache.storm.utils Utils])
(:use [org.apache.storm util stats log])
(:require [org.apache.storm.daemon [common :as common]]))
@@ -71,6 +72,8 @@
(:worker->resources assignment)))))
thrift-assignment))
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
+;TODO: when translating this function, you should replace the map-key with a proper for loop HERE
(defn clojurify-executor->node_port [executor->node_port]
(into {}
(map-val
@@ -90,6 +93,7 @@
[(.get_mem_on_heap resources) (.get_mem_off_heap resources) (.get_cpu resources)]])
worker->resources)))
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
(defn clojurify-assignment [^Assignment assignment]
(if assignment
(org.apache.storm.daemon.common.Assignment.
@@ -117,12 +121,17 @@
:killed TopologyStatus/KILLED
nil)))
+(defn assoc-non-nil
+ [m k v]
+ (if v (assoc m k v) m))
+
(defn clojurify-rebalance-options [^RebalanceOptions rebalance-options]
(-> {:action :rebalance}
(assoc-non-nil :delay-secs (if (.is_set_wait_secs rebalance-options) (.get_wait_secs rebalance-options)))
(assoc-non-nil :num-workers (if (.is_set_num_workers rebalance-options) (.get_num_workers rebalance-options)))
(assoc-non-nil :component->executors (if (.is_set_num_executors rebalance-options) (into {} (.get_num_executors rebalance-options))))))
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
(defn thriftify-rebalance-options [rebalance-options]
(if rebalance-options
(let [thrift-rebalance-options (RebalanceOptions.)]
@@ -178,6 +187,7 @@
(.set_enable (get options :enable false))
(.set_samplingpct (get options :samplingpct 10))))
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
(defn thriftify-storm-base [storm-base]
(doto (StormBase.)
(.set_name (:storm-name storm-base))
@@ -190,6 +200,7 @@
(.set_prev_status (convert-to-status-from-symbol (:prev-status storm-base)))
(.set_component_debug (map-val thriftify-debugoptions (:component->debug storm-base)))))
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
(defn clojurify-storm-base [^StormBase storm-base]
(if storm-base
(org.apache.storm.daemon.common.StormBase.
@@ -203,6 +214,8 @@
(convert-to-symbol-from-status (.get_prev_status storm-base))
(map-val clojurify-debugoptions (.get_component_debug storm-base)))))
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
(defn thriftify-stats [stats]
(if stats
(map-val thriftify-executor-stats
@@ -210,6 +223,8 @@
stats))
{}))
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
(defn clojurify-stats [stats]
(if stats
(map-val clojurify-executor-stats
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
index 7c4d614..58d8e7a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
@@ -14,9 +14,10 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.daemon.acker
- (:import [org.apache.storm.task OutputCollector TopologyContext IBolt])
+ (:import [org.apache.storm.task OutputCollector TopologyContext IBolt]
+ [org.apache.storm.utils Utils])
(:import [org.apache.storm.tuple Tuple Fields])
- (:import [org.apache.storm.utils RotatingMap MutableObject])
+ (:import [org.apache.storm.utils Container RotatingMap MutableObject])
(:import [java.util List Map])
(:import [org.apache.storm Constants])
(:use [org.apache.storm config util log])
@@ -88,20 +89,20 @@
)))
(defn -init []
- [[] (container)])
+ [[] (Container.)])
(defn -prepare [this conf context collector]
(let [^IBolt ret (mk-acker-bolt)]
- (container-set! (.state ^org.apache.storm.daemon.acker this) ret)
+ (Utils/containerSet (.state ^org.apache.storm.daemon.acker this) ret)
(.prepare ret conf context collector)
))
(defn -execute [this tuple]
- (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))]
+ (let [^IBolt delegate (Utils/containerGet (.state ^org.apache.storm.daemon.acker this))]
(.execute delegate tuple)
))
(defn -cleanup [this]
- (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))]
+ (let [^IBolt delegate (Utils/containerGet (.state ^org.apache.storm.daemon.acker this))]
(.cleanup delegate)
))
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index d0f8dd9..3dc2ee5 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -17,17 +17,17 @@
(:use [org.apache.storm log config util])
(:import [org.apache.storm.generated StormTopology
InvalidTopologyException GlobalStreamId]
- [org.apache.storm.utils ThriftTopologyUtils]
+ [org.apache.storm.utils Utils ConfigUtils IPredicate ThriftTopologyUtils]
[org.apache.storm.daemon.metrics.reporters PreparableReporter]
[com.codahale.metrics MetricRegistry])
- (:import [org.apache.storm.utils Utils ConfigUtils])
(:import [org.apache.storm.daemon.metrics MetricsUtils])
(:import [org.apache.storm.task WorkerTopologyContext])
(:import [org.apache.storm Constants])
(:import [org.apache.storm.metric SystemBolt])
(:import [org.apache.storm.metric EventLoggerBolt])
- (:import [org.apache.storm.security.auth IAuthorizer])
- (:import [java.io InterruptedIOException])
+ (:import [org.apache.storm.security.auth IAuthorizer])
+ (:import [java.io InterruptedIOException]
+ [org.json.simple JSONValue])
(:require [clojure.set :as set])
(:require [org.apache.storm.daemon.acker :as acker])
(:require [org.apache.storm.thrift :as thrift])
@@ -84,10 +84,9 @@
(ExecutorStats. 0 0 0 0 0))
(defn get-storm-id [storm-cluster-state storm-name]
- (let [active-storms (.active-storms storm-cluster-state)]
- (find-first
- #(= storm-name (:storm-name (.storm-base storm-cluster-state % nil)))
- active-storms)
+ (let [active-storms (.active-storms storm-cluster-state)
+ pred (reify IPredicate (test [this x] (= storm-name (:storm-name (.storm-base storm-cluster-state x nil)))))]
+ (Utils/findFirst pred active-storms)
))
(defn topology-bases [storm-cluster-state]
@@ -114,12 +113,12 @@
(throw e#))
(catch Throwable t#
(log-error t# "Error on initialization of server " ~(str name))
- (exit-process! 13 "Error on initialization")
+ (Utils/exitProcess 13 "Error on initialization")
)))))
(defn- validate-ids! [^StormTopology topology]
(let [sets (map #(.getFieldValue topology %) thrift/STORM-TOPOLOGY-FIELDS)
- offending (apply any-intersection sets)]
+ offending (apply set/intersection sets)]
(if-not (empty? offending)
(throw (InvalidTopologyException.
(str "Duplicate component ids: " offending))))
@@ -145,9 +144,10 @@
(defn component-conf [component]
(->> component
- .get_common
- .get_json_conf
- from-json))
+ .get_common
+ .get_json_conf
+ (#(if % (JSONValue/parse %)))
+ clojurify-structure))
(defn validate-basic! [^StormTopology topology]
(validate-ids! topology)
@@ -238,7 +238,7 @@
{TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]]
(do
;; this set up tick tuples to cause timeouts to be triggered
- (.set_json_conf common (to-json spout-conf))
+ (.set_json_conf common (JSONValue/toJSONString spout-conf))
(.put_to_streams common ACKER-INIT-STREAM-ID (thrift/output-fields ["id" "init-val" "spout-task"]))
(.put_to_inputs common
(GlobalStreamId. ACKER-COMPONENT-ID ACKER-ACK-STREAM-ID)
@@ -363,6 +363,7 @@
(defn num-start-executors [component]
(thrift/parallelism-hint (.get_common component)))
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
(defn storm-task-info
"Returns map from task -> component id"
[^StormTopology user-topology storm-conf]
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
index a07b9ef..7e5965b 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
@@ -17,12 +17,14 @@
(ns org.apache.storm.daemon.drpc
(:import [org.apache.storm.security.auth AuthUtils ThriftServer ThriftConnectionType ReqContext])
(:import [org.apache.storm.security.auth.authorizer DRPCAuthorizerBase])
+ (:import [org.apache.storm.utils Utils])
(:import [org.apache.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor
DRPCRequest DRPCExecutionException DistributedRPCInvocations DistributedRPCInvocations$Iface
DistributedRPCInvocations$Processor])
(:import [java.util.concurrent Semaphore ConcurrentLinkedQueue
ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
- (:import [org.apache.storm.daemon Shutdownable])
+ (:import [org.apache.storm.daemon Shutdownable]
+ [org.apache.storm.utils Time])
(:import [java.net InetAddress])
(:import [org.apache.storm.generated AuthorizationException]
[org.apache.storm.utils VersionInfo ConfigUtils])
@@ -57,7 +59,7 @@
(defn check-authorization
([aclHandler mapping operation context]
(if (not-nil? context)
- (log-thrift-access (.requestID context) (.remoteAddress context) (.principal context) operation))
+ (Utils/logThriftAccess (.requestID context) (.remoteAddress context) (.principal context) operation))
(if aclHandler
(let [context (or context (ReqContext/context))]
(if-not (.permit aclHandler context operation mapping)
@@ -85,10 +87,10 @@
(swap! id->request dissoc id)
(swap! id->start dissoc id))
my-ip (.getHostAddress (InetAddress/getLocalHost))
- clear-thread (async-loop
+ clear-thread (Utils/asyncLoop
(fn []
(doseq [[id start] @id->start]
- (when (> (time-delta start) (conf DRPC-REQUEST-TIMEOUT-SECS))
+ (when (> (Time/delta start) (conf DRPC-REQUEST-TIMEOUT-SECS))
(when-let [sem (@id->sem id)]
(.remove (acquire-queue request-queues (@id->function id)) (@id->request id))
(log-warn "Timeout DRPC request id: " id " start at " start)
@@ -107,7 +109,7 @@
^Semaphore sem (Semaphore. 0)
req (DRPCRequest. args id)
^ConcurrentLinkedQueue queue (acquire-queue request-queues function)]
- (swap! id->start assoc id (current-time-secs))
+ (swap! id->start assoc id (Time/currentTimeSecs))
(swap! id->sem assoc id sem)
(swap! id->function assoc id function)
(swap! id->request assoc id req)
@@ -227,9 +229,9 @@
(DistributedRPCInvocations$Processor. drpc-service-handler)
ThriftConnectionType/DRPC_INVOCATIONS)
http-creds-handler (AuthUtils/GetDrpcHttpCredentialsPlugin conf)]
- (add-shutdown-hook-with-force-kill-in-1-sec (fn []
- (if handler-server (.stop handler-server))
- (.stop invoke-server)))
+ (Utils/addShutdownHookWithForceKillIn1Sec (fn []
+ (if handler-server (.stop handler-server))
+ (.stop invoke-server)))
(log-message "Starting Distributed RPC servers...")
(future (.serve invoke-server))
(when (> drpc-http-port 0)
@@ -270,5 +272,5 @@
(.serve handler-server)))))
(defn -main []
- (setup-default-uncaught-exception-handler)
+ (Utils/setupDefaultUncaughtExceptionHandler)
(launch-server!))
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index ab0c8aa..2415d5b 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -36,7 +36,9 @@
(:import [org.apache.storm Config Constants])
(:import [org.apache.storm.cluster ClusterStateContext DaemonType])
(:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
- (:import [java.util.concurrent ConcurrentLinkedQueue])
+ (:import [java.lang Thread Thread$UncaughtExceptionHandler]
+ [java.util.concurrent ConcurrentLinkedQueue]
+ [org.json.simple JSONValue])
(:require [org.apache.storm [thrift :as thrift]
[cluster :as cluster] [disruptor :as disruptor] [stats :as stats]])
(:require [org.apache.storm.daemon [task :as task]])
@@ -109,6 +111,7 @@
:direct
)))
+;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE
(defn- outbound-groupings
[^WorkerTopologyContext worker-context this-component-id stream-id out-fields component->grouping topo-conf]
(->> component->grouping
@@ -151,7 +154,7 @@
bolts (.get_bolts topology)]
(cond (contains? spouts component-id) :spout
(contains? bolts component-id) :bolt
- :else (throw-runtime "Could not find " component-id " in topology " topology))))
+ :else (Utils/throwRuntime ["Could not find " component-id " in topology " topology]))))
(defn executor-selector [executor-data & _] (:type executor-data))
@@ -181,7 +184,8 @@
spec-conf (-> general-context
(.getComponentCommon component-id)
.get_json_conf
- from-json)]
+ (#(if % (JSONValue/parse %)))
+ clojurify-structure)]
(merge storm-conf (apply dissoc spec-conf to-remove))
))
@@ -195,20 +199,20 @@
(let [storm-conf (:storm-conf executor)
error-interval-secs (storm-conf TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS)
max-per-interval (storm-conf TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL)
- interval-start-time (atom (current-time-secs))
+ interval-start-time (atom (Time/currentTimeSecs))
interval-errors (atom 0)
]
(fn [error]
(log-error error)
- (when (> (time-delta @interval-start-time)
+ (when (> (Time/delta @interval-start-time)
error-interval-secs)
(reset! interval-errors 0)
- (reset! interval-start-time (current-time-secs)))
+ (reset! interval-start-time (Time/currentTimeSecs)))
(swap! interval-errors inc)
(when (<= @interval-errors max-per-interval)
(cluster/report-error (:storm-cluster-state executor) (:storm-id executor) (:component-id executor)
- (hostname storm-conf)
+ (Utils/hostname storm-conf)
(.getThisWorkerPort (:worker-context executor)) error)
))))
@@ -262,13 +266,16 @@
:task->component (:task->component worker)
:stream->component->grouper (outbound-components worker-context component-id storm-conf)
:report-error (throttled-report-error-fn <>)
- :report-error-and-die (fn [error]
- ((:report-error <>) error)
- (if (or
- (exception-cause? InterruptedException error)
- (exception-cause? java.io.InterruptedIOException error))
- (log-message "Got interrupted excpetion shutting thread down...")
- ((:suicide-fn <>))))
+ :report-error-and-die (reify
+ Thread$UncaughtExceptionHandler
+ (uncaughtException [this _ error]
+ (fn [error]
+ ((:report-error <>) error)
+ (if (or
+ (Utils/exceptionCauseIsInstanceOf InterruptedException error)
+ (Utils/exceptionCauseIsInstanceOf java.io.InterruptedIOException error))
+ (log-message "Got interrupted excpetion shutting thread down...")
+ ((:suicide-fn <>))))))
:sampler (mk-stats-sampler storm-conf)
:backpressure (atom false)
:spout-throttling-metrics (if (= executor-type :spout)
@@ -329,7 +336,7 @@
task-id (:task-id task-data)
name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
task-info (IMetricsConsumer$TaskInfo.
- (hostname (:storm-conf executor-data))
+ (Utils/hostname (:storm-conf executor-data))
(.getThisWorkerPort worker-context)
(:component-id executor-data)
task-id
@@ -386,8 +393,9 @@
;; doesn't block (because it's a single threaded queue and the caching/consumer started
;; trick isn't thread-safe)
system-threads [(start-batch-transfer->worker-handler! worker executor-data)]
- handlers (with-error-reaction report-error-and-die
- (mk-threads executor-data task-datas initial-credentials))
+ handlers (try
+ (mk-threads executor-data task-datas initial-credentials)
+ (catch Throwable t (report-error-and-die t)))
threads (concat handlers system-threads)]
(setup-ticks! worker executor-data)
@@ -472,7 +480,7 @@
(if p (* p num-tasks))))
(defn init-spout-wait-strategy [storm-conf]
- (let [ret (-> storm-conf (get TOPOLOGY-SPOUT-WAIT-STRATEGY) new-instance)]
+ (let [ret (-> storm-conf (get TOPOLOGY-SPOUT-WAIT-STRATEGY) Utils/newInstance)]
(.prepare ret storm-conf)
ret
))
@@ -491,6 +499,10 @@
EVENTLOGGER-STREAM-ID
[component-id message-id (System/currentTimeMillis) values]))))
+(defn- bit-xor-vals
+ [vals]
+ (reduce bit-xor 0 vals))
+
(defmethod mk-threads :spout [executor-data task-datas initial-credentials]
(let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data
^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy storm-conf)
@@ -506,7 +518,7 @@
2 ;; microoptimize for performance of .size method
(reify RotatingMap$ExpiredCallback
(expire [this id [task-id spout-id tuple-info start-time-ms]]
- (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
+ (let [time-delta (if start-time-ms (Time/deltaMs start-time-ms))]
(fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta "TIMEOUT" id)
))))
tuple-action-fn (fn [task-id ^TupleImpl tuple]
@@ -523,8 +535,8 @@
[stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
(when spout-id
(when-not (= stored-task-id task-id)
- (throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id))
- (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
+ (Utils/throwRuntime ["Fatal error, mismatched task ids: " task-id " " stored-task-id]))
+ (let [time-delta (if start-time-ms (Time/deltaMs start-time-ms))]
(condp = stream-id
ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
spout-id tuple-finished-info time-delta id)
@@ -540,7 +552,7 @@
emitted-count (MutableLong. 0)
empty-emit-streak (MutableLong. 0)]
- [(async-loop
+ [(Utils/asyncLoop
(fn []
;; If topology was started in inactive state, don't call (.open spout) until it's activated first.
(while (not @(:storm-active-atom executor-data))
@@ -661,19 +673,22 @@
(.set empty-emit-streak 0)
))
0))
- :kill-fn (:report-error-and-die executor-data)
- :factory? true
- :thread-name (str component-id "-executor" (:executor-id executor-data)))]))
+ false ; isDaemon
+ (:report-error-and-die executor-data)
+ Thread/NORM_PRIORITY
+ true ; isFactory
+ true ; startImmediately
+ (str component-id "-executor" (:executor-id executor-data)))]))
(defn- tuple-time-delta! [^TupleImpl tuple]
(let [ms (.getProcessSampleStartTime tuple)]
(if ms
- (time-delta-ms ms))))
+ (Time/deltaMs ms))))
(defn- tuple-execute-time-delta! [^TupleImpl tuple]
(let [ms (.getExecuteSampleStartTime tuple)]
(if ms
- (time-delta-ms ms))))
+ (Time/deltaMs ms))))
(defn put-xor! [^Map pending key id]
(let [curr (or (.get pending key) (long 0))]
@@ -738,7 +753,7 @@
;; TODO: can get any SubscribedState objects out of the context now
- [(async-loop
+ [(Utils/asyncLoop
(fn []
;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
(while (not @(:storm-active-atom executor-data))
@@ -840,9 +855,12 @@
(fn []
(disruptor/consume-batch-when-available receive-queue event-handler)
0)))
- :kill-fn (:report-error-and-die executor-data)
- :factory? true
- :thread-name (str component-id "-executor" (:executor-id executor-data)))]))
+ false ; isDaemon
+ (:report-error-and-die executor-data)
+ Thread/NORM_PRIORITY
+ true ; isFactory
+ true ; startImmediately
+ (str component-id "-executor" (:executor-id executor-data)))]))
(defmethod close-component :spout [executor-data spout]
(.close spout))
http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
index 0edfe08..6ca1759 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -20,7 +20,7 @@
(:use [hiccup core page-helpers form-helpers])
(:use [org.apache.storm config util log timer])
(:use [org.apache.storm.ui helpers])
- (:import [org.apache.storm.utils Utils VersionInfo ConfigUtils])
+ (:import [org.apache.storm.utils Utils Time VersionInfo ConfigUtils])
(:import [org.slf4j LoggerFactory])
(:import [java.util Arrays ArrayList HashSet])
(:import [java.util.zip GZIPInputStream])
@@ -28,10 +28,10 @@
(:import [org.apache.logging.log4j.core Appender LoggerContext])
(:import [org.apache.logging.log4j.core.appender RollingFileAppender])
(:import [java.io BufferedInputStream File FileFilter FileInputStream
- InputStream InputStreamReader])
+ InputStream InputStreamReader]
+ [java.net URLDecoder])
(:import [java.nio.file Files Path Paths DirectoryStream])
(:import [java.nio ByteBuffer])
- (:import [org.apache.storm.utils Utils])
(:import [org.apache.storm.daemon DirectoryCleaner])
(:import [org.yaml.snakeyaml Yaml]
[org.yaml.snakeyaml.constructor SafeConstructor])
@@ -51,6 +51,8 @@
(def ^:dynamic *STORM-CONF* (clojurify-structure (ConfigUtils/readStormConfig)))
(def STORM-VERSION (VersionInfo/getVersion))
+(def worker-log-filename-pattern #"^worker.log(.*)")
+
(defmeter logviewer:num-log-page-http-requests)
(defmeter logviewer:num-daemonlog-page-http-requests)
(defmeter logviewer:num-download-log-file-http-requests)
@@ -117,9 +119,9 @@
(defn get-topo-port-workerlog
"Return the path of the worker log with the format of topoId/port/worker.log.*"
[^File file]
- (clojure.string/join file-path-separator
+ (clojure.string/join Utils/FILE_PATH_SEPARATOR
(take-last 3
- (split (.getCanonicalPath file) (re-pattern file-path-separator)))))
+ (split (.getCanonicalPath file) (re-pattern Utils/FILE_PATH_SEPARATOR)))))
(defn get-metadata-file-for-log-root-name [root-name root-dir]
(let [metaFile (clojure.java.io/file root-dir "metadata"
@@ -141,10 +143,10 @@
nil))))
(defn get-worker-id-from-metadata-file [metaFile]
- (get (clojure-from-yaml-file metaFile) "worker-id"))
+ (get (clojurify-structure (Utils/readYamlFile metaFile)) "worker-id"))
(defn get-topo-owner-from-metadata-file [metaFile]
- (get (clojure-from-yaml-file metaFile) TOPOLOGY-SUBMITTER-USER))
+ (get (clojurify-structure (Utils/readYamlFile metaFile)) TOPOLOGY-SUBMITTER-USER))
(defn identify-worker-log-dirs [log-dirs]
"return the workerid to worker-log-dir map"
@@ -188,7 +190,7 @@
"Return a sorted set of java.io.Files that were written by workers that are
now active"
[conf root-dir]
- (let [alive-ids (get-alive-ids conf (current-time-secs))
+ (let [alive-ids (get-alive-ids conf (Time/currentTimeSecs))
log-dirs (get-all-worker-dirs root-dir)
id->dir (identify-worker-log-dirs log-dirs)]
(apply sorted-set
@@ -227,12 +229,12 @@
[^File dir]
(let [topodir (.getParentFile dir)]
(if (empty? (.listFiles topodir))
- (rmr (.getCanonicalPath topodir)))))
+ (Utils/forceDelete (.getCanonicalPath topodir)))))
(defn cleanup-fn!
"Delete old log dirs for which the workers are no longer alive"
[log-root-dir]
- (let [now-secs (current-time-secs)
+ (let [now-secs (Time/currentTimeSecs)
old-log-dirs (select-dirs-for-cleanup *STORM-CONF*
(* now-secs 1000)
log-root-dir)
@@ -250,7 +252,7 @@
(dofor [dir dead-worker-dirs]
(let [path (.getCanonicalPath dir)]
(log-message "Cleaning up: Removing " path)
- (try (rmr path)
+ (try (Utils/forceDelete path)
(cleanup-empty-topodir! dir)
(catch Exception ex (log-error ex)))))
(per-workerdir-cleanup! (File. log-root-dir) (* per-dir-size (* 1024 1024)) cleaner)
@@ -264,7 +266,7 @@
(schedule-recurring (mk-timer :thread-name "logviewer-cleanup"
:kill-fn (fn [t]
(log-error t "Error when doing logs cleanup")
- (exit-process! 20 "Error when doing log cleanup")))
+ (Utils/exitProcess 20 "Error when doing log cleanup")))
0 ;; Start immediately.
interval-secs
(fn [] (cleanup-fn! log-root-dir))))))
@@ -309,7 +311,7 @@
(defn get-log-user-group-whitelist [fname]
(let [wl-file (ConfigUtils/getLogMetaDataFile fname)
- m (clojure-from-yaml-file wl-file)]
+ m (clojurify-structure (Utils/readYamlFile wl-file))]
(if (not-nil? m)
(do
(let [user-wl (.get m LOGS-USERS)
@@ -514,9 +516,9 @@
(defn url-to-match-centered-in-log-page
[needle fname offset port]
- (let [host (local-hostname)
+ (let [host (Utils/localHostname)
port (logviewer-port)
- fname (clojure.string/join file-path-separator (take-last 3 (split fname (re-pattern file-path-separator))))]
+ fname (clojure.string/join Utils/FILE_PATH_SEPARATOR (take-last 3 (split fname (re-pattern Utils/FILE_PATH_SEPARATOR))))]
(url (str "http://" host ":" port "/log")
{:file fname
:start (max 0
@@ -851,7 +853,7 @@
new-matches (conj matches
(merge these-matches
{ "fileName" file-name
- "port" (first (take-last 2 (split (.getCanonicalPath (first logs)) (re-pattern file-path-separator))))}))
+ "port" (first (take-last 2 (split (.getCanonicalPath (first logs)) (re-pattern Utils/FILE_PATH_SEPARATOR))))}))
new-count (+ match-count (count (these-matches "matches")))]
(if (empty? these-matches)
(recur matches (rest logs) 0 (+ file-offset 1) match-count)
@@ -874,12 +876,12 @@
(defn deep-search-logs-for-topology
[topology-id user ^String root-dir search num-matches port file-offset offset search-archived? callback origin]
(json-response
- (if (or (not search) (not (.exists (File. (str root-dir file-path-separator topology-id)))))
+ (if (or (not search) (not (.exists (File. (str root-dir Utils/FILE_PATH_SEPARATOR topology-id)))))
[]
(let [file-offset (if file-offset (Integer/parseInt file-offset) 0)
offset (if offset (Integer/parseInt offset) 0)
num-matches (or (Integer/parseInt num-matches) 1)
- port-dirs (vec (.listFiles (File. (str root-dir file-path-separator topology-id))))
+ port-dirs (vec (.listFiles (File. (str root-dir Utils/FILE_PATH_SEPARATOR topology-id))))
logs-for-port-fn (partial logs-for-port user)]
(if (or (not port) (= "*" port))
;; Check for all ports
@@ -892,7 +894,7 @@
;; Check just the one port
(if (not (contains? (into #{} (map str (*STORM-CONF* SUPERVISOR-SLOTS-PORTS))) port))
[]
- (let [port-dir (File. (str root-dir file-path-separator topology-id file-path-separator port))]
+ (let [port-dir (File. (str root-dir Utils/FILE_PATH_SEPARATOR topology-id Utils/FILE_PATH_SEPARATOR port))]
(if (or (not (.exists port-dir)) (empty? (logs-for-port user port-dir)))
[]
(let [filtered-logs (logs-for-port user port-dir)]
@@ -945,7 +947,7 @@
(if (= (str port) (.getName port-dir))
(into [] (DirectoryCleaner/getFilesForDir port-dir))))))))
(if (nil? port)
- (let [topo-dir (File. (str log-root file-path-separator topoId))]
+ (let [topo-dir (File. (str log-root Utils/FILE_PATH_SEPARATOR topoId))]
(if (.exists topo-dir)
(reduce concat
(for [port-dir (.listFiles topo-dir)]
@@ -982,7 +984,7 @@
user (.getUserName http-creds-handler servlet-request)
start (if (:start m) (parse-long-from-map m :start))
length (if (:length m) (parse-long-from-map m :length))
- file (url-decode (:file m))]
+ file (URLDecoder/decode (:file m))]
(log-template (log-page file start length (:grep m) user log-root)
file user))
(catch InvalidRequestException ex
@@ -993,21 +995,21 @@
(let [user (.getUserName http-creds-handler servlet-request)
port (second (split host-port #":"))
dir (File. (str log-root
- file-path-separator
+ Utils/FILE_PATH_SEPARATOR
topo-id
- file-path-separator
+ Utils/FILE_PATH_SEPARATOR
port))
file (File. (str log-root
- file-path-separator
+ Utils/FILE_PATH_SEPARATOR
topo-id
- file-path-separator
+ Utils/FILE_PATH_SEPARATOR
port
- file-path-separator
+ Utils/FILE_PATH_SEPARATOR
filename))]
(if (and (.exists dir) (.exists file))
(if (or (blank? (*STORM-CONF* UI-FILTER))
(authorized-log-user? user
- (str topo-id file-path-separator port file-path-separator "worker.log")
+ (str topo-id Utils/FILE_PATH_SEPARATOR port Utils/FILE_PATH_SEPARATOR "worker.log")
*STORM-CONF*))
(-> (resp/response file)
(resp/content-type "application/octet-stream"))
@@ -1019,14 +1021,14 @@
(let [user (.getUserName http-creds-handler servlet-request)
port (second (split host-port #":"))
dir (File. (str log-root
- file-path-separator
+ Utils/FILE_PATH_SEPARATOR
topo-id
- file-path-separator
+ Utils/FILE_PATH_SEPARATOR
port))]
(if (.exists dir)
(if (or (blank? (*STORM-CONF* UI-FILTER))
(authorized-log-user? user
- (str topo-id file-path-separator port file-path-separator "worker.log")
+ (str topo-id Utils/FILE_PATH_SEPARATOR port Utils/FILE_PATH_SEPARATOR "worker.log")
*STORM-CONF*))
(html4
[:head
@@ -1050,7 +1052,7 @@
user (.getUserName http-creds-handler servlet-request)
start (if (:start m) (parse-long-from-map m :start))
length (if (:length m) (parse-long-from-map m :length))
- file (url-decode (:file m))]
+ file (URLDecoder/decode (:file m))]
(log-template (daemonlog-page file start length (:grep m) user daemonlog-root)
file user))
(catch InvalidRequestException ex
@@ -1078,7 +1080,7 @@
;; filter is configured.
(try
(let [user (.getUserName http-creds-handler servlet-request)]
- (search-log-file (url-decode file)
+ (search-log-file (URLDecoder/decode file)
user
(if (= (:is-daemon m) "yes") daemonlog-root log-root)
(:search-string m)
@@ -1192,7 +1194,7 @@
(let [conf (clojurify-structure (ConfigUtils/readStormConfig))
log-root (ConfigUtils/workerArtifactsRoot conf)
daemonlog-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))]
- (setup-default-uncaught-exception-handler)
+ (Utils/setupDefaultUncaughtExceptionHandler)
(start-log-cleaner! conf log-root)
(log-message "Starting logviewer server for storm version '"
STORM-VERSION