You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/12 23:11:02 UTC
[04/14] git commit: move towards idiomatic Clojure style
move towards idiomatic Clojure style
Summary:
* When using defn, put function arguments on a separate line.
* Remove dangling ),],}
* Try to keep lines at 80 characters or less
Going forward, I’d like to see
* more docstrings
* most lines under 80 characters
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/2278fc96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/2278fc96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/2278fc96
Branch: refs/heads/security
Commit: 2278fc9623d5b71bca9ece3857e645e85ab1562f
Parents: eee8b24
Author: David James <da...@bluemontlabs.com>
Authored: Sat May 31 13:55:54 2014 -0400
Committer: David James <da...@bluemontlabs.com>
Committed: Sat May 31 13:55:54 2014 -0400
----------------------------------------------------------------------
.../src/clj/backtype/storm/LocalCluster.clj | 71 +-
storm-core/src/clj/backtype/storm/LocalDRPC.clj | 5 +-
storm-core/src/clj/backtype/storm/bootstrap.clj | 9 +-
storm-core/src/clj/backtype/storm/clojure.clj | 10 +-
storm-core/src/clj/backtype/storm/cluster.clj | 427 +++---
storm-core/src/clj/backtype/storm/config.clj | 145 +-
storm-core/src/clj/backtype/storm/disruptor.clj | 83 +-
storm-core/src/clj/backtype/storm/event.clj | 49 +-
storm-core/src/clj/backtype/storm/log.clj | 22 +-
.../clj/backtype/storm/process_simulator.clj | 27 +-
storm-core/src/clj/backtype/storm/stats.clj | 289 ++--
storm-core/src/clj/backtype/storm/testing.clj | 523 ++++----
storm-core/src/clj/backtype/storm/testing4j.clj | 58 +-
storm-core/src/clj/backtype/storm/thrift.clj | 255 ++--
storm-core/src/clj/backtype/storm/timer.clj | 94 +-
storm-core/src/clj/backtype/storm/tuple.clj | 7 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 1262 +++++++++---------
storm-core/src/clj/backtype/storm/util.clj | 732 +++++-----
storm-core/src/clj/backtype/storm/zookeeper.clj | 158 ++-
19 files changed, 2205 insertions(+), 2021 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/LocalCluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/LocalCluster.clj b/storm-core/src/clj/backtype/storm/LocalCluster.clj
index 77f3b3f..dc8214d 100644
--- a/storm-core/src/clj/backtype/storm/LocalCluster.clj
+++ b/storm-core/src/clj/backtype/storm/LocalCluster.clj
@@ -13,66 +13,75 @@
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
+
(ns backtype.storm.LocalCluster
(:use [backtype.storm testing config])
(:import [java.util Map])
(:gen-class
- :init init
- :implements [backtype.storm.ILocalCluster]
- :constructors {[] [] [java.util.Map] []}
- :state state ))
+ :init init
+ :implements [backtype.storm.ILocalCluster]
+ :constructors {[] [] [java.util.Map] []}
+ :state state))
(defn -init
([]
- (let [ret (mk-local-storm-cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})]
- [[] ret]
- ))
+ (let [ret (mk-local-storm-cluster
+ :daemon-conf
+ {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})]
+ [[] ret]))
([^Map stateMap]
- [[] stateMap]))
+ [[] stateMap]))
-(defn -submitTopology [this name conf topology]
- (submit-local-topology (:nimbus (. this state))
- name
- conf
- topology))
+(defn -submitTopology
+ [this name conf topology]
+ (submit-local-topology
+ (:nimbus (. this state)) name conf topology))
-(defn -submitTopologyWithOpts [this name conf topology submit-opts]
- (submit-local-topology-with-opts (:nimbus (. this state))
- name
- conf
- topology
- submit-opts))
+(defn -submitTopologyWithOpts
+ [this name conf topology submit-opts]
+ (submit-local-topology-with-opts
+ (:nimbus (. this state)) name conf topology submit-opts))
-(defn -shutdown [this]
+(defn -shutdown
+ [this]
(kill-local-storm-cluster (. this state)))
-(defn -killTopology [this name]
+(defn -killTopology
+ [this name]
(.killTopology (:nimbus (. this state)) name))
-(defn -getTopologyConf [this id]
+(defn -getTopologyConf
+ [this id]
(.getTopologyConf (:nimbus (. this state)) id))
-(defn -getTopology [this id]
+(defn -getTopology
+ [this id]
(.getTopology (:nimbus (. this state)) id))
-(defn -getClusterInfo [this]
+(defn -getClusterInfo
+ [this]
(.getClusterInfo (:nimbus (. this state))))
-(defn -getTopologyInfo [this id]
+(defn -getTopologyInfo
+ [this id]
(.getTopologyInfo (:nimbus (. this state)) id))
-(defn -killTopologyWithOpts [this name opts]
+(defn -killTopologyWithOpts
+ [this name opts]
(.killTopologyWithOpts (:nimbus (. this state)) name opts))
-(defn -activate [this name]
+(defn -activate
+ [this name]
(.activate (:nimbus (. this state)) name))
-(defn -deactivate [this name]
+(defn -deactivate
+ [this name]
(.deactivate (:nimbus (. this state)) name))
-(defn -rebalance [this name opts]
+(defn -rebalance
+ [this name opts]
(.rebalance (:nimbus (. this state)) name opts))
-(defn -getState [this]
+(defn -getState
+ [this]
(.state this))
-
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/LocalDRPC.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/LocalDRPC.clj b/storm-core/src/clj/backtype/storm/LocalDRPC.clj
index a6dab95..daead2e 100644
--- a/storm-core/src/clj/backtype/storm/LocalDRPC.clj
+++ b/storm-core/src/clj/backtype/storm/LocalDRPC.clj
@@ -13,6 +13,7 @@
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
+
(ns backtype.storm.LocalDRPC
(:require [backtype.storm.daemon [drpc :as drpc]])
(:use [backtype.storm util])
@@ -45,9 +46,9 @@
(defn -failRequest [this id]
(.failRequest (:handler (. this state)) id)
)
-
+
(defn -getServiceId [this]
- (:service-id (. this state)))
+ (:service-id (. this state)))
(defn -shutdown [this]
(ServiceRegistry/unregisterService (:service-id (. this state)))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/bootstrap.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/bootstrap.clj b/storm-core/src/clj/backtype/storm/bootstrap.clj
index 5f34ff1..3ee6f76 100644
--- a/storm-core/src/clj/backtype/storm/bootstrap.clj
+++ b/storm-core/src/clj/backtype/storm/bootstrap.clj
@@ -13,9 +13,11 @@
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
+
(ns backtype.storm.bootstrap)
-(defmacro bootstrap []
+(defmacro bootstrap
+ []
'(do
(import (quote [backtype.storm Constants]))
(import (quote [backtype.storm.testing FeederSpout TestPlannerBolt TestPlannerSpout
@@ -32,7 +34,7 @@
(import (quote [backtype.storm.task IBolt IOutputCollector
OutputCollector TopologyContext ShellBolt
GeneralTopologyContext WorkerTopologyContext]))
- (import (quote [backtype.storm.coordination CoordinatedBolt CoordinatedBolt$SourceArgs
+ (import (quote [backtype.storm.coordination CoordinatedBolt CoordinatedBolt$SourceArgs
IBatchBolt BatchBoltExecutor]))
(import (quote [backtype.storm.drpc KeyedFairBolt]))
(import (quote [backtype.storm.daemon Shutdownable]))
@@ -58,5 +60,4 @@
(import (quote [backtype.storm.grouping CustomStreamGrouping]))
(import (quote [java.io File FileOutputStream FileInputStream]))
(import (quote [java.util Collection List Random Map HashMap Collections ArrayList LinkedList]))
- (import (quote [org.apache.commons.io FileUtils]))
- ))
+ (import (quote [org.apache.commons.io FileUtils]))))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/clojure.clj b/storm-core/src/clj/backtype/storm/clojure.clj
index 33d204b..fdae7cb 100644
--- a/storm-core/src/clj/backtype/storm/clojure.clj
+++ b/storm-core/src/clj/backtype/storm/clojure.clj
@@ -13,6 +13,7 @@
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
+
(ns backtype.storm.clojure
(:use [backtype.storm bootstrap util])
(:import [backtype.storm StormSubmitter])
@@ -25,7 +26,6 @@
(:import [java.util List])
(:require [backtype.storm [thrift :as thrift]]))
-
(defn direct-stream [fields]
(StreamInfo. fields true))
@@ -145,9 +145,9 @@
(tuple-values [this collector ^String stream]
(let [^TopologyContext context (:context collector)
fields (.. context (getThisOutputFields stream) toList) ]
- (vec (map (into
- (empty this) (for [[k v] this]
- [(if (keyword? k) (name k) k) v]))
+ (vec (map (into
+ (empty this) (for [[k v] this]
+ [(if (keyword? k) (name k) k) v]))
fields))))
java.util.List
(tuple-values [this collector stream]
@@ -195,7 +195,7 @@
(defn submit-remote-topology [name conf topology]
(StormSubmitter/submitTopology name conf topology))
-(defn local-cluster []
+(defn local-cluster []
;; do this to avoid a cyclic dependency of
;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster
(eval '(new backtype.storm.LocalCluster)))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index b5c1e3b..e370b97 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -13,30 +13,30 @@
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
+
(ns backtype.storm.cluster
(:import [org.apache.zookeeper.data Stat])
(:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException])
(:import [backtype.storm.utils Utils])
(:use [backtype.storm util log config])
(:require [backtype.storm [zookeeper :as zk]])
- (:require [backtype.storm.daemon [common :as common]])
-
- )
+ (:require [backtype.storm.daemon [common :as common]]))
(defprotocol ClusterState
(set-ephemeral-node [this path data])
(delete-node [this path])
(create-sequential [this path data])
- (set-data [this path data]) ;; if node does not exist, create persistent with this data
+ ;; if node does not exist, create persistent with this data
+ (set-data [this path data])
(get-data [this path watch?])
(get-children [this path watch?])
(mkdirs [this path])
(close [this])
(register [this callback])
- (unregister [this id])
- )
+ (unregister [this id]))
-(defn mk-distributed-cluster-state [conf]
+(defn mk-distributed-cluster-state
+ [conf]
(let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)]
(zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
(.close zk))
@@ -48,83 +48,85 @@
:auth-conf conf
:root (conf STORM-ZOOKEEPER-ROOT)
:watcher (fn [state type path]
- (when @active
- (when-not (= :connected state)
- (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
- (when-not (= :none type)
- (doseq [callback (vals @callbacks)]
- (callback type path))))
- ))]
+ (when @active
+ (when-not (= :connected state)
+ (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
+ (when-not (= :none type)
+ (doseq [callback (vals @callbacks)]
+ (callback type path))))))]
(reify
ClusterState
- (register [this callback]
- (let [id (uuid)]
- (swap! callbacks assoc id callback)
- id
- ))
- (unregister [this id]
- (swap! callbacks dissoc id))
-
- (set-ephemeral-node [this path data]
- (zk/mkdirs zk (parent-path path))
- (if (zk/exists zk path false)
- (try-cause
- (zk/set-data zk path data) ; should verify that it's ephemeral
- (catch KeeperException$NoNodeException e
- (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
- (zk/create-node zk path data :ephemeral)
- ))
- (zk/create-node zk path data :ephemeral)
- ))
-
- (create-sequential [this path data]
+
+ (register
+ [this callback]
+ (let [id (uuid)]
+ (swap! callbacks assoc id callback)
+ id))
+
+ (unregister
+ [this id]
+ (swap! callbacks dissoc id))
+
+ (set-ephemeral-node
+ [this path data]
+ (zk/mkdirs zk (parent-path path))
+ (if (zk/exists zk path false)
+ (try-cause
+ (zk/set-data zk path data) ; should verify that it's ephemeral
+ (catch KeeperException$NoNodeException e
+ (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
+ (zk/create-node zk path data :ephemeral)
+ ))
+ (zk/create-node zk path data :ephemeral)))
+
+ (create-sequential
+ [this path data]
(zk/create-node zk path data :sequential))
-
- (set-data [this path data]
- ;; note: this does not turn off any existing watches
- (if (zk/exists zk path false)
- (zk/set-data zk path data)
- (do
- (zk/mkdirs zk (parent-path path))
- (zk/create-node zk path data :persistent)
- )))
-
- (delete-node [this path]
- (zk/delete-recursive zk path)
- )
-
- (get-data [this path watch?]
- (zk/get-data zk path watch?)
- )
-
- (get-children [this path watch?]
- (zk/get-children zk path watch?))
-
- (mkdirs [this path]
- (zk/mkdirs zk path))
-
- (close [this]
- (reset! active false)
- (.close zk))
- )))
+
+ (set-data
+ [this path data]
+ ;; note: this does not turn off any existing watches
+ (if (zk/exists zk path false)
+ (zk/set-data zk path data)
+ (do
+ (zk/mkdirs zk (parent-path path))
+ (zk/create-node zk path data :persistent))))
+
+ (delete-node
+ [this path]
+ (zk/delete-recursive zk path))
+
+ (get-data
+ [this path watch?]
+ (zk/get-data zk path watch?))
+
+ (get-children
+ [this path watch?]
+ (zk/get-children zk path watch?))
+
+ (mkdirs
+ [this path]
+ (zk/mkdirs zk path))
+
+ (close
+ [this]
+ (reset! active false)
+ (.close zk)))))
(defprotocol StormClusterState
(assignments [this callback])
(assignment-info [this storm-id callback])
(active-storms [this])
(storm-base [this storm-id callback])
-
(get-worker-heartbeat [this storm-id node port])
(executor-beats [this storm-id executor->node+port])
(supervisors [this callback])
- (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
-
+ (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
(setup-heartbeats! [this storm-id])
(teardown-heartbeats! [this storm-id])
(teardown-topology-errors! [this storm-id])
(heartbeat-storms [this])
(error-topologies [this])
-
(worker-heartbeat! [this storm-id node port info])
(remove-worker-heartbeat! [this storm-id node port])
(supervisor-heartbeat! [this supervisor-id info])
@@ -135,10 +137,7 @@
(remove-storm! [this storm-id])
(report-error [this storm-id task-id error])
(errors [this storm-id task-id])
-
- (disconnect [this])
- )
-
+ (disconnect [this]))
(def ASSIGNMENTS-ROOT "assignments")
(def CODE-ROOT "code")
@@ -153,64 +152,75 @@
(def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
(def ERRORS-SUBTREE (str "/" ERRORS-ROOT))
-(defn supervisor-path [id]
+(defn supervisor-path
+ [id]
(str SUPERVISORS-SUBTREE "/" id))
-(defn assignment-path [id]
+(defn assignment-path
+ [id]
(str ASSIGNMENTS-SUBTREE "/" id))
-(defn storm-path [id]
+(defn storm-path
+ [id]
(str STORMS-SUBTREE "/" id))
-(defn workerbeat-storm-root [storm-id]
+(defn workerbeat-storm-root
+ [storm-id]
(str WORKERBEATS-SUBTREE "/" storm-id))
-(defn workerbeat-path [storm-id node port]
+(defn workerbeat-path
+ [storm-id node port]
(str (workerbeat-storm-root storm-id) "/" node "-" port))
-(defn error-storm-root [storm-id]
+(defn error-storm-root
+ [storm-id]
(str ERRORS-SUBTREE "/" storm-id))
-(defn error-path [storm-id component-id]
+(defn error-path
+ [storm-id component-id]
(str (error-storm-root storm-id) "/" (url-encode component-id)))
-(defn- issue-callback! [cb-atom]
+(defn- issue-callback!
+ [cb-atom]
(let [cb @cb-atom]
(reset! cb-atom nil)
(when cb
- (cb))
- ))
+ (cb))))
-(defn- issue-map-callback! [cb-atom id]
+(defn- issue-map-callback!
+ [cb-atom id]
(let [cb (@cb-atom id)]
(swap! cb-atom dissoc id)
(when cb
- (cb id))
- ))
+ (cb id))))
-(defn- maybe-deserialize [ser]
+(defn- maybe-deserialize
+ [ser]
(when ser
(Utils/deserialize ser)))
(defstruct TaskError :error :time-secs)
-(defn- parse-error-path [^String p]
+(defn- parse-error-path
+ [^String p]
(Long/parseLong (.substring p 1)))
-
-(defn convert-executor-beats [executors worker-hb]
- ;; ensures that we only return heartbeats for executors assigned to this worker
+(defn convert-executor-beats
+ "Ensures that we only return heartbeats for executors assigned to
+ this worker."
+ [executors worker-hb]
(let [executor-stats (:executor-stats worker-hb)]
(->> executors
- (map (fn [t]
- (if (contains? executor-stats t)
- {t {:time-secs (:time-secs worker-hb)
- :uptime (:uptime worker-hb)
- :stats (get executor-stats t)}})))
- (into {}))))
+ (map (fn [t]
+ (if (contains? executor-stats t)
+ {t {:time-secs (:time-secs worker-hb)
+ :uptime (:uptime worker-hb)
+ :stats (get executor-stats t)}})))
+ (into {}))))
;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
-(defn mk-storm-cluster-state [cluster-state-spec]
+(defn mk-storm-cluster-state
+ [cluster-state-spec]
(let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
[false cluster-state-spec]
[true (mk-distributed-cluster-state cluster-state-spec)])
@@ -219,164 +229,171 @@
assignments-callback (atom nil)
storm-base-callback (atom {})
state-id (register
- cluster-state
- (fn [type path]
- (let [[subtree & args] (tokenize-path path)]
- (condp = subtree
- ASSIGNMENTS-ROOT (if (empty? args)
- (issue-callback! assignments-callback)
- (issue-map-callback! assignment-info-callback (first args)))
- SUPERVISORS-ROOT (issue-callback! supervisors-callback)
- STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
- ;; this should never happen
- (halt-process! 30 "Unknown callback for subtree " subtree args)
- )
- )))]
+ cluster-state
+ (fn [type path]
+ (let [[subtree & args] (tokenize-path path)]
+ (condp = subtree
+ ASSIGNMENTS-ROOT (if (empty? args)
+ (issue-callback! assignments-callback)
+ (issue-map-callback! assignment-info-callback (first args)))
+ SUPERVISORS-ROOT (issue-callback! supervisors-callback)
+ STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
+ ;; this should never happen
+ (halt-process! 30 "Unknown callback for subtree " subtree args)))))]
(doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
(mkdirs cluster-state p))
(reify
- StormClusterState
-
- (assignments [this callback]
+ StormClusterState
+
+ (assignments
+ [this callback]
(when callback
(reset! assignments-callback callback))
(get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
-
- (assignment-info [this storm-id callback]
+
+ (assignment-info
+ [this storm-id callback]
(when callback
(swap! assignment-info-callback assoc storm-id callback))
- (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback)))
- )
+ (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))
- (active-storms [this]
- (get-children cluster-state STORMS-SUBTREE false)
- )
+ (active-storms
+ [this]
+ (get-children cluster-state STORMS-SUBTREE false))
- (heartbeat-storms [this]
- (get-children cluster-state WORKERBEATS-SUBTREE false)
- )
+ (heartbeat-storms
+ [this]
+ (get-children cluster-state WORKERBEATS-SUBTREE false))
- (error-topologies [this]
- (get-children cluster-state ERRORS-SUBTREE false)
- )
+ (error-topologies
+ [this]
+ (get-children cluster-state ERRORS-SUBTREE false))
- (get-worker-heartbeat [this storm-id node port]
+ (get-worker-heartbeat
+ [this storm-id node port]
(-> cluster-state
(get-data (workerbeat-path storm-id node port) false)
maybe-deserialize))
- (executor-beats [this storm-id executor->node+port]
- ;; need to take executor->node+port in explicitly so that we don't run into a situation where a
+ (executor-beats
+ [this storm-id executor->node+port]
+ ;; need to take executor->node+port in explicitly so that we don't run into a situation where a
;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
;; we avoid situations like that
(let [node+port->executors (reverse-map executor->node+port)
all-heartbeats (for [[[node port] executors] node+port->executors]
- (->> (get-worker-heartbeat this storm-id node port)
- (convert-executor-beats executors)
- ))]
+ (->> (get-worker-heartbeat this storm-id node port)
+ (convert-executor-beats executors)
+ ))]
(apply merge all-heartbeats)))
- (supervisors [this callback]
+ (supervisors
+ [this callback]
(when callback
(reset! supervisors-callback callback))
- (get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback))
- )
+ (get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))
- (supervisor-info [this supervisor-id]
- (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false))
- )
+ (supervisor-info
+ [this supervisor-id]
+ (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)))
- (worker-heartbeat! [this storm-id node port info]
+ (worker-heartbeat!
+ [this storm-id node port info]
(set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info)))
- (remove-worker-heartbeat! [this storm-id node port]
- (delete-node cluster-state (workerbeat-path storm-id node port))
- )
+ (remove-worker-heartbeat!
+ [this storm-id node port]
+ (delete-node cluster-state (workerbeat-path storm-id node port)))
- (setup-heartbeats! [this storm-id]
+ (setup-heartbeats!
+ [this storm-id]
(mkdirs cluster-state (workerbeat-storm-root storm-id)))
- (teardown-heartbeats! [this storm-id]
+ (teardown-heartbeats!
+ [this storm-id]
(try-cause
- (delete-node cluster-state (workerbeat-storm-root storm-id))
- (catch KeeperException e
- (log-warn-error e "Could not teardown heartbeats for " storm-id)
- )))
+ (delete-node cluster-state (workerbeat-storm-root storm-id))
+ (catch KeeperException e
+ (log-warn-error e "Could not teardown heartbeats for " storm-id))))
- (teardown-topology-errors! [this storm-id]
+ (teardown-topology-errors!
+ [this storm-id]
(try-cause
- (delete-node cluster-state (error-storm-root storm-id))
- (catch KeeperException e
- (log-warn-error e "Could not teardown errors for " storm-id)
- )))
+ (delete-node cluster-state (error-storm-root storm-id))
+ (catch KeeperException e
+ (log-warn-error e "Could not teardown errors for " storm-id))))
- (supervisor-heartbeat! [this supervisor-id info]
- (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info))
- )
+ (supervisor-heartbeat!
+ [this supervisor-id info]
+ (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info)))
- (activate-storm! [this storm-id storm-base]
- (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base))
- )
+ (activate-storm!
+ [this storm-id storm-base]
+ (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)))
- (update-storm! [this storm-id new-elems]
+ (update-storm!
+ [this storm-id new-elems]
(let [base (storm-base this storm-id nil)
executors (:component->executors base)
new-elems (update new-elems :component->executors (partial merge executors))]
(set-data cluster-state (storm-path storm-id)
- (-> base
- (merge new-elems)
- Utils/serialize))))
+ (-> base
+ (merge new-elems)
+ Utils/serialize))))
- (storm-base [this storm-id callback]
+ (storm-base
+ [this storm-id callback]
(when callback
(swap! storm-base-callback assoc storm-id callback))
- (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback)))
- )
+ (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))))
- (remove-storm-base! [this storm-id]
- (delete-node cluster-state (storm-path storm-id))
- )
+ (remove-storm-base!
+ [this storm-id]
+ (delete-node cluster-state (storm-path storm-id)))
- (set-assignment! [this storm-id info]
- (set-data cluster-state (assignment-path storm-id) (Utils/serialize info))
- )
+ (set-assignment!
+ [this storm-id info]
+ (set-data cluster-state (assignment-path storm-id) (Utils/serialize info)))
- (remove-storm! [this storm-id]
+ (remove-storm!
+ [this storm-id]
(delete-node cluster-state (assignment-path storm-id))
(remove-storm-base! this storm-id))
- (report-error [this storm-id component-id error]
- (let [path (error-path storm-id component-id)
- data {:time-secs (current-time-secs) :error (stringify-error error)}
- _ (mkdirs cluster-state path)
- _ (create-sequential cluster-state (str path "/e") (Utils/serialize data))
- to-kill (->> (get-children cluster-state path false)
- (sort-by parse-error-path)
- reverse
- (drop 10))]
- (doseq [k to-kill]
- (delete-node cluster-state (str path "/" k)))))
-
- (errors [this storm-id component-id]
- (let [path (error-path storm-id component-id)
- _ (mkdirs cluster-state path)
- children (get-children cluster-state path false)
- errors (dofor [c children]
- (let [data (-> (get-data cluster-state (str path "/" c) false)
- maybe-deserialize)]
- (when data
- (struct TaskError (:error data) (:time-secs data))
- )))
- ]
- (->> (filter not-nil? errors)
- (sort-by (comp - :time-secs)))))
-
- (disconnect [this]
+ (report-error
+ [this storm-id component-id error]
+ (let [path (error-path storm-id component-id)
+ data {:time-secs (current-time-secs) :error (stringify-error error)}
+ _ (mkdirs cluster-state path)
+ _ (create-sequential cluster-state (str path "/e") (Utils/serialize data))
+ to-kill (->> (get-children cluster-state path false)
+ (sort-by parse-error-path)
+ reverse
+ (drop 10))]
+ (doseq [k to-kill]
+ (delete-node cluster-state (str path "/" k)))))
+
+ (errors
+ [this storm-id component-id]
+ (let [path (error-path storm-id component-id)
+ _ (mkdirs cluster-state path)
+ children (get-children cluster-state path false)
+ errors (dofor [c children]
+ (let [data (-> (get-data cluster-state (str path "/" c) false)
+ maybe-deserialize)]
+ (when data
+ (struct TaskError (:error data) (:time-secs data))
+ )))
+ ]
+ (->> (filter not-nil? errors)
+ (sort-by (comp - :time-secs)))))
+
+ (disconnect
+ [this]
(unregister cluster-state state-id)
(when solo?
- (close cluster-state)))
- )))
+ (close cluster-state))))))
;; daemons have a single thread that will respond to events
;; start with initialize event
@@ -395,9 +412,8 @@
;; everyone reads this in full to understand structure
;; /tasks/{storm id}/{task id} ; just contains bolt id
-
;; supervisors send heartbeats here, master doesn't subscribe but checks asynchronously
-;; /supervisors/status/{ephemeral node ids} ;; node metadata such as port ranges are kept here
+;; /supervisors/status/{ephemeral node ids} ;; node metadata such as port ranges are kept here
;; tasks send heartbeats here, master doesn't subscribe, just checks asynchronously
;; /taskbeats/{storm id}/{ephemeral task id}
@@ -406,8 +422,6 @@
;; master manipulates
;; /storms/{storm id}
-
-
;; Zookeeper flows:
;; Master:
@@ -423,7 +437,6 @@
;; 3. make new assignment to fix any problems
;; 4. if a storm exists but is not taken down fully, ensure that storm takedown is launched (step by step remove tasks and finally remove assignments)
-
;; masters only possible watches is on ephemeral nodes and tasks, and maybe not even
;; Supervisor:
@@ -439,8 +452,6 @@
;; 1. monitor assignments, reroute when assignments change
;; 2. monitor storm (when storm turns off, error if assignments change) - take down tasks as master turns them off
-
-
;; locally on supervisor: workers write pids locally on startup, supervisor deletes it on shutdown (associates pid with worker name)
;; supervisor periodically checks to make sure processes are alive
;; {rootdir}/workers/{storm id}/{worker id} ;; contains pid inside
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
index 15be94d..14beb21 100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@ -13,14 +13,14 @@
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
+
(ns backtype.storm.config
(:import [java.io FileReader File])
(:import [backtype.storm Config ConfigValidation$FieldValidator])
(:import [backtype.storm.utils Utils LocalState])
(:import [org.apache.commons.io FileUtils])
(:require [clojure [string :as str]])
- (:use [backtype.storm util])
- )
+ (:use [backtype.storm util]))
(def RESOURCES-SUBDIR "resources")
@@ -32,13 +32,11 @@
(let [name (.getName f)
new-name (clojure-config-name name)]
(eval
- `(def ~(symbol new-name) (. Config ~(symbol name))))
- ))
+ `(def ~(symbol new-name) (. Config ~(symbol name))))))
(def ALL-CONFIGS
(dofor [f (seq (.getFields Config))]
- (.get f nil)
- ))
+ (.get f nil)))
(defmulti get-FieldValidator class-selector)
@@ -48,44 +46,49 @@
(defmethod get-FieldValidator
ConfigValidation$FieldValidator [validator] validator)
-(defmethod get-FieldValidator Object [klass]
+(defmethod get-FieldValidator Object
+ [klass]
{:pre [(not (nil? klass))]}
(reify ConfigValidation$FieldValidator
(validateField [this name v]
- (if (and (not (nil? v))
- (not (instance? klass v)))
- (throw (IllegalArgumentException.
- (str "field " name " '" v "' must be a '" (.getName klass) "'")))))))
+ (if (and (not (nil? v))
+ (not (instance? klass v)))
+ (throw (IllegalArgumentException.
+ (str "field " name " '" v "' must be a '" (.getName klass) "'")))))))
;; Create a mapping of config-string -> validator
;; Config fields must have a _SCHEMA field defined
(def CONFIG-SCHEMA-MAP
(->> (.getFields Config)
- (filter #(not (re-matches #".*_SCHEMA$" (.getName %))))
- (map (fn [f] [(.get f nil) (get-FieldValidator
- (-> Config
- (.getField (str (.getName f) "_SCHEMA"))
- (.get nil)))]))
- (into {})))
-
-(defn cluster-mode [conf & args]
+ (filter #(not (re-matches #".*_SCHEMA$" (.getName %))))
+ (map (fn [f] [(.get f nil)
+ (get-FieldValidator
+ (-> Config
+ (.getField (str (.getName f) "_SCHEMA"))
+ (.get nil)))]))
+ (into {})))
+
+(defn cluster-mode
+ [conf & args]
(keyword (conf STORM-CLUSTER-MODE)))
-(defn local-mode? [conf]
+(defn local-mode?
+ [conf]
(let [mode (conf STORM-CLUSTER-MODE)]
(condp = mode
"local" true
"distributed" false
(throw (IllegalArgumentException.
- (str "Illegal cluster mode in conf: " mode)))
- )))
+ (str "Illegal cluster mode in conf: " mode))))))
-(defn sampling-rate [conf]
+(defn sampling-rate
+ [conf]
(->> (conf TOPOLOGY-STATS-SAMPLE-RATE)
(/ 1)
int))
-(defn mk-stats-sampler [conf]
+(defn mk-stats-sampler
+ [conf]
(even-sampler (sampling-rate conf)))
; storm.zookeeper.servers:
@@ -93,106 +96,124 @@
; - "server2"
; - "server3"
; nimbus.host: "master"
-;
+;
; ########### These all have default values as shown
-;
+;
; ### storm.* configs are general configurations
; # the local dir is where jars are kept
; storm.local.dir: "/mnt/storm"
; storm.zookeeper.port: 2181
; storm.zookeeper.root: "/storm"
-(defn read-default-config []
+(defn read-default-config
+ []
(clojurify-structure (Utils/readDefaultConfig)))
-(defn validate-configs-with-schemas [conf]
+(defn validate-configs-with-schemas
+ [conf]
(doseq [[k v] conf
- :let [schema (CONFIG-SCHEMA-MAP k)]]
+ :let [schema (CONFIG-SCHEMA-MAP k)]]
(if (not (nil? schema))
(.validateField schema k v))))
-(defn read-storm-config []
- (let [
- conf (clojurify-structure (Utils/readStormConfig))]
+(defn read-storm-config
+ []
+ (let [conf (clojurify-structure (Utils/readStormConfig))]
(validate-configs-with-schemas conf)
conf))
-(defn read-yaml-config [name]
+(defn read-yaml-config
+ [name]
(let [conf (clojurify-structure (Utils/findAndReadConfigFile name true))]
(validate-configs-with-schemas conf)
conf))
-(defn master-local-dir [conf]
+(defn master-local-dir
+ [conf]
(let [ret (str (conf STORM-LOCAL-DIR) file-path-separator "nimbus")]
(FileUtils/forceMkdir (File. ret))
- ret
- ))
+ ret))
(defn master-stormdist-root
([conf]
- (str (master-local-dir conf) file-path-separator "stormdist"))
+ (str (master-local-dir conf) file-path-separator "stormdist"))
([conf storm-id]
- (str (master-stormdist-root conf) file-path-separator storm-id)))
+ (str (master-stormdist-root conf) file-path-separator storm-id)))
-(defn master-stormjar-path [stormroot]
+(defn master-stormjar-path
+ [stormroot]
(str stormroot file-path-separator "stormjar.jar"))
-(defn master-stormcode-path [stormroot]
+(defn master-stormcode-path
+ [stormroot]
(str stormroot file-path-separator "stormcode.ser"))
-(defn master-stormconf-path [stormroot]
+(defn master-stormconf-path
+ [stormroot]
(str stormroot file-path-separator "stormconf.ser"))
-(defn master-inbox [conf]
+(defn master-inbox
+ [conf]
(let [ret (str (master-local-dir conf) file-path-separator "inbox")]
(FileUtils/forceMkdir (File. ret))
ret ))
-(defn master-inimbus-dir [conf]
+(defn master-inimbus-dir
+ [conf]
(str (master-local-dir conf) file-path-separator "inimbus"))
-(defn supervisor-local-dir [conf]
+(defn supervisor-local-dir
+ [conf]
(let [ret (str (conf STORM-LOCAL-DIR) file-path-separator "supervisor")]
(FileUtils/forceMkdir (File. ret))
- ret
- ))
+ ret))
-(defn supervisor-isupervisor-dir [conf]
+(defn supervisor-isupervisor-dir
+ [conf]
(str (supervisor-local-dir conf) file-path-separator "isupervisor"))
(defn supervisor-stormdist-root
- ([conf] (str (supervisor-local-dir conf) file-path-separator "stormdist"))
+ ([conf]
+ (str (supervisor-local-dir conf) file-path-separator "stormdist"))
([conf storm-id]
- (str (supervisor-stormdist-root conf) file-path-separator (url-encode storm-id))))
+ (str (supervisor-stormdist-root conf) file-path-separator (url-encode storm-id))))
-(defn supervisor-stormjar-path [stormroot]
+(defn supervisor-stormjar-path
+ [stormroot]
(str stormroot file-path-separator "stormjar.jar"))
-(defn supervisor-stormcode-path [stormroot]
+(defn supervisor-stormcode-path
+ [stormroot]
(str stormroot file-path-separator "stormcode.ser"))
-(defn supervisor-stormconf-path [stormroot]
+(defn supervisor-stormconf-path
+ [stormroot]
(str stormroot file-path-separator "stormconf.ser"))
-(defn supervisor-tmp-dir [conf]
+(defn supervisor-tmp-dir
+ [conf]
(let [ret (str (supervisor-local-dir conf) file-path-separator "tmp")]
(FileUtils/forceMkdir (File. ret))
ret ))
-(defn supervisor-storm-resources-path [stormroot]
+(defn supervisor-storm-resources-path
+ [stormroot]
(str stormroot file-path-separator RESOURCES-SUBDIR))
-(defn ^LocalState supervisor-state [conf]
+(defn ^LocalState supervisor-state
+ [conf]
(LocalState. (str (supervisor-local-dir conf) file-path-separator "localstate")))
-(defn read-supervisor-storm-conf [conf storm-id]
+(defn read-supervisor-storm-conf
+ [conf storm-id]
(let [stormroot (supervisor-stormdist-root conf storm-id)
conf-path (supervisor-stormconf-path stormroot)
topology-path (supervisor-stormcode-path stormroot)]
(merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path))))
))
-(defn read-supervisor-topology [conf storm-id]
+(defn read-supervisor-topology
+ [conf storm-id]
(let [stormroot (supervisor-stormdist-root conf storm-id)
topology-path (supervisor-stormcode-path stormroot)]
(Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)))
@@ -200,15 +221,16 @@
(defn worker-root
([conf]
- (str (conf STORM-LOCAL-DIR) file-path-separator "workers"))
+ (str (conf STORM-LOCAL-DIR) file-path-separator "workers"))
([conf id]
- (str (worker-root conf) file-path-separator id)))
+ (str (worker-root conf) file-path-separator id)))
(defn worker-pids-root
[conf id]
(str (worker-root conf id) file-path-separator "pids"))
-(defn worker-pid-path [conf id pid]
+(defn worker-pid-path
+ [conf id pid]
(str (worker-pids-root conf id) file-path-separator pid))
(defn worker-heartbeats-root
@@ -218,5 +240,6 @@
;; workers heartbeat here with pid and timestamp
;; if supervisor stops receiving heartbeat, it kills and restarts the process
;; in local mode, keep a global map of ids to threads for simulating process management
-(defn ^LocalState worker-state [conf id]
+(defn ^LocalState worker-state
+ [conf id]
(LocalState. (worker-heartbeats-root conf id)))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/disruptor.clj b/storm-core/src/clj/backtype/storm/disruptor.clj
index 9456d1a..d8c5c91 100644
--- a/storm-core/src/clj/backtype/storm/disruptor.clj
+++ b/storm-core/src/clj/backtype/storm/disruptor.clj
@@ -13,89 +13,90 @@
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
+
(ns backtype.storm.disruptor
(:import [backtype.storm.utils DisruptorQueue])
(:import [com.lmax.disruptor MultiThreadedClaimStrategy SingleThreadedClaimStrategy
- BlockingWaitStrategy SleepingWaitStrategy YieldingWaitStrategy
- BusySpinWaitStrategy])
+ BlockingWaitStrategy SleepingWaitStrategy YieldingWaitStrategy
+ BusySpinWaitStrategy])
(:require [clojure [string :as str]])
(:require [clojure [set :as set]])
(:use [clojure walk])
- (:use [backtype.storm util log])
- )
+ (:use [backtype.storm util log]))
(def CLAIM-STRATEGY
{:multi-threaded (fn [size] (MultiThreadedClaimStrategy. (int size)))
- :single-threaded (fn [size] (SingleThreadedClaimStrategy. (int size)))
- })
-
+ :single-threaded (fn [size] (SingleThreadedClaimStrategy. (int size)))})
+
(def WAIT-STRATEGY
{:block (fn [] (BlockingWaitStrategy.))
:yield (fn [] (YieldingWaitStrategy.))
:sleep (fn [] (SleepingWaitStrategy.))
- :spin (fn [] (BusySpinWaitStrategy.))
- })
-
+ :spin (fn [] (BusySpinWaitStrategy.))})
-(defn- mk-wait-strategy [spec]
+(defn- mk-wait-strategy
+ [spec]
(if (keyword? spec)
((WAIT-STRATEGY spec))
- (-> (str spec) new-instance)
- ))
+ (-> (str spec) new-instance)))
;; :block strategy requires using a timeout on waitFor (implemented in DisruptorQueue), as sometimes the consumer stays blocked even when there's an item on the queue.
;; This would manifest itself in Trident when doing 1 batch at a time processing, and the ack_init message
-;; wouldn't make it to the acker until the batch timed out and another tuple was played into the queue,
+;; wouldn't make it to the acker until the batch timed out and another tuple was played into the queue,
;; unblocking the consumer
-(defnk disruptor-queue [buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
+(defnk disruptor-queue
+ [buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
(DisruptorQueue. ((CLAIM-STRATEGY claim-strategy) buffer-size)
- (mk-wait-strategy wait-strategy)
- ))
+ (mk-wait-strategy wait-strategy)))
-(defn clojure-handler [afn]
+(defn clojure-handler
+ [afn]
(reify com.lmax.disruptor.EventHandler
- (onEvent [this o seq-id batchEnd?]
- (afn o seq-id batchEnd?)
- )))
+ (onEvent
+ [this o seq-id batchEnd?]
+ (afn o seq-id batchEnd?))))
-(defmacro handler [& args]
+(defmacro handler
+ [& args]
`(clojure-handler (fn ~@args)))
(defn publish
([^DisruptorQueue q o block?]
- (.publish q o block?))
+ (.publish q o block?))
([q o]
- (publish q o true)))
+ (publish q o true)))
-(defn try-publish [^DisruptorQueue q o]
+(defn try-publish
+ [^DisruptorQueue q o]
(.tryPublish q o))
-(defn consume-batch [^DisruptorQueue queue handler]
+(defn consume-batch
+ [^DisruptorQueue queue handler]
(.consumeBatch queue handler))
-(defn consume-batch-when-available [^DisruptorQueue queue handler]
+(defn consume-batch-when-available
+ [^DisruptorQueue queue handler]
(.consumeBatchWhenAvailable queue handler))
-(defn consumer-started! [^DisruptorQueue queue]
+(defn consumer-started!
+ [^DisruptorQueue queue]
(.consumerStarted queue))
-(defn halt-with-interrupt! [^DisruptorQueue queue]
+(defn halt-with-interrupt!
+ [^DisruptorQueue queue]
(.haltWithInterrupt queue))
-(defnk consume-loop* [^DisruptorQueue queue handler :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
- :thread-name nil]
+(defnk consume-loop*
+ [^DisruptorQueue queue handler
+ :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
+ :thread-name nil]
(let [ret (async-loop
- (fn []
- (consume-batch-when-available queue handler)
- 0 )
+ (fn [] (consume-batch-when-available queue handler) 0)
:kill-fn kill-fn
- :thread-name thread-name
- )]
- (consumer-started! queue)
- ret
- ))
+ :thread-name thread-name)]
+ (consumer-started! queue)
+ ret))
(defmacro consume-loop [queue & handler-args]
`(let [handler# (handler ~@handler-args)]
- (consume-loop* ~queue handler#)
- ))
+ (consume-loop* ~queue handler#)))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/event.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/event.clj b/storm-core/src/clj/backtype/storm/event.clj
index 70ba8a6..f92c9bb 100644
--- a/storm-core/src/clj/backtype/storm/event.clj
+++ b/storm-core/src/clj/backtype/storm/event.clj
@@ -13,11 +13,11 @@
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
+
(ns backtype.storm.event
(:use [backtype.storm log util])
(:import [backtype.storm.utils Time Utils])
- (:import [java.util.concurrent LinkedBlockingQueue TimeUnit])
- )
+ (:import [java.util.concurrent LinkedBlockingQueue TimeUnit]))
(defprotocol EventManager
(add [this event-fn])
@@ -32,36 +32,37 @@
^LinkedBlockingQueue queue (LinkedBlockingQueue.)
running (atom true)
runner (Thread.
- (fn []
- (try-cause
- (while @running
- (let [r (.take queue)]
- (r)
- (swap! processed inc)))
- (catch InterruptedException t
- (log-message "Event manager interrupted"))
- (catch Throwable t
- (log-error t "Error when processing event")
- (halt-process! 20 "Error when processing an event"))
- )))]
+ (fn []
+ (try-cause
+ (while @running
+ (let [r (.take queue)]
+ (r)
+ (swap! processed inc)))
+ (catch InterruptedException t
+ (log-message "Event manager interrupted"))
+ (catch Throwable t
+ (log-error t "Error when processing event")
+ (halt-process! 20 "Error when processing an event")))))]
(.setDaemon runner daemon?)
(.start runner)
(reify
EventManager
- (add [this event-fn]
+
+ (add
+ [this event-fn]
;; should keep track of total added and processed to know if this is finished yet
(when-not @running
(throw (RuntimeException. "Cannot add events to a shutdown event manager")))
(swap! added inc)
- (.put queue event-fn)
- )
- (waiting? [this]
+ (.put queue event-fn))
+
+ (waiting?
+ [this]
(or (Time/isThreadWaiting runner)
- (= @processed @added)
- ))
- (shutdown [this]
+ (= @processed @added)))
+
+ (shutdown
+ [this]
(reset! running false)
(.interrupt runner)
- (.join runner)
- )
- )))
+ (.join runner)))))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/log.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/log.clj b/storm-core/src/clj/backtype/storm/log.clj
index adb2774..0fcf822 100644
--- a/storm-core/src/clj/backtype/storm/log.clj
+++ b/storm-core/src/clj/backtype/storm/log.clj
@@ -13,26 +13,34 @@
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
+
(ns backtype.storm.log
(:require [clojure.tools [logging :as log]]))
-(defmacro log-message [& args]
+(defmacro log-message
+ [& args]
`(log/info (str ~@args)))
-(defmacro log-error [e & args]
+(defmacro log-error
+ [e & args]
`(log/log :error ~e (str ~@args)))
-(defmacro log-debug [& args]
+(defmacro log-debug
+ [& args]
`(log/debug (str ~@args)))
-(defmacro log-warn-error [e & args]
+(defmacro log-warn-error
+ [e & args]
`(log/warn (str ~@args) ~e))
-(defmacro log-warn [& args]
+(defmacro log-warn
+ [& args]
`(log/warn (str ~@args)))
-(defn log-capture! [& args]
+(defn log-capture!
+ [& args]
(apply log/log-capture! args))
-(defn log-stream [& args]
+(defn log-stream
+ [& args]
(apply log/log-stream args))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/process_simulator.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/process_simulator.clj b/storm-core/src/clj/backtype/storm/process_simulator.clj
index 0446a98..e0cf6ed 100644
--- a/storm-core/src/clj/backtype/storm/process_simulator.clj
+++ b/storm-core/src/clj/backtype/storm/process_simulator.clj
@@ -13,9 +13,9 @@
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
+
(ns backtype.storm.process-simulator
- (:use [backtype.storm log util])
- )
+ (:use [backtype.storm log util]))
(def pid-counter (mk-counter))
@@ -26,23 +26,26 @@
(defn register-process [pid shutdownable]
(swap! process-map assoc pid shutdownable))
-(defn process-handle [pid]
+(defn process-handle
+ [pid]
(@process-map pid))
-(defn all-processes []
+(defn all-processes
+ []
(vals @process-map))
-(defn kill-process [pid]
- (locking kill-lock ; in case cluster shuts down while supervisor is
- ; killing a task
+(defn kill-process
+ "Uses `locking` in case cluster shuts down while supervisor is
+ killing a task"
+ [pid]
+ (locking kill-lock
(log-message "Killing process " pid)
(let [shutdownable (process-handle pid)]
(swap! process-map dissoc pid)
(when shutdownable
- (.shutdown shutdownable))
- )))
+ (.shutdown shutdownable)))))
-(defn kill-all-processes []
+(defn kill-all-processes
+ []
(doseq [pid (keys @process-map)]
- (kill-process pid)
- ))
+ (kill-process pid)))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/stats.clj b/storm-core/src/clj/backtype/storm/stats.clj
index 944d2b6..b872c6f 100644
--- a/storm-core/src/clj/backtype/storm/stats.clj
+++ b/storm-core/src/clj/backtype/storm/stats.clj
@@ -13,6 +13,7 @@
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
+
(ns backtype.storm.stats
(:import [backtype.storm.generated Nimbus Nimbus$Processor Nimbus$Iface StormTopology ShellComponent
NotAliveException AlreadyAliveException InvalidTopologyException GlobalStreamId
@@ -23,43 +24,44 @@
;;TODO: consider replacing this with some sort of RRD
-(defn curr-time-bucket [^Integer time-secs ^Integer bucket-size-secs]
- (* bucket-size-secs (unchecked-divide-int time-secs bucket-size-secs))
- )
+(defn curr-time-bucket
+ [^Integer time-secs ^Integer bucket-size-secs]
+ (* bucket-size-secs (unchecked-divide-int time-secs bucket-size-secs)))
-(defrecord RollingWindow [updater merger extractor bucket-size-secs num-buckets buckets])
+(defrecord RollingWindow
+ [updater merger extractor bucket-size-secs num-buckets buckets])
-(defn rolling-window [updater merger extractor bucket-size-secs num-buckets]
+(defn rolling-window
+ [updater merger extractor bucket-size-secs num-buckets]
(RollingWindow. updater merger extractor bucket-size-secs num-buckets {}))
(defn update-rolling-window
([^RollingWindow rw time-secs & args]
- ;; this is 2.5x faster than using update-in...
- (let [time-bucket (curr-time-bucket time-secs (:bucket-size-secs rw))
- buckets (:buckets rw)
- curr (get buckets time-bucket)
- curr (apply (:updater rw) curr args)
- ]
- (assoc rw :buckets (assoc buckets time-bucket curr))
- )))
-
-(defn value-rolling-window [^RollingWindow rw]
+ ;; this is 2.5x faster than using update-in...
+ (let [time-bucket (curr-time-bucket time-secs (:bucket-size-secs rw))
+ buckets (:buckets rw)
+ curr (get buckets time-bucket)
+ curr (apply (:updater rw) curr args)]
+ (assoc rw :buckets (assoc buckets time-bucket curr)))))
+
+(defn value-rolling-window
+ [^RollingWindow rw]
((:extractor rw)
(let [values (vals (:buckets rw))]
- (apply (:merger rw) values)
- )))
+ (apply (:merger rw) values))))
-(defn cleanup-rolling-window [^RollingWindow rw]
+(defn cleanup-rolling-window
+ [^RollingWindow rw]
(let [buckets (:buckets rw)
cutoff (- (current-time-secs)
(* (:num-buckets rw)
(:bucket-size-secs rw)))
to-remove (filter #(< % cutoff) (keys buckets))
buckets (apply dissoc buckets to-remove)]
- (assoc rw :buckets buckets)
- ))
+ (assoc rw :buckets buckets)))
-(defn rolling-window-size [^RollingWindow rw]
+(defn rolling-window-size
+ [^RollingWindow rw]
(* (:bucket-size-secs rw) (:num-buckets rw)))
(defrecord RollingWindowSet [updater extractor windows all-time])
@@ -70,49 +72,52 @@
(defn update-rolling-window-set
([^RollingWindowSet rws & args]
- (let [now (current-time-secs)
- new-windows (dofor [w (:windows rws)]
- (apply update-rolling-window w now args))]
- (assoc rws :windows new-windows :all-time (apply (:updater rws) (:all-time rws) args))
- )))
+ (let [now (current-time-secs)
+ new-windows (dofor [w (:windows rws)]
+ (apply update-rolling-window w now args))]
+ (assoc rws
+ :windows new-windows
+ :all-time (apply (:updater rws) (:all-time rws) args)))))
(defn cleanup-rolling-window-set
([^RollingWindowSet rws]
- (let [windows (:windows rws)]
- (assoc rws :windows (map cleanup-rolling-window windows))
- )))
+ (let [windows (:windows rws)]
+ (assoc rws :windows (map cleanup-rolling-window windows)))))
-(defn value-rolling-window-set [^RollingWindowSet rws]
+(defn value-rolling-window-set
+ [^RollingWindowSet rws]
(merge
- (into {}
- (for [w (:windows rws)]
- {(rolling-window-size w) (value-rolling-window w)}
- ))
- {:all-time ((:extractor rws) (:all-time rws))}))
+ (into {}
+ (for [w (:windows rws)]
+ {(rolling-window-size w) (value-rolling-window w)}
+ ))
+ {:all-time ((:extractor rws) (:all-time rws))}))
(defn- incr-val
([amap key]
- (incr-val amap key 1))
+ (incr-val amap key 1))
([amap key amt]
- (let [val (get amap key (long 0))]
- (assoc amap key (+ val amt))
- )))
+ (let [val (get amap key (long 0))]
+ (assoc amap key (+ val amt)))))
-(defn- update-avg [curr val]
+(defn- update-avg
+ [curr val]
(if curr
[(+ (first curr) val) (inc (second curr))]
- [val (long 1)]
- ))
+ [val (long 1)]))
-(defn- merge-avg [& avg]
+(defn- merge-avg
+ [& avg]
[(apply + (map first avg))
(apply + (map second avg))
])
-(defn- extract-avg [pair]
+(defn- extract-avg
+ [pair]
(double (/ (first pair) (second pair))))
-(defn- update-keyed-avg [amap key val]
+(defn- update-keyed-avg
+ [amap key val]
(assoc amap key (update-avg (get amap key) val)))
(defn- merge-keyed-avg [& vals]
@@ -124,14 +129,16 @@
(defn- counter-extract [v]
(if v v {}))
-(defn keyed-counter-rolling-window-set [num-buckets & bucket-sizes]
+(defn keyed-counter-rolling-window-set
+ [num-buckets & bucket-sizes]
(apply rolling-window-set incr-val (partial merge-with +) counter-extract num-buckets bucket-sizes))
-(defn avg-rolling-window-set [num-buckets & bucket-sizes]
- (apply rolling-window-set update-avg merge-avg extract-avg num-buckets bucket-sizes)
- )
+(defn avg-rolling-window-set
+ [num-buckets & bucket-sizes]
+ (apply rolling-window-set update-avg merge-avg extract-avg num-buckets bucket-sizes))
-(defn keyed-avg-rolling-window-set [num-buckets & bucket-sizes]
+(defn keyed-avg-rolling-window-set
+ [num-buckets & bucket-sizes]
(apply rolling-window-set update-keyed-avg merge-keyed-avg extract-keyed-avg num-buckets bucket-sizes))
;; (defn choose-bucket [val buckets]
@@ -169,160 +176,166 @@
;; 10 minutes, 3 hours, 1 day
(def STAT-BUCKETS [30 540 4320])
-(defn- mk-common-stats [rate]
- (CommonStats. (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- rate
- ))
-
-(defn mk-bolt-stats [rate]
- (BoltExecutorStats. (mk-common-stats rate)
- (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- ))
-
-(defn mk-spout-stats [rate]
- (SpoutExecutorStats. (mk-common-stats rate)
- (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- ))
-
-(defmacro update-executor-stat! [stats path & args]
+(defn- mk-common-stats
+ [rate]
+ (CommonStats.
+ (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+ (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+ rate))
+
+(defn mk-bolt-stats
+ [rate]
+ (BoltExecutorStats.
+ (mk-common-stats rate)
+ (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+ (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+ (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+ (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+ (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))
+
+(defn mk-spout-stats
+ [rate]
+ (SpoutExecutorStats.
+ (mk-common-stats rate)
+ (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+ (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+ (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))
+
+(defmacro update-executor-stat!
+ [stats path & args]
(let [path (collectify path)]
- `(swap! (-> ~stats ~@path) update-rolling-window-set ~@args)
- ))
+ `(swap! (-> ~stats ~@path) update-rolling-window-set ~@args)))
-(defmacro stats-rate [stats]
+(defmacro stats-rate
+ [stats]
`(-> ~stats :common :rate))
-(defn emitted-tuple! [stats stream]
+(defn emitted-tuple!
+ [stats stream]
(update-executor-stat! stats [:common :emitted] stream (stats-rate stats)))
-(defn transferred-tuples! [stats stream amt]
+(defn transferred-tuples!
+ [stats stream amt]
(update-executor-stat! stats [:common :transferred] stream (* (stats-rate stats) amt)))
-(defn bolt-execute-tuple! [^BoltExecutorStats stats component stream latency-ms]
+(defn bolt-execute-tuple!
+ [^BoltExecutorStats stats component stream latency-ms]
(let [key [component stream]]
(update-executor-stat! stats :executed key (stats-rate stats))
- (update-executor-stat! stats :execute-latencies key latency-ms)
- ))
+ (update-executor-stat! stats :execute-latencies key latency-ms)))
-(defn bolt-acked-tuple! [^BoltExecutorStats stats component stream latency-ms]
+(defn bolt-acked-tuple!
+ [^BoltExecutorStats stats component stream latency-ms]
(let [key [component stream]]
(update-executor-stat! stats :acked key (stats-rate stats))
- (update-executor-stat! stats :process-latencies key latency-ms)
- ))
+ (update-executor-stat! stats :process-latencies key latency-ms)))
-(defn bolt-failed-tuple! [^BoltExecutorStats stats component stream latency-ms]
+(defn bolt-failed-tuple!
+ [^BoltExecutorStats stats component stream latency-ms]
(let [key [component stream]]
- (update-executor-stat! stats :failed key (stats-rate stats))
- ))
+ (update-executor-stat! stats :failed key (stats-rate stats))))
-(defn spout-acked-tuple! [^SpoutExecutorStats stats stream latency-ms]
+(defn spout-acked-tuple!
+ [^SpoutExecutorStats stats stream latency-ms]
(update-executor-stat! stats :acked stream (stats-rate stats))
- (update-executor-stat! stats :complete-latencies stream latency-ms)
- )
+ (update-executor-stat! stats :complete-latencies stream latency-ms))
-(defn spout-failed-tuple! [^SpoutExecutorStats stats stream latency-ms]
+(defn spout-failed-tuple!
+ [^SpoutExecutorStats stats stream latency-ms]
(update-executor-stat! stats :failed stream (stats-rate stats))
)
(defn- cleanup-stat! [stat]
(swap! stat cleanup-rolling-window-set))
-(defn- cleanup-common-stats! [^CommonStats stats]
+(defn- cleanup-common-stats!
+ [^CommonStats stats]
(doseq [f COMMON-FIELDS]
- (cleanup-stat! (f stats))
- ))
+ (cleanup-stat! (f stats))))
-(defn cleanup-bolt-stats! [^BoltExecutorStats stats]
+(defn cleanup-bolt-stats!
+ [^BoltExecutorStats stats]
(cleanup-common-stats! (:common stats))
(doseq [f BOLT-FIELDS]
- (cleanup-stat! (f stats))
- ))
+ (cleanup-stat! (f stats))))
-(defn cleanup-spout-stats! [^SpoutExecutorStats stats]
+(defn cleanup-spout-stats!
+ [^SpoutExecutorStats stats]
(cleanup-common-stats! (:common stats))
(doseq [f SPOUT-FIELDS]
- (cleanup-stat! (f stats))
- ))
+ (cleanup-stat! (f stats))))
-(defn- value-stats [stats fields]
- (into
- {}
- (dofor [f fields]
- [f (value-rolling-window-set @(f stats))]
- )))
+(defn- value-stats
+ [stats fields]
+ (into {} (dofor [f fields]
+ [f (value-rolling-window-set @(f stats))])))
-(defn- value-common-stats [^CommonStats stats]
+(defn- value-common-stats
+ [^CommonStats stats]
(merge
- (value-stats stats COMMON-FIELDS)
- {:rate (:rate stats)}))
+ (value-stats stats COMMON-FIELDS)
+ {:rate (:rate stats)}))
-(defn value-bolt-stats! [^BoltExecutorStats stats]
+(defn value-bolt-stats!
+ [^BoltExecutorStats stats]
(cleanup-bolt-stats! stats)
(merge (value-common-stats (:common stats))
(value-stats stats BOLT-FIELDS)
{:type :bolt}))
-(defn value-spout-stats! [^SpoutExecutorStats stats]
+(defn value-spout-stats!
+ [^SpoutExecutorStats stats]
(cleanup-spout-stats! stats)
(merge (value-common-stats (:common stats))
(value-stats stats SPOUT-FIELDS)
{:type :spout}))
-
(defmulti render-stats! class-selector)
-(defmethod render-stats! SpoutExecutorStats [stats]
+(defmethod render-stats! SpoutExecutorStats
+ [stats]
(value-spout-stats! stats))
-(defmethod render-stats! BoltExecutorStats [stats]
+(defmethod render-stats! BoltExecutorStats
+ [stats]
(value-bolt-stats! stats))
(defmulti thriftify-specific-stats :type)
(defn window-set-converter
([stats key-fn]
- ;; make the first key a string,
- (into {}
- (for [[k v] stats]
- [(str k)
- (into {}
- (for [[k2 v2] v]
- [(key-fn k2) v2]))]
- )
- ))
+ ;; make the first key a string,
+ (into {}
+ (for [[k v] stats]
+ [(str k)
+ (into {} (for [[k2 v2] v]
+ [(key-fn k2) v2]))])))
([stats]
- (window-set-converter stats identity)))
+ (window-set-converter stats identity)))
-(defn to-global-stream-id [[component stream]]
- (GlobalStreamId. component stream)
- )
+(defn to-global-stream-id
+ [[component stream]]
+ (GlobalStreamId. component stream))
(defmethod thriftify-specific-stats :bolt
[stats]
(ExecutorSpecificStats/bolt
- (BoltStats. (window-set-converter (:acked stats) to-global-stream-id)
- (window-set-converter (:failed stats) to-global-stream-id)
- (window-set-converter (:process-latencies stats) to-global-stream-id)
- (window-set-converter (:executed stats) to-global-stream-id)
- (window-set-converter (:execute-latencies stats) to-global-stream-id)
- )))
+ (BoltStats.
+ (window-set-converter (:acked stats) to-global-stream-id)
+ (window-set-converter (:failed stats) to-global-stream-id)
+ (window-set-converter (:process-latencies stats) to-global-stream-id)
+ (window-set-converter (:executed stats) to-global-stream-id)
+ (window-set-converter (:execute-latencies stats) to-global-stream-id))))
(defmethod thriftify-specific-stats :spout
[stats]
(ExecutorSpecificStats/spout
- (SpoutStats. (window-set-converter (:acked stats))
- (window-set-converter (:failed stats))
- (window-set-converter (:complete-latencies stats)))
- ))
+ (SpoutStats. (window-set-converter (:acked stats))
+ (window-set-converter (:failed stats))
+ (window-set-converter (:complete-latencies stats)))))
-(defn thriftify-executor-stats [stats]
+(defn thriftify-executor-stats
+ [stats]
(let [specific-stats (thriftify-specific-stats stats)]
(ExecutorStats. (window-set-converter (:emitted stats))
(window-set-converter (:transferred stats))