You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/12 23:11:02 UTC

[04/14] git commit: move towards idiomatic Clojure style

move towards idiomatic Clojure style

Summary:
* When using defn, put function arguments on a separate line.
* Remove dangling ),],}
* Try to keep lines at 80 characters or less

Going forward, I’d like to see
* more docstrings
* most lines under 80 characters

Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/2278fc96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/2278fc96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/2278fc96

Branch: refs/heads/security
Commit: 2278fc9623d5b71bca9ece3857e645e85ab1562f
Parents: eee8b24
Author: David James <da...@bluemontlabs.com>
Authored: Sat May 31 13:55:54 2014 -0400
Committer: David James <da...@bluemontlabs.com>
Committed: Sat May 31 13:55:54 2014 -0400

----------------------------------------------------------------------
 .../src/clj/backtype/storm/LocalCluster.clj     |   71 +-
 storm-core/src/clj/backtype/storm/LocalDRPC.clj |    5 +-
 storm-core/src/clj/backtype/storm/bootstrap.clj |    9 +-
 storm-core/src/clj/backtype/storm/clojure.clj   |   10 +-
 storm-core/src/clj/backtype/storm/cluster.clj   |  427 +++---
 storm-core/src/clj/backtype/storm/config.clj    |  145 +-
 storm-core/src/clj/backtype/storm/disruptor.clj |   83 +-
 storm-core/src/clj/backtype/storm/event.clj     |   49 +-
 storm-core/src/clj/backtype/storm/log.clj       |   22 +-
 .../clj/backtype/storm/process_simulator.clj    |   27 +-
 storm-core/src/clj/backtype/storm/stats.clj     |  289 ++--
 storm-core/src/clj/backtype/storm/testing.clj   |  523 ++++----
 storm-core/src/clj/backtype/storm/testing4j.clj |   58 +-
 storm-core/src/clj/backtype/storm/thrift.clj    |  255 ++--
 storm-core/src/clj/backtype/storm/timer.clj     |   94 +-
 storm-core/src/clj/backtype/storm/tuple.clj     |    7 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   | 1262 +++++++++---------
 storm-core/src/clj/backtype/storm/util.clj      |  732 +++++-----
 storm-core/src/clj/backtype/storm/zookeeper.clj |  158 ++-
 19 files changed, 2205 insertions(+), 2021 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/LocalCluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/LocalCluster.clj b/storm-core/src/clj/backtype/storm/LocalCluster.clj
index 77f3b3f..dc8214d 100644
--- a/storm-core/src/clj/backtype/storm/LocalCluster.clj
+++ b/storm-core/src/clj/backtype/storm/LocalCluster.clj
@@ -13,66 +13,75 @@
 ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
+
 (ns backtype.storm.LocalCluster
   (:use [backtype.storm testing config])
   (:import [java.util Map])
   (:gen-class
-   :init init
-   :implements [backtype.storm.ILocalCluster]
-   :constructors {[] [] [java.util.Map] []}
-   :state state ))
+    :init init
+    :implements [backtype.storm.ILocalCluster]
+    :constructors {[] [] [java.util.Map] []}
+    :state state))
 
 (defn -init
   ([]
-     (let [ret (mk-local-storm-cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})]
-       [[] ret]
-       ))
+   (let [ret (mk-local-storm-cluster
+               :daemon-conf
+               {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})]
+     [[] ret]))
   ([^Map stateMap]
-     [[] stateMap]))
+   [[] stateMap]))
 
-(defn -submitTopology [this name conf topology]
-  (submit-local-topology (:nimbus (. this state))
-                      name
-                      conf
-                      topology))
+(defn -submitTopology
+  [this name conf topology]
+  (submit-local-topology
+    (:nimbus (. this state)) name conf topology))
 
-(defn -submitTopologyWithOpts [this name conf topology submit-opts]
-  (submit-local-topology-with-opts (:nimbus (. this state))
-                      name
-                      conf
-                      topology
-                      submit-opts))
+(defn -submitTopologyWithOpts
+  [this name conf topology submit-opts]
+  (submit-local-topology-with-opts
+    (:nimbus (. this state)) name conf topology submit-opts))
 
-(defn -shutdown [this]
+(defn -shutdown
+  [this]
   (kill-local-storm-cluster (. this state)))
 
-(defn -killTopology [this name]
+(defn -killTopology
+  [this name]
   (.killTopology (:nimbus (. this state)) name))
 
-(defn -getTopologyConf [this id]
+(defn -getTopologyConf
+  [this id]
   (.getTopologyConf (:nimbus (. this state)) id))
 
-(defn -getTopology [this id]
+(defn -getTopology
+  [this id]
   (.getTopology (:nimbus (. this state)) id))
 
-(defn -getClusterInfo [this]
+(defn -getClusterInfo
+  [this]
   (.getClusterInfo (:nimbus (. this state))))
 
-(defn -getTopologyInfo [this id]
+(defn -getTopologyInfo
+  [this id]
   (.getTopologyInfo (:nimbus (. this state)) id))
 
-(defn -killTopologyWithOpts [this name opts]
+(defn -killTopologyWithOpts
+  [this name opts]
   (.killTopologyWithOpts (:nimbus (. this state)) name opts))
 
-(defn -activate [this name]
+(defn -activate
+  [this name]
   (.activate (:nimbus (. this state)) name))
 
-(defn -deactivate [this name]
+(defn -deactivate
+  [this name]
   (.deactivate (:nimbus (. this state)) name))
 
-(defn -rebalance [this name opts]
+(defn -rebalance
+  [this name opts]
   (.rebalance (:nimbus (. this state)) name opts))
 
-(defn -getState [this]
+(defn -getState
+  [this]
   (.state this))
-

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/LocalDRPC.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/LocalDRPC.clj b/storm-core/src/clj/backtype/storm/LocalDRPC.clj
index a6dab95..daead2e 100644
--- a/storm-core/src/clj/backtype/storm/LocalDRPC.clj
+++ b/storm-core/src/clj/backtype/storm/LocalDRPC.clj
@@ -13,6 +13,7 @@
 ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
+
 (ns backtype.storm.LocalDRPC
   (:require [backtype.storm.daemon [drpc :as drpc]])
   (:use [backtype.storm util])
@@ -45,9 +46,9 @@
 (defn -failRequest [this id]
   (.failRequest (:handler (. this state)) id)
   )
-  
+
 (defn -getServiceId [this]
-  (:service-id (. this state)))  
+  (:service-id (. this state)))
 
 (defn -shutdown [this]
   (ServiceRegistry/unregisterService (:service-id (. this state)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/bootstrap.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/bootstrap.clj b/storm-core/src/clj/backtype/storm/bootstrap.clj
index 5f34ff1..3ee6f76 100644
--- a/storm-core/src/clj/backtype/storm/bootstrap.clj
+++ b/storm-core/src/clj/backtype/storm/bootstrap.clj
@@ -13,9 +13,11 @@
 ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
+
 (ns backtype.storm.bootstrap)
 
-(defmacro bootstrap []
+(defmacro bootstrap
+  []
   '(do
      (import (quote [backtype.storm Constants]))
      (import (quote [backtype.storm.testing FeederSpout TestPlannerBolt TestPlannerSpout
@@ -32,7 +34,7 @@
      (import (quote [backtype.storm.task IBolt IOutputCollector
                      OutputCollector TopologyContext ShellBolt
                      GeneralTopologyContext WorkerTopologyContext]))
-     (import (quote [backtype.storm.coordination CoordinatedBolt CoordinatedBolt$SourceArgs 
+     (import (quote [backtype.storm.coordination CoordinatedBolt CoordinatedBolt$SourceArgs
                      IBatchBolt BatchBoltExecutor]))
      (import (quote [backtype.storm.drpc KeyedFairBolt]))
      (import (quote [backtype.storm.daemon Shutdownable]))
@@ -58,5 +60,4 @@
      (import (quote [backtype.storm.grouping CustomStreamGrouping]))
      (import (quote [java.io File FileOutputStream FileInputStream]))
      (import (quote [java.util Collection List Random Map HashMap Collections ArrayList LinkedList]))
-     (import (quote [org.apache.commons.io FileUtils]))
-     ))
+     (import (quote [org.apache.commons.io FileUtils]))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/clojure.clj b/storm-core/src/clj/backtype/storm/clojure.clj
index 33d204b..fdae7cb 100644
--- a/storm-core/src/clj/backtype/storm/clojure.clj
+++ b/storm-core/src/clj/backtype/storm/clojure.clj
@@ -13,6 +13,7 @@
 ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
+
 (ns backtype.storm.clojure
   (:use [backtype.storm bootstrap util])
   (:import [backtype.storm StormSubmitter])
@@ -25,7 +26,6 @@
   (:import [java.util List])
   (:require [backtype.storm [thrift :as thrift]]))
 
-
 (defn direct-stream [fields]
   (StreamInfo. fields true))
 
@@ -145,9 +145,9 @@
   (tuple-values [this collector ^String stream]
     (let [^TopologyContext context (:context collector)
           fields (..  context (getThisOutputFields stream) toList) ]
-      (vec (map (into 
-                  (empty this) (for [[k v] this] 
-                                   [(if (keyword? k) (name k) k) v])) 
+      (vec (map (into
+                  (empty this) (for [[k v] this]
+                                   [(if (keyword? k) (name k) k) v]))
                 fields))))
   java.util.List
   (tuple-values [this collector stream]
@@ -195,7 +195,7 @@
 (defn submit-remote-topology [name conf topology]
   (StormSubmitter/submitTopology name conf topology))
 
-(defn local-cluster []  
+(defn local-cluster []
   ;; do this to avoid a cyclic dependency of
   ;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster
   (eval '(new backtype.storm.LocalCluster)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index b5c1e3b..e370b97 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -13,30 +13,30 @@
 ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
+
 (ns backtype.storm.cluster
   (:import [org.apache.zookeeper.data Stat])
   (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException])
   (:import [backtype.storm.utils Utils])
   (:use [backtype.storm util log config])
   (:require [backtype.storm [zookeeper :as zk]])
-  (:require [backtype.storm.daemon [common :as common]])
-  
-  )
+  (:require [backtype.storm.daemon [common :as common]]))
 
 (defprotocol ClusterState
   (set-ephemeral-node [this path data])
   (delete-node [this path])
   (create-sequential [this path data])
-  (set-data [this path data])  ;; if node does not exist, create persistent with this data 
+  ;; if node does not exist, create persistent with this data
+  (set-data [this path data])
   (get-data [this path watch?])
   (get-children [this path watch?])
   (mkdirs [this path])
   (close [this])
   (register [this callback])
-  (unregister [this id])
-  )
+  (unregister [this id]))
 
-(defn mk-distributed-cluster-state [conf]
+(defn mk-distributed-cluster-state
+  [conf]
   (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)]
     (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
     (.close zk))
@@ -48,83 +48,85 @@
                          :auth-conf conf
                          :root (conf STORM-ZOOKEEPER-ROOT)
                          :watcher (fn [state type path]
-                                     (when @active
-                                       (when-not (= :connected state)
-                                         (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
-                                       (when-not (= :none type)
-                                         (doseq [callback (vals @callbacks)]
-                                           (callback type path))))
-                                       ))]
+                                    (when @active
+                                      (when-not (= :connected state)
+                                        (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
+                                      (when-not (= :none type)
+                                        (doseq [callback (vals @callbacks)]
+                                          (callback type path))))))]
     (reify
      ClusterState
-     (register [this callback]
-               (let [id (uuid)]
-                 (swap! callbacks assoc id callback)
-                 id
-                 ))
-     (unregister [this id]
-                 (swap! callbacks dissoc id))
-
-     (set-ephemeral-node [this path data]
-                         (zk/mkdirs zk (parent-path path))
-                         (if (zk/exists zk path false)
-                           (try-cause
-                             (zk/set-data zk path data) ; should verify that it's ephemeral
-                             (catch KeeperException$NoNodeException e
-                               (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
-                               (zk/create-node zk path data :ephemeral)
-                               ))
-                           (zk/create-node zk path data :ephemeral)
-                           ))
-     
-     (create-sequential [this path data]
+
+     (register
+       [this callback]
+       (let [id (uuid)]
+         (swap! callbacks assoc id callback)
+         id))
+
+     (unregister
+       [this id]
+       (swap! callbacks dissoc id))
+
+     (set-ephemeral-node
+       [this path data]
+       (zk/mkdirs zk (parent-path path))
+       (if (zk/exists zk path false)
+         (try-cause
+           (zk/set-data zk path data) ; should verify that it's ephemeral
+           (catch KeeperException$NoNodeException e
+             (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
+             (zk/create-node zk path data :ephemeral)
+             ))
+         (zk/create-node zk path data :ephemeral)))
+
+     (create-sequential
+       [this path data]
        (zk/create-node zk path data :sequential))
-     
-     (set-data [this path data]
-               ;; note: this does not turn off any existing watches
-               (if (zk/exists zk path false)
-                 (zk/set-data zk path data)
-                 (do
-                   (zk/mkdirs zk (parent-path path))
-                   (zk/create-node zk path data :persistent)
-                   )))
-     
-     (delete-node [this path]
-                  (zk/delete-recursive zk path)
-                  )
-     
-     (get-data [this path watch?]
-               (zk/get-data zk path watch?)
-               )
-     
-     (get-children [this path watch?]
-                   (zk/get-children zk path watch?))
-     
-     (mkdirs [this path]
-             (zk/mkdirs zk path))
-     
-     (close [this]
-            (reset! active false)
-            (.close zk))
-     )))
+
+     (set-data
+       [this path data]
+       ;; note: this does not turn off any existing watches
+       (if (zk/exists zk path false)
+         (zk/set-data zk path data)
+         (do
+           (zk/mkdirs zk (parent-path path))
+           (zk/create-node zk path data :persistent))))
+
+     (delete-node
+       [this path]
+       (zk/delete-recursive zk path))
+
+     (get-data
+       [this path watch?]
+       (zk/get-data zk path watch?))
+
+     (get-children
+       [this path watch?]
+       (zk/get-children zk path watch?))
+
+     (mkdirs
+       [this path]
+       (zk/mkdirs zk path))
+
+     (close
+       [this]
+       (reset! active false)
+       (.close zk)))))
 
 (defprotocol StormClusterState
   (assignments [this callback])
   (assignment-info [this storm-id callback])
   (active-storms [this])
   (storm-base [this storm-id callback])
-
   (get-worker-heartbeat [this storm-id node port])
   (executor-beats [this storm-id executor->node+port])
   (supervisors [this callback])
-  (supervisor-info [this supervisor-id])  ;; returns nil if doesn't exist
-
+  (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
   (setup-heartbeats! [this storm-id])
   (teardown-heartbeats! [this storm-id])
   (teardown-topology-errors! [this storm-id])
   (heartbeat-storms [this])
   (error-topologies [this])
-
   (worker-heartbeat! [this storm-id node port info])
   (remove-worker-heartbeat! [this storm-id node port])
   (supervisor-heartbeat! [this supervisor-id info])
@@ -135,10 +137,7 @@
   (remove-storm! [this storm-id])
   (report-error [this storm-id task-id error])
   (errors [this storm-id task-id])
-
-  (disconnect [this])
-  )
-
+  (disconnect [this]))
 
 (def ASSIGNMENTS-ROOT "assignments")
 (def CODE-ROOT "code")
@@ -153,64 +152,75 @@
 (def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
 (def ERRORS-SUBTREE (str "/" ERRORS-ROOT))
 
-(defn supervisor-path [id]
+(defn supervisor-path
+  [id]
   (str SUPERVISORS-SUBTREE "/" id))
 
-(defn assignment-path [id]
+(defn assignment-path
+  [id]
   (str ASSIGNMENTS-SUBTREE "/" id))
 
-(defn storm-path [id]
+(defn storm-path
+  [id]
   (str STORMS-SUBTREE "/" id))
 
-(defn workerbeat-storm-root [storm-id]
+(defn workerbeat-storm-root
+  [storm-id]
   (str WORKERBEATS-SUBTREE "/" storm-id))
 
-(defn workerbeat-path [storm-id node port]
+(defn workerbeat-path
+  [storm-id node port]
   (str (workerbeat-storm-root storm-id) "/" node "-" port))
 
-(defn error-storm-root [storm-id]
+(defn error-storm-root
+  [storm-id]
   (str ERRORS-SUBTREE "/" storm-id))
 
-(defn error-path [storm-id component-id]
+(defn error-path
+  [storm-id component-id]
   (str (error-storm-root storm-id) "/" (url-encode component-id)))
 
-(defn- issue-callback! [cb-atom]
+(defn- issue-callback!
+  [cb-atom]
   (let [cb @cb-atom]
     (reset! cb-atom nil)
     (when cb
-      (cb))
-    ))
+      (cb))))
 
-(defn- issue-map-callback! [cb-atom id]
+(defn- issue-map-callback!
+  [cb-atom id]
   (let [cb (@cb-atom id)]
     (swap! cb-atom dissoc id)
     (when cb
-      (cb id))
-    ))
+      (cb id))))
 
-(defn- maybe-deserialize [ser]
+(defn- maybe-deserialize
+  [ser]
   (when ser
     (Utils/deserialize ser)))
 
 (defstruct TaskError :error :time-secs)
 
-(defn- parse-error-path [^String p]
+(defn- parse-error-path
+  [^String p]
   (Long/parseLong (.substring p 1)))
 
-
-(defn convert-executor-beats [executors worker-hb]
-  ;; ensures that we only return heartbeats for executors assigned to this worker
+(defn convert-executor-beats
+  "Ensures that we only return heartbeats for executors assigned to
+  this worker."
+  [executors worker-hb]
   (let [executor-stats (:executor-stats worker-hb)]
     (->> executors
-      (map (fn [t] 
-             (if (contains? executor-stats t)
-               {t {:time-secs (:time-secs worker-hb)
-                    :uptime (:uptime worker-hb)
-                    :stats (get executor-stats t)}})))
-      (into {}))))
+         (map (fn [t]
+                (if (contains? executor-stats t)
+                  {t {:time-secs (:time-secs worker-hb)
+                      :uptime (:uptime worker-hb)
+                      :stats (get executor-stats t)}})))
+         (into {}))))
 
 ;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
-(defn mk-storm-cluster-state [cluster-state-spec]
+(defn mk-storm-cluster-state
+  [cluster-state-spec]
   (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
                                 [false cluster-state-spec]
                                 [true (mk-distributed-cluster-state cluster-state-spec)])
@@ -219,164 +229,171 @@
         assignments-callback (atom nil)
         storm-base-callback (atom {})
         state-id (register
-                  cluster-state
-                  (fn [type path]
-                    (let [[subtree & args] (tokenize-path path)]
-                      (condp = subtree
-                          ASSIGNMENTS-ROOT (if (empty? args)
-                                             (issue-callback! assignments-callback)
-                                             (issue-map-callback! assignment-info-callback (first args)))
-                          SUPERVISORS-ROOT (issue-callback! supervisors-callback)
-                          STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
-                          ;; this should never happen
-                          (halt-process! 30 "Unknown callback for subtree " subtree args)
-                          )
-                      )))]
+                   cluster-state
+                   (fn [type path]
+                     (let [[subtree & args] (tokenize-path path)]
+                       (condp = subtree
+                         ASSIGNMENTS-ROOT (if (empty? args)
+                                            (issue-callback! assignments-callback)
+                                            (issue-map-callback! assignment-info-callback (first args)))
+                         SUPERVISORS-ROOT (issue-callback! supervisors-callback)
+                         STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
+                         ;; this should never happen
+                         (halt-process! 30 "Unknown callback for subtree " subtree args)))))]
     (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
       (mkdirs cluster-state p))
     (reify
-     StormClusterState
-     
-     (assignments [this callback]
+      StormClusterState
+
+      (assignments
+        [this callback]
         (when callback
           (reset! assignments-callback callback))
         (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
-      
-      (assignment-info [this storm-id callback]
+
+      (assignment-info
+        [this storm-id callback]
         (when callback
           (swap! assignment-info-callback assoc storm-id callback))
-        (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback)))
-        )
+        (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))
 
-      (active-storms [this]
-        (get-children cluster-state STORMS-SUBTREE false)
-        )
+      (active-storms
+        [this]
+        (get-children cluster-state STORMS-SUBTREE false))
 
-      (heartbeat-storms [this]
-        (get-children cluster-state WORKERBEATS-SUBTREE false)
-        )
+      (heartbeat-storms
+        [this]
+        (get-children cluster-state WORKERBEATS-SUBTREE false))
 
-      (error-topologies [this]
-         (get-children cluster-state ERRORS-SUBTREE false)
-        )
+      (error-topologies
+        [this]
+        (get-children cluster-state ERRORS-SUBTREE false))
 
-      (get-worker-heartbeat [this storm-id node port]
+      (get-worker-heartbeat
+        [this storm-id node port]
         (-> cluster-state
             (get-data (workerbeat-path storm-id node port) false)
             maybe-deserialize))
 
-      (executor-beats [this storm-id executor->node+port]
-        ;; need to take executor->node+port in explicitly so that we don't run into a situation where a 
+      (executor-beats
+        [this storm-id executor->node+port]
+        ;; need to take executor->node+port in explicitly so that we don't run into a situation where a
         ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
         ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
         ;; we avoid situations like that
         (let [node+port->executors (reverse-map executor->node+port)
               all-heartbeats (for [[[node port] executors] node+port->executors]
-                                (->> (get-worker-heartbeat this storm-id node port)
-                                     (convert-executor-beats executors)
-                                     ))]
+                               (->> (get-worker-heartbeat this storm-id node port)
+                                    (convert-executor-beats executors)
+                                    ))]
           (apply merge all-heartbeats)))
 
-      (supervisors [this callback]
+      (supervisors
+        [this callback]
         (when callback
           (reset! supervisors-callback callback))
-        (get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback))
-        )
+        (get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))
 
-      (supervisor-info [this supervisor-id]
-        (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false))
-        )
+      (supervisor-info
+        [this supervisor-id]
+        (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)))
 
-      (worker-heartbeat! [this storm-id node port info]
+      (worker-heartbeat!
+        [this storm-id node port info]
         (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info)))
 
-      (remove-worker-heartbeat! [this storm-id node port]
-        (delete-node cluster-state (workerbeat-path storm-id node port))
-        )
+      (remove-worker-heartbeat!
+        [this storm-id node port]
+        (delete-node cluster-state (workerbeat-path storm-id node port)))
 
-      (setup-heartbeats! [this storm-id]
+      (setup-heartbeats!
+        [this storm-id]
         (mkdirs cluster-state (workerbeat-storm-root storm-id)))
 
-      (teardown-heartbeats! [this storm-id]
+      (teardown-heartbeats!
+        [this storm-id]
         (try-cause
-         (delete-node cluster-state (workerbeat-storm-root storm-id))
-         (catch KeeperException e
-           (log-warn-error e "Could not teardown heartbeats for " storm-id)
-           )))
+          (delete-node cluster-state (workerbeat-storm-root storm-id))
+          (catch KeeperException e
+            (log-warn-error e "Could not teardown heartbeats for " storm-id))))
 
-      (teardown-topology-errors! [this storm-id]
+      (teardown-topology-errors!
+        [this storm-id]
         (try-cause
-         (delete-node cluster-state (error-storm-root storm-id))         
-         (catch KeeperException e
-           (log-warn-error e "Could not teardown errors for " storm-id)
-           )))
+          (delete-node cluster-state (error-storm-root storm-id))
+          (catch KeeperException e
+            (log-warn-error e "Could not teardown errors for " storm-id))))
 
-      (supervisor-heartbeat! [this supervisor-id info]
-        (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info))
-        )
+      (supervisor-heartbeat!
+        [this supervisor-id info]
+        (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info)))
 
-      (activate-storm! [this storm-id storm-base]
-        (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base))
-        )
+      (activate-storm!
+        [this storm-id storm-base]
+        (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)))
 
-      (update-storm! [this storm-id new-elems]
+      (update-storm!
+        [this storm-id new-elems]
         (let [base (storm-base this storm-id nil)
               executors (:component->executors base)
               new-elems (update new-elems :component->executors (partial merge executors))]
           (set-data cluster-state (storm-path storm-id)
-                                  (-> base
-                                      (merge new-elems)
-                                      Utils/serialize))))
+                    (-> base
+                        (merge new-elems)
+                        Utils/serialize))))
 
-      (storm-base [this storm-id callback]
+      (storm-base
+        [this storm-id callback]
         (when callback
           (swap! storm-base-callback assoc storm-id callback))
-        (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback)))
-        )
+        (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))))
 
-      (remove-storm-base! [this storm-id]
-        (delete-node cluster-state (storm-path storm-id))
-        )
+      (remove-storm-base!
+        [this storm-id]
+        (delete-node cluster-state (storm-path storm-id)))
 
-      (set-assignment! [this storm-id info]
-        (set-data cluster-state (assignment-path storm-id) (Utils/serialize info))
-        )
+      (set-assignment!
+        [this storm-id info]
+        (set-data cluster-state (assignment-path storm-id) (Utils/serialize info)))
 
-      (remove-storm! [this storm-id]
+      (remove-storm!
+        [this storm-id]
         (delete-node cluster-state (assignment-path storm-id))
         (remove-storm-base! this storm-id))
 
-      (report-error [this storm-id component-id error]                
-         (let [path (error-path storm-id component-id)
-               data {:time-secs (current-time-secs) :error (stringify-error error)}
-               _ (mkdirs cluster-state path)
-               _ (create-sequential cluster-state (str path "/e") (Utils/serialize data))
-               to-kill (->> (get-children cluster-state path false)
-                            (sort-by parse-error-path)
-                            reverse
-                            (drop 10))]
-           (doseq [k to-kill]
-             (delete-node cluster-state (str path "/" k)))))
-
-      (errors [this storm-id component-id]
-         (let [path (error-path storm-id component-id)
-               _ (mkdirs cluster-state path)
-               children (get-children cluster-state path false)
-               errors (dofor [c children]
-                             (let [data (-> (get-data cluster-state (str path "/" c) false)
-                                            maybe-deserialize)]
-                               (when data
-                                 (struct TaskError (:error data) (:time-secs data))
-                                 )))
-               ]
-           (->> (filter not-nil? errors)
-                (sort-by (comp - :time-secs)))))
-      
-      (disconnect [this]
+      (report-error
+        [this storm-id component-id error]
+        (let [path (error-path storm-id component-id)
+              data {:time-secs (current-time-secs) :error (stringify-error error)}
+              _ (mkdirs cluster-state path)
+              _ (create-sequential cluster-state (str path "/e") (Utils/serialize data))
+              to-kill (->> (get-children cluster-state path false)
+                           (sort-by parse-error-path)
+                           reverse
+                           (drop 10))]
+          (doseq [k to-kill]
+            (delete-node cluster-state (str path "/" k)))))
+
+      (errors
+        [this storm-id component-id]
+        (let [path (error-path storm-id component-id)
+              _ (mkdirs cluster-state path)
+              children (get-children cluster-state path false)
+              errors (dofor [c children]
+                            (let [data (-> (get-data cluster-state (str path "/" c) false)
+                                           maybe-deserialize)]
+                              (when data
+                                (struct TaskError (:error data) (:time-secs data))
+                                )))
+              ]
+          (->> (filter not-nil? errors)
+               (sort-by (comp - :time-secs)))))
+
+      (disconnect
+        [this]
         (unregister cluster-state state-id)
         (when solo?
-          (close cluster-state)))
-      )))
+          (close cluster-state))))))
 
 ;; daemons have a single thread that will respond to events
 ;; start with initialize event
@@ -395,9 +412,8 @@
 ;; everyone reads this in full to understand structure
 ;; /tasks/{storm id}/{task id} ; just contains bolt id
 
-
 ;; supervisors send heartbeats here, master doesn't subscribe but checks asynchronously
-;; /supervisors/status/{ephemeral node ids}  ;; node metadata such as port ranges are kept here 
+;; /supervisors/status/{ephemeral node ids}  ;; node metadata such as port ranges are kept here
 
 ;; tasks send heartbeats here, master doesn't subscribe, just checks asynchronously
 ;; /taskbeats/{storm id}/{ephemeral task id}
@@ -406,8 +422,6 @@
 ;; master manipulates
 ;; /storms/{storm id}
 
-
-
 ;; Zookeeper flows:
 
 ;; Master:
@@ -423,7 +437,6 @@
 ;; 3. make new assignment to fix any problems
 ;; 4. if a storm exists but is not taken down fully, ensure that storm takedown is launched (step by step remove tasks and finally remove assignments)
 
-
 ;; masters only possible watches is on ephemeral nodes and tasks, and maybe not even
 
 ;; Supervisor:
@@ -439,8 +452,6 @@
 ;; 1. monitor assignments, reroute when assignments change
 ;; 2. monitor storm (when storm turns off, error if assignments change) - take down tasks as master turns them off
 
-
-
 ;; locally on supervisor: workers write pids locally on startup, supervisor deletes it on shutdown (associates pid with worker name)
 ;; supervisor periodically checks to make sure processes are alive
 ;; {rootdir}/workers/{storm id}/{worker id}   ;; contains pid inside

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
index 15be94d..14beb21 100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@ -13,14 +13,14 @@
 ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
+
 (ns backtype.storm.config
   (:import [java.io FileReader File])
   (:import [backtype.storm Config ConfigValidation$FieldValidator])
   (:import [backtype.storm.utils Utils LocalState])
   (:import [org.apache.commons.io FileUtils])
   (:require [clojure [string :as str]])
-  (:use [backtype.storm util])
-  )
+  (:use [backtype.storm util]))
 
 (def RESOURCES-SUBDIR "resources")
 
@@ -32,13 +32,11 @@
   (let [name (.getName f)
         new-name (clojure-config-name name)]
     (eval
-      `(def ~(symbol new-name) (. Config ~(symbol name))))
-      ))
+      `(def ~(symbol new-name) (. Config ~(symbol name))))))
 
 (def ALL-CONFIGS
   (dofor [f (seq (.getFields Config))]
-         (.get f nil)
-         ))
+         (.get f nil)))
 
 (defmulti get-FieldValidator class-selector)
 
@@ -48,44 +46,49 @@
 (defmethod get-FieldValidator
   ConfigValidation$FieldValidator [validator] validator)
 
-(defmethod get-FieldValidator Object [klass]
+(defmethod get-FieldValidator Object
+  [klass]
   {:pre [(not (nil? klass))]}
   (reify ConfigValidation$FieldValidator
     (validateField [this name v]
-      (if (and (not (nil? v))
-               (not (instance? klass v)))
-        (throw (IllegalArgumentException.
-                 (str "field " name " '" v "' must be a '" (.getName klass) "'")))))))
+                   (if (and (not (nil? v))
+                            (not (instance? klass v)))
+                     (throw (IllegalArgumentException.
+                              (str "field " name " '" v "' must be a '" (.getName klass) "'")))))))
 
 ;; Create a mapping of config-string -> validator
 ;; Config fields must have a _SCHEMA field defined
 (def CONFIG-SCHEMA-MAP
   (->> (.getFields Config)
-          (filter #(not (re-matches #".*_SCHEMA$" (.getName %))))
-          (map (fn [f] [(.get f nil) (get-FieldValidator
-                                       (-> Config
-                                         (.getField (str (.getName f) "_SCHEMA"))
-                                         (.get nil)))]))
-          (into {})))
-
-(defn cluster-mode [conf & args]
+       (filter #(not (re-matches #".*_SCHEMA$" (.getName %))))
+       (map (fn [f] [(.get f nil)
+                     (get-FieldValidator
+                       (-> Config
+                           (.getField (str (.getName f) "_SCHEMA"))
+                           (.get nil)))]))
+       (into {})))
+
+(defn cluster-mode
+  [conf & args]
   (keyword (conf STORM-CLUSTER-MODE)))
 
-(defn local-mode? [conf]
+(defn local-mode?
+  [conf]
   (let [mode (conf STORM-CLUSTER-MODE)]
     (condp = mode
       "local" true
       "distributed" false
       (throw (IllegalArgumentException.
-                (str "Illegal cluster mode in conf: " mode)))
-      )))
+               (str "Illegal cluster mode in conf: " mode))))))
 
-(defn sampling-rate [conf]
+(defn sampling-rate
+  [conf]
   (->> (conf TOPOLOGY-STATS-SAMPLE-RATE)
        (/ 1)
        int))
 
-(defn mk-stats-sampler [conf]
+(defn mk-stats-sampler
+  [conf]
   (even-sampler (sampling-rate conf)))
 
 ; storm.zookeeper.servers:
@@ -93,106 +96,124 @@
 ;     - "server2"
 ;     - "server3"
 ; nimbus.host: "master"
-; 
+;
 ; ########### These all have default values as shown
-; 
+;
 ; ### storm.* configs are general configurations
 ; # the local dir is where jars are kept
 ; storm.local.dir: "/mnt/storm"
 ; storm.zookeeper.port: 2181
 ; storm.zookeeper.root: "/storm"
 
-(defn read-default-config []
+(defn read-default-config
+  []
   (clojurify-structure (Utils/readDefaultConfig)))
 
-(defn validate-configs-with-schemas [conf]
+(defn validate-configs-with-schemas
+  [conf]
   (doseq [[k v] conf
-         :let [schema (CONFIG-SCHEMA-MAP k)]]
+          :let [schema (CONFIG-SCHEMA-MAP k)]]
     (if (not (nil? schema))
       (.validateField schema k v))))
 
-(defn read-storm-config []
-  (let [
-        conf (clojurify-structure (Utils/readStormConfig))]
+(defn read-storm-config
+  []
+  (let [conf (clojurify-structure (Utils/readStormConfig))]
     (validate-configs-with-schemas conf)
     conf))
 
-(defn read-yaml-config [name]
+(defn read-yaml-config
+  [name]
   (let [conf (clojurify-structure (Utils/findAndReadConfigFile name true))]
     (validate-configs-with-schemas conf)
     conf))
 
-(defn master-local-dir [conf]
+(defn master-local-dir
+  [conf]
   (let [ret (str (conf STORM-LOCAL-DIR) file-path-separator "nimbus")]
     (FileUtils/forceMkdir (File. ret))
-    ret
-    ))
+    ret))
 
 (defn master-stormdist-root
   ([conf]
-     (str (master-local-dir conf) file-path-separator "stormdist"))
+   (str (master-local-dir conf) file-path-separator "stormdist"))
   ([conf storm-id]
-     (str (master-stormdist-root conf) file-path-separator storm-id)))
+   (str (master-stormdist-root conf) file-path-separator storm-id)))
 
-(defn master-stormjar-path [stormroot]
+(defn master-stormjar-path
+  [stormroot]
   (str stormroot file-path-separator "stormjar.jar"))
 
-(defn master-stormcode-path [stormroot]
+(defn master-stormcode-path
+  [stormroot]
   (str stormroot file-path-separator "stormcode.ser"))
 
-(defn master-stormconf-path [stormroot]
+(defn master-stormconf-path
+  [stormroot]
   (str stormroot file-path-separator "stormconf.ser"))
 
-(defn master-inbox [conf]
+(defn master-inbox
+  [conf]
   (let [ret (str (master-local-dir conf) file-path-separator "inbox")]
     (FileUtils/forceMkdir (File. ret))
     ret ))
 
-(defn master-inimbus-dir [conf]
+(defn master-inimbus-dir
+  [conf]
   (str (master-local-dir conf) file-path-separator "inimbus"))
 
-(defn supervisor-local-dir [conf]
+(defn supervisor-local-dir
+  [conf]
   (let [ret (str (conf STORM-LOCAL-DIR) file-path-separator "supervisor")]
     (FileUtils/forceMkdir (File. ret))
-    ret
-    ))
+    ret))
 
-(defn supervisor-isupervisor-dir [conf]
+(defn supervisor-isupervisor-dir
+  [conf]
   (str (supervisor-local-dir conf) file-path-separator "isupervisor"))
 
 (defn supervisor-stormdist-root
-  ([conf] (str (supervisor-local-dir conf) file-path-separator "stormdist"))
+  ([conf]
+   (str (supervisor-local-dir conf) file-path-separator "stormdist"))
   ([conf storm-id]
-      (str (supervisor-stormdist-root conf) file-path-separator (url-encode storm-id))))
+   (str (supervisor-stormdist-root conf) file-path-separator (url-encode storm-id))))
 
-(defn supervisor-stormjar-path [stormroot]
+(defn supervisor-stormjar-path
+  [stormroot]
   (str stormroot file-path-separator "stormjar.jar"))
 
-(defn supervisor-stormcode-path [stormroot]
+(defn supervisor-stormcode-path
+  [stormroot]
   (str stormroot file-path-separator "stormcode.ser"))
 
-(defn supervisor-stormconf-path [stormroot]
+(defn supervisor-stormconf-path
+  [stormroot]
   (str stormroot file-path-separator "stormconf.ser"))
 
-(defn supervisor-tmp-dir [conf]
+(defn supervisor-tmp-dir
+  [conf]
   (let [ret (str (supervisor-local-dir conf) file-path-separator "tmp")]
     (FileUtils/forceMkdir (File. ret))
     ret ))
 
-(defn supervisor-storm-resources-path [stormroot]
+(defn supervisor-storm-resources-path
+  [stormroot]
   (str stormroot file-path-separator RESOURCES-SUBDIR))
 
-(defn ^LocalState supervisor-state [conf]
+(defn ^LocalState supervisor-state
+  [conf]
   (LocalState. (str (supervisor-local-dir conf) file-path-separator "localstate")))
 
-(defn read-supervisor-storm-conf [conf storm-id]
+(defn read-supervisor-storm-conf
+  [conf storm-id]
   (let [stormroot (supervisor-stormdist-root conf storm-id)
         conf-path (supervisor-stormconf-path stormroot)
         topology-path (supervisor-stormcode-path stormroot)]
     (merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path))))
     ))
 
-(defn read-supervisor-topology [conf storm-id]
+(defn read-supervisor-topology
+  [conf storm-id]
   (let [stormroot (supervisor-stormdist-root conf storm-id)
         topology-path (supervisor-stormcode-path stormroot)]
     (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)))
@@ -200,15 +221,16 @@
 
 (defn worker-root
   ([conf]
-     (str (conf STORM-LOCAL-DIR) file-path-separator "workers"))
+   (str (conf STORM-LOCAL-DIR) file-path-separator "workers"))
   ([conf id]
-     (str (worker-root conf) file-path-separator id)))
+   (str (worker-root conf) file-path-separator id)))
 
 (defn worker-pids-root
   [conf id]
   (str (worker-root conf id) file-path-separator "pids"))
 
-(defn worker-pid-path [conf id pid]
+(defn worker-pid-path
+  [conf id pid]
   (str (worker-pids-root conf id) file-path-separator pid))
 
 (defn worker-heartbeats-root
@@ -218,5 +240,6 @@
 ;; workers heartbeat here with pid and timestamp
 ;; if supervisor stops receiving heartbeat, it kills and restarts the process
 ;; in local mode, keep a global map of ids to threads for simulating process management
-(defn ^LocalState worker-state  [conf id]
+(defn ^LocalState worker-state
+  [conf id]
   (LocalState. (worker-heartbeats-root conf id)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/disruptor.clj b/storm-core/src/clj/backtype/storm/disruptor.clj
index 9456d1a..d8c5c91 100644
--- a/storm-core/src/clj/backtype/storm/disruptor.clj
+++ b/storm-core/src/clj/backtype/storm/disruptor.clj
@@ -13,89 +13,90 @@
 ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
+
 (ns backtype.storm.disruptor
   (:import [backtype.storm.utils DisruptorQueue])
   (:import [com.lmax.disruptor MultiThreadedClaimStrategy SingleThreadedClaimStrategy
-              BlockingWaitStrategy SleepingWaitStrategy YieldingWaitStrategy
-              BusySpinWaitStrategy])
+            BlockingWaitStrategy SleepingWaitStrategy YieldingWaitStrategy
+            BusySpinWaitStrategy])
   (:require [clojure [string :as str]])
   (:require [clojure [set :as set]])
   (:use [clojure walk])
-  (:use [backtype.storm util log])
-  )
+  (:use [backtype.storm util log]))
 
 (def CLAIM-STRATEGY
   {:multi-threaded (fn [size] (MultiThreadedClaimStrategy. (int size)))
-   :single-threaded (fn [size] (SingleThreadedClaimStrategy. (int size)))
-    })
-    
+   :single-threaded (fn [size] (SingleThreadedClaimStrategy. (int size)))})
+
 (def WAIT-STRATEGY
   {:block (fn [] (BlockingWaitStrategy.))
    :yield (fn [] (YieldingWaitStrategy.))
    :sleep (fn [] (SleepingWaitStrategy.))
-   :spin (fn [] (BusySpinWaitStrategy.))
-    })
-
+   :spin (fn [] (BusySpinWaitStrategy.))})
 
-(defn- mk-wait-strategy [spec]
+(defn- mk-wait-strategy
+  [spec]
   (if (keyword? spec)
     ((WAIT-STRATEGY spec))
-    (-> (str spec) new-instance)
-    ))
+    (-> (str spec) new-instance)))
 
 ;; :block strategy requires using a timeout on waitFor (implemented in DisruptorQueue), as sometimes the consumer stays blocked even when there's an item on the queue.
 ;; This would manifest itself in Trident when doing 1 batch at a time processing, and the ack_init message
-;; wouldn't make it to the acker until the batch timed out and another tuple was played into the queue, 
+;; wouldn't make it to the acker until the batch timed out and another tuple was played into the queue,
 ;; unblocking the consumer
-(defnk disruptor-queue [buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
+(defnk disruptor-queue
+  [buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
   (DisruptorQueue. ((CLAIM-STRATEGY claim-strategy) buffer-size)
-                   (mk-wait-strategy wait-strategy)
-                   ))
+                   (mk-wait-strategy wait-strategy)))
 
-(defn clojure-handler [afn]
+(defn clojure-handler
+  [afn]
   (reify com.lmax.disruptor.EventHandler
-    (onEvent [this o seq-id batchEnd?]
-      (afn o seq-id batchEnd?)
-      )))
+    (onEvent
+      [this o seq-id batchEnd?]
+      (afn o seq-id batchEnd?))))
 
-(defmacro handler [& args]
+(defmacro handler
+  [& args]
   `(clojure-handler (fn ~@args)))
 
 (defn publish
   ([^DisruptorQueue q o block?]
-    (.publish q o block?))
+   (.publish q o block?))
   ([q o]
-    (publish q o true)))
+   (publish q o true)))
 
-(defn try-publish [^DisruptorQueue q o]
+(defn try-publish
+  [^DisruptorQueue q o]
   (.tryPublish q o))
 
-(defn consume-batch [^DisruptorQueue queue handler]
+(defn consume-batch
+  [^DisruptorQueue queue handler]
   (.consumeBatch queue handler))
 
-(defn consume-batch-when-available [^DisruptorQueue queue handler]
+(defn consume-batch-when-available
+  [^DisruptorQueue queue handler]
   (.consumeBatchWhenAvailable queue handler))
 
-(defn consumer-started! [^DisruptorQueue queue]
+(defn consumer-started!
+  [^DisruptorQueue queue]
   (.consumerStarted queue))
 
-(defn halt-with-interrupt! [^DisruptorQueue queue]
+(defn halt-with-interrupt!
+  [^DisruptorQueue queue]
   (.haltWithInterrupt queue))
 
-(defnk consume-loop* [^DisruptorQueue queue handler :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
-                      :thread-name nil]
+(defnk consume-loop*
+  [^DisruptorQueue queue handler
+   :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
+   :thread-name nil]
   (let [ret (async-loop
-              (fn []
-                (consume-batch-when-available queue handler)
-                0 )
+              (fn [] (consume-batch-when-available queue handler) 0)
               :kill-fn kill-fn
-              :thread-name thread-name
-              )]
-     (consumer-started! queue)
-     ret
-     ))
+              :thread-name thread-name)]
+    (consumer-started! queue)
+    ret))
 
 (defmacro consume-loop [queue & handler-args]
   `(let [handler# (handler ~@handler-args)]
-     (consume-loop* ~queue handler#)
-     ))
+     (consume-loop* ~queue handler#)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/event.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/event.clj b/storm-core/src/clj/backtype/storm/event.clj
index 70ba8a6..f92c9bb 100644
--- a/storm-core/src/clj/backtype/storm/event.clj
+++ b/storm-core/src/clj/backtype/storm/event.clj
@@ -13,11 +13,11 @@
 ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
+
 (ns backtype.storm.event
   (:use [backtype.storm log util])
   (:import [backtype.storm.utils Time Utils])
-  (:import [java.util.concurrent LinkedBlockingQueue TimeUnit])
-  )
+  (:import [java.util.concurrent LinkedBlockingQueue TimeUnit]))
 
 (defprotocol EventManager
   (add [this event-fn])
@@ -32,36 +32,37 @@
         ^LinkedBlockingQueue queue (LinkedBlockingQueue.)
         running (atom true)
         runner (Thread.
-                  (fn []
-                    (try-cause
-                      (while @running
-                        (let [r (.take queue)]
-                          (r)
-                          (swap! processed inc)))
-                    (catch InterruptedException t
-                      (log-message "Event manager interrupted"))
-                    (catch Throwable t
-                      (log-error t "Error when processing event")
-                      (halt-process! 20 "Error when processing an event"))
-                      )))]
+                 (fn []
+                   (try-cause
+                     (while @running
+                       (let [r (.take queue)]
+                         (r)
+                         (swap! processed inc)))
+                     (catch InterruptedException t
+                       (log-message "Event manager interrupted"))
+                     (catch Throwable t
+                       (log-error t "Error when processing event")
+                       (halt-process! 20 "Error when processing an event")))))]
     (.setDaemon runner daemon?)
     (.start runner)
     (reify
       EventManager
-      (add [this event-fn]
+
+      (add
+        [this event-fn]
         ;; should keep track of total added and processed to know if this is finished yet
         (when-not @running
           (throw (RuntimeException. "Cannot add events to a shutdown event manager")))
         (swap! added inc)
-        (.put queue event-fn)
-        )
-      (waiting? [this]
+        (.put queue event-fn))
+
+      (waiting?
+        [this]
         (or (Time/isThreadWaiting runner)
-            (= @processed @added)
-            ))
-      (shutdown [this]
+            (= @processed @added)))
+
+      (shutdown
+        [this]
         (reset! running false)
         (.interrupt runner)
-        (.join runner)
-        )
-        )))
+        (.join runner)))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/log.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/log.clj b/storm-core/src/clj/backtype/storm/log.clj
index adb2774..0fcf822 100644
--- a/storm-core/src/clj/backtype/storm/log.clj
+++ b/storm-core/src/clj/backtype/storm/log.clj
@@ -13,26 +13,34 @@
 ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
+
 (ns backtype.storm.log
   (:require [clojure.tools [logging :as log]]))
 
-(defmacro log-message [& args]
+(defmacro log-message
+  [& args]
   `(log/info (str ~@args)))
 
-(defmacro log-error [e & args]
+(defmacro log-error
+  [e & args]
   `(log/log :error ~e (str ~@args)))
 
-(defmacro log-debug [& args]
+(defmacro log-debug
+  [& args]
   `(log/debug (str ~@args)))
 
-(defmacro log-warn-error [e & args]
+(defmacro log-warn-error
+  [e & args]
   `(log/warn (str ~@args) ~e))
 
-(defmacro log-warn [& args]
+(defmacro log-warn
+  [& args]
   `(log/warn (str ~@args)))
 
-(defn log-capture! [& args]
+(defn log-capture!
+  [& args]
   (apply log/log-capture! args))
 
-(defn log-stream [& args]
+(defn log-stream
+  [& args]
   (apply log/log-stream args))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/process_simulator.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/process_simulator.clj b/storm-core/src/clj/backtype/storm/process_simulator.clj
index 0446a98..e0cf6ed 100644
--- a/storm-core/src/clj/backtype/storm/process_simulator.clj
+++ b/storm-core/src/clj/backtype/storm/process_simulator.clj
@@ -13,9 +13,9 @@
 ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
+
 (ns backtype.storm.process-simulator
-  (:use [backtype.storm log util])
-  )
+  (:use [backtype.storm log util]))
 
 (def pid-counter (mk-counter))
 
@@ -26,23 +26,26 @@
 (defn register-process [pid shutdownable]
   (swap! process-map assoc pid shutdownable))
 
-(defn process-handle [pid]
+(defn process-handle
+  [pid]
   (@process-map pid))
 
-(defn all-processes []
+(defn all-processes
+  []
   (vals @process-map))
 
-(defn kill-process [pid]
-  (locking kill-lock ; in case cluster shuts down while supervisor is
-                     ; killing a task
+(defn kill-process
+  "Uses `locking` in case cluster shuts down while supervisor is
+  killing a task"
+  [pid]
+  (locking kill-lock
     (log-message "Killing process " pid)
     (let [shutdownable (process-handle pid)]
       (swap! process-map dissoc pid)
       (when shutdownable
-        (.shutdown shutdownable))
-      )))
+        (.shutdown shutdownable)))))
 
-(defn kill-all-processes []
+(defn kill-all-processes
+  []
   (doseq [pid (keys @process-map)]
-    (kill-process pid)
-    ))
+    (kill-process pid)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/stats.clj b/storm-core/src/clj/backtype/storm/stats.clj
index 944d2b6..b872c6f 100644
--- a/storm-core/src/clj/backtype/storm/stats.clj
+++ b/storm-core/src/clj/backtype/storm/stats.clj
@@ -13,6 +13,7 @@
 ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
+
 (ns backtype.storm.stats
   (:import [backtype.storm.generated Nimbus Nimbus$Processor Nimbus$Iface StormTopology ShellComponent
             NotAliveException AlreadyAliveException InvalidTopologyException GlobalStreamId
@@ -23,43 +24,44 @@
 
 ;;TODO: consider replacing this with some sort of RRD
 
-(defn curr-time-bucket [^Integer time-secs ^Integer bucket-size-secs]
-  (* bucket-size-secs (unchecked-divide-int time-secs bucket-size-secs))
-  )
+(defn curr-time-bucket
+  [^Integer time-secs ^Integer bucket-size-secs]
+  (* bucket-size-secs (unchecked-divide-int time-secs bucket-size-secs)))
 
-(defrecord RollingWindow [updater merger extractor bucket-size-secs num-buckets buckets])
+(defrecord RollingWindow
+  [updater merger extractor bucket-size-secs num-buckets buckets])
 
-(defn rolling-window [updater merger extractor bucket-size-secs num-buckets]
+(defn rolling-window
+  [updater merger extractor bucket-size-secs num-buckets]
   (RollingWindow. updater merger extractor bucket-size-secs num-buckets {}))
 
 (defn update-rolling-window
   ([^RollingWindow rw time-secs & args]
-     ;; this is 2.5x faster than using update-in...
-     (let [time-bucket (curr-time-bucket time-secs (:bucket-size-secs rw))
-           buckets (:buckets rw)
-           curr (get buckets time-bucket)           
-           curr (apply (:updater rw) curr args)
-           ]
-       (assoc rw :buckets (assoc buckets time-bucket curr))
-       )))
-
-(defn value-rolling-window [^RollingWindow rw]
+   ;; this is 2.5x faster than using update-in...
+   (let [time-bucket (curr-time-bucket time-secs (:bucket-size-secs rw))
+         buckets (:buckets rw)
+         curr (get buckets time-bucket)
+         curr (apply (:updater rw) curr args)]
+     (assoc rw :buckets (assoc buckets time-bucket curr)))))
+
+(defn value-rolling-window
+  [^RollingWindow rw]
   ((:extractor rw)
    (let [values (vals (:buckets rw))]
-     (apply (:merger rw) values)
-     )))
+     (apply (:merger rw) values))))
 
-(defn cleanup-rolling-window [^RollingWindow rw]
+(defn cleanup-rolling-window
+  [^RollingWindow rw]
   (let [buckets (:buckets rw)
         cutoff (- (current-time-secs)
                   (* (:num-buckets rw)
                      (:bucket-size-secs rw)))
         to-remove (filter #(< % cutoff) (keys buckets))
         buckets (apply dissoc buckets to-remove)]
-    (assoc rw :buckets buckets)
-    ))
+    (assoc rw :buckets buckets)))
 
-(defn rolling-window-size [^RollingWindow rw]
+(defn rolling-window-size
+  [^RollingWindow rw]
   (* (:bucket-size-secs rw) (:num-buckets rw)))
 
 (defrecord RollingWindowSet [updater extractor windows all-time])
@@ -70,49 +72,52 @@
 
 (defn update-rolling-window-set
   ([^RollingWindowSet rws & args]
-     (let [now (current-time-secs)
-           new-windows (dofor [w (:windows rws)]
-                         (apply update-rolling-window w now args))]
-       (assoc rws :windows new-windows :all-time (apply (:updater rws) (:all-time rws) args))
-       )))
+   (let [now (current-time-secs)
+         new-windows (dofor [w (:windows rws)]
+                            (apply update-rolling-window w now args))]
+     (assoc rws
+       :windows new-windows
+       :all-time (apply (:updater rws) (:all-time rws) args)))))
 
 (defn cleanup-rolling-window-set
   ([^RollingWindowSet rws]
-     (let [windows (:windows rws)]
-       (assoc rws :windows (map cleanup-rolling-window windows))
-       )))
+   (let [windows (:windows rws)]
+     (assoc rws :windows (map cleanup-rolling-window windows)))))
 
-(defn value-rolling-window-set [^RollingWindowSet rws]
+(defn value-rolling-window-set
+  [^RollingWindowSet rws]
   (merge
-   (into {}
-         (for [w (:windows rws)]
-           {(rolling-window-size w) (value-rolling-window w)}
-           ))
-   {:all-time ((:extractor rws) (:all-time rws))}))
+    (into {}
+          (for [w (:windows rws)]
+            {(rolling-window-size w) (value-rolling-window w)}
+            ))
+    {:all-time ((:extractor rws) (:all-time rws))}))
 
 (defn- incr-val
   ([amap key]
-     (incr-val amap key 1))
+   (incr-val amap key 1))
   ([amap key amt]
-     (let [val (get amap key (long 0))]
-       (assoc amap key (+ val amt))
-       )))
+   (let [val (get amap key (long 0))]
+     (assoc amap key (+ val amt)))))
 
-(defn- update-avg [curr val]
+(defn- update-avg
+  [curr val]
   (if curr
     [(+ (first curr) val) (inc (second curr))]
-    [val (long 1)]
-    ))
+    [val (long 1)]))
 
-(defn- merge-avg [& avg]
+(defn- merge-avg
+  [& avg]
   [(apply + (map first avg))
    (apply + (map second avg))
    ])
 
-(defn- extract-avg [pair]
+(defn- extract-avg
+  [pair]
   (double (/ (first pair) (second pair))))
 
-(defn- update-keyed-avg [amap key val]
+(defn- update-keyed-avg
+  [amap key val]
   (assoc amap key (update-avg (get amap key) val)))
 
 (defn- merge-keyed-avg [& vals]
@@ -124,14 +129,16 @@
 (defn- counter-extract [v]
   (if v v {}))
 
-(defn keyed-counter-rolling-window-set [num-buckets & bucket-sizes]
+(defn keyed-counter-rolling-window-set
+  [num-buckets & bucket-sizes]
   (apply rolling-window-set incr-val (partial merge-with +) counter-extract num-buckets bucket-sizes))
 
-(defn avg-rolling-window-set [num-buckets & bucket-sizes]
-  (apply rolling-window-set update-avg merge-avg extract-avg num-buckets bucket-sizes)
-  )
+(defn avg-rolling-window-set
+  [num-buckets & bucket-sizes]
+  (apply rolling-window-set update-avg merge-avg extract-avg num-buckets bucket-sizes))
 
-(defn keyed-avg-rolling-window-set [num-buckets & bucket-sizes]
+(defn keyed-avg-rolling-window-set
+  [num-buckets & bucket-sizes]
   (apply rolling-window-set update-keyed-avg merge-keyed-avg extract-keyed-avg num-buckets bucket-sizes))
 
 ;; (defn choose-bucket [val buckets]
@@ -169,160 +176,166 @@
 ;; 10 minutes, 3 hours, 1 day
 (def STAT-BUCKETS [30 540 4320])
 
-(defn- mk-common-stats [rate]
-  (CommonStats. (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-                (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-                rate
-                ))
-
-(defn mk-bolt-stats [rate]
-  (BoltExecutorStats. (mk-common-stats rate)
-                  (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-                  (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-                  (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-                  (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-                  (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-                  ))
-
-(defn mk-spout-stats [rate]
-  (SpoutExecutorStats. (mk-common-stats rate)
-                   (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-                   (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-                   (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-                   ))
-
-(defmacro update-executor-stat! [stats path & args]
+(defn- mk-common-stats
+  [rate]
+  (CommonStats.
+    (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+    (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+    rate))
+
+(defn mk-bolt-stats
+  [rate]
+  (BoltExecutorStats.
+    (mk-common-stats rate)
+    (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+    (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+    (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+    (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+    (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))
+
+(defn mk-spout-stats
+  [rate]
+  (SpoutExecutorStats.
+    (mk-common-stats rate)
+    (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+    (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+    (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))
+
+(defmacro update-executor-stat!
+  [stats path & args]
   (let [path (collectify path)]
-    `(swap! (-> ~stats ~@path) update-rolling-window-set ~@args)
-    ))
+    `(swap! (-> ~stats ~@path) update-rolling-window-set ~@args)))
 
-(defmacro stats-rate [stats]
+(defmacro stats-rate
+  [stats]
   `(-> ~stats :common :rate))
 
-(defn emitted-tuple! [stats stream]
+(defn emitted-tuple!
+  [stats stream]
   (update-executor-stat! stats [:common :emitted] stream (stats-rate stats)))
 
-(defn transferred-tuples! [stats stream amt]
+(defn transferred-tuples!
+  [stats stream amt]
   (update-executor-stat! stats [:common :transferred] stream (* (stats-rate stats) amt)))
 
-(defn bolt-execute-tuple! [^BoltExecutorStats stats component stream latency-ms]
+(defn bolt-execute-tuple!
+  [^BoltExecutorStats stats component stream latency-ms]
   (let [key [component stream]]
     (update-executor-stat! stats :executed key (stats-rate stats))
-    (update-executor-stat! stats :execute-latencies key latency-ms)
-    ))
+    (update-executor-stat! stats :execute-latencies key latency-ms)))
 
-(defn bolt-acked-tuple! [^BoltExecutorStats stats component stream latency-ms]
+(defn bolt-acked-tuple!
+  [^BoltExecutorStats stats component stream latency-ms]
   (let [key [component stream]]
     (update-executor-stat! stats :acked key (stats-rate stats))
-    (update-executor-stat! stats :process-latencies key latency-ms)
-    ))
+    (update-executor-stat! stats :process-latencies key latency-ms)))
 
-(defn bolt-failed-tuple! [^BoltExecutorStats stats component stream latency-ms]
+(defn bolt-failed-tuple!
+  [^BoltExecutorStats stats component stream latency-ms]
   (let [key [component stream]]
-    (update-executor-stat! stats :failed key (stats-rate stats))
-    ))
+    (update-executor-stat! stats :failed key (stats-rate stats))))
 
-(defn spout-acked-tuple! [^SpoutExecutorStats stats stream latency-ms]
+(defn spout-acked-tuple!
+  [^SpoutExecutorStats stats stream latency-ms]
   (update-executor-stat! stats :acked stream (stats-rate stats))
-  (update-executor-stat! stats :complete-latencies stream latency-ms)
-  )
+  (update-executor-stat! stats :complete-latencies stream latency-ms))
 
-(defn spout-failed-tuple! [^SpoutExecutorStats stats stream latency-ms]
+(defn spout-failed-tuple!
+  [^SpoutExecutorStats stats stream latency-ms]
   (update-executor-stat! stats :failed stream (stats-rate stats))
   )
 
 (defn- cleanup-stat! [stat]
   (swap! stat cleanup-rolling-window-set))
 
-(defn- cleanup-common-stats! [^CommonStats stats]
+(defn- cleanup-common-stats!
+  [^CommonStats stats]
   (doseq [f COMMON-FIELDS]
-    (cleanup-stat! (f stats))
-    ))
+    (cleanup-stat! (f stats))))
 
-(defn cleanup-bolt-stats! [^BoltExecutorStats stats]
+(defn cleanup-bolt-stats!
+  [^BoltExecutorStats stats]
   (cleanup-common-stats! (:common stats))
   (doseq [f BOLT-FIELDS]
-    (cleanup-stat! (f stats))
-    ))
+    (cleanup-stat! (f stats))))
 
-(defn cleanup-spout-stats! [^SpoutExecutorStats stats]
+(defn cleanup-spout-stats!
+  [^SpoutExecutorStats stats]
   (cleanup-common-stats! (:common stats))
   (doseq [f SPOUT-FIELDS]
-    (cleanup-stat! (f stats))
-    ))
+    (cleanup-stat! (f stats))))
 
-(defn- value-stats [stats fields]
-  (into
-   {}
-   (dofor [f fields]
-          [f (value-rolling-window-set @(f stats))]
-          )))
+(defn- value-stats
+  [stats fields]
+  (into {} (dofor [f fields]
+                  [f (value-rolling-window-set @(f stats))])))
 
-(defn- value-common-stats [^CommonStats stats]
+(defn- value-common-stats
+  [^CommonStats stats]
   (merge
-   (value-stats stats COMMON-FIELDS)
-   {:rate (:rate stats)}))
+    (value-stats stats COMMON-FIELDS)
+    {:rate (:rate stats)}))
 
-(defn value-bolt-stats! [^BoltExecutorStats stats]
+(defn value-bolt-stats!
+  [^BoltExecutorStats stats]
   (cleanup-bolt-stats! stats)
   (merge (value-common-stats (:common stats))
          (value-stats stats BOLT-FIELDS)
          {:type :bolt}))
 
-(defn value-spout-stats! [^SpoutExecutorStats stats]
+(defn value-spout-stats!
+  [^SpoutExecutorStats stats]
   (cleanup-spout-stats! stats)
   (merge (value-common-stats (:common stats))
          (value-stats stats SPOUT-FIELDS)
          {:type :spout}))
 
-
 (defmulti render-stats! class-selector)
 
-(defmethod render-stats! SpoutExecutorStats [stats]
+(defmethod render-stats! SpoutExecutorStats
+  [stats]
   (value-spout-stats! stats))
 
-(defmethod render-stats! BoltExecutorStats [stats]
+(defmethod render-stats! BoltExecutorStats
+  [stats]
   (value-bolt-stats! stats))
 
 (defmulti thriftify-specific-stats :type)
 
 (defn window-set-converter
   ([stats key-fn]
-     ;; make the first key a string,
-     (into {}
-           (for [[k v] stats]
-             [(str k)
-              (into {}
-                    (for [[k2 v2] v]
-                      [(key-fn k2) v2]))]
-             )
-           ))
+   ;; make the first key a string,
+   (into {}
+         (for [[k v] stats]
+           [(str k)
+            (into {} (for [[k2 v2] v]
+                       [(key-fn k2) v2]))])))
   ([stats]
-     (window-set-converter stats identity)))
+   (window-set-converter stats identity)))
 
-(defn to-global-stream-id [[component stream]]
-  (GlobalStreamId. component stream)
-  )
+(defn to-global-stream-id
+  [[component stream]]
+  (GlobalStreamId. component stream))
 
 (defmethod thriftify-specific-stats :bolt
   [stats]
   (ExecutorSpecificStats/bolt
-   (BoltStats. (window-set-converter (:acked stats) to-global-stream-id)
-               (window-set-converter (:failed stats) to-global-stream-id)
-               (window-set-converter (:process-latencies stats) to-global-stream-id)
-               (window-set-converter (:executed stats) to-global-stream-id)
-               (window-set-converter (:execute-latencies stats) to-global-stream-id)
-               )))
+    (BoltStats.
+      (window-set-converter (:acked stats) to-global-stream-id)
+      (window-set-converter (:failed stats) to-global-stream-id)
+      (window-set-converter (:process-latencies stats) to-global-stream-id)
+      (window-set-converter (:executed stats) to-global-stream-id)
+      (window-set-converter (:execute-latencies stats) to-global-stream-id))))
 
 (defmethod thriftify-specific-stats :spout
   [stats]
   (ExecutorSpecificStats/spout
-   (SpoutStats. (window-set-converter (:acked stats))
-                (window-set-converter (:failed stats))
-                (window-set-converter (:complete-latencies stats)))
-   ))
+    (SpoutStats. (window-set-converter (:acked stats))
+                 (window-set-converter (:failed stats))
+                 (window-set-converter (:complete-latencies stats)))))
 
-(defn thriftify-executor-stats [stats]
+(defn thriftify-executor-stats
+  [stats]
   (let [specific-stats (thriftify-specific-stats stats)]
     (ExecutorStats. (window-set-converter (:emitted stats))
                     (window-set-converter (:transferred stats))