You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:21 UTC

[25/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
deleted file mode 100644
index 0cb2f52..0000000
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ /dev/null
@@ -1,701 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.testing
-  (:require [backtype.storm.daemon
-             [nimbus :as nimbus]
-             [supervisor :as supervisor]
-             [common :as common]
-             [worker :as worker]
-             [executor :as executor]])
-  (:require [backtype.storm [process-simulator :as psim]])
-  (:import [org.apache.commons.io FileUtils])
-  (:import [java.io File])
-  (:import [java.util HashMap ArrayList])
-  (:import [java.util.concurrent.atomic AtomicInteger])
-  (:import [java.util.concurrent ConcurrentHashMap])
-  (:import [backtype.storm.utils Time Utils RegisteredGlobalState])
-  (:import [backtype.storm.tuple Fields Tuple TupleImpl])
-  (:import [backtype.storm.task TopologyContext])
-  (:import [backtype.storm.generated GlobalStreamId Bolt KillOptions])
-  (:import [backtype.storm.testing FeederSpout FixedTupleSpout FixedTuple
-            TupleCaptureBolt SpoutTracker BoltTracker NonRichBoltTracker
-            TestWordSpout MemoryTransactionalSpout])
-  (:import [backtype.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils])
-  (:import [backtype.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo
-            ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus
-            KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo
-            ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice])
-  (:import [backtype.storm.transactional TransactionalSpoutCoordinator])
-  (:import [backtype.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor])
-  (:import [backtype.storm.tuple Tuple])
-  (:import [backtype.storm.generated StormTopology])
-  (:import [backtype.storm.task TopologyContext])
-  (:require [backtype.storm [zookeeper :as zk]])
-  (:require [backtype.storm.messaging.loader :as msg-loader])
-  (:require [backtype.storm.daemon.acker :as acker])
-  (:use [backtype.storm cluster util thrift config log local-state]))
-
-(defn feeder-spout
-  [fields]
-  (FeederSpout. (Fields. fields)))
-
-(defn local-temp-path
-  []
-  (str (System/getProperty "java.io.tmpdir") (if-not on-windows? "/") (uuid)))
-
-(defn delete-all
-  [paths]
-  (dorun
-    (for [t paths]
-      (if (.exists (File. t))
-        (try
-          (FileUtils/forceDelete (File. t))
-          (catch Exception e
-            (log-message (.getMessage e))))))))
-
-(defmacro with-local-tmp
-  [[& tmp-syms] & body]
-  (let [tmp-paths (mapcat (fn [t] [t `(local-temp-path)]) tmp-syms)]
-    `(let [~@tmp-paths]
-       (try
-         ~@body
-         (finally
-           (delete-all ~(vec tmp-syms)))))))
-
-(defn start-simulating-time!
-  []
-  (Time/startSimulating))
-
-(defn stop-simulating-time!
-  []
-  (Time/stopSimulating))
-
- (defmacro with-simulated-time
-   [& body]
-   `(try
-     (start-simulating-time!)
-     ~@body
-     (finally
-       (stop-simulating-time!))))
-
-(defn advance-time-ms! [ms]
-  (Time/advanceTime ms))
-
-(defn advance-time-secs! [secs]
-  (advance-time-ms! (* (long secs) 1000)))
-
-(defnk add-supervisor
-  [cluster-map :ports 2 :conf {} :id nil]
-  (let [tmp-dir (local-temp-path)
-        port-ids (if (sequential? ports)
-                   ports
-                   (doall (repeatedly ports (:port-counter cluster-map))))
-        supervisor-conf (merge (:daemon-conf cluster-map)
-                               conf
-                               {STORM-LOCAL-DIR tmp-dir
-                                SUPERVISOR-SLOTS-PORTS port-ids})
-        id-fn (if id (fn [] id) supervisor/generate-supervisor-id)
-        daemon (with-var-roots [supervisor/generate-supervisor-id id-fn] (supervisor/mk-supervisor supervisor-conf (:shared-context cluster-map) (supervisor/standalone-supervisor)))]
-    (swap! (:supervisors cluster-map) conj daemon)
-    (swap! (:tmp-dirs cluster-map) conj tmp-dir)
-    daemon))
-
-(defn mk-shared-context [conf]
-  (if-not (conf STORM-LOCAL-MODE-ZMQ)
-    (msg-loader/mk-local-context)))
-
-(defn start-nimbus-daemon [conf nimbus]
-  (let [server (ThriftServer. conf (Nimbus$Processor. nimbus)
-                              ThriftConnectionType/NIMBUS)
-        nimbus-thread (Thread. (fn [] (.serve server)))]
-    (log-message "Starting Nimbus server...")
-    (.start nimbus-thread)
-    server))
-
-
-;; returns map containing cluster info
-;; local dir is always overridden in maps
-;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
-;; if need to customize amt of ports more, can use add-supervisor calls afterwards
-(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
-  (let [zk-tmp (local-temp-path)
-        [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
-                              (zk/mk-inprocess-zookeeper zk-tmp))
-        daemon-conf (merge (read-storm-config)
-                           {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
-                            ZMQ-LINGER-MILLIS 0
-                            TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
-                            TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
-                            STORM-CLUSTER-MODE "local"
-                            BLOBSTORE-SUPERUSER (System/getProperty "user.name")}
-                           (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
-                             {STORM-ZOOKEEPER-PORT zk-port
-                              STORM-ZOOKEEPER-SERVERS ["localhost"]})
-                           daemon-conf)
-        nimbus-tmp (local-temp-path)
-        port-counter (mk-counter supervisor-slot-port-min)
-        nimbus (nimbus/service-handler
-                (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
-                (if inimbus inimbus (nimbus/standalone-nimbus)))
-        context (mk-shared-context daemon-conf)
-        nimbus-thrift-server (if nimbus-daemon (start-nimbus-daemon daemon-conf nimbus) nil)
-        cluster-map {:nimbus nimbus
-                     :port-counter port-counter
-                     :daemon-conf daemon-conf
-                     :supervisors (atom [])
-                     :state (mk-distributed-cluster-state daemon-conf)
-                     :storm-cluster-state (mk-storm-cluster-state daemon-conf)
-                     :tmp-dirs (atom [nimbus-tmp zk-tmp])
-                     :zookeeper (if (not-nil? zk-handle) zk-handle)
-                     :shared-context context
-                     :nimbus-thrift-server nimbus-thrift-server}
-        supervisor-confs (if (sequential? supervisors)
-                           supervisors
-                           (repeat supervisors {}))]
-
-    (doseq [sc supervisor-confs]
-      (add-supervisor cluster-map :ports ports-per-supervisor :conf sc))
-    cluster-map))
-
-(defn get-supervisor [cluster-map supervisor-id]
-  (let [finder-fn #(= (.get-id %) supervisor-id)]
-    (find-first finder-fn @(:supervisors cluster-map))))
-
-(defn kill-supervisor [cluster-map supervisor-id]
-  (let [finder-fn #(= (.get-id %) supervisor-id)
-        supervisors @(:supervisors cluster-map)
-        sup (find-first finder-fn
-                        supervisors)]
-    ;; tmp-dir will be taken care of by shutdown
-    (reset! (:supervisors cluster-map) (remove-first finder-fn supervisors))
-    (.shutdown sup)))
-
-(defn kill-local-storm-cluster [cluster-map]
-  (.shutdown (:nimbus cluster-map))
-  (if (not-nil? (:nimbus-thrift-server cluster-map))
-    (do
-      (log-message "shutting down thrift server")
-      (try
-        (.stop (:nimbus-thrift-server cluster-map))
-        (catch Exception e (log-message "failed to stop thrift")))
-      ))
-  (.close (:state cluster-map))
-  (.disconnect (:storm-cluster-state cluster-map))
-  (doseq [s @(:supervisors cluster-map)]
-    (.shutdown-all-workers s)
-    ;; race condition here? will it launch the workers again?
-    (supervisor/kill-supervisor s))
-  (psim/kill-all-processes)
-  (if (not-nil? (:zookeeper cluster-map))
-    (do
-      (log-message "Shutting down in process zookeeper")
-      (zk/shutdown-inprocess-zookeeper (:zookeeper cluster-map))
-      (log-message "Done shutting down in process zookeeper")))
-  (doseq [t @(:tmp-dirs cluster-map)]
-    (log-message "Deleting temporary path " t)
-    (try
-      (rmr t)
-      ;; on windows, the host process still holds lock on the logfile
-      (catch Exception e (log-message (.getMessage e)))) ))
-
-(def TEST-TIMEOUT-MS
-  (let [timeout (System/getenv "STORM_TEST_TIMEOUT_MS")]
-    (parse-int (if timeout timeout "5000"))))
-
-(defmacro while-timeout [timeout-ms condition & body]
-  `(let [end-time# (+ (System/currentTimeMillis) ~timeout-ms)]
-     (log-debug "Looping until " '~condition)
-     (while ~condition
-       (when (> (System/currentTimeMillis) end-time#)
-         (let [thread-dump# (Utils/threadDump)]
-           (log-message "Condition " '~condition  " not met in " ~timeout-ms "ms")
-           (log-message thread-dump#)
-           (throw (AssertionError. (str "Test timed out (" ~timeout-ms "ms) " '~condition)))))
-       ~@body)
-     (log-debug "Condition met " '~condition)))
-
-(defn wait-for-condition
-  ([apredicate]
-    (wait-for-condition TEST-TIMEOUT-MS apredicate))
-  ([timeout-ms apredicate]
-    (while-timeout timeout-ms (not (apredicate))
-      (Time/sleep 100))))
-
-(defn wait-until-cluster-waiting
-  "Wait until the cluster is idle. Should be used with time simulation."
-  ([cluster-map] (wait-until-cluster-waiting cluster-map TEST-TIMEOUT-MS))
-  ([cluster-map timeout-ms]
-  ;; wait until all workers, supervisors, and nimbus is waiting
-  (let [supervisors @(:supervisors cluster-map)
-        workers (filter (partial satisfies? common/DaemonCommon) (psim/all-processes))
-        daemons (concat
-                  [(:nimbus cluster-map)]
-                  supervisors
-                  ; because a worker may already be dead
-                  workers)]
-    (while-timeout timeout-ms (not (every? (memfn waiting?) daemons))
-                   (Thread/sleep (rand-int 20))
-                   ;;      (doseq [d daemons]
-                   ;;        (if-not ((memfn waiting?) d)
-                   ;;          (println d)))
-                   ))))
-
-(defn advance-cluster-time
-  ([cluster-map secs increment-secs]
-   (loop [left secs]
-     (when (> left 0)
-       (let [diff (min left increment-secs)]
-         (advance-time-secs! diff)
-         (wait-until-cluster-waiting cluster-map)
-         (recur (- left diff))))))
-  ([cluster-map secs]
-   (advance-cluster-time cluster-map secs 1)))
-
-(defmacro with-local-cluster
-  [[cluster-sym & args] & body]
-  `(let [~cluster-sym (mk-local-storm-cluster ~@args)]
-     (try
-       ~@body
-       (catch Throwable t#
-         (log-error t# "Error in cluster")
-         (throw t#))
-       (finally
-         (let [keep-waiting?# (atom true)
-               f# (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))]
-           (kill-local-storm-cluster ~cluster-sym)
-           (reset! keep-waiting?# false)
-            @f#)))))
-
-(defmacro with-simulated-time-local-cluster
-  [& args]
-  `(with-simulated-time
-     (with-local-cluster ~@args)))
-
-(defmacro with-inprocess-zookeeper
-  [port-sym & body]
-  `(with-local-tmp [tmp#]
-                   (let [[~port-sym zks#] (zk/mk-inprocess-zookeeper tmp#)]
-                     (try
-                       ~@body
-                       (finally
-                         (zk/shutdown-inprocess-zookeeper zks#))))))
-
-(defn submit-local-topology
-  [nimbus storm-name conf topology]
-  (when-not (Utils/isValidConf conf)
-    (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
-  (.submitTopology nimbus storm-name nil (to-json conf) topology))
-
-(defn submit-local-topology-with-opts
-  [nimbus storm-name conf topology submit-opts]
-  (when-not (Utils/isValidConf conf)
-    (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
-  (.submitTopologyWithOpts nimbus storm-name nil (to-json conf) topology submit-opts))
-
-(defn mocked-convert-assignments-to-worker->resources [storm-cluster-state storm-name worker->resources]
-  (fn [existing-assignments]
-    (let [topology-id (common/get-storm-id storm-cluster-state storm-name)
-          existing-assignments (into {} (for [[tid assignment] existing-assignments]
-                                          {tid (:worker->resources assignment)}))
-          new-assignments (assoc existing-assignments topology-id worker->resources)]
-      new-assignments)))
-
-(defn mocked-compute-new-topology->executor->node+port [storm-cluster-state storm-name executor->node+port]
-  (fn [new-scheduler-assignments existing-assignments]
-    (let [topology-id (common/get-storm-id storm-cluster-state storm-name)
-          existing-assignments (into {} (for [[tid assignment] existing-assignments]
-                                          {tid (:executor->node+port assignment)}))
-          new-assignments (assoc existing-assignments topology-id executor->node+port)]
-      new-assignments)))
-
-(defn mocked-compute-new-scheduler-assignments []
-  (fn [nimbus existing-assignments topologies scratch-topology-id]
-    existing-assignments))
-
-(defn submit-mocked-assignment
-  [nimbus storm-cluster-state storm-name conf topology task->component executor->node+port worker->resources]
-  (with-var-roots [common/storm-task-info (fn [& ignored] task->component)
-                   nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments)
-                   nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources
-                                                          storm-cluster-state
-                                                          storm-name
-                                                          worker->resources)
-                   nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port
-                                                                      storm-cluster-state
-                                                                      storm-name
-                                                                      executor->node+port)]
-    (submit-local-topology nimbus storm-name conf topology)))
-
-(defn mk-capture-launch-fn [capture-atom]
-  (fn [supervisor storm-id port worker-id mem-onheap]
-    (let [supervisor-id (:supervisor-id supervisor)
-          conf (:conf supervisor)
-          existing (get @capture-atom [supervisor-id port] [])]
-      (set-worker-user! conf worker-id "")
-      (swap! capture-atom assoc [supervisor-id port] (conj existing storm-id)))))
-
-(defn find-worker-id
-  [supervisor-conf port]
-  (let [supervisor-state (supervisor-state supervisor-conf)
-        worker->port (ls-approved-workers supervisor-state)]
-    (first ((reverse-map worker->port) port))))
-
-(defn find-worker-port
-  [supervisor-conf worker-id]
-  (let [supervisor-state (supervisor-state supervisor-conf)
-        worker->port (ls-approved-workers supervisor-state)]
-    (worker->port worker-id)))
-
-(defn mk-capture-shutdown-fn
-  [capture-atom]
-  (let [existing-fn supervisor/shutdown-worker]
-    (fn [supervisor worker-id]
-      (let [conf (:conf supervisor)
-            supervisor-id (:supervisor-id supervisor)
-            port (find-worker-port conf worker-id)
-            existing (get @capture-atom [supervisor-id port] 0)]
-        (swap! capture-atom assoc [supervisor-id port] (inc existing))
-        (existing-fn supervisor worker-id)))))
-
-(defmacro capture-changed-workers
-  [& body]
-  `(let [launch-captured# (atom {})
-         shutdown-captured# (atom {})]
-     (with-var-roots [supervisor/launch-worker (mk-capture-launch-fn launch-captured#)
-                      supervisor/shutdown-worker (mk-capture-shutdown-fn shutdown-captured#)]
-                     ~@body
-                     {:launched @launch-captured#
-                      :shutdown @shutdown-captured#})))
-
-(defmacro capture-launched-workers
-  [& body]
-  `(:launched (capture-changed-workers ~@body)))
-
-(defmacro capture-shutdown-workers
-  [& body]
-  `(:shutdown (capture-changed-workers ~@body)))
-
-(defnk aggregated-stat
-  [cluster-map storm-name stat-key :component-ids nil]
-  (let [state (:storm-cluster-state cluster-map)
-        nimbus (:nimbus cluster-map)
-        storm-id (common/get-storm-id state storm-name)
-        component->tasks (reverse-map
-                           (common/storm-task-info
-                             (.getUserTopology nimbus storm-id)
-                             (from-json (.getTopologyConf nimbus storm-id))))
-        component->tasks (if component-ids
-                           (select-keys component->tasks component-ids)
-                           component->tasks)
-        task-ids (apply concat (vals component->tasks))
-        assignment (.assignment-info state storm-id nil)
-        taskbeats (.taskbeats state storm-id (:task->node+port assignment))
-        heartbeats (dofor [id task-ids] (get taskbeats id))
-        stats (dofor [hb heartbeats] (if hb (stat-key (:stats hb)) 0))]
-    (reduce + stats)))
-
-(defn emitted-spout-tuples
-  [cluster-map topology storm-name]
-  (aggregated-stat
-    cluster-map
-    storm-name
-    :emitted
-    :component-ids (keys (.get_spouts topology))))
-
-(defn transferred-tuples
-  [cluster-map storm-name]
-  (aggregated-stat cluster-map storm-name :transferred))
-
-(defn acked-tuples
-  [cluster-map storm-name]
-  (aggregated-stat cluster-map storm-name :acked))
-
-(defn simulate-wait
-  [cluster-map]
-  (if (Time/isSimulating)
-    (advance-cluster-time cluster-map 10)
-    (Thread/sleep 100)))
-
-(defprotocol CompletableSpout
-  (exhausted?
-    [this]
-    "Whether all the tuples for this spout have been completed.")
-  (cleanup
-    [this]
-    "Cleanup any global state kept")
-  (startup
-    [this]
-    "Prepare the spout (globally) before starting the topology"))
-
-(extend-type FixedTupleSpout
-  CompletableSpout
-  (exhausted? [this]
-              (= (-> this .getSourceTuples count)
-                 (.getCompleted this)))
-  (cleanup [this]
-           (.cleanup this))
-  (startup [this]))
-
-(extend-type TransactionalSpoutCoordinator
-  CompletableSpout
-  (exhausted? [this]
-              (exhausted? (.getSpout this)))
-  (cleanup [this]
-           (cleanup (.getSpout this)))
-  (startup [this]
-           (startup (.getSpout this))))
-
-(extend-type PartitionedTransactionalSpoutExecutor
-  CompletableSpout
-  (exhausted? [this]
-              (exhausted? (.getPartitionedSpout this)))
-  (cleanup [this]
-           (cleanup (.getPartitionedSpout this)))
-  (startup [this]
-           (startup (.getPartitionedSpout this))))
-
-(extend-type MemoryTransactionalSpout
-  CompletableSpout
-  (exhausted? [this]
-              (.isExhaustedTuples this))
-  (cleanup [this]
-           (.cleanup this))
-  (startup [this]
-           (.startup this)))
-
-(defn spout-objects [spec-map]
-  (for [[_ spout-spec] spec-map]
-    (-> spout-spec
-        .get_spout_object
-        deserialized-component-object)))
-
-(defn capture-topology
-  [topology]
-  (let [topology (.deepCopy topology)
-        spouts (.get_spouts topology)
-        bolts (.get_bolts topology)
-        all-streams (apply concat
-                           (for [[id spec] (merge (clojurify-structure spouts)
-                                                  (clojurify-structure bolts))]
-                             (for [[stream info] (.. spec get_common get_streams)]
-                               [(GlobalStreamId. id stream) (.is_direct info)])))
-        capturer (TupleCaptureBolt.)]
-    (.set_bolts topology
-                (assoc (clojurify-structure bolts)
-                  (uuid)
-                  (Bolt.
-                    (serialize-component-object capturer)
-                    (mk-plain-component-common (into {} (for [[id direct?] all-streams]
-                                                          [id (if direct?
-                                                                (mk-direct-grouping)
-                                                                (mk-global-grouping))]))
-                                               {}
-                                               nil))))
-    {:topology topology
-     :capturer capturer}))
-
-;; TODO: mock-sources needs to be able to mock out state spouts as well
-(defnk complete-topology
-  [cluster-map topology
-   :mock-sources {}
-   :storm-conf {}
-   :cleanup-state true
-   :topology-name nil
-   :timeout-ms TEST-TIMEOUT-MS]
-  ;; TODO: the idea of mocking for transactional topologies should be done an
-  ;; abstraction level above... should have a complete-transactional-topology for this
-  (let [{topology :topology capturer :capturer} (capture-topology topology)
-        storm-name (or topology-name (str "topologytest-" (uuid)))
-        state (:storm-cluster-state cluster-map)
-        spouts (.get_spouts topology)
-        replacements (map-val (fn [v]
-                                (FixedTupleSpout.
-                                  (for [tup v]
-                                    (if (map? tup)
-                                      (FixedTuple. (:stream tup) (:values tup))
-                                      tup))))
-                              mock-sources)]
-    (doseq [[id spout] replacements]
-      (let [spout-spec (get spouts id)]
-        (.set_spout_object spout-spec (serialize-component-object spout))))
-    (doseq [spout (spout-objects spouts)]
-      (when-not (extends? CompletableSpout (.getClass spout))
-        (throw (RuntimeException. (str "Cannot complete topology unless every spout is a CompletableSpout (or mocked to be); failed by " spout)))))
-
-    (doseq [spout (spout-objects spouts)]
-      (startup spout))
-
-    (submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
-    (advance-cluster-time cluster-map 11)
-
-    (let [storm-id (common/get-storm-id state storm-name)]
-      ;;Give the topology time to come up without using it to wait for the spouts to complete
-      (simulate-wait cluster-map)
-
-      (while-timeout timeout-ms (not (every? exhausted? (spout-objects spouts)))
-                     (simulate-wait cluster-map))
-
-      (.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
-      (while-timeout timeout-ms (.assignment-info state storm-id nil)
-                     (simulate-wait cluster-map))
-      (when cleanup-state
-        (doseq [spout (spout-objects spouts)]
-          (cleanup spout))))
-
-    (if cleanup-state
-      (.getAndRemoveResults capturer)
-      (.getAndClearResults capturer))))
-
-(defn read-tuples
-  ([results component-id stream-id]
-   (let [fixed-tuples (get results component-id [])]
-     (mapcat
-       (fn [ft]
-         (if (= stream-id (. ft stream))
-           [(vec (. ft values))]))
-       fixed-tuples)
-     ))
-  ([results component-id]
-   (read-tuples results component-id Utils/DEFAULT_STREAM_ID)))
-
-(defn ms=
-  [& args]
-  (apply = (map multi-set args)))
-
-(def TRACKER-BOLT-ID "+++tracker-bolt")
-
-;; TODO: should override system-topology! and wrap everything there
-(defn mk-tracked-topology
-  ([tracked-cluster topology]
-   (let [track-id (::track-id tracked-cluster)
-         ret (.deepCopy topology)]
-     (dofor [[_ bolt] (.get_bolts ret)
-             :let [obj (deserialized-component-object (.get_bolt_object bolt))]]
-            (.set_bolt_object bolt (serialize-component-object
-                                     (BoltTracker. obj track-id))))
-     (dofor [[_ spout] (.get_spouts ret)
-             :let [obj (deserialized-component-object (.get_spout_object spout))]]
-            (.set_spout_object spout (serialize-component-object
-                                       (SpoutTracker. obj track-id))))
-     {:topology ret
-      :last-spout-emit (atom 0)
-      :cluster tracked-cluster})))
-
-(defn assoc-track-id
-  [cluster track-id]
-  (assoc cluster ::track-id track-id))
-
-(defn increment-global!
-  [id key amt]
-  (-> (RegisteredGlobalState/getState id)
-      (get key)
-      (.addAndGet amt)))
-
-(defn global-amt
-  [id key]
-  (-> (RegisteredGlobalState/getState id)
-      (get key)
-      .get))
-
-(defmacro with-tracked-cluster
-  [[cluster-sym & cluster-args] & body]
-  `(let [id# (uuid)]
-     (RegisteredGlobalState/setState
-       id#
-       (doto (ConcurrentHashMap.)
-         (.put "spout-emitted" (AtomicInteger. 0))
-         (.put "transferred" (AtomicInteger. 0))
-         (.put "processed" (AtomicInteger. 0))))
-     (with-var-roots
-       [acker/mk-acker-bolt
-        (let [old# acker/mk-acker-bolt]
-          (fn [& args#] (NonRichBoltTracker. (apply old# args#) id#)))
-        ;; critical that this particular function is overridden here,
-        ;; since the transferred stat needs to be incremented at the moment
-        ;; of tuple emission (and not on a separate thread later) for
-        ;; topologies to be tracked correctly. This is because "transferred" *must*
-        ;; be incremented before "processing".
-        executor/mk-executor-transfer-fn
-        (let [old# executor/mk-executor-transfer-fn]
-          (fn [& args#]
-            (let [transferrer# (apply old# args#)]
-              (fn [& args2#]
-                ;; (log-message "Transferring: " transfer-args#)
-                (increment-global! id# "transferred" 1)
-                (apply transferrer# args2#)))))]
-       (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
-                           (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
-                             ~@body)))
-     (RegisteredGlobalState/clearState id#)))
-
-(defn tracked-wait
-  "Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
-  ([tracked-topology]
-     (tracked-wait tracked-topology 1 TEST-TIMEOUT-MS))
-  ([tracked-topology amt]
-     (tracked-wait tracked-topology amt TEST-TIMEOUT-MS))
-  ([tracked-topology amt timeout-ms]
-    (let [target (+ amt @(:last-spout-emit tracked-topology))
-          track-id (-> tracked-topology :cluster ::track-id)
-          waiting? (fn []
-                     (or (not= target (global-amt track-id "spout-emitted"))
-                         (not= (global-amt track-id "transferred")
-                               (global-amt track-id "processed"))))]
-      (while-timeout timeout-ms (waiting?)
-                     ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
-                     ;; (println "Processed: " (global-amt track-id "processed"))
-                     ;; (println "Transferred: " (global-amt track-id "transferred"))
-                    (Thread/sleep (rand-int 200)))
-      (reset! (:last-spout-emit tracked-topology) target))))
-
-(defnk test-tuple
-  [values
-   :stream Utils/DEFAULT_STREAM_ID
-   :component "component"
-   :fields nil]
-  (let [fields (or fields
-                   (->> (iterate inc 1)
-                        (take (count values))
-                        (map #(str "field" %))))
-        spout-spec (mk-spout-spec* (TestWordSpout.)
-                                   {stream fields})
-        topology (StormTopology. {component spout-spec} {} {})
-        context (TopologyContext.
-                  topology
-                  (read-storm-config)
-                  {(int 1) component}
-                  {component [(int 1)]}
-                  {component {stream (Fields. fields)}}
-                  "test-storm-id"
-                  nil
-                  nil
-                  (int 1)
-                  nil
-                  [(int 1)]
-                  {}
-                  {}
-                  (HashMap.)
-                  (HashMap.)
-                  (atom false))]
-    (TupleImpl. context values 1 stream)))
-
-(defmacro with-timeout
-  [millis unit & body]
-  `(let [f# (future ~@body)]
-     (try
-       (.get f# ~millis ~unit)
-       (finally (future-cancel f#)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/testing4j.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing4j.clj b/storm-core/src/clj/backtype/storm/testing4j.clj
deleted file mode 100644
index bc5dc57..0000000
--- a/storm-core/src/clj/backtype/storm/testing4j.clj
+++ /dev/null
@@ -1,184 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.testing4j
-  (:import [java.util Map List Collection ArrayList])
-  (:require [backtype.storm [LocalCluster :as LocalCluster]])
-  (:import [backtype.storm Config ILocalCluster LocalCluster])
-  (:import [backtype.storm.generated StormTopology])
-  (:import [backtype.storm.daemon nimbus])
-  (:import [backtype.storm.testing TestJob MockedSources TrackedTopology
-            MkClusterParam CompleteTopologyParam MkTupleParam])
-  (:import [backtype.storm.utils Utils])
-  (:use [backtype.storm testing util log])
-  (:gen-class
-   :name backtype.storm.Testing
-   :methods [^:static [completeTopology
-                       [backtype.storm.ILocalCluster  backtype.storm.generated.StormTopology
-                        backtype.storm.testing.CompleteTopologyParam]
-                       java.util.Map]
-             ^:static [completeTopology
-                       [backtype.storm.ILocalCluster backtype.storm.generated.StormTopology]
-                       java.util.Map]
-             ^:static [withSimulatedTime [Runnable] void]
-             ^:static [withLocalCluster [backtype.storm.testing.TestJob] void]
-             ^:static [withLocalCluster [backtype.storm.testing.MkClusterParam backtype.storm.testing.TestJob] void]
-             ^:static [getLocalCluster [java.util.Map] backtype.storm.ILocalCluster]
-             ^:static [withSimulatedTimeLocalCluster [backtype.storm.testing.TestJob] void]
-             ^:static [withSimulatedTimeLocalCluster [backtype.storm.testing.MkClusterParam backtype.storm.testing.TestJob] void]
-             ^:static [withTrackedCluster [backtype.storm.testing.TestJob] void]
-             ^:static [withTrackedCluster [backtype.storm.testing.MkClusterParam backtype.storm.testing.TestJob] void]
-             ^:static [readTuples [java.util.Map String String] java.util.List]
-             ^:static [readTuples [java.util.Map String] java.util.List]
-             ^:static [mkTrackedTopology [backtype.storm.ILocalCluster backtype.storm.generated.StormTopology] backtype.storm.testing.TrackedTopology]
-             ^:static [trackedWait [backtype.storm.testing.TrackedTopology] void]
-             ^:static [trackedWait [backtype.storm.testing.TrackedTopology Integer] void]
-             ^:static [trackedWait [backtype.storm.testing.TrackedTopology Integer Integer] void]
-             ^:static [advanceClusterTime [backtype.storm.ILocalCluster Integer Integer] void]
-             ^:static [advanceClusterTime [backtype.storm.ILocalCluster Integer] void]
-             ^:static [multiseteq [java.util.Collection java.util.Collection] boolean]
-             ^:static [multiseteq [java.util.Map java.util.Map] boolean]
-             ^:static [testTuple [java.util.List] backtype.storm.tuple.Tuple]
-             ^:static [testTuple [java.util.List backtype.storm.testing.MkTupleParam] backtype.storm.tuple.Tuple]]))
-
-(defn -completeTopology
-  ([^ILocalCluster cluster ^StormTopology topology ^CompleteTopologyParam completeTopologyParam]
-    (let [mocked-sources (or (-> completeTopologyParam .getMockedSources .getData) {})
-          storm-conf (or (.getStormConf completeTopologyParam) {})
-          cleanup-state (or (.getCleanupState completeTopologyParam) true)
-          topology-name (.getTopologyName completeTopologyParam)
-          timeout-ms (or (.getTimeoutMs completeTopologyParam) TEST-TIMEOUT-MS)]
-      (complete-topology (.getState cluster) topology
-        :mock-sources mocked-sources
-        :storm-conf storm-conf
-        :cleanup-state cleanup-state
-        :topology-name topology-name
-        :timeout-ms timeout-ms)))
-  ([^ILocalCluster cluster ^StormTopology topology]
-    (-completeTopology cluster topology (CompleteTopologyParam.))))
-
-
-(defn -withSimulatedTime
-  [^Runnable code]
-  (with-simulated-time
-    (.run code)))
-
-(defmacro with-cluster
-  [cluster-type mkClusterParam code]
-  `(let [supervisors# (or (.getSupervisors ~mkClusterParam) 2)
-         ports-per-supervisor# (or (.getPortsPerSupervisor ~mkClusterParam) 3)
-         daemon-conf# (or (.getDaemonConf ~mkClusterParam) {})]
-     (~cluster-type [cluster# :supervisors supervisors#
-                     :ports-per-supervisor ports-per-supervisor#
-                     :daemon-conf daemon-conf#]
-                    (let [cluster# (LocalCluster. cluster#)]
-                      (.run ~code cluster#)))))
-
-(defn -withLocalCluster
-  ([^MkClusterParam mkClusterParam ^TestJob code]
-     (with-cluster with-local-cluster mkClusterParam code))
-  ([^TestJob code]
-     (-withLocalCluster (MkClusterParam.) code)))
-
-(defn -getLocalCluster
-  ([^Map clusterConf]
-     (let [daemon-conf (get-in clusterConf ["daemon-conf"] {})
-           supervisors (get-in clusterConf ["supervisors"] 2)
-           ports-per-supervisor (get-in clusterConf ["ports-per-supervisor"] 3)
-           inimbus (get-in clusterConf ["inimbus"] nil)
-           supervisor-slot-port-min (get-in clusterConf ["supervisor-slot-port-min"] 1024)
-           nimbus-daemon (get-in clusterConf ["nimbus-daemon"] false)
-           local-cluster-map (mk-local-storm-cluster :supervisors supervisors
-                                                     :ports-per-supervisor ports-per-supervisor
-                                                     :daemon-conf daemon-conf
-                                                     :inimbus inimbus
-                                                     :supervisor-slot-port-min supervisor-slot-port-min
-                                                     :nimbus-daemon nimbus-daemon
-                                                     )]
-       (LocalCluster. local-cluster-map))))
-
-(defn -withSimulatedTimeLocalCluster
-  ([^MkClusterParam mkClusterParam ^TestJob code]
-     (with-cluster with-simulated-time-local-cluster mkClusterParam code))
-  ([^TestJob code]
-     (-withSimulatedTimeLocalCluster (MkClusterParam.) code)))
-
-(defn -withTrackedCluster
-  ([^MkClusterParam mkClusterParam ^TestJob code]
-     (with-cluster with-tracked-cluster mkClusterParam code))
-  ([^TestJob code]
-     (-withTrackedCluster (MkClusterParam.) code)))
-
-(defn- find-tuples
-  [^List fixed-tuples ^String stream]
-  (let [ret (ArrayList.)]
-    (doseq [fixed-tuple fixed-tuples]
-      (if (= (.stream fixed-tuple) stream)
-        (.add ret (.values fixed-tuple))))
-    ret))
-
-(defn -readTuples
-  ([^Map result ^String componentId ^String streamId]
-   (let [stream-result (.get result componentId)
-         ret (if stream-result
-               (find-tuples stream-result streamId)
-               [])]
-     ret))
-  ([^Map result ^String componentId]
-   (-readTuples result componentId Utils/DEFAULT_STREAM_ID)))
-
-(defn -mkTrackedTopology
-  [^ILocalCluster trackedCluster ^StormTopology topology]
-  (-> (mk-tracked-topology (.getState trackedCluster) topology)
-      (TrackedTopology.)))
-
-(defn -trackedWait
-  ([^TrackedTopology trackedTopology ^Integer amt ^Integer timeout-ms]
-   (tracked-wait trackedTopology amt timeout-ms))
-  ([^TrackedTopology trackedTopology ^Integer amt]
-   (tracked-wait trackedTopology amt))
-  ([^TrackedTopology trackedTopology]
-   (-trackedWait trackedTopology 1)))
-
-(defn -advanceClusterTime
-  ([^ILocalCluster cluster ^Integer secs ^Integer step]
-   (advance-cluster-time (.getState cluster) secs step))
-  ([^ILocalCluster cluster ^Integer secs]
-   (-advanceClusterTime cluster secs 1)))
-
-(defn- multiseteq
-  [^Object obj1 ^Object obj2]
-  (let [obj1 (clojurify-structure obj1)
-        obj2 (clojurify-structure obj2)]
-    (ms= obj1 obj2)))
-
-(defn -multiseteq
-  [^Collection coll1 ^Collection coll2]
-  (multiseteq coll1 coll2))
-
-(defn -multiseteq
-  [^Map coll1 ^Map coll2]
-  (multiseteq coll1 coll2))
-
-(defn -testTuple
-  ([^List values]
-   (-testTuple values nil))
-  ([^List values ^MkTupleParam param]
-   (if (nil? param)
-     (test-tuple values)
-     (let [stream (or (.getStream param) Utils/DEFAULT_STREAM_ID)
-           component (or (.getComponent param) "component")
-           fields (.getFields param)]
-       (test-tuple values :stream stream :component component :fields fields)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/thrift.clj b/storm-core/src/clj/backtype/storm/thrift.clj
deleted file mode 100644
index 8f4c659..0000000
--- a/storm-core/src/clj/backtype/storm/thrift.clj
+++ /dev/null
@@ -1,284 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.thrift
-  (:import [java.util HashMap]
-           [java.io Serializable]
-           [backtype.storm.generated NodeInfo Assignment])
-  (:import [backtype.storm.generated JavaObject Grouping Nimbus StormTopology
-            StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface
-            ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
-            GlobalStreamId ComponentObject ComponentObject$_Fields
-            ShellComponent SupervisorInfo])
-  (:import [backtype.storm.utils Utils NimbusClient])
-  (:import [backtype.storm Constants])
-  (:import [backtype.storm.security.auth ReqContext])
-  (:import [backtype.storm.grouping CustomStreamGrouping])
-  (:import [backtype.storm.topology TopologyBuilder])
-  (:import [backtype.storm.clojure RichShellBolt RichShellSpout])
-  (:import [org.apache.thrift.transport TTransport])
-  (:use [backtype.storm util config log zookeeper]))
-
-(defn instantiate-java-object
-  [^JavaObject obj]
-  (let [name (symbol (.get_full_class_name obj))
-        args (map (memfn getFieldValue) (.get_args_list obj))]
-    (eval `(new ~name ~@args))))
-
-(def grouping-constants
-  {Grouping$_Fields/FIELDS :fields
-   Grouping$_Fields/SHUFFLE :shuffle
-   Grouping$_Fields/ALL :all
-   Grouping$_Fields/NONE :none
-   Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized
-   Grouping$_Fields/CUSTOM_OBJECT :custom-object
-   Grouping$_Fields/DIRECT :direct
-   Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle})
-
-(defn grouping-type
-  [^Grouping grouping]
-  (grouping-constants (.getSetField grouping)))
-
-(defn field-grouping
-  [^Grouping grouping]
-  (when-not (= (grouping-type grouping) :fields)
-    (throw (IllegalArgumentException. "Tried to get grouping fields from non fields grouping")))
-  (.get_fields grouping))
-
-(defn global-grouping?
-  [^Grouping grouping]
-  (and (= :fields (grouping-type grouping))
-       (empty? (field-grouping grouping))))
-
-(defn parallelism-hint
-  [^ComponentCommon component-common]
-  (let [phint (.get_parallelism_hint component-common)]
-    (if-not (.is_set_parallelism_hint component-common) 1 phint)))
-
-(defn nimbus-client-and-conn
-  ([host port]
-    (nimbus-client-and-conn host port nil))
-  ([host port as-user]
-  (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user)
-  (let [conf (read-storm-config)
-        nimbusClient (NimbusClient. conf host port nil as-user)
-        client (.getClient nimbusClient)
-        transport (.transport nimbusClient)]
-        [client transport] )))
-
-(defmacro with-nimbus-connection
-  [[client-sym host port] & body]
-  `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)]
-    (try
-      ~@body
-    (finally (.close conn#)))))
-
-(defmacro with-configured-nimbus-connection
-  [client-sym & body]
-  `(let [conf# (read-storm-config)
-         context# (ReqContext/context)
-         user# (if (.principal context#) (.getName (.principal context#)))
-         nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#)
-         ~client-sym (.getClient nimbusClient#)
-         conn# (.transport nimbusClient#)
-         ]
-     (try
-       ~@body
-     (finally (.close conn#)))))
-
-(defn direct-output-fields
-  [fields]
-  (StreamInfo. fields true))
-
-(defn output-fields
-  [fields]
-  (StreamInfo. fields false))
-
-(defn mk-output-spec
-  [output-spec]
-  (let [output-spec (if (map? output-spec)
-                      output-spec
-                      {Utils/DEFAULT_STREAM_ID output-spec})]
-    (map-val
-      (fn [out]
-        (if (instance? StreamInfo out)
-          out
-          (StreamInfo. out false)))
-      output-spec)))
-
-(defnk mk-plain-component-common
-  [inputs output-spec parallelism-hint :conf nil]
-  (let [ret (ComponentCommon. (HashMap. inputs) (HashMap. (mk-output-spec output-spec)))]
-    (when parallelism-hint
-      (.set_parallelism_hint ret parallelism-hint))
-    (when conf
-      (.set_json_conf ret (to-json conf)))
-    ret))
-
-(defnk mk-spout-spec*
-  [spout outputs :p nil :conf nil]
-  (SpoutSpec. (ComponentObject/serialized_java (Utils/javaSerialize spout))
-              (mk-plain-component-common {} outputs p :conf conf)))
-
-(defn mk-shuffle-grouping
-  []
-  (Grouping/shuffle (NullStruct.)))
-
-(defn mk-local-or-shuffle-grouping
-  []
-  (Grouping/local_or_shuffle (NullStruct.)))
-
-(defn mk-fields-grouping
-  [fields]
-  (Grouping/fields fields))
-
-(defn mk-global-grouping
-  []
-  (mk-fields-grouping []))
-
-(defn mk-direct-grouping
-  []
-  (Grouping/direct (NullStruct.)))
-
-(defn mk-all-grouping
-  []
-  (Grouping/all (NullStruct.)))
-
-(defn mk-none-grouping
-  []
-  (Grouping/none (NullStruct.)))
-
-(defn deserialized-component-object
-  [^ComponentObject obj]
-  (when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA)
-    (throw (RuntimeException. "Cannot deserialize non-java-serialized object")))
-  (Utils/javaDeserialize (.get_serialized_java obj) Serializable))
-
-(defn serialize-component-object
-  [obj]
-  (ComponentObject/serialized_java (Utils/javaSerialize obj)))
-
-(defn- mk-grouping
-  [grouping-spec]
-  (cond (nil? grouping-spec)
-        (mk-none-grouping)
-
-        (instance? Grouping grouping-spec)
-        grouping-spec
-
-        (instance? CustomStreamGrouping grouping-spec)
-        (Grouping/custom_serialized (Utils/javaSerialize grouping-spec))
-
-        (instance? JavaObject grouping-spec)
-        (Grouping/custom_object grouping-spec)
-
-        (sequential? grouping-spec)
-        (mk-fields-grouping grouping-spec)
-
-        (= grouping-spec :shuffle)
-        (mk-shuffle-grouping)
-
-        (= grouping-spec :local-or-shuffle)
-        (mk-local-or-shuffle-grouping)
-        (= grouping-spec :none)
-        (mk-none-grouping)
-
-        (= grouping-spec :all)
-        (mk-all-grouping)
-
-        (= grouping-spec :global)
-        (mk-global-grouping)
-
-        (= grouping-spec :direct)
-        (mk-direct-grouping)
-
-        true
-        (throw (IllegalArgumentException.
-                 (str grouping-spec " is not a valid grouping")))))
-
-(defn- mk-inputs
-  [inputs]
-  (into {} (for [[stream-id grouping-spec] inputs]
-             [(if (sequential? stream-id)
-                (GlobalStreamId. (first stream-id) (second stream-id))
-                (GlobalStreamId. stream-id Utils/DEFAULT_STREAM_ID))
-              (mk-grouping grouping-spec)])))
-
-(defnk mk-bolt-spec*
-  [inputs bolt outputs :p nil :conf nil]
-  (let [common (mk-plain-component-common (mk-inputs inputs) outputs p :conf conf)]
-    (Bolt. (ComponentObject/serialized_java (Utils/javaSerialize bolt))
-           common)))
-
-(defnk mk-spout-spec
-  [spout :parallelism-hint nil :p nil :conf nil]
-  (let [parallelism-hint (if p p parallelism-hint)]
-    {:obj spout :p parallelism-hint :conf conf}))
-
-(defn- shell-component-params
-  [command script-or-output-spec kwargs]
-  (if (string? script-or-output-spec)
-    [(into-array String [command script-or-output-spec])
-     (first kwargs)
-     (rest kwargs)]
-    [(into-array String command)
-     script-or-output-spec
-     kwargs]))
-
-(defnk mk-bolt-spec
-  [inputs bolt :parallelism-hint nil :p nil :conf nil]
-  (let [parallelism-hint (if p p parallelism-hint)]
-    {:obj bolt :inputs inputs :p parallelism-hint :conf conf}))
-
-(defn mk-shell-bolt-spec
-  [inputs command script-or-output-spec & kwargs]
-  (let [[command output-spec kwargs]
-        (shell-component-params command script-or-output-spec kwargs)]
-    (apply mk-bolt-spec inputs
-           (RichShellBolt. command (mk-output-spec output-spec)) kwargs)))
-
-(defn mk-shell-spout-spec
-  [command script-or-output-spec & kwargs]
-  (let [[command output-spec kwargs]
-        (shell-component-params command script-or-output-spec kwargs)]
-    (apply mk-spout-spec
-           (RichShellSpout. command (mk-output-spec output-spec)) kwargs)))
-
-(defn- add-inputs
-  [declarer inputs]
-  (doseq [[id grouping] (mk-inputs inputs)]
-    (.grouping declarer id grouping)))
-
-(defn mk-topology
-  ([spout-map bolt-map]
-   (let [builder (TopologyBuilder.)]
-     (doseq [[name {spout :obj p :p conf :conf}] spout-map]
-       (-> builder (.setSpout name spout (if-not (nil? p) (int p) p)) (.addConfigurations conf)))
-     (doseq [[name {bolt :obj p :p conf :conf inputs :inputs}] bolt-map]
-       (-> builder (.setBolt name bolt (if-not (nil? p) (int p) p)) (.addConfigurations conf) (add-inputs inputs)))
-     (.createTopology builder)))
-  ([spout-map bolt-map state-spout-map]
-   (mk-topology spout-map bolt-map)))
-
-;; clojurify-structure is needed or else every element becomes the same after successive calls
-;; don't know why this happens
-(def STORM-TOPOLOGY-FIELDS
-  (-> StormTopology/metaDataMap clojurify-structure keys))
-
-(def SPOUT-FIELDS
-  [StormTopology$_Fields/SPOUTS
-   StormTopology$_Fields/STATE_SPOUTS])
-

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/timer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/timer.clj b/storm-core/src/clj/backtype/storm/timer.clj
deleted file mode 100644
index b5f73f7..0000000
--- a/storm-core/src/clj/backtype/storm/timer.clj
+++ /dev/null
@@ -1,128 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.timer
-  (:import [backtype.storm.utils Time])
-  (:import [java.util PriorityQueue Comparator Random])
-  (:import [java.util.concurrent Semaphore])
-  (:use [backtype.storm util log]))
-
-;; The timer defined in this file is very similar to java.util.Timer, except
-;; it integrates with Storm's time simulation capabilities. This lets us test
-;; code that does asynchronous work on the timer thread
-
-(defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil]
-  (let [queue (PriorityQueue. 10 (reify Comparator
-                                   (compare
-                                     [this o1 o2]
-                                     (- (first o1) (first o2)))
-                                   (equals
-                                     [this obj]
-                                     true)))
-        active (atom true)
-        lock (Object.)
-        notifier (Semaphore. 0)
-        thread-name (if timer-name timer-name "timer")
-        timer-thread (Thread.
-                       (fn []
-                         (while @active
-                           (try
-                             (let [[time-millis _ _ :as elem] (locking lock (.peek queue))]
-                               (if (and elem (>= (current-time-millis) time-millis))
-                                 ;; It is imperative to not run the function
-                                 ;; inside the timer lock. Otherwise, it is
-                                 ;; possible to deadlock if the fn deals with
-                                 ;; other locks, like the submit lock.
-                                 (let [afn (locking lock (second (.poll queue)))]
-                                   (afn))
-                                 (if time-millis
-                                   ;; If any events are scheduled, sleep until
-                                   ;; event generation. If any recurring events
-                                   ;; are scheduled then we will always go
-                                   ;; through this branch, sleeping only the
-                                   ;; exact necessary amount of time. We give
-                                   ;; an upper bound, e.g. 1000 millis, to the
-                                   ;; sleeping time, to limit the response time
-                                   ;; for detecting any new event within 1 secs.
-                                   (Time/sleep (min 1000 (- time-millis (current-time-millis))))
-                                   ;; Otherwise poll to see if any new event
-                                   ;; was scheduled. This is, in essence, the
-                                   ;; response time for detecting any new event
-                                   ;; schedulings when there are no scheduled
-                                   ;; events.
-                                   (Time/sleep 1000))))
-                             (catch Throwable t
-                               ;; Because the interrupted exception can be
-                               ;; wrapped in a RuntimeException.
-                               (when-not (exception-cause? InterruptedException t)
-                                 (kill-fn t)
-                                 (reset! active false)
-                                 (throw t)))))
-                         (.release notifier)) thread-name)]
-    (.setDaemon timer-thread true)
-    (.setPriority timer-thread Thread/MAX_PRIORITY)
-    (.start timer-thread)
-    {:timer-thread timer-thread
-     :queue queue
-     :active active
-     :lock lock
-     :random (Random.)
-     :cancel-notifier notifier}))
-
-(defn- check-active!
-  [timer]
-  (when-not @(:active timer)
-    (throw (IllegalStateException. "Timer is not active"))))
-
-(defnk schedule
-  [timer delay-secs afn :check-active true :jitter-ms 0]
-  (when check-active (check-active! timer))
-  (let [id (uuid)
-        ^PriorityQueue queue (:queue timer)
-        end-time-ms (+ (current-time-millis) (secs-to-millis-long delay-secs))
-        end-time-ms (if (< 0 jitter-ms) (+ (.nextInt (:random timer) jitter-ms) end-time-ms) end-time-ms)]
-    (locking (:lock timer)
-      (.add queue [end-time-ms afn id]))))
-
-(defn schedule-recurring
-  [timer delay-secs recur-secs afn]
-  (schedule timer
-            delay-secs
-            (fn this []
-              (afn)
-              ; This avoids a race condition with cancel-timer.
-              (schedule timer recur-secs this :check-active false))))
-
-(defn schedule-recurring-with-jitter
-  [timer delay-secs recur-secs jitter-ms afn]
-  (schedule timer
-            delay-secs
-            (fn this []
-              (afn)
-              ; This avoids a race condition with cancel-timer.
-              (schedule timer recur-secs this :check-active false :jitter-ms jitter-ms))))
-
-(defn cancel-timer
-  [timer]
-  (check-active! timer)
-  (locking (:lock timer)
-    (reset! (:active timer) false)
-    (.interrupt (:timer-thread timer)))
-  (.acquire (:cancel-notifier timer)))
-
-(defn timer-waiting?
-  [timer]
-  (Time/isThreadWaiting (:timer-thread timer)))