You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:21 UTC
[25/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs
to org.apache.storm, but try to provide some form of backwards compatability
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
deleted file mode 100644
index 0cb2f52..0000000
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ /dev/null
@@ -1,701 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.testing
- (:require [backtype.storm.daemon
- [nimbus :as nimbus]
- [supervisor :as supervisor]
- [common :as common]
- [worker :as worker]
- [executor :as executor]])
- (:require [backtype.storm [process-simulator :as psim]])
- (:import [org.apache.commons.io FileUtils])
- (:import [java.io File])
- (:import [java.util HashMap ArrayList])
- (:import [java.util.concurrent.atomic AtomicInteger])
- (:import [java.util.concurrent ConcurrentHashMap])
- (:import [backtype.storm.utils Time Utils RegisteredGlobalState])
- (:import [backtype.storm.tuple Fields Tuple TupleImpl])
- (:import [backtype.storm.task TopologyContext])
- (:import [backtype.storm.generated GlobalStreamId Bolt KillOptions])
- (:import [backtype.storm.testing FeederSpout FixedTupleSpout FixedTuple
- TupleCaptureBolt SpoutTracker BoltTracker NonRichBoltTracker
- TestWordSpout MemoryTransactionalSpout])
- (:import [backtype.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils])
- (:import [backtype.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo
- ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus
- KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo
- ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice])
- (:import [backtype.storm.transactional TransactionalSpoutCoordinator])
- (:import [backtype.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor])
- (:import [backtype.storm.tuple Tuple])
- (:import [backtype.storm.generated StormTopology])
- (:import [backtype.storm.task TopologyContext])
- (:require [backtype.storm [zookeeper :as zk]])
- (:require [backtype.storm.messaging.loader :as msg-loader])
- (:require [backtype.storm.daemon.acker :as acker])
- (:use [backtype.storm cluster util thrift config log local-state]))
-
-(defn feeder-spout
- [fields]
- (FeederSpout. (Fields. fields)))
-
-(defn local-temp-path
- []
- (str (System/getProperty "java.io.tmpdir") (if-not on-windows? "/") (uuid)))
-
-(defn delete-all
- [paths]
- (dorun
- (for [t paths]
- (if (.exists (File. t))
- (try
- (FileUtils/forceDelete (File. t))
- (catch Exception e
- (log-message (.getMessage e))))))))
-
-(defmacro with-local-tmp
- [[& tmp-syms] & body]
- (let [tmp-paths (mapcat (fn [t] [t `(local-temp-path)]) tmp-syms)]
- `(let [~@tmp-paths]
- (try
- ~@body
- (finally
- (delete-all ~(vec tmp-syms)))))))
-
-(defn start-simulating-time!
- []
- (Time/startSimulating))
-
-(defn stop-simulating-time!
- []
- (Time/stopSimulating))
-
- (defmacro with-simulated-time
- [& body]
- `(try
- (start-simulating-time!)
- ~@body
- (finally
- (stop-simulating-time!))))
-
-(defn advance-time-ms! [ms]
- (Time/advanceTime ms))
-
-(defn advance-time-secs! [secs]
- (advance-time-ms! (* (long secs) 1000)))
-
-(defnk add-supervisor
- [cluster-map :ports 2 :conf {} :id nil]
- (let [tmp-dir (local-temp-path)
- port-ids (if (sequential? ports)
- ports
- (doall (repeatedly ports (:port-counter cluster-map))))
- supervisor-conf (merge (:daemon-conf cluster-map)
- conf
- {STORM-LOCAL-DIR tmp-dir
- SUPERVISOR-SLOTS-PORTS port-ids})
- id-fn (if id (fn [] id) supervisor/generate-supervisor-id)
- daemon (with-var-roots [supervisor/generate-supervisor-id id-fn] (supervisor/mk-supervisor supervisor-conf (:shared-context cluster-map) (supervisor/standalone-supervisor)))]
- (swap! (:supervisors cluster-map) conj daemon)
- (swap! (:tmp-dirs cluster-map) conj tmp-dir)
- daemon))
-
-(defn mk-shared-context [conf]
- (if-not (conf STORM-LOCAL-MODE-ZMQ)
- (msg-loader/mk-local-context)))
-
-(defn start-nimbus-daemon [conf nimbus]
- (let [server (ThriftServer. conf (Nimbus$Processor. nimbus)
- ThriftConnectionType/NIMBUS)
- nimbus-thread (Thread. (fn [] (.serve server)))]
- (log-message "Starting Nimbus server...")
- (.start nimbus-thread)
- server))
-
-
-;; returns map containing cluster info
-;; local dir is always overridden in maps
-;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
-;; if need to customize amt of ports more, can use add-supervisor calls afterwards
-(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
- (let [zk-tmp (local-temp-path)
- [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
- (zk/mk-inprocess-zookeeper zk-tmp))
- daemon-conf (merge (read-storm-config)
- {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
- ZMQ-LINGER-MILLIS 0
- TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
- TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
- STORM-CLUSTER-MODE "local"
- BLOBSTORE-SUPERUSER (System/getProperty "user.name")}
- (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
- {STORM-ZOOKEEPER-PORT zk-port
- STORM-ZOOKEEPER-SERVERS ["localhost"]})
- daemon-conf)
- nimbus-tmp (local-temp-path)
- port-counter (mk-counter supervisor-slot-port-min)
- nimbus (nimbus/service-handler
- (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
- (if inimbus inimbus (nimbus/standalone-nimbus)))
- context (mk-shared-context daemon-conf)
- nimbus-thrift-server (if nimbus-daemon (start-nimbus-daemon daemon-conf nimbus) nil)
- cluster-map {:nimbus nimbus
- :port-counter port-counter
- :daemon-conf daemon-conf
- :supervisors (atom [])
- :state (mk-distributed-cluster-state daemon-conf)
- :storm-cluster-state (mk-storm-cluster-state daemon-conf)
- :tmp-dirs (atom [nimbus-tmp zk-tmp])
- :zookeeper (if (not-nil? zk-handle) zk-handle)
- :shared-context context
- :nimbus-thrift-server nimbus-thrift-server}
- supervisor-confs (if (sequential? supervisors)
- supervisors
- (repeat supervisors {}))]
-
- (doseq [sc supervisor-confs]
- (add-supervisor cluster-map :ports ports-per-supervisor :conf sc))
- cluster-map))
-
-(defn get-supervisor [cluster-map supervisor-id]
- (let [finder-fn #(= (.get-id %) supervisor-id)]
- (find-first finder-fn @(:supervisors cluster-map))))
-
-(defn kill-supervisor [cluster-map supervisor-id]
- (let [finder-fn #(= (.get-id %) supervisor-id)
- supervisors @(:supervisors cluster-map)
- sup (find-first finder-fn
- supervisors)]
- ;; tmp-dir will be taken care of by shutdown
- (reset! (:supervisors cluster-map) (remove-first finder-fn supervisors))
- (.shutdown sup)))
-
-(defn kill-local-storm-cluster [cluster-map]
- (.shutdown (:nimbus cluster-map))
- (if (not-nil? (:nimbus-thrift-server cluster-map))
- (do
- (log-message "shutting down thrift server")
- (try
- (.stop (:nimbus-thrift-server cluster-map))
- (catch Exception e (log-message "failed to stop thrift")))
- ))
- (.close (:state cluster-map))
- (.disconnect (:storm-cluster-state cluster-map))
- (doseq [s @(:supervisors cluster-map)]
- (.shutdown-all-workers s)
- ;; race condition here? will it launch the workers again?
- (supervisor/kill-supervisor s))
- (psim/kill-all-processes)
- (if (not-nil? (:zookeeper cluster-map))
- (do
- (log-message "Shutting down in process zookeeper")
- (zk/shutdown-inprocess-zookeeper (:zookeeper cluster-map))
- (log-message "Done shutting down in process zookeeper")))
- (doseq [t @(:tmp-dirs cluster-map)]
- (log-message "Deleting temporary path " t)
- (try
- (rmr t)
- ;; on windows, the host process still holds lock on the logfile
- (catch Exception e (log-message (.getMessage e)))) ))
-
-(def TEST-TIMEOUT-MS
- (let [timeout (System/getenv "STORM_TEST_TIMEOUT_MS")]
- (parse-int (if timeout timeout "5000"))))
-
-(defmacro while-timeout [timeout-ms condition & body]
- `(let [end-time# (+ (System/currentTimeMillis) ~timeout-ms)]
- (log-debug "Looping until " '~condition)
- (while ~condition
- (when (> (System/currentTimeMillis) end-time#)
- (let [thread-dump# (Utils/threadDump)]
- (log-message "Condition " '~condition " not met in " ~timeout-ms "ms")
- (log-message thread-dump#)
- (throw (AssertionError. (str "Test timed out (" ~timeout-ms "ms) " '~condition)))))
- ~@body)
- (log-debug "Condition met " '~condition)))
-
-(defn wait-for-condition
- ([apredicate]
- (wait-for-condition TEST-TIMEOUT-MS apredicate))
- ([timeout-ms apredicate]
- (while-timeout timeout-ms (not (apredicate))
- (Time/sleep 100))))
-
-(defn wait-until-cluster-waiting
- "Wait until the cluster is idle. Should be used with time simulation."
- ([cluster-map] (wait-until-cluster-waiting cluster-map TEST-TIMEOUT-MS))
- ([cluster-map timeout-ms]
- ;; wait until all workers, supervisors, and nimbus is waiting
- (let [supervisors @(:supervisors cluster-map)
- workers (filter (partial satisfies? common/DaemonCommon) (psim/all-processes))
- daemons (concat
- [(:nimbus cluster-map)]
- supervisors
- ; because a worker may already be dead
- workers)]
- (while-timeout timeout-ms (not (every? (memfn waiting?) daemons))
- (Thread/sleep (rand-int 20))
- ;; (doseq [d daemons]
- ;; (if-not ((memfn waiting?) d)
- ;; (println d)))
- ))))
-
-(defn advance-cluster-time
- ([cluster-map secs increment-secs]
- (loop [left secs]
- (when (> left 0)
- (let [diff (min left increment-secs)]
- (advance-time-secs! diff)
- (wait-until-cluster-waiting cluster-map)
- (recur (- left diff))))))
- ([cluster-map secs]
- (advance-cluster-time cluster-map secs 1)))
-
-(defmacro with-local-cluster
- [[cluster-sym & args] & body]
- `(let [~cluster-sym (mk-local-storm-cluster ~@args)]
- (try
- ~@body
- (catch Throwable t#
- (log-error t# "Error in cluster")
- (throw t#))
- (finally
- (let [keep-waiting?# (atom true)
- f# (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))]
- (kill-local-storm-cluster ~cluster-sym)
- (reset! keep-waiting?# false)
- @f#)))))
-
-(defmacro with-simulated-time-local-cluster
- [& args]
- `(with-simulated-time
- (with-local-cluster ~@args)))
-
-(defmacro with-inprocess-zookeeper
- [port-sym & body]
- `(with-local-tmp [tmp#]
- (let [[~port-sym zks#] (zk/mk-inprocess-zookeeper tmp#)]
- (try
- ~@body
- (finally
- (zk/shutdown-inprocess-zookeeper zks#))))))
-
-(defn submit-local-topology
- [nimbus storm-name conf topology]
- (when-not (Utils/isValidConf conf)
- (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
- (.submitTopology nimbus storm-name nil (to-json conf) topology))
-
-(defn submit-local-topology-with-opts
- [nimbus storm-name conf topology submit-opts]
- (when-not (Utils/isValidConf conf)
- (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
- (.submitTopologyWithOpts nimbus storm-name nil (to-json conf) topology submit-opts))
-
-(defn mocked-convert-assignments-to-worker->resources [storm-cluster-state storm-name worker->resources]
- (fn [existing-assignments]
- (let [topology-id (common/get-storm-id storm-cluster-state storm-name)
- existing-assignments (into {} (for [[tid assignment] existing-assignments]
- {tid (:worker->resources assignment)}))
- new-assignments (assoc existing-assignments topology-id worker->resources)]
- new-assignments)))
-
-(defn mocked-compute-new-topology->executor->node+port [storm-cluster-state storm-name executor->node+port]
- (fn [new-scheduler-assignments existing-assignments]
- (let [topology-id (common/get-storm-id storm-cluster-state storm-name)
- existing-assignments (into {} (for [[tid assignment] existing-assignments]
- {tid (:executor->node+port assignment)}))
- new-assignments (assoc existing-assignments topology-id executor->node+port)]
- new-assignments)))
-
-(defn mocked-compute-new-scheduler-assignments []
- (fn [nimbus existing-assignments topologies scratch-topology-id]
- existing-assignments))
-
-(defn submit-mocked-assignment
- [nimbus storm-cluster-state storm-name conf topology task->component executor->node+port worker->resources]
- (with-var-roots [common/storm-task-info (fn [& ignored] task->component)
- nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments)
- nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources
- storm-cluster-state
- storm-name
- worker->resources)
- nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port
- storm-cluster-state
- storm-name
- executor->node+port)]
- (submit-local-topology nimbus storm-name conf topology)))
-
-(defn mk-capture-launch-fn [capture-atom]
- (fn [supervisor storm-id port worker-id mem-onheap]
- (let [supervisor-id (:supervisor-id supervisor)
- conf (:conf supervisor)
- existing (get @capture-atom [supervisor-id port] [])]
- (set-worker-user! conf worker-id "")
- (swap! capture-atom assoc [supervisor-id port] (conj existing storm-id)))))
-
-(defn find-worker-id
- [supervisor-conf port]
- (let [supervisor-state (supervisor-state supervisor-conf)
- worker->port (ls-approved-workers supervisor-state)]
- (first ((reverse-map worker->port) port))))
-
-(defn find-worker-port
- [supervisor-conf worker-id]
- (let [supervisor-state (supervisor-state supervisor-conf)
- worker->port (ls-approved-workers supervisor-state)]
- (worker->port worker-id)))
-
-(defn mk-capture-shutdown-fn
- [capture-atom]
- (let [existing-fn supervisor/shutdown-worker]
- (fn [supervisor worker-id]
- (let [conf (:conf supervisor)
- supervisor-id (:supervisor-id supervisor)
- port (find-worker-port conf worker-id)
- existing (get @capture-atom [supervisor-id port] 0)]
- (swap! capture-atom assoc [supervisor-id port] (inc existing))
- (existing-fn supervisor worker-id)))))
-
-(defmacro capture-changed-workers
- [& body]
- `(let [launch-captured# (atom {})
- shutdown-captured# (atom {})]
- (with-var-roots [supervisor/launch-worker (mk-capture-launch-fn launch-captured#)
- supervisor/shutdown-worker (mk-capture-shutdown-fn shutdown-captured#)]
- ~@body
- {:launched @launch-captured#
- :shutdown @shutdown-captured#})))
-
-(defmacro capture-launched-workers
- [& body]
- `(:launched (capture-changed-workers ~@body)))
-
-(defmacro capture-shutdown-workers
- [& body]
- `(:shutdown (capture-changed-workers ~@body)))
-
-(defnk aggregated-stat
- [cluster-map storm-name stat-key :component-ids nil]
- (let [state (:storm-cluster-state cluster-map)
- nimbus (:nimbus cluster-map)
- storm-id (common/get-storm-id state storm-name)
- component->tasks (reverse-map
- (common/storm-task-info
- (.getUserTopology nimbus storm-id)
- (from-json (.getTopologyConf nimbus storm-id))))
- component->tasks (if component-ids
- (select-keys component->tasks component-ids)
- component->tasks)
- task-ids (apply concat (vals component->tasks))
- assignment (.assignment-info state storm-id nil)
- taskbeats (.taskbeats state storm-id (:task->node+port assignment))
- heartbeats (dofor [id task-ids] (get taskbeats id))
- stats (dofor [hb heartbeats] (if hb (stat-key (:stats hb)) 0))]
- (reduce + stats)))
-
-(defn emitted-spout-tuples
- [cluster-map topology storm-name]
- (aggregated-stat
- cluster-map
- storm-name
- :emitted
- :component-ids (keys (.get_spouts topology))))
-
-(defn transferred-tuples
- [cluster-map storm-name]
- (aggregated-stat cluster-map storm-name :transferred))
-
-(defn acked-tuples
- [cluster-map storm-name]
- (aggregated-stat cluster-map storm-name :acked))
-
-(defn simulate-wait
- [cluster-map]
- (if (Time/isSimulating)
- (advance-cluster-time cluster-map 10)
- (Thread/sleep 100)))
-
-(defprotocol CompletableSpout
- (exhausted?
- [this]
- "Whether all the tuples for this spout have been completed.")
- (cleanup
- [this]
- "Cleanup any global state kept")
- (startup
- [this]
- "Prepare the spout (globally) before starting the topology"))
-
-(extend-type FixedTupleSpout
- CompletableSpout
- (exhausted? [this]
- (= (-> this .getSourceTuples count)
- (.getCompleted this)))
- (cleanup [this]
- (.cleanup this))
- (startup [this]))
-
-(extend-type TransactionalSpoutCoordinator
- CompletableSpout
- (exhausted? [this]
- (exhausted? (.getSpout this)))
- (cleanup [this]
- (cleanup (.getSpout this)))
- (startup [this]
- (startup (.getSpout this))))
-
-(extend-type PartitionedTransactionalSpoutExecutor
- CompletableSpout
- (exhausted? [this]
- (exhausted? (.getPartitionedSpout this)))
- (cleanup [this]
- (cleanup (.getPartitionedSpout this)))
- (startup [this]
- (startup (.getPartitionedSpout this))))
-
-(extend-type MemoryTransactionalSpout
- CompletableSpout
- (exhausted? [this]
- (.isExhaustedTuples this))
- (cleanup [this]
- (.cleanup this))
- (startup [this]
- (.startup this)))
-
-(defn spout-objects [spec-map]
- (for [[_ spout-spec] spec-map]
- (-> spout-spec
- .get_spout_object
- deserialized-component-object)))
-
-(defn capture-topology
- [topology]
- (let [topology (.deepCopy topology)
- spouts (.get_spouts topology)
- bolts (.get_bolts topology)
- all-streams (apply concat
- (for [[id spec] (merge (clojurify-structure spouts)
- (clojurify-structure bolts))]
- (for [[stream info] (.. spec get_common get_streams)]
- [(GlobalStreamId. id stream) (.is_direct info)])))
- capturer (TupleCaptureBolt.)]
- (.set_bolts topology
- (assoc (clojurify-structure bolts)
- (uuid)
- (Bolt.
- (serialize-component-object capturer)
- (mk-plain-component-common (into {} (for [[id direct?] all-streams]
- [id (if direct?
- (mk-direct-grouping)
- (mk-global-grouping))]))
- {}
- nil))))
- {:topology topology
- :capturer capturer}))
-
-;; TODO: mock-sources needs to be able to mock out state spouts as well
-(defnk complete-topology
- [cluster-map topology
- :mock-sources {}
- :storm-conf {}
- :cleanup-state true
- :topology-name nil
- :timeout-ms TEST-TIMEOUT-MS]
- ;; TODO: the idea of mocking for transactional topologies should be done an
- ;; abstraction level above... should have a complete-transactional-topology for this
- (let [{topology :topology capturer :capturer} (capture-topology topology)
- storm-name (or topology-name (str "topologytest-" (uuid)))
- state (:storm-cluster-state cluster-map)
- spouts (.get_spouts topology)
- replacements (map-val (fn [v]
- (FixedTupleSpout.
- (for [tup v]
- (if (map? tup)
- (FixedTuple. (:stream tup) (:values tup))
- tup))))
- mock-sources)]
- (doseq [[id spout] replacements]
- (let [spout-spec (get spouts id)]
- (.set_spout_object spout-spec (serialize-component-object spout))))
- (doseq [spout (spout-objects spouts)]
- (when-not (extends? CompletableSpout (.getClass spout))
- (throw (RuntimeException. (str "Cannot complete topology unless every spout is a CompletableSpout (or mocked to be); failed by " spout)))))
-
- (doseq [spout (spout-objects spouts)]
- (startup spout))
-
- (submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
- (advance-cluster-time cluster-map 11)
-
- (let [storm-id (common/get-storm-id state storm-name)]
- ;;Give the topology time to come up without using it to wait for the spouts to complete
- (simulate-wait cluster-map)
-
- (while-timeout timeout-ms (not (every? exhausted? (spout-objects spouts)))
- (simulate-wait cluster-map))
-
- (.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
- (while-timeout timeout-ms (.assignment-info state storm-id nil)
- (simulate-wait cluster-map))
- (when cleanup-state
- (doseq [spout (spout-objects spouts)]
- (cleanup spout))))
-
- (if cleanup-state
- (.getAndRemoveResults capturer)
- (.getAndClearResults capturer))))
-
-(defn read-tuples
- ([results component-id stream-id]
- (let [fixed-tuples (get results component-id [])]
- (mapcat
- (fn [ft]
- (if (= stream-id (. ft stream))
- [(vec (. ft values))]))
- fixed-tuples)
- ))
- ([results component-id]
- (read-tuples results component-id Utils/DEFAULT_STREAM_ID)))
-
-(defn ms=
- [& args]
- (apply = (map multi-set args)))
-
-(def TRACKER-BOLT-ID "+++tracker-bolt")
-
-;; TODO: should override system-topology! and wrap everything there
-(defn mk-tracked-topology
- ([tracked-cluster topology]
- (let [track-id (::track-id tracked-cluster)
- ret (.deepCopy topology)]
- (dofor [[_ bolt] (.get_bolts ret)
- :let [obj (deserialized-component-object (.get_bolt_object bolt))]]
- (.set_bolt_object bolt (serialize-component-object
- (BoltTracker. obj track-id))))
- (dofor [[_ spout] (.get_spouts ret)
- :let [obj (deserialized-component-object (.get_spout_object spout))]]
- (.set_spout_object spout (serialize-component-object
- (SpoutTracker. obj track-id))))
- {:topology ret
- :last-spout-emit (atom 0)
- :cluster tracked-cluster})))
-
-(defn assoc-track-id
- [cluster track-id]
- (assoc cluster ::track-id track-id))
-
-(defn increment-global!
- [id key amt]
- (-> (RegisteredGlobalState/getState id)
- (get key)
- (.addAndGet amt)))
-
-(defn global-amt
- [id key]
- (-> (RegisteredGlobalState/getState id)
- (get key)
- .get))
-
-(defmacro with-tracked-cluster
- [[cluster-sym & cluster-args] & body]
- `(let [id# (uuid)]
- (RegisteredGlobalState/setState
- id#
- (doto (ConcurrentHashMap.)
- (.put "spout-emitted" (AtomicInteger. 0))
- (.put "transferred" (AtomicInteger. 0))
- (.put "processed" (AtomicInteger. 0))))
- (with-var-roots
- [acker/mk-acker-bolt
- (let [old# acker/mk-acker-bolt]
- (fn [& args#] (NonRichBoltTracker. (apply old# args#) id#)))
- ;; critical that this particular function is overridden here,
- ;; since the transferred stat needs to be incremented at the moment
- ;; of tuple emission (and not on a separate thread later) for
- ;; topologies to be tracked correctly. This is because "transferred" *must*
- ;; be incremented before "processing".
- executor/mk-executor-transfer-fn
- (let [old# executor/mk-executor-transfer-fn]
- (fn [& args#]
- (let [transferrer# (apply old# args#)]
- (fn [& args2#]
- ;; (log-message "Transferring: " transfer-args#)
- (increment-global! id# "transferred" 1)
- (apply transferrer# args2#)))))]
- (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
- (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
- ~@body)))
- (RegisteredGlobalState/clearState id#)))
-
-(defn tracked-wait
- "Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
- ([tracked-topology]
- (tracked-wait tracked-topology 1 TEST-TIMEOUT-MS))
- ([tracked-topology amt]
- (tracked-wait tracked-topology amt TEST-TIMEOUT-MS))
- ([tracked-topology amt timeout-ms]
- (let [target (+ amt @(:last-spout-emit tracked-topology))
- track-id (-> tracked-topology :cluster ::track-id)
- waiting? (fn []
- (or (not= target (global-amt track-id "spout-emitted"))
- (not= (global-amt track-id "transferred")
- (global-amt track-id "processed"))))]
- (while-timeout timeout-ms (waiting?)
- ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
- ;; (println "Processed: " (global-amt track-id "processed"))
- ;; (println "Transferred: " (global-amt track-id "transferred"))
- (Thread/sleep (rand-int 200)))
- (reset! (:last-spout-emit tracked-topology) target))))
-
-(defnk test-tuple
- [values
- :stream Utils/DEFAULT_STREAM_ID
- :component "component"
- :fields nil]
- (let [fields (or fields
- (->> (iterate inc 1)
- (take (count values))
- (map #(str "field" %))))
- spout-spec (mk-spout-spec* (TestWordSpout.)
- {stream fields})
- topology (StormTopology. {component spout-spec} {} {})
- context (TopologyContext.
- topology
- (read-storm-config)
- {(int 1) component}
- {component [(int 1)]}
- {component {stream (Fields. fields)}}
- "test-storm-id"
- nil
- nil
- (int 1)
- nil
- [(int 1)]
- {}
- {}
- (HashMap.)
- (HashMap.)
- (atom false))]
- (TupleImpl. context values 1 stream)))
-
-(defmacro with-timeout
- [millis unit & body]
- `(let [f# (future ~@body)]
- (try
- (.get f# ~millis ~unit)
- (finally (future-cancel f#)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/testing4j.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing4j.clj b/storm-core/src/clj/backtype/storm/testing4j.clj
deleted file mode 100644
index bc5dc57..0000000
--- a/storm-core/src/clj/backtype/storm/testing4j.clj
+++ /dev/null
@@ -1,184 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.testing4j
- (:import [java.util Map List Collection ArrayList])
- (:require [backtype.storm [LocalCluster :as LocalCluster]])
- (:import [backtype.storm Config ILocalCluster LocalCluster])
- (:import [backtype.storm.generated StormTopology])
- (:import [backtype.storm.daemon nimbus])
- (:import [backtype.storm.testing TestJob MockedSources TrackedTopology
- MkClusterParam CompleteTopologyParam MkTupleParam])
- (:import [backtype.storm.utils Utils])
- (:use [backtype.storm testing util log])
- (:gen-class
- :name backtype.storm.Testing
- :methods [^:static [completeTopology
- [backtype.storm.ILocalCluster backtype.storm.generated.StormTopology
- backtype.storm.testing.CompleteTopologyParam]
- java.util.Map]
- ^:static [completeTopology
- [backtype.storm.ILocalCluster backtype.storm.generated.StormTopology]
- java.util.Map]
- ^:static [withSimulatedTime [Runnable] void]
- ^:static [withLocalCluster [backtype.storm.testing.TestJob] void]
- ^:static [withLocalCluster [backtype.storm.testing.MkClusterParam backtype.storm.testing.TestJob] void]
- ^:static [getLocalCluster [java.util.Map] backtype.storm.ILocalCluster]
- ^:static [withSimulatedTimeLocalCluster [backtype.storm.testing.TestJob] void]
- ^:static [withSimulatedTimeLocalCluster [backtype.storm.testing.MkClusterParam backtype.storm.testing.TestJob] void]
- ^:static [withTrackedCluster [backtype.storm.testing.TestJob] void]
- ^:static [withTrackedCluster [backtype.storm.testing.MkClusterParam backtype.storm.testing.TestJob] void]
- ^:static [readTuples [java.util.Map String String] java.util.List]
- ^:static [readTuples [java.util.Map String] java.util.List]
- ^:static [mkTrackedTopology [backtype.storm.ILocalCluster backtype.storm.generated.StormTopology] backtype.storm.testing.TrackedTopology]
- ^:static [trackedWait [backtype.storm.testing.TrackedTopology] void]
- ^:static [trackedWait [backtype.storm.testing.TrackedTopology Integer] void]
- ^:static [trackedWait [backtype.storm.testing.TrackedTopology Integer Integer] void]
- ^:static [advanceClusterTime [backtype.storm.ILocalCluster Integer Integer] void]
- ^:static [advanceClusterTime [backtype.storm.ILocalCluster Integer] void]
- ^:static [multiseteq [java.util.Collection java.util.Collection] boolean]
- ^:static [multiseteq [java.util.Map java.util.Map] boolean]
- ^:static [testTuple [java.util.List] backtype.storm.tuple.Tuple]
- ^:static [testTuple [java.util.List backtype.storm.testing.MkTupleParam] backtype.storm.tuple.Tuple]]))
-
-(defn -completeTopology
- ([^ILocalCluster cluster ^StormTopology topology ^CompleteTopologyParam completeTopologyParam]
- (let [mocked-sources (or (-> completeTopologyParam .getMockedSources .getData) {})
- storm-conf (or (.getStormConf completeTopologyParam) {})
- cleanup-state (or (.getCleanupState completeTopologyParam) true)
- topology-name (.getTopologyName completeTopologyParam)
- timeout-ms (or (.getTimeoutMs completeTopologyParam) TEST-TIMEOUT-MS)]
- (complete-topology (.getState cluster) topology
- :mock-sources mocked-sources
- :storm-conf storm-conf
- :cleanup-state cleanup-state
- :topology-name topology-name
- :timeout-ms timeout-ms)))
- ([^ILocalCluster cluster ^StormTopology topology]
- (-completeTopology cluster topology (CompleteTopologyParam.))))
-
-
-(defn -withSimulatedTime
- [^Runnable code]
- (with-simulated-time
- (.run code)))
-
-(defmacro with-cluster
- [cluster-type mkClusterParam code]
- `(let [supervisors# (or (.getSupervisors ~mkClusterParam) 2)
- ports-per-supervisor# (or (.getPortsPerSupervisor ~mkClusterParam) 3)
- daemon-conf# (or (.getDaemonConf ~mkClusterParam) {})]
- (~cluster-type [cluster# :supervisors supervisors#
- :ports-per-supervisor ports-per-supervisor#
- :daemon-conf daemon-conf#]
- (let [cluster# (LocalCluster. cluster#)]
- (.run ~code cluster#)))))
-
-(defn -withLocalCluster
- ([^MkClusterParam mkClusterParam ^TestJob code]
- (with-cluster with-local-cluster mkClusterParam code))
- ([^TestJob code]
- (-withLocalCluster (MkClusterParam.) code)))
-
-(defn -getLocalCluster
- ([^Map clusterConf]
- (let [daemon-conf (get-in clusterConf ["daemon-conf"] {})
- supervisors (get-in clusterConf ["supervisors"] 2)
- ports-per-supervisor (get-in clusterConf ["ports-per-supervisor"] 3)
- inimbus (get-in clusterConf ["inimbus"] nil)
- supervisor-slot-port-min (get-in clusterConf ["supervisor-slot-port-min"] 1024)
- nimbus-daemon (get-in clusterConf ["nimbus-daemon"] false)
- local-cluster-map (mk-local-storm-cluster :supervisors supervisors
- :ports-per-supervisor ports-per-supervisor
- :daemon-conf daemon-conf
- :inimbus inimbus
- :supervisor-slot-port-min supervisor-slot-port-min
- :nimbus-daemon nimbus-daemon
- )]
- (LocalCluster. local-cluster-map))))
-
-(defn -withSimulatedTimeLocalCluster
- ([^MkClusterParam mkClusterParam ^TestJob code]
- (with-cluster with-simulated-time-local-cluster mkClusterParam code))
- ([^TestJob code]
- (-withSimulatedTimeLocalCluster (MkClusterParam.) code)))
-
-(defn -withTrackedCluster
- ([^MkClusterParam mkClusterParam ^TestJob code]
- (with-cluster with-tracked-cluster mkClusterParam code))
- ([^TestJob code]
- (-withTrackedCluster (MkClusterParam.) code)))
-
-(defn- find-tuples
- [^List fixed-tuples ^String stream]
- (let [ret (ArrayList.)]
- (doseq [fixed-tuple fixed-tuples]
- (if (= (.stream fixed-tuple) stream)
- (.add ret (.values fixed-tuple))))
- ret))
-
-(defn -readTuples
- ([^Map result ^String componentId ^String streamId]
- (let [stream-result (.get result componentId)
- ret (if stream-result
- (find-tuples stream-result streamId)
- [])]
- ret))
- ([^Map result ^String componentId]
- (-readTuples result componentId Utils/DEFAULT_STREAM_ID)))
-
-(defn -mkTrackedTopology
- [^ILocalCluster trackedCluster ^StormTopology topology]
- (-> (mk-tracked-topology (.getState trackedCluster) topology)
- (TrackedTopology.)))
-
-(defn -trackedWait
- ([^TrackedTopology trackedTopology ^Integer amt ^Integer timeout-ms]
- (tracked-wait trackedTopology amt timeout-ms))
- ([^TrackedTopology trackedTopology ^Integer amt]
- (tracked-wait trackedTopology amt))
- ([^TrackedTopology trackedTopology]
- (-trackedWait trackedTopology 1)))
-
-(defn -advanceClusterTime
- ([^ILocalCluster cluster ^Integer secs ^Integer step]
- (advance-cluster-time (.getState cluster) secs step))
- ([^ILocalCluster cluster ^Integer secs]
- (-advanceClusterTime cluster secs 1)))
-
-(defn- multiseteq
- [^Object obj1 ^Object obj2]
- (let [obj1 (clojurify-structure obj1)
- obj2 (clojurify-structure obj2)]
- (ms= obj1 obj2)))
-
-(defn -multiseteq
- [^Collection coll1 ^Collection coll2]
- (multiseteq coll1 coll2))
-
-(defn -multiseteq
- [^Map coll1 ^Map coll2]
- (multiseteq coll1 coll2))
-
-(defn -testTuple
- ([^List values]
- (-testTuple values nil))
- ([^List values ^MkTupleParam param]
- (if (nil? param)
- (test-tuple values)
- (let [stream (or (.getStream param) Utils/DEFAULT_STREAM_ID)
- component (or (.getComponent param) "component")
- fields (.getFields param)]
- (test-tuple values :stream stream :component component :fields fields)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/thrift.clj b/storm-core/src/clj/backtype/storm/thrift.clj
deleted file mode 100644
index 8f4c659..0000000
--- a/storm-core/src/clj/backtype/storm/thrift.clj
+++ /dev/null
@@ -1,284 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.thrift
- (:import [java.util HashMap]
- [java.io Serializable]
- [backtype.storm.generated NodeInfo Assignment])
- (:import [backtype.storm.generated JavaObject Grouping Nimbus StormTopology
- StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface
- ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
- GlobalStreamId ComponentObject ComponentObject$_Fields
- ShellComponent SupervisorInfo])
- (:import [backtype.storm.utils Utils NimbusClient])
- (:import [backtype.storm Constants])
- (:import [backtype.storm.security.auth ReqContext])
- (:import [backtype.storm.grouping CustomStreamGrouping])
- (:import [backtype.storm.topology TopologyBuilder])
- (:import [backtype.storm.clojure RichShellBolt RichShellSpout])
- (:import [org.apache.thrift.transport TTransport])
- (:use [backtype.storm util config log zookeeper]))
-
-(defn instantiate-java-object
- [^JavaObject obj]
- (let [name (symbol (.get_full_class_name obj))
- args (map (memfn getFieldValue) (.get_args_list obj))]
- (eval `(new ~name ~@args))))
-
-(def grouping-constants
- {Grouping$_Fields/FIELDS :fields
- Grouping$_Fields/SHUFFLE :shuffle
- Grouping$_Fields/ALL :all
- Grouping$_Fields/NONE :none
- Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized
- Grouping$_Fields/CUSTOM_OBJECT :custom-object
- Grouping$_Fields/DIRECT :direct
- Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle})
-
-(defn grouping-type
- [^Grouping grouping]
- (grouping-constants (.getSetField grouping)))
-
-(defn field-grouping
- [^Grouping grouping]
- (when-not (= (grouping-type grouping) :fields)
- (throw (IllegalArgumentException. "Tried to get grouping fields from non fields grouping")))
- (.get_fields grouping))
-
-(defn global-grouping?
- [^Grouping grouping]
- (and (= :fields (grouping-type grouping))
- (empty? (field-grouping grouping))))
-
-(defn parallelism-hint
- [^ComponentCommon component-common]
- (let [phint (.get_parallelism_hint component-common)]
- (if-not (.is_set_parallelism_hint component-common) 1 phint)))
-
-(defn nimbus-client-and-conn
- ([host port]
- (nimbus-client-and-conn host port nil))
- ([host port as-user]
- (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user)
- (let [conf (read-storm-config)
- nimbusClient (NimbusClient. conf host port nil as-user)
- client (.getClient nimbusClient)
- transport (.transport nimbusClient)]
- [client transport] )))
-
-(defmacro with-nimbus-connection
- [[client-sym host port] & body]
- `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)]
- (try
- ~@body
- (finally (.close conn#)))))
-
-(defmacro with-configured-nimbus-connection
- [client-sym & body]
- `(let [conf# (read-storm-config)
- context# (ReqContext/context)
- user# (if (.principal context#) (.getName (.principal context#)))
- nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#)
- ~client-sym (.getClient nimbusClient#)
- conn# (.transport nimbusClient#)
- ]
- (try
- ~@body
- (finally (.close conn#)))))
-
-(defn direct-output-fields
- [fields]
- (StreamInfo. fields true))
-
-(defn output-fields
- [fields]
- (StreamInfo. fields false))
-
-(defn mk-output-spec
- [output-spec]
- (let [output-spec (if (map? output-spec)
- output-spec
- {Utils/DEFAULT_STREAM_ID output-spec})]
- (map-val
- (fn [out]
- (if (instance? StreamInfo out)
- out
- (StreamInfo. out false)))
- output-spec)))
-
-(defnk mk-plain-component-common
- [inputs output-spec parallelism-hint :conf nil]
- (let [ret (ComponentCommon. (HashMap. inputs) (HashMap. (mk-output-spec output-spec)))]
- (when parallelism-hint
- (.set_parallelism_hint ret parallelism-hint))
- (when conf
- (.set_json_conf ret (to-json conf)))
- ret))
-
-(defnk mk-spout-spec*
- [spout outputs :p nil :conf nil]
- (SpoutSpec. (ComponentObject/serialized_java (Utils/javaSerialize spout))
- (mk-plain-component-common {} outputs p :conf conf)))
-
-(defn mk-shuffle-grouping
- []
- (Grouping/shuffle (NullStruct.)))
-
-(defn mk-local-or-shuffle-grouping
- []
- (Grouping/local_or_shuffle (NullStruct.)))
-
-(defn mk-fields-grouping
- [fields]
- (Grouping/fields fields))
-
-(defn mk-global-grouping
- []
- (mk-fields-grouping []))
-
-(defn mk-direct-grouping
- []
- (Grouping/direct (NullStruct.)))
-
-(defn mk-all-grouping
- []
- (Grouping/all (NullStruct.)))
-
-(defn mk-none-grouping
- []
- (Grouping/none (NullStruct.)))
-
-(defn deserialized-component-object
- [^ComponentObject obj]
- (when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA)
- (throw (RuntimeException. "Cannot deserialize non-java-serialized object")))
- (Utils/javaDeserialize (.get_serialized_java obj) Serializable))
-
-(defn serialize-component-object
- [obj]
- (ComponentObject/serialized_java (Utils/javaSerialize obj)))
-
-(defn- mk-grouping
- [grouping-spec]
- (cond (nil? grouping-spec)
- (mk-none-grouping)
-
- (instance? Grouping grouping-spec)
- grouping-spec
-
- (instance? CustomStreamGrouping grouping-spec)
- (Grouping/custom_serialized (Utils/javaSerialize grouping-spec))
-
- (instance? JavaObject grouping-spec)
- (Grouping/custom_object grouping-spec)
-
- (sequential? grouping-spec)
- (mk-fields-grouping grouping-spec)
-
- (= grouping-spec :shuffle)
- (mk-shuffle-grouping)
-
- (= grouping-spec :local-or-shuffle)
- (mk-local-or-shuffle-grouping)
- (= grouping-spec :none)
- (mk-none-grouping)
-
- (= grouping-spec :all)
- (mk-all-grouping)
-
- (= grouping-spec :global)
- (mk-global-grouping)
-
- (= grouping-spec :direct)
- (mk-direct-grouping)
-
- true
- (throw (IllegalArgumentException.
- (str grouping-spec " is not a valid grouping")))))
-
-(defn- mk-inputs
- [inputs]
- (into {} (for [[stream-id grouping-spec] inputs]
- [(if (sequential? stream-id)
- (GlobalStreamId. (first stream-id) (second stream-id))
- (GlobalStreamId. stream-id Utils/DEFAULT_STREAM_ID))
- (mk-grouping grouping-spec)])))
-
-(defnk mk-bolt-spec*
- [inputs bolt outputs :p nil :conf nil]
- (let [common (mk-plain-component-common (mk-inputs inputs) outputs p :conf conf)]
- (Bolt. (ComponentObject/serialized_java (Utils/javaSerialize bolt))
- common)))
-
-(defnk mk-spout-spec
- [spout :parallelism-hint nil :p nil :conf nil]
- (let [parallelism-hint (if p p parallelism-hint)]
- {:obj spout :p parallelism-hint :conf conf}))
-
-(defn- shell-component-params
- [command script-or-output-spec kwargs]
- (if (string? script-or-output-spec)
- [(into-array String [command script-or-output-spec])
- (first kwargs)
- (rest kwargs)]
- [(into-array String command)
- script-or-output-spec
- kwargs]))
-
-(defnk mk-bolt-spec
- [inputs bolt :parallelism-hint nil :p nil :conf nil]
- (let [parallelism-hint (if p p parallelism-hint)]
- {:obj bolt :inputs inputs :p parallelism-hint :conf conf}))
-
-(defn mk-shell-bolt-spec
- [inputs command script-or-output-spec & kwargs]
- (let [[command output-spec kwargs]
- (shell-component-params command script-or-output-spec kwargs)]
- (apply mk-bolt-spec inputs
- (RichShellBolt. command (mk-output-spec output-spec)) kwargs)))
-
-(defn mk-shell-spout-spec
- [command script-or-output-spec & kwargs]
- (let [[command output-spec kwargs]
- (shell-component-params command script-or-output-spec kwargs)]
- (apply mk-spout-spec
- (RichShellSpout. command (mk-output-spec output-spec)) kwargs)))
-
-(defn- add-inputs
- [declarer inputs]
- (doseq [[id grouping] (mk-inputs inputs)]
- (.grouping declarer id grouping)))
-
-(defn mk-topology
- ([spout-map bolt-map]
- (let [builder (TopologyBuilder.)]
- (doseq [[name {spout :obj p :p conf :conf}] spout-map]
- (-> builder (.setSpout name spout (if-not (nil? p) (int p) p)) (.addConfigurations conf)))
- (doseq [[name {bolt :obj p :p conf :conf inputs :inputs}] bolt-map]
- (-> builder (.setBolt name bolt (if-not (nil? p) (int p) p)) (.addConfigurations conf) (add-inputs inputs)))
- (.createTopology builder)))
- ([spout-map bolt-map state-spout-map]
- (mk-topology spout-map bolt-map)))
-
-;; clojurify-structure is needed or else every element becomes the same after successive calls
-;; don't know why this happens
-(def STORM-TOPOLOGY-FIELDS
- (-> StormTopology/metaDataMap clojurify-structure keys))
-
-(def SPOUT-FIELDS
- [StormTopology$_Fields/SPOUTS
- StormTopology$_Fields/STATE_SPOUTS])
-
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/timer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/timer.clj b/storm-core/src/clj/backtype/storm/timer.clj
deleted file mode 100644
index b5f73f7..0000000
--- a/storm-core/src/clj/backtype/storm/timer.clj
+++ /dev/null
@@ -1,128 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.timer
- (:import [backtype.storm.utils Time])
- (:import [java.util PriorityQueue Comparator Random])
- (:import [java.util.concurrent Semaphore])
- (:use [backtype.storm util log]))
-
-;; The timer defined in this file is very similar to java.util.Timer, except
-;; it integrates with Storm's time simulation capabilities. This lets us test
-;; code that does asynchronous work on the timer thread
-
-(defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil]
- (let [queue (PriorityQueue. 10 (reify Comparator
- (compare
- [this o1 o2]
- (- (first o1) (first o2)))
- (equals
- [this obj]
- true)))
- active (atom true)
- lock (Object.)
- notifier (Semaphore. 0)
- thread-name (if timer-name timer-name "timer")
- timer-thread (Thread.
- (fn []
- (while @active
- (try
- (let [[time-millis _ _ :as elem] (locking lock (.peek queue))]
- (if (and elem (>= (current-time-millis) time-millis))
- ;; It is imperative to not run the function
- ;; inside the timer lock. Otherwise, it is
- ;; possible to deadlock if the fn deals with
- ;; other locks, like the submit lock.
- (let [afn (locking lock (second (.poll queue)))]
- (afn))
- (if time-millis
- ;; If any events are scheduled, sleep until
- ;; event generation. If any recurring events
- ;; are scheduled then we will always go
- ;; through this branch, sleeping only the
- ;; exact necessary amount of time. We give
- ;; an upper bound, e.g. 1000 millis, to the
- ;; sleeping time, to limit the response time
- ;; for detecting any new event within 1 secs.
- (Time/sleep (min 1000 (- time-millis (current-time-millis))))
- ;; Otherwise poll to see if any new event
- ;; was scheduled. This is, in essence, the
- ;; response time for detecting any new event
- ;; schedulings when there are no scheduled
- ;; events.
- (Time/sleep 1000))))
- (catch Throwable t
- ;; Because the interrupted exception can be
- ;; wrapped in a RuntimeException.
- (when-not (exception-cause? InterruptedException t)
- (kill-fn t)
- (reset! active false)
- (throw t)))))
- (.release notifier)) thread-name)]
- (.setDaemon timer-thread true)
- (.setPriority timer-thread Thread/MAX_PRIORITY)
- (.start timer-thread)
- {:timer-thread timer-thread
- :queue queue
- :active active
- :lock lock
- :random (Random.)
- :cancel-notifier notifier}))
-
-(defn- check-active!
- [timer]
- (when-not @(:active timer)
- (throw (IllegalStateException. "Timer is not active"))))
-
-(defnk schedule
- [timer delay-secs afn :check-active true :jitter-ms 0]
- (when check-active (check-active! timer))
- (let [id (uuid)
- ^PriorityQueue queue (:queue timer)
- end-time-ms (+ (current-time-millis) (secs-to-millis-long delay-secs))
- end-time-ms (if (< 0 jitter-ms) (+ (.nextInt (:random timer) jitter-ms) end-time-ms) end-time-ms)]
- (locking (:lock timer)
- (.add queue [end-time-ms afn id]))))
-
-(defn schedule-recurring
- [timer delay-secs recur-secs afn]
- (schedule timer
- delay-secs
- (fn this []
- (afn)
- ; This avoids a race condition with cancel-timer.
- (schedule timer recur-secs this :check-active false))))
-
-(defn schedule-recurring-with-jitter
- [timer delay-secs recur-secs jitter-ms afn]
- (schedule timer
- delay-secs
- (fn this []
- (afn)
- ; This avoids a race condition with cancel-timer.
- (schedule timer recur-secs this :check-active false :jitter-ms jitter-ms))))
-
-(defn cancel-timer
- [timer]
- (check-active! timer)
- (locking (:lock timer)
- (reset! (:active timer) false)
- (.interrupt (:timer-thread timer)))
- (.acquire (:cancel-notifier timer)))
-
-(defn timer-waiting?
- [timer]
- (Time/isThreadWaiting (:timer-thread timer)))