You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:28 UTC

[32/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/config_value.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/config_value.clj b/storm-core/src/clj/backtype/storm/command/config_value.clj
deleted file mode 100644
index 1d193a2..0000000
--- a/storm-core/src/clj/backtype/storm/command/config_value.clj
+++ /dev/null
@@ -1,24 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.config-value
-  (:use [backtype.storm config log])
-  (:gen-class))
-
-
-(defn -main [^String name]
-  (let [conf (read-storm-config)]
-    (println "VALUE:" (conf name))
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/deactivate.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/deactivate.clj b/storm-core/src/clj/backtype/storm/command/deactivate.clj
deleted file mode 100644
index 1a614de..0000000
--- a/storm-core/src/clj/backtype/storm/command/deactivate.clj
+++ /dev/null
@@ -1,24 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.deactivate
-  (:use [backtype.storm thrift log])
-  (:gen-class))
-
-(defn -main [name] 
-  (with-configured-nimbus-connection nimbus
-    (.deactivate nimbus name)
-    (log-message "Deactivated topology: " name)
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj b/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj
deleted file mode 100644
index d90e72a..0000000
--- a/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj
+++ /dev/null
@@ -1,26 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.dev-zookeeper
-  (:use [backtype.storm zookeeper util config])
-  (:gen-class))
-
-(defn -main [& args]
-  (let [conf (read-storm-config)
-        port (conf STORM-ZOOKEEPER-PORT)
-        localpath (conf DEV-ZOOKEEPER-PATH)]
-    (rmr localpath)
-    (mk-inprocess-zookeeper localpath :port port)
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/get_errors.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/get_errors.clj b/storm-core/src/clj/backtype/storm/command/get_errors.clj
deleted file mode 100644
index 60707b2..0000000
--- a/storm-core/src/clj/backtype/storm/command/get_errors.clj
+++ /dev/null
@@ -1,52 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.get-errors
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [backtype.storm thrift log])
-  (:use [backtype.storm util])
-  (:require [backtype.storm.daemon
-             [nimbus :as nimbus]
-             [common :as common]])
-  (:import [backtype.storm.generated GetInfoOptions NumErrorsChoice
-            TopologySummary ErrorInfo])
-  (:gen-class))
-
-(defn get-topology-id [name topologies]
-  (let [topology (first (filter #(= (.get_name %1) name) topologies))]
-    (when (not-nil? topology) (.get_id topology))))
-
-(defn get-component-errors
-  [topology-errors]
-  (apply hash-map (remove nil?
-                    (flatten (for [[comp-name comp-errors] topology-errors]
-                               (let [latest-error (when (not (empty? comp-errors)) (first comp-errors))]
-                                 (if latest-error [comp-name (.get_error ^ErrorInfo latest-error)])))))))
-
-(defn -main [name]
-  (with-configured-nimbus-connection nimbus
-    (let [opts (doto (GetInfoOptions.)
-                 (.set_num_err_choice NumErrorsChoice/ONE))
-          cluster-info (.getClusterInfo nimbus)
-          topologies (.get_topologies cluster-info)
-          topo-id (get-topology-id name topologies)
-          topo-info (when (not-nil? topo-id) (.getTopologyInfoWithOpts nimbus topo-id opts))]
-      (if (or (nil? topo-id) (nil? topo-info))
-        (println (to-json {"Failure" (str "No topologies running with name " name)}))
-        (let [topology-name (.get_name topo-info)
-              topology-errors (.get_errors topo-info)]
-          (println (to-json (hash-map
-                              "Topology Name" topology-name
-                              "Comp-Errors" (get-component-errors topology-errors)))))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/healthcheck.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/healthcheck.clj b/storm-core/src/clj/backtype/storm/command/healthcheck.clj
deleted file mode 100644
index 14af223..0000000
--- a/storm-core/src/clj/backtype/storm/command/healthcheck.clj
+++ /dev/null
@@ -1,88 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.healthcheck
-  (:require [backtype.storm
-             [config :refer :all]
-             [log :refer :all]]
-            [clojure.java [io :as io]]
-            [clojure [string :refer [split]]])
-  (:gen-class))
-
-(defn interrupter
-  "Interrupt a given thread after ms milliseconds."
-  [thread ms]
-  (let [interrupter (Thread.
-                     (fn []
-                       (try
-                         (Thread/sleep ms)
-                         (.interrupt thread)
-                         (catch InterruptedException e))))]
-    (.start interrupter)
-    interrupter))
-
-(defn check-output [lines]
-  (if (some #(.startsWith % "ERROR") lines)
-    :failed
-    :success))
-
-(defn process-script [conf script]
-  (let [script-proc (. (Runtime/getRuntime) (exec script))
-        curthread (Thread/currentThread)
-        interrupter-thread (interrupter curthread
-                                        (conf STORM-HEALTH-CHECK-TIMEOUT-MS))]
-    (try
-      (.waitFor script-proc)
-      (.interrupt interrupter-thread)
-      (if (not (= (.exitValue script-proc) 0))
-        :failed_with_exit_code
-        (check-output (split
-                       (slurp (.getInputStream script-proc))
-                       #"\n+")))
-      (catch InterruptedException e
-        (println "Script" script "timed out.")
-        :timeout)
-      (catch Exception e
-        (println "Script failed with exception: " e)
-        :failed_with_exception)
-      (finally (.interrupt interrupter-thread)))))
-
-(defn health-check [conf]
-  (let [health-dir (absolute-healthcheck-dir conf)
-        health-files (file-seq (io/file health-dir))
-        health-scripts (filter #(and (.canExecute %)
-                                     (not (.isDirectory %)))
-                               health-files)
-        results (->> health-scripts
-                     (map #(.getAbsolutePath %))
-                     (map (partial process-script conf)))]
-    (log-message
-     (pr-str (map #'vector
-                  (map #(.getAbsolutePath %) health-scripts)
-                  results)))
-    ; failed_with_exit_code is OK. We're mimicing Hadoop's health checks.
-    ; We treat non-zero exit codes as indicators that the scripts failed
-    ; to execute properly, not that the system is unhealthy, in which case
-    ; we don't want to start killing things.
-    (if (every? #(or (= % :failed_with_exit_code)
-                     (= % :success))
-                results)
-      0
-      1)))
-
-(defn -main [& args]
-  (let [conf (read-storm-config)]
-    (System/exit
-     (health-check conf))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/heartbeats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/heartbeats.clj b/storm-core/src/clj/backtype/storm/command/heartbeats.clj
deleted file mode 100644
index 99790aa..0000000
--- a/storm-core/src/clj/backtype/storm/command/heartbeats.clj
+++ /dev/null
@@ -1,52 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.heartbeats
-  (:require [backtype.storm
-             [config :refer :all]
-             [log :refer :all]
-             [cluster :refer :all]
-             [converter :refer :all]]
-        [clojure.string :refer :all])
-  (:import [backtype.storm.generated ClusterWorkerHeartbeat]
-           [backtype.storm.utils Utils])
-  (:gen-class))
-
-(defn -main [command path & args]
-  (let [conf (read-storm-config)
-        cluster (mk-distributed-cluster-state conf :auth-conf conf)]
-    (println "Command: [" command "]")
-    (condp = command
-      "list"
-      (let [message (join " \n" (.get_worker_hb_children cluster path false))]
-        (log-message "list " path ":\n"
-                     message "\n"))
-      "get"
-      (log-message 
-       (if-let [hb (.get_worker_hb cluster path false)]
-         (clojurify-zk-worker-hb
-          (Utils/deserialize
-           hb
-           ClusterWorkerHeartbeat))
-         "Nothing"))
-      
-      (log-message "Usage: heartbeats [list|get] path"))
-    
-    (try
-      (.close cluster)
-      (catch Exception e
-        (log-message "Caught exception: " e " on close."))))
-  (System/exit 0))
-         

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/kill_topology.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/kill_topology.clj b/storm-core/src/clj/backtype/storm/command/kill_topology.clj
deleted file mode 100644
index 94b4585..0000000
--- a/storm-core/src/clj/backtype/storm/command/kill_topology.clj
+++ /dev/null
@@ -1,29 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.kill-topology
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [backtype.storm thrift config log])
-  (:import [backtype.storm.generated KillOptions])
-  (:gen-class))
-
-(defn -main [& args]
-  (let [[{wait :wait} [name] _] (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)])
-        opts (KillOptions.)]
-    (if wait (.set_wait_secs opts wait))
-    (with-configured-nimbus-connection nimbus
-      (.killTopologyWithOpts nimbus name opts)
-      (log-message "Killed topology: " name)
-      )))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/kill_workers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/kill_workers.clj b/storm-core/src/clj/backtype/storm/command/kill_workers.clj
deleted file mode 100644
index 3866cc7..0000000
--- a/storm-core/src/clj/backtype/storm/command/kill_workers.clj
+++ /dev/null
@@ -1,33 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.kill-workers
-  (:import [java.io File])
-  (:use [backtype.storm.daemon common])
-  (:use [backtype.storm util config])
-  (:require [backtype.storm.daemon
-             [supervisor :as supervisor]])
-  (:gen-class))
-
-(defn -main 
-  "Construct the supervisor-data from scratch and kill the workers on this supervisor"
-  [& args]
-  (let [conf (read-storm-config)
-        conf (assoc conf STORM-LOCAL-DIR (. (File. (conf STORM-LOCAL-DIR)) getCanonicalPath))
-        isupervisor (supervisor/standalone-supervisor)
-        supervisor-data (supervisor/supervisor-data conf nil isupervisor)
-        ids (supervisor/my-worker-ids conf)]
-    (doseq [id ids]
-      (supervisor/shutdown-worker supervisor-data id))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/list.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/list.clj b/storm-core/src/clj/backtype/storm/command/list.clj
deleted file mode 100644
index 79cfcf7..0000000
--- a/storm-core/src/clj/backtype/storm/command/list.clj
+++ /dev/null
@@ -1,38 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.list
-  (:use [backtype.storm thrift log])
-  (:import [backtype.storm.generated TopologySummary])
-  (:gen-class))
-
-(defn -main []
-  (with-configured-nimbus-connection nimbus
-    (let [cluster-info (.getClusterInfo nimbus)
-          topologies (.get_topologies cluster-info)
-          msg-format "%-20s %-10s %-10s %-12s %-10s"]
-      (if (or (nil? topologies) (empty? topologies))
-        (println "No topologies running.")
-        (do
-          (println (format msg-format "Topology_name" "Status" "Num_tasks" "Num_workers" "Uptime_secs"))
-          (println "-------------------------------------------------------------------")
-          (doseq [^TopologySummary topology topologies]
-            (let [topology-name (.get_name topology)
-                  topology-status (.get_status topology)
-                  topology-num-tasks (.get_num_tasks topology)
-                  topology-num-workers (.get_num_workers topology)
-                  topology-uptime-secs (.get_uptime_secs topology)]
-              (println (format msg-format  topology-name topology-status topology-num-tasks
-                               topology-num-workers topology-uptime-secs)))))))))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/monitor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/monitor.clj b/storm-core/src/clj/backtype/storm/command/monitor.clj
deleted file mode 100644
index 36ccbc9..0000000
--- a/storm-core/src/clj/backtype/storm/command/monitor.clj
+++ /dev/null
@@ -1,37 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.monitor
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [backtype.storm.thrift :only [with-configured-nimbus-connection]])
-  (:import [backtype.storm.utils Monitor])
-  (:gen-class)
- )
-
-(defn -main [& args]
-  (let [[{interval :interval component :component stream :stream watch :watch} [name] _]
-        (cli args ["-i" "--interval" :default 4 :parse-fn #(Integer/parseInt %)]
-          ["-m" "--component" :default nil]
-          ["-s" "--stream" :default "default"]
-          ["-w" "--watch" :default "emitted"])
-        mon (Monitor.)]
-    (if interval (.set_interval mon interval))
-    (if name (.set_topology mon name))
-    (if component (.set_component mon component))
-    (if stream (.set_stream mon stream))
-    (if watch (.set_watch mon watch))
-    (with-configured-nimbus-connection nimbus
-      (.metrics mon nimbus)
-      )))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/rebalance.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/rebalance.clj b/storm-core/src/clj/backtype/storm/command/rebalance.clj
deleted file mode 100644
index e3a032b..0000000
--- a/storm-core/src/clj/backtype/storm/command/rebalance.clj
+++ /dev/null
@@ -1,46 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.rebalance
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [backtype.storm thrift config log])
-  (:import [backtype.storm.generated RebalanceOptions])
-  (:gen-class))
-
-(defn- parse-executor [^String s]
-  (let [eq-pos (.lastIndexOf s "=")
-        name (.substring s 0 eq-pos)
-        amt (.substring s (inc eq-pos))]
-    {name (Integer/parseInt amt)}
-    ))
-
-(defn -main [& args] 
-  (let [[{wait :wait executor :executor num-workers :num-workers} [name] _]
-                  (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)]
-                            ["-n" "--num-workers" :default nil :parse-fn #(Integer/parseInt %)]
-                            ["-e" "--executor"  :parse-fn parse-executor
-                             :assoc-fn (fn [previous key val]
-                                         (assoc previous key
-                                                (if-let [oldval (get previous key)]
-                                                  (merge oldval val)
-                                                  val)))])
-        opts (RebalanceOptions.)]
-    (if wait (.set_wait_secs opts wait))
-    (if executor (.set_num_executors opts executor))
-    (if num-workers (.set_num_workers opts num-workers))
-    (with-configured-nimbus-connection nimbus
-      (.rebalance nimbus name opts)
-      (log-message "Topology " name " is rebalancing")
-      )))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/set_log_level.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/set_log_level.clj b/storm-core/src/clj/backtype/storm/command/set_log_level.clj
deleted file mode 100644
index 88b297d..0000000
--- a/storm-core/src/clj/backtype/storm/command/set_log_level.clj
+++ /dev/null
@@ -1,75 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.set-log-level
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [backtype.storm thrift log])
-  (:import [org.apache.logging.log4j Level])
-  (:import [backtype.storm.generated LogConfig LogLevel LogLevelAction])
-  (:gen-class))
-
-(defn- get-storm-id
-  "Get topology id for a running topology from the topology name."
-  [nimbus name]
-  (let [info (.getClusterInfo nimbus)
-        topologies (.get_topologies info)
-        topology (first (filter (fn [topo] (= name (.get_name topo))) topologies))]
-    (if topology 
-      (.get_id topology)
-      (throw (.IllegalArgumentException (str name " is not a running topology"))))))
-
-(defn- parse-named-log-levels [action]
-  "Parses [logger name]=[level string]:[optional timeout],[logger name2]...
-
-   e.g. ROOT=DEBUG:30
-        root logger, debug for 30 seconds
-
-        org.apache.foo=WARN
-        org.apache.foo set to WARN indefinitely"
-  (fn [^String s]
-    (let [log-args (re-find #"(.*)=([A-Z]+):?(\d*)" s)
-          name (if (= action LogLevelAction/REMOVE) s (nth log-args 1))
-          level (Level/toLevel (nth log-args 2))
-          timeout-str (nth log-args 3)
-          log-level (LogLevel.)]
-      (if (= action LogLevelAction/REMOVE)
-        (.set_action log-level action)
-        (do
-          (.set_action log-level action)
-          (.set_target_log_level log-level (.toString level))
-          (.set_reset_log_level_timeout_secs log-level
-            (Integer. (if (= timeout-str "") "0" timeout-str)))))
-      {name log-level})))
-
-(defn- merge-together [previous key val]
-   (assoc previous key
-      (if-let [oldval (get previous key)]
-         (merge oldval val)
-         val)))
-
-(defn -main [& args]
-  (let [[{log-setting :log-setting remove-log-setting :remove-log-setting} [name] _]
-        (cli args ["-l" "--log-setting"
-                   :parse-fn (parse-named-log-levels LogLevelAction/UPDATE)
-                   :assoc-fn merge-together]
-                  ["-r" "--remove-log-setting"
-                   :parse-fn (parse-named-log-levels LogLevelAction/REMOVE)
-                   :assoc-fn merge-together])
-        log-config (LogConfig.)]
-    (doseq [[log-name log-val] (merge log-setting remove-log-setting)]
-      (.put_to_named_logger_level log-config log-name log-val))
-    (log-message "Sent log config " log-config " for topology " name)
-    (with-configured-nimbus-connection nimbus
-      (.setLogConfig nimbus (get-storm-id nimbus name) log-config))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/shell_submission.clj b/storm-core/src/clj/backtype/storm/command/shell_submission.clj
deleted file mode 100644
index 9bb8efe..0000000
--- a/storm-core/src/clj/backtype/storm/command/shell_submission.clj
+++ /dev/null
@@ -1,33 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.shell-submission
-  (:import [backtype.storm StormSubmitter])
-  (:use [backtype.storm thrift util config log zookeeper])
-  (:require [clojure.string :as str])
-  (:gen-class))
-
-
-(defn -main [^String tmpjarpath & args]
-  (let [conf (read-storm-config)
-        zk-leader-elector (zk-leader-elector conf)
-        leader-nimbus (.getLeader zk-leader-elector)
-        host (.getHost leader-nimbus)
-        port (.getPort leader-nimbus)
-        no-op (.close zk-leader-elector)
-        jarpath (StormSubmitter/submitJar conf tmpjarpath)
-        args (concat args [host port jarpath])]
-    (exec-command! (str/join " " args))
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/upload_credentials.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/upload_credentials.clj b/storm-core/src/clj/backtype/storm/command/upload_credentials.clj
deleted file mode 100644
index 05a82cb..0000000
--- a/storm-core/src/clj/backtype/storm/command/upload_credentials.clj
+++ /dev/null
@@ -1,35 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.upload-credentials
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [backtype.storm log util])
-  (:import [backtype.storm StormSubmitter])
-  (:import [java.util Properties])
-  (:import [java.io FileReader])
-  (:gen-class))
-
-(defn read-map [file-name]
-  (let [props (Properties. )
-        _ (.load props (FileReader. file-name))]
-    (clojurify-structure props)))
-
-(defn -main [& args]
-  (let [[{cred-file :file} [name & rawCreds]] (cli args ["-f" "--file" :default nil])
-        _ (when (and rawCreds (not (even? (.size rawCreds)))) (throw (RuntimeException.  "Need an even number of arguments to make a map")))
-        mapping (if rawCreds (apply assoc {} rawCreds) {})
-        file-mapping (if (nil? cred-file) {} (read-map cred-file))]
-      (StormSubmitter/pushCredentials name {} (merge file-mapping mapping))
-      (log-message "Uploaded new creds to topology: " name)))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
deleted file mode 100644
index 4d24f97..0000000
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ /dev/null
@@ -1,331 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.config
-  (:import [java.io FileReader File IOException]
-           [backtype.storm.generated StormTopology])
-  (:import [backtype.storm Config])
-  (:import [backtype.storm.utils Utils LocalState])
-  (:import [backtype.storm.validation ConfigValidation])
-  (:import [org.apache.commons.io FileUtils])
-  (:require [clojure [string :as str]])
-  (:use [backtype.storm log util]))
-
-(def RESOURCES-SUBDIR "resources")
-(def NIMBUS-DO-NOT-REASSIGN "NIMBUS-DO-NOT-REASSIGN")
-
-(defn- clojure-config-name [name]
-  (.replace (.toUpperCase name) "_" "-"))
-
-; define clojure constants for every configuration parameter
-(doseq [f (seq (.getFields Config))]
-  (let [name (.getName f)
-        new-name (clojure-config-name name)]
-    (eval
-      `(def ~(symbol new-name) (. Config ~(symbol name))))))
-
-(def ALL-CONFIGS
-  (dofor [f (seq (.getFields Config))]
-         (.get f nil)))
-
-
-(defn cluster-mode
-  [conf & args]
-  (keyword (conf STORM-CLUSTER-MODE)))
-
-(defn local-mode?
-  [conf]
-  (let [mode (conf STORM-CLUSTER-MODE)]
-    (condp = mode
-      "local" true
-      "distributed" false
-      (throw (IllegalArgumentException.
-               (str "Illegal cluster mode in conf: " mode))))))
-
-(defn sampling-rate
-  [conf]
-  (->> (conf TOPOLOGY-STATS-SAMPLE-RATE)
-       (/ 1)
-       int))
-
-(defn mk-stats-sampler
-  [conf]
-  (even-sampler (sampling-rate conf)))
-
-(defn read-default-config
-  []
-  (clojurify-structure (Utils/readDefaultConfig)))
-
-(defn validate-configs-with-schemas
-  [conf]
-  (ConfigValidation/validateFields conf))
-
-(defn read-storm-config
-  []
-  (let [conf (clojurify-structure (Utils/readStormConfig))]
-    (validate-configs-with-schemas conf)
-    conf))
-
-(defn read-yaml-config
-  ([name must-exist]
-     (let [conf (clojurify-structure (Utils/findAndReadConfigFile name must-exist))]
-       (validate-configs-with-schemas conf)
-       conf))
-  ([name]
-     (read-yaml-config true)))
-
-(defn absolute-storm-local-dir [conf]
-  (let [storm-home (System/getProperty "storm.home")
-        path (conf STORM-LOCAL-DIR)]
-    (if path
-      (if (is-absolute-path? path) path (str storm-home file-path-separator path))
-      (str storm-home file-path-separator "storm-local"))))
-
-(def LOG-DIR
-  (.getCanonicalPath
-    (clojure.java.io/file (or (System/getProperty "storm.log.dir")
-                              (get (read-storm-config) "storm.log.dir")
-                              (str (System/getProperty "storm.home") file-path-separator "logs")))))
-
-(defn absolute-healthcheck-dir [conf]
-  (let [storm-home (System/getProperty "storm.home")
-        path (conf STORM-HEALTH-CHECK-DIR)]
-    (if path
-      (if (is-absolute-path? path) path (str storm-home file-path-separator path))
-      (str storm-home file-path-separator "healthchecks"))))
-
-(defn master-local-dir
-  [conf]
-  (let [ret (str (absolute-storm-local-dir conf) file-path-separator "nimbus")]
-    (FileUtils/forceMkdir (File. ret))
-    ret))
-
-(defn master-stormjar-key
-  [topology-id]
-  (str topology-id "-stormjar.jar"))
-
-(defn master-stormcode-key
-  [topology-id]
-  (str topology-id "-stormcode.ser"))
-
-(defn master-stormconf-key
-  [topology-id]
-  (str topology-id "-stormconf.ser"))
-
-(defn master-stormdist-root
-  ([conf]
-   (str (master-local-dir conf) file-path-separator "stormdist"))
-  ([conf storm-id]
-   (str (master-stormdist-root conf) file-path-separator storm-id)))
-
-(defn master-tmp-dir
-  [conf]
-  (let [ret (str (master-local-dir conf) file-path-separator "tmp")]
-    (FileUtils/forceMkdir (File. ret))
-    ret ))
-
-(defn read-supervisor-storm-conf-given-path
-  [conf stormconf-path]
-  (merge conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path))))))
-
-(defn master-storm-metafile-path [stormroot ]
-  (str stormroot file-path-separator "storm-code-distributor.meta"))
-
-(defn master-stormjar-path
-  [stormroot]
-  (str stormroot file-path-separator "stormjar.jar"))
-
-(defn master-stormcode-path
-  [stormroot]
-  (str stormroot file-path-separator "stormcode.ser"))
-
-(defn master-stormconf-path
-  [stormroot]
-  (str stormroot file-path-separator "stormconf.ser"))
-
-(defn master-inbox
-  [conf]
-  (let [ret (str (master-local-dir conf) file-path-separator "inbox")]
-    (FileUtils/forceMkdir (File. ret))
-    ret ))
-
-(defn master-inimbus-dir
-  [conf]
-  (str (master-local-dir conf) file-path-separator "inimbus"))
-
-(defn supervisor-local-dir
-  [conf]
-  (let [ret (str (absolute-storm-local-dir conf) file-path-separator "supervisor")]
-    (FileUtils/forceMkdir (File. ret))
-    ret))
-
-(defn supervisor-isupervisor-dir
-  [conf]
-  (str (supervisor-local-dir conf) file-path-separator "isupervisor"))
-
-(defn supervisor-stormdist-root
-  ([conf]
-   (str (supervisor-local-dir conf) file-path-separator "stormdist"))
-  ([conf storm-id]
-   (str (supervisor-stormdist-root conf) file-path-separator (url-encode storm-id))))
-
-(defn supervisor-stormjar-path [stormroot]
-  (str stormroot file-path-separator "stormjar.jar"))
-
-(defn supervisor-storm-metafile-path [stormroot]
-  (str stormroot file-path-separator "storm-code-distributor.meta"))
-
-(defn supervisor-stormcode-path
-  [stormroot]
-  (str stormroot file-path-separator "stormcode.ser"))
-
-(defn supervisor-stormconf-path
-  [stormroot]
-  (str stormroot file-path-separator "stormconf.ser"))
-
-(defn supervisor-tmp-dir
-  [conf]
-  (let [ret (str (supervisor-local-dir conf) file-path-separator "tmp")]
-    (FileUtils/forceMkdir (File. ret))
-    ret ))
-
-(defn supervisor-storm-resources-path
-  [stormroot]
-  (str stormroot file-path-separator RESOURCES-SUBDIR))
-
-(defn ^LocalState supervisor-state
-  [conf]
-  (LocalState. (str (supervisor-local-dir conf) file-path-separator "localstate")))
-
-(defn ^LocalState nimbus-topo-history-state
-  [conf]
-  (LocalState. (str (master-local-dir conf) file-path-separator "history")))
-
-(defn read-supervisor-storm-conf
-  [conf storm-id]
-  (let [stormroot (supervisor-stormdist-root conf storm-id)
-        conf-path (supervisor-stormconf-path stormroot)]
-    (read-supervisor-storm-conf-given-path conf conf-path)))
-
-(defn read-supervisor-topology
-  [conf storm-id]
-  (let [stormroot (supervisor-stormdist-root conf storm-id)
-        topology-path (supervisor-stormcode-path stormroot)]
-    (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)) StormTopology)
-    ))
-
-(defn worker-user-root [conf]
-  (str (absolute-storm-local-dir conf) "/workers-users"))
-
-(defn worker-user-file [conf worker-id]
-  (str (worker-user-root conf) "/" worker-id))
-
-(defn get-worker-user [conf worker-id]
-  (log-message "GET worker-user " worker-id)
-  (try
-    (str/trim (slurp (worker-user-file conf worker-id)))
-  (catch IOException e
-    (log-warn-error e "Failed to get worker user for " worker-id ".")
-    nil
-    )))
-
-(defn get-id-from-blob-key
-  [key]
-  (if-let [groups (re-find #"^(.*)((-stormjar\.jar)|(-stormcode\.ser)|(-stormconf\.ser))$" key)]
-    (nth groups 1)))
-
-(defn set-worker-user! [conf worker-id user]
-  (log-message "SET worker-user " worker-id " " user)
-  (let [file (worker-user-file conf worker-id)]
-    (.mkdirs (.getParentFile (File. file)))
-    (spit (worker-user-file conf worker-id) user)))
-
-(defn remove-worker-user! [conf worker-id]
-  (log-message "REMOVE worker-user " worker-id)
-  (.delete (File. (worker-user-file conf worker-id))))
-
-(defn worker-artifacts-root
-  ([conf]
-   (let [workers-artifacts-dir (conf STORM-WORKERS-ARTIFACTS-DIR)]
-     (if workers-artifacts-dir
-       (if (is-absolute-path? workers-artifacts-dir)
-         workers-artifacts-dir
-         (str LOG-DIR file-path-separator workers-artifacts-dir))
-       (str LOG-DIR file-path-separator "workers-artifacts"))))
-  ([conf id]
-   (str (worker-artifacts-root conf) file-path-separator id))
-  ([conf id port]
-   (str (worker-artifacts-root conf id) file-path-separator port)))
-
-(defn worker-artifacts-pid-path
-  [conf id port]
-  (str (worker-artifacts-root conf id port) file-path-separator "worker.pid"))
-
-(defn get-log-metadata-file
-  ([fname]
-    (let [[id port & _] (str/split fname (re-pattern file-path-separator))]
-      (get-log-metadata-file (read-storm-config) id port)))
-  ([conf id port]
-    (clojure.java.io/file (str (worker-artifacts-root conf id) file-path-separator port file-path-separator) "worker.yaml")))
-
-(defn get-worker-dir-from-root
-  [log-root id port]
-  (clojure.java.io/file (str log-root file-path-separator id file-path-separator port)))
-
-(defn worker-root
-  ([conf]
-   (str (absolute-storm-local-dir conf) file-path-separator "workers"))
-  ([conf id]
-   (str (worker-root conf) file-path-separator id)))
-
-(defn worker-pids-root
-  [conf id]
-  (str (worker-root conf id) file-path-separator "pids"))
-
-(defn worker-pid-path
-  [conf id pid]
-  (str (worker-pids-root conf id) file-path-separator pid))
-
-(defn worker-heartbeats-root
-  [conf id]
-  (str (worker-root conf id) file-path-separator "heartbeats"))
-
-;; workers heartbeat here with pid and timestamp
-;; if supervisor stops receiving heartbeat, it kills and restarts the process
-;; in local mode, keep a global map of ids to threads for simulating process management
-(defn ^LocalState worker-state
-  [conf id]
-  (LocalState. (worker-heartbeats-root conf id)))
-
-(defn override-login-config-with-system-property [conf]
-  (if-let [login_conf_file (System/getProperty "java.security.auth.login.config")]
-    (assoc conf "java.security.auth.login.config" login_conf_file)
-    conf))
-
-(defn get-topo-logs-users
-  [topology-conf]
-  (sort (distinct (remove nil?
-                    (concat
-                      (topology-conf LOGS-USERS)
-                      (topology-conf TOPOLOGY-USERS))))))
-
-(defn get-topo-logs-groups
-  [topology-conf]
-  (sort (distinct (remove nil?
-                    (concat
-                      (topology-conf LOGS-GROUPS)
-                      (topology-conf TOPOLOGY-GROUPS))))))
-

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/converter.clj b/storm-core/src/clj/backtype/storm/converter.clj
deleted file mode 100644
index 52a1817..0000000
--- a/storm-core/src/clj/backtype/storm/converter.clj
+++ /dev/null
@@ -1,277 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.converter
-  (:import [backtype.storm.generated SupervisorInfo NodeInfo Assignment WorkerResources
-            StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions
-            TopologyActionOptions DebugOptions ProfileRequest])
-  (:use [backtype.storm util stats log])
-  (:require [backtype.storm.daemon [common :as common]]))
-
-(defn thriftify-supervisor-info [supervisor-info]
-  (doto (SupervisorInfo.)
-    (.set_time_secs (long (:time-secs supervisor-info)))
-    (.set_hostname (:hostname supervisor-info))
-    (.set_assignment_id (:assignment-id supervisor-info))
-    (.set_used_ports (map long (:used-ports supervisor-info)))
-    (.set_meta (map long (:meta supervisor-info)))
-    (.set_scheduler_meta (:scheduler-meta supervisor-info))
-    (.set_uptime_secs (long (:uptime-secs supervisor-info)))
-    (.set_version (:version supervisor-info))
-    (.set_resources_map (:resources-map supervisor-info))
-    ))
-
-(defn clojurify-supervisor-info [^SupervisorInfo supervisor-info]
-  (if supervisor-info
-    (backtype.storm.daemon.common.SupervisorInfo.
-      (.get_time_secs supervisor-info)
-      (.get_hostname supervisor-info)
-      (.get_assignment_id supervisor-info)
-      (if (.get_used_ports supervisor-info) (into [] (.get_used_ports supervisor-info)))
-      (if (.get_meta supervisor-info) (into [] (.get_meta supervisor-info)))
-      (if (.get_scheduler_meta supervisor-info) (into {} (.get_scheduler_meta supervisor-info)))
-      (.get_uptime_secs supervisor-info)
-      (.get_version supervisor-info)
-      (if-let [res-map (.get_resources_map supervisor-info)] (into {} res-map)))))
-
-(defn thriftify-assignment [assignment]
-  (let [thrift-assignment (doto (Assignment.)
-                            (.set_master_code_dir (:master-code-dir assignment))
-                            (.set_node_host (:node->host assignment))
-                            (.set_executor_node_port (into {}
-                                                           (map (fn [[k v]]
-                                                                  [(map long k)
-                                                                   (NodeInfo. (first v) (set (map long (rest v))))])
-                                                                (:executor->node+port assignment))))
-                            (.set_executor_start_time_secs
-                              (into {}
-                                    (map (fn [[k v]]
-                                           [(map long k) (long v)])
-                                         (:executor->start-time-secs assignment)))))]
-    (if (:worker->resources assignment)
-      (.set_worker_resources thrift-assignment (into {} (map
-                                                          (fn [[node+port resources]]
-                                                            [(NodeInfo. (first node+port) (set (map long (rest node+port))))
-                                                             (doto (WorkerResources.)
-                                                               (.set_mem_on_heap (first resources))
-                                                               (.set_mem_off_heap (second resources))
-                                                               (.set_cpu (last resources)))])
-                                                          (:worker->resources assignment)))))
-    thrift-assignment))
-
-(defn clojurify-executor->node_port [executor->node_port]
-  (into {}
-    (map-val
-      (fn [nodeInfo]
-        (concat [(.get_node nodeInfo)] (.get_port nodeInfo))) ;nodeInfo should be converted to [node,port1,port2..]
-      (map-key
-        (fn [list-of-executors]
-          (into [] list-of-executors)) ; list of executors must be coverted to clojure vector to ensure it is sortable.
-        executor->node_port))))
-
-(defn clojurify-worker->resources [worker->resources]
-  "convert worker info to be [node, port]
-   convert resources to be [mem_on_heap mem_off_heap cpu]"
-  (into {} (map
-             (fn [[nodeInfo resources]]
-               [(concat [(.get_node nodeInfo)] (.get_port nodeInfo))
-                [(.get_mem_on_heap resources) (.get_mem_off_heap resources) (.get_cpu resources)]])
-             worker->resources)))
-
-(defn clojurify-assignment [^Assignment assignment]
-  (if assignment
-    (backtype.storm.daemon.common.Assignment.
-      (.get_master_code_dir assignment)
-      (into {} (.get_node_host assignment))
-      (clojurify-executor->node_port (into {} (.get_executor_node_port assignment)))
-      (map-key (fn [executor] (into [] executor))
-        (into {} (.get_executor_start_time_secs assignment)))
-      (clojurify-worker->resources (into {} (.get_worker_resources assignment))))))
-
-(defn convert-to-symbol-from-status [status]
-  (condp = status
-    TopologyStatus/ACTIVE {:type :active}
-    TopologyStatus/INACTIVE {:type :inactive}
-    TopologyStatus/REBALANCING {:type :rebalancing}
-    TopologyStatus/KILLED {:type :killed}
-    nil))
-
-(defn- convert-to-status-from-symbol [status]
-  (if status
-    (condp = (:type status)
-      :active TopologyStatus/ACTIVE
-      :inactive TopologyStatus/INACTIVE
-      :rebalancing TopologyStatus/REBALANCING
-      :killed TopologyStatus/KILLED
-      nil)))
-
-(defn clojurify-rebalance-options [^RebalanceOptions rebalance-options]
-  (-> {:action :rebalance}
-    (assoc-non-nil :delay-secs (if (.is_set_wait_secs rebalance-options) (.get_wait_secs rebalance-options)))
-    (assoc-non-nil :num-workers (if (.is_set_num_workers rebalance-options) (.get_num_workers rebalance-options)))
-    (assoc-non-nil :component->executors (if (.is_set_num_executors rebalance-options) (into {} (.get_num_executors rebalance-options))))))
-
-(defn thriftify-rebalance-options [rebalance-options]
-  (if rebalance-options
-    (let [thrift-rebalance-options (RebalanceOptions.)]
-      (if (:delay-secs rebalance-options)
-        (.set_wait_secs thrift-rebalance-options (int (:delay-secs rebalance-options))))
-      (if (:num-workers rebalance-options)
-        (.set_num_workers thrift-rebalance-options (int (:num-workers rebalance-options))))
-      (if (:component->executors rebalance-options)
-        (.set_num_executors thrift-rebalance-options (map-val int (:component->executors rebalance-options))))
-      thrift-rebalance-options)))
-
-(defn clojurify-kill-options [^KillOptions kill-options]
-  (-> {:action :kill}
-    (assoc-non-nil :delay-secs (if (.is_set_wait_secs kill-options) (.get_wait_secs kill-options)))))
-
-(defn thriftify-kill-options [kill-options]
-  (if kill-options
-    (let [thrift-kill-options (KillOptions.)]
-      (if (:delay-secs kill-options)
-        (.set_wait_secs thrift-kill-options (int (:delay-secs kill-options))))
-      thrift-kill-options)))
-
-(defn thriftify-topology-action-options [storm-base]
-  (if (:topology-action-options storm-base)
-    (let [ topology-action-options (:topology-action-options storm-base)
-           action (:action topology-action-options)
-           thrift-topology-action-options (TopologyActionOptions.)]
-      (if (= action :kill)
-        (.set_kill_options thrift-topology-action-options (thriftify-kill-options topology-action-options)))
-      (if (= action :rebalance)
-        (.set_rebalance_options thrift-topology-action-options (thriftify-rebalance-options topology-action-options)))
-      thrift-topology-action-options)))
-
-(defn clojurify-topology-action-options [^TopologyActionOptions topology-action-options]
-  (if topology-action-options
-    (or (and (.is_set_kill_options topology-action-options)
-             (clojurify-kill-options
-               (.get_kill_options topology-action-options)))
-        (and (.is_set_rebalance_options topology-action-options)
-             (clojurify-rebalance-options
-               (.get_rebalance_options topology-action-options))))))
-
-(defn clojurify-debugoptions [^DebugOptions options]
-  (if options
-    {
-      :enable (.is_enable options)
-      :samplingpct (.get_samplingpct options)
-      }
-    ))
-
-(defn thriftify-debugoptions [options]
-  (doto (DebugOptions.)
-    (.set_enable (get options :enable false))
-    (.set_samplingpct (get options :samplingpct 10))))
-
-(defn thriftify-storm-base [storm-base]
-  (doto (StormBase.)
-    (.set_name (:storm-name storm-base))
-    (.set_launch_time_secs (int (:launch-time-secs storm-base)))
-    (.set_status (convert-to-status-from-symbol (:status storm-base)))
-    (.set_num_workers (int (:num-workers storm-base)))
-    (.set_component_executors (map-val int (:component->executors storm-base)))
-    (.set_owner (:owner storm-base))
-    (.set_topology_action_options (thriftify-topology-action-options storm-base))
-    (.set_prev_status (convert-to-status-from-symbol (:prev-status storm-base)))
-    (.set_component_debug (map-val thriftify-debugoptions (:component->debug storm-base)))))
-
-(defn clojurify-storm-base [^StormBase storm-base]
-  (if storm-base
-    (backtype.storm.daemon.common.StormBase.
-      (.get_name storm-base)
-      (.get_launch_time_secs storm-base)
-      (convert-to-symbol-from-status (.get_status storm-base))
-      (.get_num_workers storm-base)
-      (into {} (.get_component_executors storm-base))
-      (.get_owner storm-base)
-      (clojurify-topology-action-options (.get_topology_action_options storm-base))
-      (convert-to-symbol-from-status (.get_prev_status storm-base))
-      (map-val clojurify-debugoptions (.get_component_debug storm-base)))))
-
-(defn thriftify-stats [stats]
-  (if stats
-    (map-val thriftify-executor-stats
-      (map-key #(ExecutorInfo. (int (first %1)) (int (last %1)))
-        stats))
-    {}))
-
-(defn clojurify-stats [stats]
-  (if stats
-    (map-val clojurify-executor-stats
-      (map-key (fn [x] (list (.get_task_start x) (.get_task_end x)))
-        stats))
-    {}))
-
-(defn clojurify-zk-worker-hb [^ClusterWorkerHeartbeat worker-hb]
-  (if worker-hb
-    {:storm-id (.get_storm_id worker-hb)
-     :executor-stats (clojurify-stats (into {} (.get_executor_stats worker-hb)))
-     :uptime (.get_uptime_secs worker-hb)
-     :time-secs (.get_time_secs worker-hb)
-     }
-    {}))
-
-(defn thriftify-zk-worker-hb [worker-hb]
-  (if (not-empty (filter second (:executor-stats worker-hb)))
-    (doto (ClusterWorkerHeartbeat.)
-      (.set_uptime_secs (:uptime worker-hb))
-      (.set_storm_id (:storm-id worker-hb))
-      (.set_executor_stats (thriftify-stats (filter second (:executor-stats worker-hb))))
-      (.set_time_secs (:time-secs worker-hb)))))
-
-(defn clojurify-error [^ErrorInfo error]
-  (if error
-    {
-      :error (.get_error error)
-      :time-secs (.get_error_time_secs error)
-      :host (.get_host error)
-      :port (.get_port error)
-      }
-    ))
-
-(defn thriftify-error [error]
-  (doto (ErrorInfo. (:error error) (:time-secs error))
-    (.set_host (:host error))
-    (.set_port (:port error))))
-
-(defn clojurify-profile-request
-  [^ProfileRequest request]
-  (when request
-    {:host (.get_node (.get_nodeInfo request))
-     :port (first (.get_port (.get_nodeInfo request)))
-     :action     (.get_action request)
-     :timestamp  (.get_time_stamp request)}))
-
-(defn thriftify-profile-request
-  [profile-request]
-  (let [nodeinfo (doto (NodeInfo.)
-                   (.set_node (:host profile-request))
-                   (.set_port (set [(:port profile-request)])))
-        request (ProfileRequest. nodeinfo (:action profile-request))]
-    (.set_time_stamp request (:timestamp profile-request))
-    request))
-
-(defn thriftify-credentials [credentials]
-    (doto (Credentials.)
-      (.set_creds (if credentials credentials {}))))
-
-(defn clojurify-crdentials [^Credentials credentials]
-  (if credentials
-    (into {} (.get_creds credentials))
-    nil
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/acker.clj b/storm-core/src/clj/backtype/storm/daemon/acker.clj
deleted file mode 100644
index ce88d11..0000000
--- a/storm-core/src/clj/backtype/storm/daemon/acker.clj
+++ /dev/null
@@ -1,107 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.daemon.acker
-  (:import [backtype.storm.task OutputCollector TopologyContext IBolt])
-  (:import [backtype.storm.tuple Tuple Fields])
-  (:import [backtype.storm.utils RotatingMap MutableObject])
-  (:import [java.util List Map])
-  (:import [backtype.storm Constants])
-  (:use [backtype.storm config util log])
-  (:gen-class
-   :init init
-   :implements [backtype.storm.task.IBolt]
-   :constructors {[] []}
-   :state state ))
-
-(def ACKER-COMPONENT-ID "__acker")
-(def ACKER-INIT-STREAM-ID "__ack_init")
-(def ACKER-ACK-STREAM-ID "__ack_ack")
-(def ACKER-FAIL-STREAM-ID "__ack_fail")
-
-(defn- update-ack [curr-entry val]
-  (let [old (get curr-entry :val 0)]
-    (assoc curr-entry :val (bit-xor old val))
-    ))
-
-(defn- acker-emit-direct [^OutputCollector collector ^Integer task ^String stream ^List values]
-  (.emitDirect collector task stream values)
-  )
-
-(defn mk-acker-bolt []
-  (let [output-collector (MutableObject.)
-        pending (MutableObject.)]
-    (reify IBolt
-      (^void prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector]
-               (.setObject output-collector collector)
-               (.setObject pending (RotatingMap. 2))
-               )
-      (^void execute [this ^Tuple tuple]
-             (let [^RotatingMap pending (.getObject pending)
-                   stream-id (.getSourceStreamId tuple)]
-               (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID)
-                 (.rotate pending)
-                 (let [id (.getValue tuple 0)
-                       ^OutputCollector output-collector (.getObject output-collector)
-                       curr (.get pending id)
-                       curr (condp = stream-id
-                                ACKER-INIT-STREAM-ID (-> curr
-                                                         (update-ack (.getValue tuple 1))
-                                                         (assoc :spout-task (.getValue tuple 2)))
-                                ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1))
-                                ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
-                   (.put pending id curr)
-                   (when (and curr (:spout-task curr))
-                     (cond (= 0 (:val curr))
-                           (do
-                             (.remove pending id)
-                             (acker-emit-direct output-collector
-                                                (:spout-task curr)
-                                                ACKER-ACK-STREAM-ID
-                                                [id]
-                                                ))
-                           (:failed curr)
-                           (do
-                             (.remove pending id)
-                             (acker-emit-direct output-collector
-                                                (:spout-task curr)
-                                                ACKER-FAIL-STREAM-ID
-                                                [id]
-                                                ))
-                           ))
-                   (.ack output-collector tuple)
-                   ))))
-      (^void cleanup [this]
-        )
-      )))
-
-(defn -init []
-  [[] (container)])
-
-(defn -prepare [this conf context collector]
-  (let [^IBolt ret (mk-acker-bolt)]
-    (container-set! (.state ^backtype.storm.daemon.acker this) ret)
-    (.prepare ret conf context collector)
-    ))
-
-(defn -execute [this tuple]
-  (let [^IBolt delegate (container-get (.state ^backtype.storm.daemon.acker this))]
-    (.execute delegate tuple)
-    ))
-
-(defn -cleanup [this]
-  (let [^IBolt delegate (container-get (.state ^backtype.storm.daemon.acker this))]
-    (.cleanup delegate)
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj b/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
deleted file mode 100644
index 0caa0b9..0000000
--- a/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
+++ /dev/null
@@ -1,98 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.daemon.builtin-metrics
-  (:import [backtype.storm.metric.api CountMetric StateMetric IMetric IStatefulObject])
-  (:import [backtype.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric])
-  (:import [backtype.storm Config])
-  (:use [backtype.storm.stats]))
-
-(defrecord BuiltinSpoutMetrics [^MultiCountStatAndMetric ack-count
-                                ^MultiLatencyStatAndMetric complete-latency
-                                ^MultiCountStatAndMetric fail-count
-                                ^MultiCountStatAndMetric emit-count
-                                ^MultiCountStatAndMetric transfer-count])
-(defrecord BuiltinBoltMetrics [^MultiCountStatAndMetric ack-count
-                               ^MultiLatencyStatAndMetric process-latency
-                               ^MultiCountStatAndMetric fail-count
-                               ^MultiCountStatAndMetric execute-count
-                               ^MultiLatencyStatAndMetric execute-latency
-                               ^MultiCountStatAndMetric emit-count
-                               ^MultiCountStatAndMetric transfer-count])
-(defrecord SpoutThrottlingMetrics [^CountMetric skipped-max-spout
-                                   ^CountMetric skipped-throttle
-                                   ^CountMetric skipped-inactive])
-
-
-(defn make-data [executor-type stats]
-  (condp = executor-type
-    :spout (BuiltinSpoutMetrics. (stats-acked stats)
-                                 (stats-complete-latencies stats)
-                                 (stats-failed stats)
-                                 (stats-emitted stats)
-                                 (stats-transferred stats))
-    :bolt (BuiltinBoltMetrics. (stats-acked stats)
-                               (stats-process-latencies stats)
-                               (stats-failed stats)
-                               (stats-executed stats)
-                               (stats-execute-latencies stats)
-                               (stats-emitted stats)
-                               (stats-transferred stats))))
-
-(defn make-spout-throttling-data []
-  (SpoutThrottlingMetrics. (CountMetric.)
-                           (CountMetric.)
-                           (CountMetric.)))
-
-(defn register-spout-throttling-metrics [throttling-metrics  storm-conf topology-context]
-  (doseq [[kw imetric] throttling-metrics]
-    (.registerMetric topology-context (str "__" (name kw)) imetric
-                     (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
-
-(defn register-all [builtin-metrics  storm-conf topology-context]
-  (doseq [[kw imetric] builtin-metrics]
-    (.registerMetric topology-context (str "__" (name kw)) imetric
-                     (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
-
-(defn register-iconnection-server-metric [server storm-conf topology-context]
-  (if (instance? IStatefulObject server)
-    (.registerMetric topology-context "__recv-iconnection" (StateMetric. server)
-                     (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
-
-(defn register-iconnection-client-metrics [node+port->socket-ref storm-conf topology-context]
-  (.registerMetric topology-context "__send-iconnection"
-    (reify IMetric
-      (^Object getValueAndReset [this]
-        (into {}
-          (map
-            (fn [[node+port ^IStatefulObject connection]] [node+port (.getState connection)])
-            (filter 
-              (fn [[node+port connection]] (instance? IStatefulObject connection))
-              @node+port->socket-ref)))))
-    (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))
- 
-(defn register-queue-metrics [queues storm-conf topology-context]
-  (doseq [[qname q] queues]
-    (.registerMetric topology-context (str "__" (name qname)) (StateMetric. q)
-                     (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
-
-(defn skipped-max-spout! [^SpoutThrottlingMetrics m stats]
-  (-> m .skipped-max-spout (.incrBy (stats-rate stats))))
-
-(defn skipped-throttle! [^SpoutThrottlingMetrics m stats]
-  (-> m .skipped-throttle (.incrBy (stats-rate stats))))
-
-(defn skipped-inactive! [^SpoutThrottlingMetrics m stats]
-  (-> m .skipped-inactive (.incrBy (stats-rate stats))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj
deleted file mode 100644
index 9b3aab3..0000000
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ /dev/null
@@ -1,402 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.daemon.common
-  (:use [backtype.storm log config util])
-  (:import [backtype.storm.generated StormTopology
-            InvalidTopologyException GlobalStreamId]
-           [backtype.storm.utils ThriftTopologyUtils])
-  (:import [backtype.storm.utils Utils])
-  (:import [backtype.storm.task WorkerTopologyContext])
-  (:import [backtype.storm Constants])
-  (:import [backtype.storm.metric SystemBolt])
-  (:import [backtype.storm.metric EventLoggerBolt])
-  (:import [backtype.storm.security.auth IAuthorizer]) 
-  (:import [java.io InterruptedIOException])
-  (:require [clojure.set :as set])  
-  (:require [backtype.storm.daemon.acker :as acker])
-  (:require [backtype.storm.thrift :as thrift])
-  (:require [metrics.reporters.jmx :as jmx]))
-
-(defn start-metrics-reporters []
-  (jmx/start (jmx/reporter {})))
-
-(def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
-(def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
-(def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID)
-(def ACKER-FAIL-STREAM-ID acker/ACKER-FAIL-STREAM-ID)
-
-(def SYSTEM-STREAM-ID "__system")
-
-(def EVENTLOGGER-COMPONENT-ID "__eventlogger")
-(def EVENTLOGGER-STREAM-ID "__eventlog")
-
-(def SYSTEM-COMPONENT-ID Constants/SYSTEM_COMPONENT_ID)
-(def SYSTEM-TICK-STREAM-ID Constants/SYSTEM_TICK_STREAM_ID)
-(def METRICS-STREAM-ID Constants/METRICS_STREAM_ID)
-(def METRICS-TICK-STREAM-ID Constants/METRICS_TICK_STREAM_ID)
-(def CREDENTIALS-CHANGED-STREAM-ID Constants/CREDENTIALS_CHANGED_STREAM_ID)
-
-;; the task id is the virtual port
-;; node->host is here so that tasks know who to talk to just from assignment
-;; this avoid situation where node goes down and task doesn't know what to do information-wise
-(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs worker->resources])
-
-
-;; component->executors is a map from spout/bolt id to number of executors for that component
-(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status component->debug])
-
-(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map])
-
-(defprotocol DaemonCommon
-  (waiting? [this]))
-
-(defrecord ExecutorStats [^long processed
-                          ^long acked
-                          ^long emitted
-                          ^long transferred
-                          ^long failed])
-
-(defn new-executor-stats []
-  (ExecutorStats. 0 0 0 0 0))
-
-(defn get-storm-id [storm-cluster-state storm-name]
-  (let [active-storms (.active-storms storm-cluster-state)]
-    (find-first
-      #(= storm-name (:storm-name (.storm-base storm-cluster-state % nil)))
-      active-storms)
-    ))
-
-(defn topology-bases [storm-cluster-state]
-  (let [active-topologies (.active-storms storm-cluster-state)]
-    (into {} 
-          (dofor [id active-topologies]
-                 [id (.storm-base storm-cluster-state id nil)]
-                 ))
-    ))
-
-(defn validate-distributed-mode! [conf]
-  (if (local-mode? conf)
-      (throw
-        (IllegalArgumentException. "Cannot start server in local mode!"))))
-
-(defmacro defserverfn [name & body]
-  `(let [exec-fn# (fn ~@body)]
-    (defn ~name [& args#]
-      (try-cause
-        (apply exec-fn# args#)
-      (catch InterruptedIOException e#
-        (throw e#))
-      (catch InterruptedException e#
-        (throw e#))
-      (catch Throwable t#
-        (log-error t# "Error on initialization of server " ~(str name))
-        (exit-process! 13 "Error on initialization")
-        )))))
-
-(defn- validate-ids! [^StormTopology topology]
-  (let [sets (map #(.getFieldValue topology %) thrift/STORM-TOPOLOGY-FIELDS)
-        offending (apply any-intersection sets)]
-    (if-not (empty? offending)
-      (throw (InvalidTopologyException.
-              (str "Duplicate component ids: " offending))))
-    (doseq [f thrift/STORM-TOPOLOGY-FIELDS
-            :let [obj-map (.getFieldValue topology f)]]
-      (if-not (ThriftTopologyUtils/isWorkerHook f)
-        (do
-          (doseq [id (keys obj-map)]
-            (if (Utils/isSystemId id)
-              (throw (InvalidTopologyException.
-                       (str id " is not a valid component id")))))
-          (doseq [obj (vals obj-map)
-                  id (-> obj .get_common .get_streams keys)]
-            (if (Utils/isSystemId id)
-              (throw (InvalidTopologyException.
-                       (str id " is not a valid stream id"))))))))))
-
-(defn all-components [^StormTopology topology]
-  (apply merge {}
-    (for [f thrift/STORM-TOPOLOGY-FIELDS]
-      (if-not (ThriftTopologyUtils/isWorkerHook f)
-        (.getFieldValue topology f)))))
-
-(defn component-conf [component]
-  (->> component
-      .get_common
-      .get_json_conf
-      from-json))
-
-(defn validate-basic! [^StormTopology topology]
-  (validate-ids! topology)
-  (doseq [f thrift/SPOUT-FIELDS
-          obj (->> f (.getFieldValue topology) vals)]
-    (if-not (empty? (-> obj .get_common .get_inputs))
-      (throw (InvalidTopologyException. "May not declare inputs for a spout"))))
-  (doseq [[comp-id comp] (all-components topology)
-          :let [conf (component-conf comp)
-                p (-> comp .get_common thrift/parallelism-hint)]]
-    (when (and (> (conf TOPOLOGY-TASKS) 0)
-               p
-               (<= p 0))
-      (throw (InvalidTopologyException. "Number of executors must be greater than 0 when number of tasks is greater than 0"))
-      )))
-
-(defn validate-structure! [^StormTopology topology]
-  ;; validate all the component subscribe from component+stream which actually exists in the topology
-  ;; and if it is a fields grouping, validate the corresponding field exists  
-  (let [all-components (all-components topology)]
-    (doseq [[id comp] all-components
-            :let [inputs (.. comp get_common get_inputs)]]
-      (doseq [[global-stream-id grouping] inputs
-              :let [source-component-id (.get_componentId global-stream-id)
-                    source-stream-id    (.get_streamId global-stream-id)]]
-        (if-not (contains? all-components source-component-id)
-          (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from non-existent component [" source-component-id "]")))
-          (let [source-streams (-> all-components (get source-component-id) .get_common .get_streams)]
-            (if-not (contains? source-streams source-stream-id)
-              (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from non-existent stream: [" source-stream-id "] of component [" source-component-id "]")))
-              (if (= :fields (thrift/grouping-type grouping))
-                (let [grouping-fields (set (.get_fields grouping))
-                      source-stream-fields (-> source-streams (get source-stream-id) .get_output_fields set)
-                      diff-fields (set/difference grouping-fields source-stream-fields)]
-                  (when-not (empty? diff-fields)
-                    (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from stream: [" source-stream-id "] of component [" source-component-id "] with non-existent fields: " diff-fields)))))))))))))
-
-(defn acker-inputs [^StormTopology topology]
-  (let [bolt-ids (.. topology get_bolts keySet)
-        spout-ids (.. topology get_spouts keySet)
-        spout-inputs (apply merge
-                            (for [id spout-ids]
-                              {[id ACKER-INIT-STREAM-ID] ["id"]}
-                              ))
-        bolt-inputs (apply merge
-                           (for [id bolt-ids]
-                             {[id ACKER-ACK-STREAM-ID] ["id"]
-                              [id ACKER-FAIL-STREAM-ID] ["id"]}
-                             ))]
-    (merge spout-inputs bolt-inputs)))
-
-;; the event logger receives inputs from all the spouts and bolts
-;; with a field grouping on component id so that all tuples from a component
-;; goes to same executor and can be viewed via logviewer.
-(defn eventlogger-inputs [^StormTopology topology]
-  (let [bolt-ids (.. topology get_bolts keySet)
-        spout-ids (.. topology get_spouts keySet)
-        spout-inputs (apply merge
-                       (for [id spout-ids]
-                         {[id EVENTLOGGER-STREAM-ID] ["component-id"]}
-                         ))
-        bolt-inputs (apply merge
-                      (for [id bolt-ids]
-                        {[id EVENTLOGGER-STREAM-ID] ["component-id"]}
-                        ))]
-    (merge spout-inputs bolt-inputs)))
-
-(defn add-acker! [storm-conf ^StormTopology ret]
-  (let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS))
-        acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
-                                         (new backtype.storm.daemon.acker)
-                                         {ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"])
-                                          ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"])
-                                          }
-                                         :p num-executors
-                                         :conf {TOPOLOGY-TASKS num-executors
-                                                TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
-    (dofor [[_ bolt] (.get_bolts ret)
-            :let [common (.get_common bolt)]]
-           (do
-             (.put_to_streams common ACKER-ACK-STREAM-ID (thrift/output-fields ["id" "ack-val"]))
-             (.put_to_streams common ACKER-FAIL-STREAM-ID (thrift/output-fields ["id"]))
-             ))
-    (dofor [[_ spout] (.get_spouts ret)
-            :let [common (.get_common spout)
-                  spout-conf (merge
-                               (component-conf spout)
-                               {TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]]
-      (do
-        ;; this set up tick tuples to cause timeouts to be triggered
-        (.set_json_conf common (to-json spout-conf))
-        (.put_to_streams common ACKER-INIT-STREAM-ID (thrift/output-fields ["id" "init-val" "spout-task"]))
-        (.put_to_inputs common
-                        (GlobalStreamId. ACKER-COMPONENT-ID ACKER-ACK-STREAM-ID)
-                        (thrift/mk-direct-grouping))
-        (.put_to_inputs common
-                        (GlobalStreamId. ACKER-COMPONENT-ID ACKER-FAIL-STREAM-ID)
-                        (thrift/mk-direct-grouping))
-        ))
-    (.put_to_bolts ret "__acker" acker-bolt)
-    ))
-
-(defn add-metric-streams! [^StormTopology topology]
-  (doseq [[_ component] (all-components topology)
-          :let [common (.get_common component)]]
-    (.put_to_streams common METRICS-STREAM-ID
-                     (thrift/output-fields ["task-info" "data-points"]))))
-
-(defn add-system-streams! [^StormTopology topology]
-  (doseq [[_ component] (all-components topology)
-          :let [common (.get_common component)]]
-    (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"]))))
-
-
-(defn map-occurrences [afn coll]
-  (->> coll
-       (reduce (fn [[counts new-coll] x]
-                 (let [occurs (inc (get counts x 0))]
-                   [(assoc counts x occurs) (cons (afn x occurs) new-coll)]))
-               [{} []])
-       (second)
-       (reverse)))
-
-(defn number-duplicates
-  "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
-  [coll]
-  (map-occurrences (fn [x occurences] (if (>= occurences 2) (str x "#" occurences) x)) coll))
-
-(defn metrics-consumer-register-ids
-  "Generates a list of component ids for each metrics consumer
-   e.g. [\"__metrics_org.mycompany.MyMetricsConsumer\", ..] "
-  [storm-conf]
-  (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)         
-       (map #(get % "class"))
-       (number-duplicates)
-       (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))
-
-(defn metrics-consumer-bolt-specs [storm-conf topology]
-  (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology)))
-        inputs (->> (for [comp-id component-ids-that-emit-metrics]
-                      {[comp-id METRICS-STREAM-ID] :shuffle})
-                    (into {}))
-        
-        mk-bolt-spec (fn [class arg p]
-                       (thrift/mk-bolt-spec*
-                        inputs
-                        (backtype.storm.metric.MetricsConsumerBolt. class arg)
-                        {} :p p :conf {TOPOLOGY-TASKS p}))]
-    
-    (map
-     (fn [component-id register]           
-       [component-id (mk-bolt-spec (get register "class")
-                                   (get register "argument")
-                                   (or (get register "parallelism.hint") 1))])
-     
-     (metrics-consumer-register-ids storm-conf)
-     (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
-
-;; return the fields that event logger bolt expects
-(defn eventlogger-bolt-fields []
-  [(EventLoggerBolt/FIELD_COMPONENT_ID) (EventLoggerBolt/FIELD_MESSAGE_ID)  (EventLoggerBolt/FIELD_TS) (EventLoggerBolt/FIELD_VALUES)]
-  )
-
-(defn add-eventlogger! [storm-conf ^StormTopology ret]
-  (let [num-executors (if (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS))
-        eventlogger-bolt (thrift/mk-bolt-spec* (eventlogger-inputs ret)
-                     (EventLoggerBolt.)
-                     {}
-                     :p num-executors
-                     :conf {TOPOLOGY-TASKS num-executors
-                            TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
-
-    (doseq [[_ component] (all-components ret)
-            :let [common (.get_common component)]]
-      (.put_to_streams common EVENTLOGGER-STREAM-ID (thrift/output-fields (eventlogger-bolt-fields))))
-    (.put_to_bolts ret EVENTLOGGER-COMPONENT-ID eventlogger-bolt)
-    ))
-
-(defn add-metric-components! [storm-conf ^StormTopology topology]  
-  (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)]
-    (.put_to_bolts topology comp-id bolt-spec)))
-
-(defn add-system-components! [conf ^StormTopology topology]
-  (let [system-bolt-spec (thrift/mk-bolt-spec*
-                          {}
-                          (SystemBolt.)
-                          {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
-                           METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])
-                           CREDENTIALS-CHANGED-STREAM-ID (thrift/output-fields ["creds"])}
-                          :p 0
-                          :conf {TOPOLOGY-TASKS 0})]
-    (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))
-
-(defn system-topology! [storm-conf ^StormTopology topology]
-  (validate-basic! topology)
-  (let [ret (.deepCopy topology)]
-    (add-acker! storm-conf ret)
-    (add-eventlogger! storm-conf ret)
-    (add-metric-components! storm-conf ret)
-    (add-system-components! storm-conf ret)
-    (add-metric-streams! ret)
-    (add-system-streams! ret)
-    (validate-structure! ret)
-    ret
-    ))
-
-(defn has-ackers? [storm-conf]
-  (or (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (> (storm-conf TOPOLOGY-ACKER-EXECUTORS) 0)))
-
-(defn has-eventloggers? [storm-conf]
-  (or (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (> (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS) 0)))
-
-(defn num-start-executors [component]
-  (thrift/parallelism-hint (.get_common component)))
-
-(defn storm-task-info
-  "Returns map from task -> component id"
-  [^StormTopology user-topology storm-conf]
-  (->> (system-topology! storm-conf user-topology)
-       all-components
-       (map-val (comp #(get % TOPOLOGY-TASKS) component-conf))
-       (sort-by first)
-       (mapcat (fn [[c num-tasks]] (repeat num-tasks c)))
-       (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
-       (into {})
-       ))
-
-(defn executor-id->tasks [[first-task-id last-task-id]]
-  (->> (range first-task-id (inc last-task-id))
-       (map int)))
-
-(defn worker-context [worker]
-  (WorkerTopologyContext. (:system-topology worker)
-                          (:storm-conf worker)
-                          (:task->component worker)
-                          (:component->sorted-tasks worker)
-                          (:component->stream->fields worker)
-                          (:storm-id worker)
-                          (supervisor-storm-resources-path
-                            (supervisor-stormdist-root (:conf worker) (:storm-id worker)))
-                          (worker-pids-root (:conf worker) (:worker-id worker))
-                          (:port worker)
-                          (:task-ids worker)
-                          (:default-shared-resources worker)
-                          (:user-shared-resources worker)
-                          ))
-
-
-(defn to-task->node+port [executor->node+port]
-  (->> executor->node+port
-       (mapcat (fn [[e node+port]] (for [t (executor-id->tasks e)] [t node+port])))
-       (into {})))
-
-(defn mk-authorization-handler [klassname conf]
-  (let [aznClass (if klassname (Class/forName klassname))
-        aznHandler (if aznClass (.newInstance aznClass))] 
-    (if aznHandler (.prepare ^IAuthorizer aznHandler conf))
-    (log-debug "authorization class name:" klassname
-                 " class:" aznClass
-                 " handler:" aznHandler)
-    aznHandler
-  )) 
-