You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:10 UTC
[14/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs
to org.apache.storm, but try to provide some form of backwards compatability
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
new file mode 100644
index 0000000..87ca2de
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -0,0 +1,701 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.testing
+ (:require [org.apache.storm.daemon
+ [nimbus :as nimbus]
+ [supervisor :as supervisor]
+ [common :as common]
+ [worker :as worker]
+ [executor :as executor]])
+ (:require [org.apache.storm [process-simulator :as psim]])
+ (:import [org.apache.commons.io FileUtils])
+ (:import [java.io File])
+ (:import [java.util HashMap ArrayList])
+ (:import [java.util.concurrent.atomic AtomicInteger])
+ (:import [java.util.concurrent ConcurrentHashMap])
+ (:import [org.apache.storm.utils Time Utils RegisteredGlobalState])
+ (:import [org.apache.storm.tuple Fields Tuple TupleImpl])
+ (:import [org.apache.storm.task TopologyContext])
+ (:import [org.apache.storm.generated GlobalStreamId Bolt KillOptions])
+ (:import [org.apache.storm.testing FeederSpout FixedTupleSpout FixedTuple
+ TupleCaptureBolt SpoutTracker BoltTracker NonRichBoltTracker
+ TestWordSpout MemoryTransactionalSpout])
+ (:import [org.apache.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils])
+ (:import [org.apache.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo
+ ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus
+ KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo
+ ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice])
+ (:import [org.apache.storm.transactional TransactionalSpoutCoordinator])
+ (:import [org.apache.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor])
+ (:import [org.apache.storm.tuple Tuple])
+ (:import [org.apache.storm.generated StormTopology])
+ (:import [org.apache.storm.task TopologyContext])
+ (:require [org.apache.storm [zookeeper :as zk]])
+ (:require [org.apache.storm.messaging.loader :as msg-loader])
+ (:require [org.apache.storm.daemon.acker :as acker])
+ (:use [org.apache.storm cluster util thrift config log local-state]))
+
+(defn feeder-spout
+ [fields]
+ (FeederSpout. (Fields. fields)))
+
+(defn local-temp-path
+ []
+ (str (System/getProperty "java.io.tmpdir") (if-not on-windows? "/") (uuid)))
+
+(defn delete-all
+ [paths]
+ (dorun
+ (for [t paths]
+ (if (.exists (File. t))
+ (try
+ (FileUtils/forceDelete (File. t))
+ (catch Exception e
+ (log-message (.getMessage e))))))))
+
+(defmacro with-local-tmp
+ [[& tmp-syms] & body]
+ (let [tmp-paths (mapcat (fn [t] [t `(local-temp-path)]) tmp-syms)]
+ `(let [~@tmp-paths]
+ (try
+ ~@body
+ (finally
+ (delete-all ~(vec tmp-syms)))))))
+
+(defn start-simulating-time!
+ []
+ (Time/startSimulating))
+
+(defn stop-simulating-time!
+ []
+ (Time/stopSimulating))
+
+ (defmacro with-simulated-time
+ [& body]
+ `(try
+ (start-simulating-time!)
+ ~@body
+ (finally
+ (stop-simulating-time!))))
+
+(defn advance-time-ms! [ms]
+ (Time/advanceTime ms))
+
+(defn advance-time-secs! [secs]
+ (advance-time-ms! (* (long secs) 1000)))
+
+(defnk add-supervisor
+ [cluster-map :ports 2 :conf {} :id nil]
+ (let [tmp-dir (local-temp-path)
+ port-ids (if (sequential? ports)
+ ports
+ (doall (repeatedly ports (:port-counter cluster-map))))
+ supervisor-conf (merge (:daemon-conf cluster-map)
+ conf
+ {STORM-LOCAL-DIR tmp-dir
+ SUPERVISOR-SLOTS-PORTS port-ids})
+ id-fn (if id (fn [] id) supervisor/generate-supervisor-id)
+ daemon (with-var-roots [supervisor/generate-supervisor-id id-fn] (supervisor/mk-supervisor supervisor-conf (:shared-context cluster-map) (supervisor/standalone-supervisor)))]
+ (swap! (:supervisors cluster-map) conj daemon)
+ (swap! (:tmp-dirs cluster-map) conj tmp-dir)
+ daemon))
+
+(defn mk-shared-context [conf]
+ (if-not (conf STORM-LOCAL-MODE-ZMQ)
+ (msg-loader/mk-local-context)))
+
+(defn start-nimbus-daemon [conf nimbus]
+ (let [server (ThriftServer. conf (Nimbus$Processor. nimbus)
+ ThriftConnectionType/NIMBUS)
+ nimbus-thread (Thread. (fn [] (.serve server)))]
+ (log-message "Starting Nimbus server...")
+ (.start nimbus-thread)
+ server))
+
+
+;; returns map containing cluster info
+;; local dir is always overridden in maps
+;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
+;; if need to customize amt of ports more, can use add-supervisor calls afterwards
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
+ (let [zk-tmp (local-temp-path)
+ [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
+ (zk/mk-inprocess-zookeeper zk-tmp))
+ daemon-conf (merge (read-storm-config)
+ {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
+ ZMQ-LINGER-MILLIS 0
+ TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
+ TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
+ STORM-CLUSTER-MODE "local"
+ BLOBSTORE-SUPERUSER (System/getProperty "user.name")}
+ (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
+ {STORM-ZOOKEEPER-PORT zk-port
+ STORM-ZOOKEEPER-SERVERS ["localhost"]})
+ daemon-conf)
+ nimbus-tmp (local-temp-path)
+ port-counter (mk-counter supervisor-slot-port-min)
+ nimbus (nimbus/service-handler
+ (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
+ (if inimbus inimbus (nimbus/standalone-nimbus)))
+ context (mk-shared-context daemon-conf)
+ nimbus-thrift-server (if nimbus-daemon (start-nimbus-daemon daemon-conf nimbus) nil)
+ cluster-map {:nimbus nimbus
+ :port-counter port-counter
+ :daemon-conf daemon-conf
+ :supervisors (atom [])
+ :state (mk-distributed-cluster-state daemon-conf)
+ :storm-cluster-state (mk-storm-cluster-state daemon-conf)
+ :tmp-dirs (atom [nimbus-tmp zk-tmp])
+ :zookeeper (if (not-nil? zk-handle) zk-handle)
+ :shared-context context
+ :nimbus-thrift-server nimbus-thrift-server}
+ supervisor-confs (if (sequential? supervisors)
+ supervisors
+ (repeat supervisors {}))]
+
+ (doseq [sc supervisor-confs]
+ (add-supervisor cluster-map :ports ports-per-supervisor :conf sc))
+ cluster-map))
+
+(defn get-supervisor [cluster-map supervisor-id]
+ (let [finder-fn #(= (.get-id %) supervisor-id)]
+ (find-first finder-fn @(:supervisors cluster-map))))
+
+(defn kill-supervisor [cluster-map supervisor-id]
+ (let [finder-fn #(= (.get-id %) supervisor-id)
+ supervisors @(:supervisors cluster-map)
+ sup (find-first finder-fn
+ supervisors)]
+ ;; tmp-dir will be taken care of by shutdown
+ (reset! (:supervisors cluster-map) (remove-first finder-fn supervisors))
+ (.shutdown sup)))
+
+(defn kill-local-storm-cluster [cluster-map]
+ (.shutdown (:nimbus cluster-map))
+ (if (not-nil? (:nimbus-thrift-server cluster-map))
+ (do
+ (log-message "shutting down thrift server")
+ (try
+ (.stop (:nimbus-thrift-server cluster-map))
+ (catch Exception e (log-message "failed to stop thrift")))
+ ))
+ (.close (:state cluster-map))
+ (.disconnect (:storm-cluster-state cluster-map))
+ (doseq [s @(:supervisors cluster-map)]
+ (.shutdown-all-workers s)
+ ;; race condition here? will it launch the workers again?
+ (supervisor/kill-supervisor s))
+ (psim/kill-all-processes)
+ (if (not-nil? (:zookeeper cluster-map))
+ (do
+ (log-message "Shutting down in process zookeeper")
+ (zk/shutdown-inprocess-zookeeper (:zookeeper cluster-map))
+ (log-message "Done shutting down in process zookeeper")))
+ (doseq [t @(:tmp-dirs cluster-map)]
+ (log-message "Deleting temporary path " t)
+ (try
+ (rmr t)
+ ;; on windows, the host process still holds lock on the logfile
+ (catch Exception e (log-message (.getMessage e)))) ))
+
+(def TEST-TIMEOUT-MS
+ (let [timeout (System/getenv "STORM_TEST_TIMEOUT_MS")]
+ (parse-int (if timeout timeout "5000"))))
+
+(defmacro while-timeout [timeout-ms condition & body]
+ `(let [end-time# (+ (System/currentTimeMillis) ~timeout-ms)]
+ (log-debug "Looping until " '~condition)
+ (while ~condition
+ (when (> (System/currentTimeMillis) end-time#)
+ (let [thread-dump# (Utils/threadDump)]
+ (log-message "Condition " '~condition " not met in " ~timeout-ms "ms")
+ (log-message thread-dump#)
+ (throw (AssertionError. (str "Test timed out (" ~timeout-ms "ms) " '~condition)))))
+ ~@body)
+ (log-debug "Condition met " '~condition)))
+
+(defn wait-for-condition
+ ([apredicate]
+ (wait-for-condition TEST-TIMEOUT-MS apredicate))
+ ([timeout-ms apredicate]
+ (while-timeout timeout-ms (not (apredicate))
+ (Time/sleep 100))))
+
+(defn wait-until-cluster-waiting
+ "Wait until the cluster is idle. Should be used with time simulation."
+ ([cluster-map] (wait-until-cluster-waiting cluster-map TEST-TIMEOUT-MS))
+ ([cluster-map timeout-ms]
+ ;; wait until all workers, supervisors, and nimbus is waiting
+ (let [supervisors @(:supervisors cluster-map)
+ workers (filter (partial satisfies? common/DaemonCommon) (psim/all-processes))
+ daemons (concat
+ [(:nimbus cluster-map)]
+ supervisors
+ ; because a worker may already be dead
+ workers)]
+ (while-timeout timeout-ms (not (every? (memfn waiting?) daemons))
+ (Thread/sleep (rand-int 20))
+ ;; (doseq [d daemons]
+ ;; (if-not ((memfn waiting?) d)
+ ;; (println d)))
+ ))))
+
+(defn advance-cluster-time
+ ([cluster-map secs increment-secs]
+ (loop [left secs]
+ (when (> left 0)
+ (let [diff (min left increment-secs)]
+ (advance-time-secs! diff)
+ (wait-until-cluster-waiting cluster-map)
+ (recur (- left diff))))))
+ ([cluster-map secs]
+ (advance-cluster-time cluster-map secs 1)))
+
+(defmacro with-local-cluster
+ [[cluster-sym & args] & body]
+ `(let [~cluster-sym (mk-local-storm-cluster ~@args)]
+ (try
+ ~@body
+ (catch Throwable t#
+ (log-error t# "Error in cluster")
+ (throw t#))
+ (finally
+ (let [keep-waiting?# (atom true)
+ f# (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))]
+ (kill-local-storm-cluster ~cluster-sym)
+ (reset! keep-waiting?# false)
+ @f#)))))
+
+(defmacro with-simulated-time-local-cluster
+ [& args]
+ `(with-simulated-time
+ (with-local-cluster ~@args)))
+
+(defmacro with-inprocess-zookeeper
+ [port-sym & body]
+ `(with-local-tmp [tmp#]
+ (let [[~port-sym zks#] (zk/mk-inprocess-zookeeper tmp#)]
+ (try
+ ~@body
+ (finally
+ (zk/shutdown-inprocess-zookeeper zks#))))))
+
+(defn submit-local-topology
+ [nimbus storm-name conf topology]
+ (when-not (Utils/isValidConf conf)
+ (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
+ (.submitTopology nimbus storm-name nil (to-json conf) topology))
+
+(defn submit-local-topology-with-opts
+ [nimbus storm-name conf topology submit-opts]
+ (when-not (Utils/isValidConf conf)
+ (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
+ (.submitTopologyWithOpts nimbus storm-name nil (to-json conf) topology submit-opts))
+
+(defn mocked-convert-assignments-to-worker->resources [storm-cluster-state storm-name worker->resources]
+ (fn [existing-assignments]
+ (let [topology-id (common/get-storm-id storm-cluster-state storm-name)
+ existing-assignments (into {} (for [[tid assignment] existing-assignments]
+ {tid (:worker->resources assignment)}))
+ new-assignments (assoc existing-assignments topology-id worker->resources)]
+ new-assignments)))
+
+(defn mocked-compute-new-topology->executor->node+port [storm-cluster-state storm-name executor->node+port]
+ (fn [new-scheduler-assignments existing-assignments]
+ (let [topology-id (common/get-storm-id storm-cluster-state storm-name)
+ existing-assignments (into {} (for [[tid assignment] existing-assignments]
+ {tid (:executor->node+port assignment)}))
+ new-assignments (assoc existing-assignments topology-id executor->node+port)]
+ new-assignments)))
+
+(defn mocked-compute-new-scheduler-assignments []
+ (fn [nimbus existing-assignments topologies scratch-topology-id]
+ existing-assignments))
+
+(defn submit-mocked-assignment
+ [nimbus storm-cluster-state storm-name conf topology task->component executor->node+port worker->resources]
+ (with-var-roots [common/storm-task-info (fn [& ignored] task->component)
+ nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments)
+ nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources
+ storm-cluster-state
+ storm-name
+ worker->resources)
+ nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port
+ storm-cluster-state
+ storm-name
+ executor->node+port)]
+ (submit-local-topology nimbus storm-name conf topology)))
+
+(defn mk-capture-launch-fn [capture-atom]
+ (fn [supervisor storm-id port worker-id mem-onheap]
+ (let [supervisor-id (:supervisor-id supervisor)
+ conf (:conf supervisor)
+ existing (get @capture-atom [supervisor-id port] [])]
+ (set-worker-user! conf worker-id "")
+ (swap! capture-atom assoc [supervisor-id port] (conj existing storm-id)))))
+
+(defn find-worker-id
+ [supervisor-conf port]
+ (let [supervisor-state (supervisor-state supervisor-conf)
+ worker->port (ls-approved-workers supervisor-state)]
+ (first ((reverse-map worker->port) port))))
+
+(defn find-worker-port
+ [supervisor-conf worker-id]
+ (let [supervisor-state (supervisor-state supervisor-conf)
+ worker->port (ls-approved-workers supervisor-state)]
+ (worker->port worker-id)))
+
+(defn mk-capture-shutdown-fn
+ [capture-atom]
+ (let [existing-fn supervisor/shutdown-worker]
+ (fn [supervisor worker-id]
+ (let [conf (:conf supervisor)
+ supervisor-id (:supervisor-id supervisor)
+ port (find-worker-port conf worker-id)
+ existing (get @capture-atom [supervisor-id port] 0)]
+ (swap! capture-atom assoc [supervisor-id port] (inc existing))
+ (existing-fn supervisor worker-id)))))
+
+(defmacro capture-changed-workers
+ [& body]
+ `(let [launch-captured# (atom {})
+ shutdown-captured# (atom {})]
+ (with-var-roots [supervisor/launch-worker (mk-capture-launch-fn launch-captured#)
+ supervisor/shutdown-worker (mk-capture-shutdown-fn shutdown-captured#)]
+ ~@body
+ {:launched @launch-captured#
+ :shutdown @shutdown-captured#})))
+
+(defmacro capture-launched-workers
+ [& body]
+ `(:launched (capture-changed-workers ~@body)))
+
+(defmacro capture-shutdown-workers
+ [& body]
+ `(:shutdown (capture-changed-workers ~@body)))
+
+(defnk aggregated-stat
+ [cluster-map storm-name stat-key :component-ids nil]
+ (let [state (:storm-cluster-state cluster-map)
+ nimbus (:nimbus cluster-map)
+ storm-id (common/get-storm-id state storm-name)
+ component->tasks (reverse-map
+ (common/storm-task-info
+ (.getUserTopology nimbus storm-id)
+ (from-json (.getTopologyConf nimbus storm-id))))
+ component->tasks (if component-ids
+ (select-keys component->tasks component-ids)
+ component->tasks)
+ task-ids (apply concat (vals component->tasks))
+ assignment (.assignment-info state storm-id nil)
+ taskbeats (.taskbeats state storm-id (:task->node+port assignment))
+ heartbeats (dofor [id task-ids] (get taskbeats id))
+ stats (dofor [hb heartbeats] (if hb (stat-key (:stats hb)) 0))]
+ (reduce + stats)))
+
+(defn emitted-spout-tuples
+ [cluster-map topology storm-name]
+ (aggregated-stat
+ cluster-map
+ storm-name
+ :emitted
+ :component-ids (keys (.get_spouts topology))))
+
+(defn transferred-tuples
+ [cluster-map storm-name]
+ (aggregated-stat cluster-map storm-name :transferred))
+
+(defn acked-tuples
+ [cluster-map storm-name]
+ (aggregated-stat cluster-map storm-name :acked))
+
+(defn simulate-wait
+ [cluster-map]
+ (if (Time/isSimulating)
+ (advance-cluster-time cluster-map 10)
+ (Thread/sleep 100)))
+
+(defprotocol CompletableSpout
+ (exhausted?
+ [this]
+ "Whether all the tuples for this spout have been completed.")
+ (cleanup
+ [this]
+ "Cleanup any global state kept")
+ (startup
+ [this]
+ "Prepare the spout (globally) before starting the topology"))
+
+(extend-type FixedTupleSpout
+ CompletableSpout
+ (exhausted? [this]
+ (= (-> this .getSourceTuples count)
+ (.getCompleted this)))
+ (cleanup [this]
+ (.cleanup this))
+ (startup [this]))
+
+(extend-type TransactionalSpoutCoordinator
+ CompletableSpout
+ (exhausted? [this]
+ (exhausted? (.getSpout this)))
+ (cleanup [this]
+ (cleanup (.getSpout this)))
+ (startup [this]
+ (startup (.getSpout this))))
+
+(extend-type PartitionedTransactionalSpoutExecutor
+ CompletableSpout
+ (exhausted? [this]
+ (exhausted? (.getPartitionedSpout this)))
+ (cleanup [this]
+ (cleanup (.getPartitionedSpout this)))
+ (startup [this]
+ (startup (.getPartitionedSpout this))))
+
+(extend-type MemoryTransactionalSpout
+ CompletableSpout
+ (exhausted? [this]
+ (.isExhaustedTuples this))
+ (cleanup [this]
+ (.cleanup this))
+ (startup [this]
+ (.startup this)))
+
+(defn spout-objects [spec-map]
+ (for [[_ spout-spec] spec-map]
+ (-> spout-spec
+ .get_spout_object
+ deserialized-component-object)))
+
+(defn capture-topology
+ [topology]
+ (let [topology (.deepCopy topology)
+ spouts (.get_spouts topology)
+ bolts (.get_bolts topology)
+ all-streams (apply concat
+ (for [[id spec] (merge (clojurify-structure spouts)
+ (clojurify-structure bolts))]
+ (for [[stream info] (.. spec get_common get_streams)]
+ [(GlobalStreamId. id stream) (.is_direct info)])))
+ capturer (TupleCaptureBolt.)]
+ (.set_bolts topology
+ (assoc (clojurify-structure bolts)
+ (uuid)
+ (Bolt.
+ (serialize-component-object capturer)
+ (mk-plain-component-common (into {} (for [[id direct?] all-streams]
+ [id (if direct?
+ (mk-direct-grouping)
+ (mk-global-grouping))]))
+ {}
+ nil))))
+ {:topology topology
+ :capturer capturer}))
+
+;; TODO: mock-sources needs to be able to mock out state spouts as well
+(defnk complete-topology
+ [cluster-map topology
+ :mock-sources {}
+ :storm-conf {}
+ :cleanup-state true
+ :topology-name nil
+ :timeout-ms TEST-TIMEOUT-MS]
+ ;; TODO: the idea of mocking for transactional topologies should be done an
+ ;; abstraction level above... should have a complete-transactional-topology for this
+ (let [{topology :topology capturer :capturer} (capture-topology topology)
+ storm-name (or topology-name (str "topologytest-" (uuid)))
+ state (:storm-cluster-state cluster-map)
+ spouts (.get_spouts topology)
+ replacements (map-val (fn [v]
+ (FixedTupleSpout.
+ (for [tup v]
+ (if (map? tup)
+ (FixedTuple. (:stream tup) (:values tup))
+ tup))))
+ mock-sources)]
+ (doseq [[id spout] replacements]
+ (let [spout-spec (get spouts id)]
+ (.set_spout_object spout-spec (serialize-component-object spout))))
+ (doseq [spout (spout-objects spouts)]
+ (when-not (extends? CompletableSpout (.getClass spout))
+ (throw (RuntimeException. (str "Cannot complete topology unless every spout is a CompletableSpout (or mocked to be); failed by " spout)))))
+
+ (doseq [spout (spout-objects spouts)]
+ (startup spout))
+
+ (submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
+ (advance-cluster-time cluster-map 11)
+
+ (let [storm-id (common/get-storm-id state storm-name)]
+ ;;Give the topology time to come up without using it to wait for the spouts to complete
+ (simulate-wait cluster-map)
+
+ (while-timeout timeout-ms (not (every? exhausted? (spout-objects spouts)))
+ (simulate-wait cluster-map))
+
+ (.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
+ (while-timeout timeout-ms (.assignment-info state storm-id nil)
+ (simulate-wait cluster-map))
+ (when cleanup-state
+ (doseq [spout (spout-objects spouts)]
+ (cleanup spout))))
+
+ (if cleanup-state
+ (.getAndRemoveResults capturer)
+ (.getAndClearResults capturer))))
+
+(defn read-tuples
+ ([results component-id stream-id]
+ (let [fixed-tuples (get results component-id [])]
+ (mapcat
+ (fn [ft]
+ (if (= stream-id (. ft stream))
+ [(vec (. ft values))]))
+ fixed-tuples)
+ ))
+ ([results component-id]
+ (read-tuples results component-id Utils/DEFAULT_STREAM_ID)))
+
+(defn ms=
+ [& args]
+ (apply = (map multi-set args)))
+
+(def TRACKER-BOLT-ID "+++tracker-bolt")
+
+;; TODO: should override system-topology! and wrap everything there
+(defn mk-tracked-topology
+ ([tracked-cluster topology]
+ (let [track-id (::track-id tracked-cluster)
+ ret (.deepCopy topology)]
+ (dofor [[_ bolt] (.get_bolts ret)
+ :let [obj (deserialized-component-object (.get_bolt_object bolt))]]
+ (.set_bolt_object bolt (serialize-component-object
+ (BoltTracker. obj track-id))))
+ (dofor [[_ spout] (.get_spouts ret)
+ :let [obj (deserialized-component-object (.get_spout_object spout))]]
+ (.set_spout_object spout (serialize-component-object
+ (SpoutTracker. obj track-id))))
+ {:topology ret
+ :last-spout-emit (atom 0)
+ :cluster tracked-cluster})))
+
+(defn assoc-track-id
+ [cluster track-id]
+ (assoc cluster ::track-id track-id))
+
+(defn increment-global!
+ [id key amt]
+ (-> (RegisteredGlobalState/getState id)
+ (get key)
+ (.addAndGet amt)))
+
+(defn global-amt
+ [id key]
+ (-> (RegisteredGlobalState/getState id)
+ (get key)
+ .get))
+
+(defmacro with-tracked-cluster
+ [[cluster-sym & cluster-args] & body]
+ `(let [id# (uuid)]
+ (RegisteredGlobalState/setState
+ id#
+ (doto (ConcurrentHashMap.)
+ (.put "spout-emitted" (AtomicInteger. 0))
+ (.put "transferred" (AtomicInteger. 0))
+ (.put "processed" (AtomicInteger. 0))))
+ (with-var-roots
+ [acker/mk-acker-bolt
+ (let [old# acker/mk-acker-bolt]
+ (fn [& args#] (NonRichBoltTracker. (apply old# args#) id#)))
+ ;; critical that this particular function is overridden here,
+ ;; since the transferred stat needs to be incremented at the moment
+ ;; of tuple emission (and not on a separate thread later) for
+ ;; topologies to be tracked correctly. This is because "transferred" *must*
+ ;; be incremented before "processing".
+ executor/mk-executor-transfer-fn
+ (let [old# executor/mk-executor-transfer-fn]
+ (fn [& args#]
+ (let [transferrer# (apply old# args#)]
+ (fn [& args2#]
+ ;; (log-message "Transferring: " transfer-args#)
+ (increment-global! id# "transferred" 1)
+ (apply transferrer# args2#)))))]
+ (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
+ (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
+ ~@body)))
+ (RegisteredGlobalState/clearState id#)))
+
+(defn tracked-wait
+ "Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
+ ([tracked-topology]
+ (tracked-wait tracked-topology 1 TEST-TIMEOUT-MS))
+ ([tracked-topology amt]
+ (tracked-wait tracked-topology amt TEST-TIMEOUT-MS))
+ ([tracked-topology amt timeout-ms]
+ (let [target (+ amt @(:last-spout-emit tracked-topology))
+ track-id (-> tracked-topology :cluster ::track-id)
+ waiting? (fn []
+ (or (not= target (global-amt track-id "spout-emitted"))
+ (not= (global-amt track-id "transferred")
+ (global-amt track-id "processed"))))]
+ (while-timeout timeout-ms (waiting?)
+ ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
+ ;; (println "Processed: " (global-amt track-id "processed"))
+ ;; (println "Transferred: " (global-amt track-id "transferred"))
+ (Thread/sleep (rand-int 200)))
+ (reset! (:last-spout-emit tracked-topology) target))))
+
+(defnk test-tuple
+ [values
+ :stream Utils/DEFAULT_STREAM_ID
+ :component "component"
+ :fields nil]
+ (let [fields (or fields
+ (->> (iterate inc 1)
+ (take (count values))
+ (map #(str "field" %))))
+ spout-spec (mk-spout-spec* (TestWordSpout.)
+ {stream fields})
+ topology (StormTopology. {component spout-spec} {} {})
+ context (TopologyContext.
+ topology
+ (read-storm-config)
+ {(int 1) component}
+ {component [(int 1)]}
+ {component {stream (Fields. fields)}}
+ "test-storm-id"
+ nil
+ nil
+ (int 1)
+ nil
+ [(int 1)]
+ {}
+ {}
+ (HashMap.)
+ (HashMap.)
+ (atom false))]
+ (TupleImpl. context values 1 stream)))
+
+(defmacro with-timeout
+ [millis unit & body]
+ `(let [f# (future ~@body)]
+ (try
+ (.get f# ~millis ~unit)
+ (finally (future-cancel f#)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/testing4j.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing4j.clj b/storm-core/src/clj/org/apache/storm/testing4j.clj
new file mode 100644
index 0000000..5850262
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/testing4j.clj
@@ -0,0 +1,184 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.testing4j
+ (:import [java.util Map List Collection ArrayList])
+ (:require [org.apache.storm [LocalCluster :as LocalCluster]])
+ (:import [org.apache.storm Config ILocalCluster LocalCluster])
+ (:import [org.apache.storm.generated StormTopology])
+ (:import [org.apache.storm.daemon nimbus])
+ (:import [org.apache.storm.testing TestJob MockedSources TrackedTopology
+ MkClusterParam CompleteTopologyParam MkTupleParam])
+ (:import [org.apache.storm.utils Utils])
+ (:use [org.apache.storm testing util log])
+ (:gen-class
+ :name org.apache.storm.Testing
+ :methods [^:static [completeTopology
+ [org.apache.storm.ILocalCluster org.apache.storm.generated.StormTopology
+ org.apache.storm.testing.CompleteTopologyParam]
+ java.util.Map]
+ ^:static [completeTopology
+ [org.apache.storm.ILocalCluster org.apache.storm.generated.StormTopology]
+ java.util.Map]
+ ^:static [withSimulatedTime [Runnable] void]
+ ^:static [withLocalCluster [org.apache.storm.testing.TestJob] void]
+ ^:static [withLocalCluster [org.apache.storm.testing.MkClusterParam org.apache.storm.testing.TestJob] void]
+ ^:static [getLocalCluster [java.util.Map] org.apache.storm.ILocalCluster]
+ ^:static [withSimulatedTimeLocalCluster [org.apache.storm.testing.TestJob] void]
+ ^:static [withSimulatedTimeLocalCluster [org.apache.storm.testing.MkClusterParam org.apache.storm.testing.TestJob] void]
+ ^:static [withTrackedCluster [org.apache.storm.testing.TestJob] void]
+ ^:static [withTrackedCluster [org.apache.storm.testing.MkClusterParam org.apache.storm.testing.TestJob] void]
+ ^:static [readTuples [java.util.Map String String] java.util.List]
+ ^:static [readTuples [java.util.Map String] java.util.List]
+ ^:static [mkTrackedTopology [org.apache.storm.ILocalCluster org.apache.storm.generated.StormTopology] org.apache.storm.testing.TrackedTopology]
+ ^:static [trackedWait [org.apache.storm.testing.TrackedTopology] void]
+ ^:static [trackedWait [org.apache.storm.testing.TrackedTopology Integer] void]
+ ^:static [trackedWait [org.apache.storm.testing.TrackedTopology Integer Integer] void]
+ ^:static [advanceClusterTime [org.apache.storm.ILocalCluster Integer Integer] void]
+ ^:static [advanceClusterTime [org.apache.storm.ILocalCluster Integer] void]
+ ^:static [multiseteq [java.util.Collection java.util.Collection] boolean]
+ ^:static [multiseteq [java.util.Map java.util.Map] boolean]
+ ^:static [testTuple [java.util.List] org.apache.storm.tuple.Tuple]
+ ^:static [testTuple [java.util.List org.apache.storm.testing.MkTupleParam] org.apache.storm.tuple.Tuple]]))
+
+(defn -completeTopology
+ ([^ILocalCluster cluster ^StormTopology topology ^CompleteTopologyParam completeTopologyParam]
+ (let [mocked-sources (or (-> completeTopologyParam .getMockedSources .getData) {})
+ storm-conf (or (.getStormConf completeTopologyParam) {})
+ cleanup-state (or (.getCleanupState completeTopologyParam) true)
+ topology-name (.getTopologyName completeTopologyParam)
+ timeout-ms (or (.getTimeoutMs completeTopologyParam) TEST-TIMEOUT-MS)]
+ (complete-topology (.getState cluster) topology
+ :mock-sources mocked-sources
+ :storm-conf storm-conf
+ :cleanup-state cleanup-state
+ :topology-name topology-name
+ :timeout-ms timeout-ms)))
+ ([^ILocalCluster cluster ^StormTopology topology]
+ (-completeTopology cluster topology (CompleteTopologyParam.))))
+
+
+(defn -withSimulatedTime
+ [^Runnable code]
+ (with-simulated-time
+ (.run code)))
+
+(defmacro with-cluster
+ [cluster-type mkClusterParam code]
+ `(let [supervisors# (or (.getSupervisors ~mkClusterParam) 2)
+ ports-per-supervisor# (or (.getPortsPerSupervisor ~mkClusterParam) 3)
+ daemon-conf# (or (.getDaemonConf ~mkClusterParam) {})]
+ (~cluster-type [cluster# :supervisors supervisors#
+ :ports-per-supervisor ports-per-supervisor#
+ :daemon-conf daemon-conf#]
+ (let [cluster# (LocalCluster. cluster#)]
+ (.run ~code cluster#)))))
+
+(defn -withLocalCluster
+ ([^MkClusterParam mkClusterParam ^TestJob code]
+ (with-cluster with-local-cluster mkClusterParam code))
+ ([^TestJob code]
+ (-withLocalCluster (MkClusterParam.) code)))
+
+(defn -getLocalCluster
+ ([^Map clusterConf]
+ (let [daemon-conf (get-in clusterConf ["daemon-conf"] {})
+ supervisors (get-in clusterConf ["supervisors"] 2)
+ ports-per-supervisor (get-in clusterConf ["ports-per-supervisor"] 3)
+ inimbus (get-in clusterConf ["inimbus"] nil)
+ supervisor-slot-port-min (get-in clusterConf ["supervisor-slot-port-min"] 1024)
+ nimbus-daemon (get-in clusterConf ["nimbus-daemon"] false)
+ local-cluster-map (mk-local-storm-cluster :supervisors supervisors
+ :ports-per-supervisor ports-per-supervisor
+ :daemon-conf daemon-conf
+ :inimbus inimbus
+ :supervisor-slot-port-min supervisor-slot-port-min
+ :nimbus-daemon nimbus-daemon
+ )]
+ (LocalCluster. local-cluster-map))))
+
+(defn -withSimulatedTimeLocalCluster
+ ([^MkClusterParam mkClusterParam ^TestJob code]
+ (with-cluster with-simulated-time-local-cluster mkClusterParam code))
+ ([^TestJob code]
+ (-withSimulatedTimeLocalCluster (MkClusterParam.) code)))
+
+(defn -withTrackedCluster
+ ([^MkClusterParam mkClusterParam ^TestJob code]
+ (with-cluster with-tracked-cluster mkClusterParam code))
+ ([^TestJob code]
+ (-withTrackedCluster (MkClusterParam.) code)))
+
+(defn- find-tuples
+ [^List fixed-tuples ^String stream]
+ (let [ret (ArrayList.)]
+ (doseq [fixed-tuple fixed-tuples]
+ (if (= (.stream fixed-tuple) stream)
+ (.add ret (.values fixed-tuple))))
+ ret))
+
+(defn -readTuples
+ ([^Map result ^String componentId ^String streamId]
+ (let [stream-result (.get result componentId)
+ ret (if stream-result
+ (find-tuples stream-result streamId)
+ [])]
+ ret))
+ ([^Map result ^String componentId]
+ (-readTuples result componentId Utils/DEFAULT_STREAM_ID)))
+
+(defn -mkTrackedTopology
+ [^ILocalCluster trackedCluster ^StormTopology topology]
+ (-> (mk-tracked-topology (.getState trackedCluster) topology)
+ (TrackedTopology.)))
+
+(defn -trackedWait
+ ([^TrackedTopology trackedTopology ^Integer amt ^Integer timeout-ms]
+ (tracked-wait trackedTopology amt timeout-ms))
+ ([^TrackedTopology trackedTopology ^Integer amt]
+ (tracked-wait trackedTopology amt))
+ ([^TrackedTopology trackedTopology]
+ (-trackedWait trackedTopology 1)))
+
+(defn -advanceClusterTime
+ ([^ILocalCluster cluster ^Integer secs ^Integer step]
+ (advance-cluster-time (.getState cluster) secs step))
+ ([^ILocalCluster cluster ^Integer secs]
+ (-advanceClusterTime cluster secs 1)))
+
+(defn- multiseteq
+ [^Object obj1 ^Object obj2]
+ (let [obj1 (clojurify-structure obj1)
+ obj2 (clojurify-structure obj2)]
+ (ms= obj1 obj2)))
+
+(defn -multiseteq
+ [^Collection coll1 ^Collection coll2]
+ (multiseteq coll1 coll2))
+
+(defn -multiseteq
+ [^Map coll1 ^Map coll2]
+ (multiseteq coll1 coll2))
+
+(defn -testTuple
+ ([^List values]
+ (-testTuple values nil))
+ ([^List values ^MkTupleParam param]
+ (if (nil? param)
+ (test-tuple values)
+ (let [stream (or (.getStream param) Utils/DEFAULT_STREAM_ID)
+ component (or (.getComponent param) "component")
+ fields (.getFields param)]
+ (test-tuple values :stream stream :component component :fields fields)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/thrift.clj b/storm-core/src/clj/org/apache/storm/thrift.clj
new file mode 100644
index 0000000..47e233a
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/thrift.clj
@@ -0,0 +1,284 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.thrift
+ (:import [java.util HashMap]
+ [java.io Serializable]
+ [org.apache.storm.generated NodeInfo Assignment])
+ (:import [org.apache.storm.generated JavaObject Grouping Nimbus StormTopology
+ StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface
+ ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
+ GlobalStreamId ComponentObject ComponentObject$_Fields
+ ShellComponent SupervisorInfo])
+ (:import [org.apache.storm.utils Utils NimbusClient])
+ (:import [org.apache.storm Constants])
+ (:import [org.apache.storm.security.auth ReqContext])
+ (:import [org.apache.storm.grouping CustomStreamGrouping])
+ (:import [org.apache.storm.topology TopologyBuilder])
+ (:import [org.apache.storm.clojure RichShellBolt RichShellSpout])
+ (:import [org.apache.thrift.transport TTransport])
+ (:use [org.apache.storm util config log zookeeper]))
+
+(defn instantiate-java-object
+ [^JavaObject obj]
+ (let [name (symbol (.get_full_class_name obj))
+ args (map (memfn getFieldValue) (.get_args_list obj))]
+ (eval `(new ~name ~@args))))
+
+(def grouping-constants
+ {Grouping$_Fields/FIELDS :fields
+ Grouping$_Fields/SHUFFLE :shuffle
+ Grouping$_Fields/ALL :all
+ Grouping$_Fields/NONE :none
+ Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized
+ Grouping$_Fields/CUSTOM_OBJECT :custom-object
+ Grouping$_Fields/DIRECT :direct
+ Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle})
+
+(defn grouping-type
+ [^Grouping grouping]
+ (grouping-constants (.getSetField grouping)))
+
+(defn field-grouping
+ [^Grouping grouping]
+ (when-not (= (grouping-type grouping) :fields)
+ (throw (IllegalArgumentException. "Tried to get grouping fields from non fields grouping")))
+ (.get_fields grouping))
+
+(defn global-grouping?
+ [^Grouping grouping]
+ (and (= :fields (grouping-type grouping))
+ (empty? (field-grouping grouping))))
+
+(defn parallelism-hint
+ [^ComponentCommon component-common]
+ (let [phint (.get_parallelism_hint component-common)]
+ (if-not (.is_set_parallelism_hint component-common) 1 phint)))
+
+(defn nimbus-client-and-conn
+ ([host port]
+ (nimbus-client-and-conn host port nil))
+ ([host port as-user]
+ (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user)
+ (let [conf (read-storm-config)
+ nimbusClient (NimbusClient. conf host port nil as-user)
+ client (.getClient nimbusClient)
+ transport (.transport nimbusClient)]
+ [client transport] )))
+
+(defmacro with-nimbus-connection
+ [[client-sym host port] & body]
+ `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)]
+ (try
+ ~@body
+ (finally (.close conn#)))))
+
+(defmacro with-configured-nimbus-connection
+ [client-sym & body]
+ `(let [conf# (read-storm-config)
+ context# (ReqContext/context)
+ user# (if (.principal context#) (.getName (.principal context#)))
+ nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#)
+ ~client-sym (.getClient nimbusClient#)
+ conn# (.transport nimbusClient#)
+ ]
+ (try
+ ~@body
+ (finally (.close conn#)))))
+
+(defn direct-output-fields
+ [fields]
+ (StreamInfo. fields true))
+
+(defn output-fields
+ [fields]
+ (StreamInfo. fields false))
+
+(defn mk-output-spec
+ [output-spec]
+ (let [output-spec (if (map? output-spec)
+ output-spec
+ {Utils/DEFAULT_STREAM_ID output-spec})]
+ (map-val
+ (fn [out]
+ (if (instance? StreamInfo out)
+ out
+ (StreamInfo. out false)))
+ output-spec)))
+
+(defnk mk-plain-component-common
+ [inputs output-spec parallelism-hint :conf nil]
+ (let [ret (ComponentCommon. (HashMap. inputs) (HashMap. (mk-output-spec output-spec)))]
+ (when parallelism-hint
+ (.set_parallelism_hint ret parallelism-hint))
+ (when conf
+ (.set_json_conf ret (to-json conf)))
+ ret))
+
+(defnk mk-spout-spec*
+ [spout outputs :p nil :conf nil]
+ (SpoutSpec. (ComponentObject/serialized_java (Utils/javaSerialize spout))
+ (mk-plain-component-common {} outputs p :conf conf)))
+
+(defn mk-shuffle-grouping
+ []
+ (Grouping/shuffle (NullStruct.)))
+
+(defn mk-local-or-shuffle-grouping
+ []
+ (Grouping/local_or_shuffle (NullStruct.)))
+
+(defn mk-fields-grouping
+ [fields]
+ (Grouping/fields fields))
+
+(defn mk-global-grouping
+ []
+ (mk-fields-grouping []))
+
+(defn mk-direct-grouping
+ []
+ (Grouping/direct (NullStruct.)))
+
+(defn mk-all-grouping
+ []
+ (Grouping/all (NullStruct.)))
+
+(defn mk-none-grouping
+ []
+ (Grouping/none (NullStruct.)))
+
+(defn deserialized-component-object
+ [^ComponentObject obj]
+ (when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA)
+ (throw (RuntimeException. "Cannot deserialize non-java-serialized object")))
+ (Utils/javaDeserialize (.get_serialized_java obj) Serializable))
+
+(defn serialize-component-object
+ [obj]
+ (ComponentObject/serialized_java (Utils/javaSerialize obj)))
+
+(defn- mk-grouping
+ [grouping-spec]
+ (cond (nil? grouping-spec)
+ (mk-none-grouping)
+
+ (instance? Grouping grouping-spec)
+ grouping-spec
+
+ (instance? CustomStreamGrouping grouping-spec)
+ (Grouping/custom_serialized (Utils/javaSerialize grouping-spec))
+
+ (instance? JavaObject grouping-spec)
+ (Grouping/custom_object grouping-spec)
+
+ (sequential? grouping-spec)
+ (mk-fields-grouping grouping-spec)
+
+ (= grouping-spec :shuffle)
+ (mk-shuffle-grouping)
+
+ (= grouping-spec :local-or-shuffle)
+ (mk-local-or-shuffle-grouping)
+ (= grouping-spec :none)
+ (mk-none-grouping)
+
+ (= grouping-spec :all)
+ (mk-all-grouping)
+
+ (= grouping-spec :global)
+ (mk-global-grouping)
+
+ (= grouping-spec :direct)
+ (mk-direct-grouping)
+
+ true
+ (throw (IllegalArgumentException.
+ (str grouping-spec " is not a valid grouping")))))
+
+(defn- mk-inputs
+ [inputs]
+ (into {} (for [[stream-id grouping-spec] inputs]
+ [(if (sequential? stream-id)
+ (GlobalStreamId. (first stream-id) (second stream-id))
+ (GlobalStreamId. stream-id Utils/DEFAULT_STREAM_ID))
+ (mk-grouping grouping-spec)])))
+
+(defnk mk-bolt-spec*
+ [inputs bolt outputs :p nil :conf nil]
+ (let [common (mk-plain-component-common (mk-inputs inputs) outputs p :conf conf)]
+ (Bolt. (ComponentObject/serialized_java (Utils/javaSerialize bolt))
+ common)))
+
+(defnk mk-spout-spec
+ [spout :parallelism-hint nil :p nil :conf nil]
+ (let [parallelism-hint (if p p parallelism-hint)]
+ {:obj spout :p parallelism-hint :conf conf}))
+
+(defn- shell-component-params
+ [command script-or-output-spec kwargs]
+ (if (string? script-or-output-spec)
+ [(into-array String [command script-or-output-spec])
+ (first kwargs)
+ (rest kwargs)]
+ [(into-array String command)
+ script-or-output-spec
+ kwargs]))
+
+(defnk mk-bolt-spec
+ [inputs bolt :parallelism-hint nil :p nil :conf nil]
+ (let [parallelism-hint (if p p parallelism-hint)]
+ {:obj bolt :inputs inputs :p parallelism-hint :conf conf}))
+
+(defn mk-shell-bolt-spec
+ [inputs command script-or-output-spec & kwargs]
+ (let [[command output-spec kwargs]
+ (shell-component-params command script-or-output-spec kwargs)]
+ (apply mk-bolt-spec inputs
+ (RichShellBolt. command (mk-output-spec output-spec)) kwargs)))
+
+(defn mk-shell-spout-spec
+ [command script-or-output-spec & kwargs]
+ (let [[command output-spec kwargs]
+ (shell-component-params command script-or-output-spec kwargs)]
+ (apply mk-spout-spec
+ (RichShellSpout. command (mk-output-spec output-spec)) kwargs)))
+
+(defn- add-inputs
+ [declarer inputs]
+ (doseq [[id grouping] (mk-inputs inputs)]
+ (.grouping declarer id grouping)))
+
+(defn mk-topology
+ ([spout-map bolt-map]
+ (let [builder (TopologyBuilder.)]
+ (doseq [[name {spout :obj p :p conf :conf}] spout-map]
+ (-> builder (.setSpout name spout (if-not (nil? p) (int p) p)) (.addConfigurations conf)))
+ (doseq [[name {bolt :obj p :p conf :conf inputs :inputs}] bolt-map]
+ (-> builder (.setBolt name bolt (if-not (nil? p) (int p) p)) (.addConfigurations conf) (add-inputs inputs)))
+ (.createTopology builder)))
+ ([spout-map bolt-map state-spout-map]
+ (mk-topology spout-map bolt-map)))
+
+;; clojurify-structure is needed or else every element becomes the same after successive calls
+;; don't know why this happens
+(def STORM-TOPOLOGY-FIELDS
+ (-> StormTopology/metaDataMap clojurify-structure keys))
+
+(def SPOUT-FIELDS
+ [StormTopology$_Fields/SPOUTS
+ StormTopology$_Fields/STATE_SPOUTS])
+
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/timer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/timer.clj b/storm-core/src/clj/org/apache/storm/timer.clj
new file mode 100644
index 0000000..0d8839e
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/timer.clj
@@ -0,0 +1,128 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.timer
+ (:import [org.apache.storm.utils Time])
+ (:import [java.util PriorityQueue Comparator Random])
+ (:import [java.util.concurrent Semaphore])
+ (:use [org.apache.storm util log]))
+
+;; The timer defined in this file is very similar to java.util.Timer, except
+;; it integrates with Storm's time simulation capabilities. This lets us test
+;; code that does asynchronous work on the timer thread
+
+(defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil]
+ (let [queue (PriorityQueue. 10 (reify Comparator
+ (compare
+ [this o1 o2]
+ (- (first o1) (first o2)))
+ (equals
+ [this obj]
+ true)))
+ active (atom true)
+ lock (Object.)
+ notifier (Semaphore. 0)
+ thread-name (if timer-name timer-name "timer")
+ timer-thread (Thread.
+ (fn []
+ (while @active
+ (try
+ (let [[time-millis _ _ :as elem] (locking lock (.peek queue))]
+ (if (and elem (>= (current-time-millis) time-millis))
+ ;; It is imperative to not run the function
+ ;; inside the timer lock. Otherwise, it is
+ ;; possible to deadlock if the fn deals with
+ ;; other locks, like the submit lock.
+ (let [afn (locking lock (second (.poll queue)))]
+ (afn))
+ (if time-millis
+ ;; If any events are scheduled, sleep until
+ ;; event generation. If any recurring events
+ ;; are scheduled then we will always go
+ ;; through this branch, sleeping only the
+ ;; exact necessary amount of time. We give
+ ;; an upper bound, e.g. 1000 millis, to the
+ ;; sleeping time, to limit the response time
+ ;; for detecting any new event within 1 secs.
+ (Time/sleep (min 1000 (- time-millis (current-time-millis))))
+ ;; Otherwise poll to see if any new event
+ ;; was scheduled. This is, in essence, the
+ ;; response time for detecting any new event
+ ;; schedulings when there are no scheduled
+ ;; events.
+ (Time/sleep 1000))))
+ (catch Throwable t
+ ;; Because the interrupted exception can be
+ ;; wrapped in a RuntimeException.
+ (when-not (exception-cause? InterruptedException t)
+ (kill-fn t)
+ (reset! active false)
+ (throw t)))))
+ (.release notifier)) thread-name)]
+ (.setDaemon timer-thread true)
+ (.setPriority timer-thread Thread/MAX_PRIORITY)
+ (.start timer-thread)
+ {:timer-thread timer-thread
+ :queue queue
+ :active active
+ :lock lock
+ :random (Random.)
+ :cancel-notifier notifier}))
+
+(defn- check-active!
+ [timer]
+ (when-not @(:active timer)
+ (throw (IllegalStateException. "Timer is not active"))))
+
+(defnk schedule
+ [timer delay-secs afn :check-active true :jitter-ms 0]
+ (when check-active (check-active! timer))
+ (let [id (uuid)
+ ^PriorityQueue queue (:queue timer)
+ end-time-ms (+ (current-time-millis) (secs-to-millis-long delay-secs))
+ end-time-ms (if (< 0 jitter-ms) (+ (.nextInt (:random timer) jitter-ms) end-time-ms) end-time-ms)]
+ (locking (:lock timer)
+ (.add queue [end-time-ms afn id]))))
+
+(defn schedule-recurring
+ [timer delay-secs recur-secs afn]
+ (schedule timer
+ delay-secs
+ (fn this []
+ (afn)
+ ; This avoids a race condition with cancel-timer.
+ (schedule timer recur-secs this :check-active false))))
+
+(defn schedule-recurring-with-jitter
+ [timer delay-secs recur-secs jitter-ms afn]
+ (schedule timer
+ delay-secs
+ (fn this []
+ (afn)
+ ; This avoids a race condition with cancel-timer.
+ (schedule timer recur-secs this :check-active false :jitter-ms jitter-ms))))
+
+(defn cancel-timer
+ [timer]
+ (check-active! timer)
+ (locking (:lock timer)
+ (reset! (:active timer) false)
+ (.interrupt (:timer-thread timer)))
+ (.acquire (:cancel-notifier timer)))
+
+(defn timer-waiting?
+ [timer]
+ (Time/isThreadWaiting (:timer-thread timer)))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/trident/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/trident/testing.clj b/storm-core/src/clj/org/apache/storm/trident/testing.clj
new file mode 100644
index 0000000..44e5ca9
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/trident/testing.clj
@@ -0,0 +1,79 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.trident.testing
+ (:require [org.apache.storm.LocalDRPC :as LocalDRPC])
+ (:import [org.apache.storm.trident.testing FeederBatchSpout FeederCommitterBatchSpout MemoryMapState MemoryMapState$Factory TuplifyArgs])
+ (:require [org.apache.storm [LocalDRPC]])
+ (:import [org.apache.storm LocalDRPC])
+ (:import [org.apache.storm.tuple Fields])
+ (:import [org.apache.storm.generated KillOptions])
+ (:require [org.apache.storm [testing :as t]])
+ (:use [org.apache.storm util])
+ )
+
+(defn local-drpc []
+ (LocalDRPC.))
+
+(defn exec-drpc [^LocalDRPC drpc function-name args]
+ (let [res (.execute drpc function-name args)]
+ (from-json res)))
+
+(defn exec-drpc-tuples [^LocalDRPC drpc function-name tuples]
+ (exec-drpc drpc function-name (to-json tuples)))
+
+(defn feeder-spout [fields]
+ (FeederBatchSpout. fields))
+
+(defn feeder-committer-spout [fields]
+ (FeederCommitterBatchSpout. fields))
+
+(defn feed [feeder tuples]
+ (.feed feeder tuples))
+
+(defn fields [& fields]
+ (Fields. fields))
+
+(defn memory-map-state []
+ (MemoryMapState$Factory.))
+
+(defmacro with-drpc [[drpc] & body]
+ `(let [~drpc (org.apache.storm.LocalDRPC.)]
+ ~@body
+ (.shutdown ~drpc)
+ ))
+
+(defn with-topology* [cluster topo body-fn]
+ (t/submit-local-topology (:nimbus cluster) "tester" {} (.build topo))
+ (body-fn)
+ (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0)))
+ )
+
+(defmacro with-topology [[cluster topo] & body]
+ `(with-topology* ~cluster ~topo (fn [] ~@body)))
+
+(defn bootstrap-imports []
+ (import 'org.apache.storm.LocalDRPC)
+ (import 'org.apache.storm.trident.TridentTopology)
+ (import '[org.apache.storm.trident.operation.builtin Count Sum Equals MapGet Debug FilterNull FirstN TupleCollectionGet])
+ )
+
+(defn drpc-tuples-input [topology function-name drpc outfields]
+ (-> topology
+ (.newDRPCStream function-name drpc)
+ (.each (fields "args") (TuplifyArgs.) outfields)
+ ))
+
+