You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:10 UTC

[14/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
new file mode 100644
index 0000000..87ca2de
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -0,0 +1,701 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.testing
+  (:require [org.apache.storm.daemon
+             [nimbus :as nimbus]
+             [supervisor :as supervisor]
+             [common :as common]
+             [worker :as worker]
+             [executor :as executor]])
+  (:require [org.apache.storm [process-simulator :as psim]])
+  (:import [org.apache.commons.io FileUtils])
+  (:import [java.io File])
+  (:import [java.util HashMap ArrayList])
+  (:import [java.util.concurrent.atomic AtomicInteger])
+  (:import [java.util.concurrent ConcurrentHashMap])
+  (:import [org.apache.storm.utils Time Utils RegisteredGlobalState])
+  (:import [org.apache.storm.tuple Fields Tuple TupleImpl])
+  (:import [org.apache.storm.task TopologyContext])
+  (:import [org.apache.storm.generated GlobalStreamId Bolt KillOptions])
+  (:import [org.apache.storm.testing FeederSpout FixedTupleSpout FixedTuple
+            TupleCaptureBolt SpoutTracker BoltTracker NonRichBoltTracker
+            TestWordSpout MemoryTransactionalSpout])
+  (:import [org.apache.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils])
+  (:import [org.apache.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo
+            ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus
+            KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo
+            ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice])
+  (:import [org.apache.storm.transactional TransactionalSpoutCoordinator])
+  (:import [org.apache.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor])
+  (:import [org.apache.storm.tuple Tuple])
+  (:import [org.apache.storm.generated StormTopology])
+  (:import [org.apache.storm.task TopologyContext])
+  (:require [org.apache.storm [zookeeper :as zk]])
+  (:require [org.apache.storm.messaging.loader :as msg-loader])
+  (:require [org.apache.storm.daemon.acker :as acker])
+  (:use [org.apache.storm cluster util thrift config log local-state]))
+
+(defn feeder-spout
+  [fields]
+  (FeederSpout. (Fields. fields)))
+
+(defn local-temp-path
+  []
+  (str (System/getProperty "java.io.tmpdir") (if-not on-windows? "/") (uuid)))
+
+(defn delete-all
+  [paths]
+  (dorun
+    (for [t paths]
+      (if (.exists (File. t))
+        (try
+          (FileUtils/forceDelete (File. t))
+          (catch Exception e
+            (log-message (.getMessage e))))))))
+
+(defmacro with-local-tmp
+  [[& tmp-syms] & body]
+  (let [tmp-paths (mapcat (fn [t] [t `(local-temp-path)]) tmp-syms)]
+    `(let [~@tmp-paths]
+       (try
+         ~@body
+         (finally
+           (delete-all ~(vec tmp-syms)))))))
+
+(defn start-simulating-time!
+  []
+  (Time/startSimulating))
+
+(defn stop-simulating-time!
+  []
+  (Time/stopSimulating))
+
+ (defmacro with-simulated-time
+   [& body]
+   `(try
+     (start-simulating-time!)
+     ~@body
+     (finally
+       (stop-simulating-time!))))
+
+(defn advance-time-ms! [ms]
+  (Time/advanceTime ms))
+
+(defn advance-time-secs! [secs]
+  (advance-time-ms! (* (long secs) 1000)))
+
+(defnk add-supervisor
+  [cluster-map :ports 2 :conf {} :id nil]
+  (let [tmp-dir (local-temp-path)
+        port-ids (if (sequential? ports)
+                   ports
+                   (doall (repeatedly ports (:port-counter cluster-map))))
+        supervisor-conf (merge (:daemon-conf cluster-map)
+                               conf
+                               {STORM-LOCAL-DIR tmp-dir
+                                SUPERVISOR-SLOTS-PORTS port-ids})
+        id-fn (if id (fn [] id) supervisor/generate-supervisor-id)
+        daemon (with-var-roots [supervisor/generate-supervisor-id id-fn] (supervisor/mk-supervisor supervisor-conf (:shared-context cluster-map) (supervisor/standalone-supervisor)))]
+    (swap! (:supervisors cluster-map) conj daemon)
+    (swap! (:tmp-dirs cluster-map) conj tmp-dir)
+    daemon))
+
+(defn mk-shared-context [conf]
+  (if-not (conf STORM-LOCAL-MODE-ZMQ)
+    (msg-loader/mk-local-context)))
+
+(defn start-nimbus-daemon [conf nimbus]
+  (let [server (ThriftServer. conf (Nimbus$Processor. nimbus)
+                              ThriftConnectionType/NIMBUS)
+        nimbus-thread (Thread. (fn [] (.serve server)))]
+    (log-message "Starting Nimbus server...")
+    (.start nimbus-thread)
+    server))
+
+
+;; returns map containing cluster info
+;; local dir is always overridden in maps
+;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
+;; if need to customize amt of ports more, can use add-supervisor calls afterwards
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
+  (let [zk-tmp (local-temp-path)
+        [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
+                              (zk/mk-inprocess-zookeeper zk-tmp))
+        daemon-conf (merge (read-storm-config)
+                           {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
+                            ZMQ-LINGER-MILLIS 0
+                            TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
+                            TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
+                            STORM-CLUSTER-MODE "local"
+                            BLOBSTORE-SUPERUSER (System/getProperty "user.name")}
+                           (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
+                             {STORM-ZOOKEEPER-PORT zk-port
+                              STORM-ZOOKEEPER-SERVERS ["localhost"]})
+                           daemon-conf)
+        nimbus-tmp (local-temp-path)
+        port-counter (mk-counter supervisor-slot-port-min)
+        nimbus (nimbus/service-handler
+                (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
+                (if inimbus inimbus (nimbus/standalone-nimbus)))
+        context (mk-shared-context daemon-conf)
+        nimbus-thrift-server (if nimbus-daemon (start-nimbus-daemon daemon-conf nimbus) nil)
+        cluster-map {:nimbus nimbus
+                     :port-counter port-counter
+                     :daemon-conf daemon-conf
+                     :supervisors (atom [])
+                     :state (mk-distributed-cluster-state daemon-conf)
+                     :storm-cluster-state (mk-storm-cluster-state daemon-conf)
+                     :tmp-dirs (atom [nimbus-tmp zk-tmp])
+                     :zookeeper (if (not-nil? zk-handle) zk-handle)
+                     :shared-context context
+                     :nimbus-thrift-server nimbus-thrift-server}
+        supervisor-confs (if (sequential? supervisors)
+                           supervisors
+                           (repeat supervisors {}))]
+
+    (doseq [sc supervisor-confs]
+      (add-supervisor cluster-map :ports ports-per-supervisor :conf sc))
+    cluster-map))
+
+(defn get-supervisor [cluster-map supervisor-id]
+  (let [finder-fn #(= (.get-id %) supervisor-id)]
+    (find-first finder-fn @(:supervisors cluster-map))))
+
+(defn kill-supervisor [cluster-map supervisor-id]
+  (let [finder-fn #(= (.get-id %) supervisor-id)
+        supervisors @(:supervisors cluster-map)
+        sup (find-first finder-fn
+                        supervisors)]
+    ;; tmp-dir will be taken care of by shutdown
+    (reset! (:supervisors cluster-map) (remove-first finder-fn supervisors))
+    (.shutdown sup)))
+
+(defn kill-local-storm-cluster [cluster-map]
+  (.shutdown (:nimbus cluster-map))
+  (if (not-nil? (:nimbus-thrift-server cluster-map))
+    (do
+      (log-message "shutting down thrift server")
+      (try
+        (.stop (:nimbus-thrift-server cluster-map))
+        (catch Exception e (log-message "failed to stop thrift")))
+      ))
+  (.close (:state cluster-map))
+  (.disconnect (:storm-cluster-state cluster-map))
+  (doseq [s @(:supervisors cluster-map)]
+    (.shutdown-all-workers s)
+    ;; race condition here? will it launch the workers again?
+    (supervisor/kill-supervisor s))
+  (psim/kill-all-processes)
+  (if (not-nil? (:zookeeper cluster-map))
+    (do
+      (log-message "Shutting down in process zookeeper")
+      (zk/shutdown-inprocess-zookeeper (:zookeeper cluster-map))
+      (log-message "Done shutting down in process zookeeper")))
+  (doseq [t @(:tmp-dirs cluster-map)]
+    (log-message "Deleting temporary path " t)
+    (try
+      (rmr t)
+      ;; on windows, the host process still holds lock on the logfile
+      (catch Exception e (log-message (.getMessage e)))) ))
+
+(def TEST-TIMEOUT-MS
+  (let [timeout (System/getenv "STORM_TEST_TIMEOUT_MS")]
+    (parse-int (if timeout timeout "5000"))))
+
+(defmacro while-timeout [timeout-ms condition & body]
+  `(let [end-time# (+ (System/currentTimeMillis) ~timeout-ms)]
+     (log-debug "Looping until " '~condition)
+     (while ~condition
+       (when (> (System/currentTimeMillis) end-time#)
+         (let [thread-dump# (Utils/threadDump)]
+           (log-message "Condition " '~condition  " not met in " ~timeout-ms "ms")
+           (log-message thread-dump#)
+           (throw (AssertionError. (str "Test timed out (" ~timeout-ms "ms) " '~condition)))))
+       ~@body)
+     (log-debug "Condition met " '~condition)))
+
+(defn wait-for-condition
+  ([apredicate]
+    (wait-for-condition TEST-TIMEOUT-MS apredicate))
+  ([timeout-ms apredicate]
+    (while-timeout timeout-ms (not (apredicate))
+      (Time/sleep 100))))
+
+(defn wait-until-cluster-waiting
+  "Wait until the cluster is idle. Should be used with time simulation."
+  ([cluster-map] (wait-until-cluster-waiting cluster-map TEST-TIMEOUT-MS))
+  ([cluster-map timeout-ms]
+  ;; wait until all workers, supervisors, and nimbus is waiting
+  (let [supervisors @(:supervisors cluster-map)
+        workers (filter (partial satisfies? common/DaemonCommon) (psim/all-processes))
+        daemons (concat
+                  [(:nimbus cluster-map)]
+                  supervisors
+                  ; because a worker may already be dead
+                  workers)]
+    (while-timeout timeout-ms (not (every? (memfn waiting?) daemons))
+                   (Thread/sleep (rand-int 20))
+                   ;;      (doseq [d daemons]
+                   ;;        (if-not ((memfn waiting?) d)
+                   ;;          (println d)))
+                   ))))
+
+(defn advance-cluster-time
+  ([cluster-map secs increment-secs]
+   (loop [left secs]
+     (when (> left 0)
+       (let [diff (min left increment-secs)]
+         (advance-time-secs! diff)
+         (wait-until-cluster-waiting cluster-map)
+         (recur (- left diff))))))
+  ([cluster-map secs]
+   (advance-cluster-time cluster-map secs 1)))
+
+(defmacro with-local-cluster
+  [[cluster-sym & args] & body]
+  `(let [~cluster-sym (mk-local-storm-cluster ~@args)]
+     (try
+       ~@body
+       (catch Throwable t#
+         (log-error t# "Error in cluster")
+         (throw t#))
+       (finally
+         (let [keep-waiting?# (atom true)
+               f# (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))]
+           (kill-local-storm-cluster ~cluster-sym)
+           (reset! keep-waiting?# false)
+            @f#)))))
+
+(defmacro with-simulated-time-local-cluster
+  [& args]
+  `(with-simulated-time
+     (with-local-cluster ~@args)))
+
+(defmacro with-inprocess-zookeeper
+  [port-sym & body]
+  `(with-local-tmp [tmp#]
+                   (let [[~port-sym zks#] (zk/mk-inprocess-zookeeper tmp#)]
+                     (try
+                       ~@body
+                       (finally
+                         (zk/shutdown-inprocess-zookeeper zks#))))))
+
+(defn submit-local-topology
+  [nimbus storm-name conf topology]
+  (when-not (Utils/isValidConf conf)
+    (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
+  (.submitTopology nimbus storm-name nil (to-json conf) topology))
+
+(defn submit-local-topology-with-opts
+  [nimbus storm-name conf topology submit-opts]
+  (when-not (Utils/isValidConf conf)
+    (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
+  (.submitTopologyWithOpts nimbus storm-name nil (to-json conf) topology submit-opts))
+
+(defn mocked-convert-assignments-to-worker->resources [storm-cluster-state storm-name worker->resources]
+  (fn [existing-assignments]
+    (let [topology-id (common/get-storm-id storm-cluster-state storm-name)
+          existing-assignments (into {} (for [[tid assignment] existing-assignments]
+                                          {tid (:worker->resources assignment)}))
+          new-assignments (assoc existing-assignments topology-id worker->resources)]
+      new-assignments)))
+
+(defn mocked-compute-new-topology->executor->node+port [storm-cluster-state storm-name executor->node+port]
+  (fn [new-scheduler-assignments existing-assignments]
+    (let [topology-id (common/get-storm-id storm-cluster-state storm-name)
+          existing-assignments (into {} (for [[tid assignment] existing-assignments]
+                                          {tid (:executor->node+port assignment)}))
+          new-assignments (assoc existing-assignments topology-id executor->node+port)]
+      new-assignments)))
+
+(defn mocked-compute-new-scheduler-assignments []
+  (fn [nimbus existing-assignments topologies scratch-topology-id]
+    existing-assignments))
+
+(defn submit-mocked-assignment
+  [nimbus storm-cluster-state storm-name conf topology task->component executor->node+port worker->resources]
+  (with-var-roots [common/storm-task-info (fn [& ignored] task->component)
+                   nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments)
+                   nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources
+                                                          storm-cluster-state
+                                                          storm-name
+                                                          worker->resources)
+                   nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port
+                                                                      storm-cluster-state
+                                                                      storm-name
+                                                                      executor->node+port)]
+    (submit-local-topology nimbus storm-name conf topology)))
+
+(defn mk-capture-launch-fn [capture-atom]
+  (fn [supervisor storm-id port worker-id mem-onheap]
+    (let [supervisor-id (:supervisor-id supervisor)
+          conf (:conf supervisor)
+          existing (get @capture-atom [supervisor-id port] [])]
+      (set-worker-user! conf worker-id "")
+      (swap! capture-atom assoc [supervisor-id port] (conj existing storm-id)))))
+
+(defn find-worker-id
+  [supervisor-conf port]
+  (let [supervisor-state (supervisor-state supervisor-conf)
+        worker->port (ls-approved-workers supervisor-state)]
+    (first ((reverse-map worker->port) port))))
+
+(defn find-worker-port
+  [supervisor-conf worker-id]
+  (let [supervisor-state (supervisor-state supervisor-conf)
+        worker->port (ls-approved-workers supervisor-state)]
+    (worker->port worker-id)))
+
+(defn mk-capture-shutdown-fn
+  [capture-atom]
+  (let [existing-fn supervisor/shutdown-worker]
+    (fn [supervisor worker-id]
+      (let [conf (:conf supervisor)
+            supervisor-id (:supervisor-id supervisor)
+            port (find-worker-port conf worker-id)
+            existing (get @capture-atom [supervisor-id port] 0)]
+        (swap! capture-atom assoc [supervisor-id port] (inc existing))
+        (existing-fn supervisor worker-id)))))
+
+(defmacro capture-changed-workers
+  [& body]
+  `(let [launch-captured# (atom {})
+         shutdown-captured# (atom {})]
+     (with-var-roots [supervisor/launch-worker (mk-capture-launch-fn launch-captured#)
+                      supervisor/shutdown-worker (mk-capture-shutdown-fn shutdown-captured#)]
+                     ~@body
+                     {:launched @launch-captured#
+                      :shutdown @shutdown-captured#})))
+
+(defmacro capture-launched-workers
+  [& body]
+  `(:launched (capture-changed-workers ~@body)))
+
+(defmacro capture-shutdown-workers
+  [& body]
+  `(:shutdown (capture-changed-workers ~@body)))
+
+(defnk aggregated-stat
+  [cluster-map storm-name stat-key :component-ids nil]
+  (let [state (:storm-cluster-state cluster-map)
+        nimbus (:nimbus cluster-map)
+        storm-id (common/get-storm-id state storm-name)
+        component->tasks (reverse-map
+                           (common/storm-task-info
+                             (.getUserTopology nimbus storm-id)
+                             (from-json (.getTopologyConf nimbus storm-id))))
+        component->tasks (if component-ids
+                           (select-keys component->tasks component-ids)
+                           component->tasks)
+        task-ids (apply concat (vals component->tasks))
+        assignment (.assignment-info state storm-id nil)
+        taskbeats (.taskbeats state storm-id (:task->node+port assignment))
+        heartbeats (dofor [id task-ids] (get taskbeats id))
+        stats (dofor [hb heartbeats] (if hb (stat-key (:stats hb)) 0))]
+    (reduce + stats)))
+
+(defn emitted-spout-tuples
+  [cluster-map topology storm-name]
+  (aggregated-stat
+    cluster-map
+    storm-name
+    :emitted
+    :component-ids (keys (.get_spouts topology))))
+
+(defn transferred-tuples
+  [cluster-map storm-name]
+  (aggregated-stat cluster-map storm-name :transferred))
+
+(defn acked-tuples
+  [cluster-map storm-name]
+  (aggregated-stat cluster-map storm-name :acked))
+
+(defn simulate-wait
+  [cluster-map]
+  (if (Time/isSimulating)
+    (advance-cluster-time cluster-map 10)
+    (Thread/sleep 100)))
+
+(defprotocol CompletableSpout
+  (exhausted?
+    [this]
+    "Whether all the tuples for this spout have been completed.")
+  (cleanup
+    [this]
+    "Cleanup any global state kept")
+  (startup
+    [this]
+    "Prepare the spout (globally) before starting the topology"))
+
+(extend-type FixedTupleSpout
+  CompletableSpout
+  (exhausted? [this]
+              (= (-> this .getSourceTuples count)
+                 (.getCompleted this)))
+  (cleanup [this]
+           (.cleanup this))
+  (startup [this]))
+
+(extend-type TransactionalSpoutCoordinator
+  CompletableSpout
+  (exhausted? [this]
+              (exhausted? (.getSpout this)))
+  (cleanup [this]
+           (cleanup (.getSpout this)))
+  (startup [this]
+           (startup (.getSpout this))))
+
+(extend-type PartitionedTransactionalSpoutExecutor
+  CompletableSpout
+  (exhausted? [this]
+              (exhausted? (.getPartitionedSpout this)))
+  (cleanup [this]
+           (cleanup (.getPartitionedSpout this)))
+  (startup [this]
+           (startup (.getPartitionedSpout this))))
+
+(extend-type MemoryTransactionalSpout
+  CompletableSpout
+  (exhausted? [this]
+              (.isExhaustedTuples this))
+  (cleanup [this]
+           (.cleanup this))
+  (startup [this]
+           (.startup this)))
+
+(defn spout-objects [spec-map]
+  (for [[_ spout-spec] spec-map]
+    (-> spout-spec
+        .get_spout_object
+        deserialized-component-object)))
+
+(defn capture-topology
+  [topology]
+  (let [topology (.deepCopy topology)
+        spouts (.get_spouts topology)
+        bolts (.get_bolts topology)
+        all-streams (apply concat
+                           (for [[id spec] (merge (clojurify-structure spouts)
+                                                  (clojurify-structure bolts))]
+                             (for [[stream info] (.. spec get_common get_streams)]
+                               [(GlobalStreamId. id stream) (.is_direct info)])))
+        capturer (TupleCaptureBolt.)]
+    (.set_bolts topology
+                (assoc (clojurify-structure bolts)
+                  (uuid)
+                  (Bolt.
+                    (serialize-component-object capturer)
+                    (mk-plain-component-common (into {} (for [[id direct?] all-streams]
+                                                          [id (if direct?
+                                                                (mk-direct-grouping)
+                                                                (mk-global-grouping))]))
+                                               {}
+                                               nil))))
+    {:topology topology
+     :capturer capturer}))
+
+;; TODO: mock-sources needs to be able to mock out state spouts as well
+(defnk complete-topology
+  [cluster-map topology
+   :mock-sources {}
+   :storm-conf {}
+   :cleanup-state true
+   :topology-name nil
+   :timeout-ms TEST-TIMEOUT-MS]
+  ;; TODO: the idea of mocking for transactional topologies should be done an
+  ;; abstraction level above... should have a complete-transactional-topology for this
+  (let [{topology :topology capturer :capturer} (capture-topology topology)
+        storm-name (or topology-name (str "topologytest-" (uuid)))
+        state (:storm-cluster-state cluster-map)
+        spouts (.get_spouts topology)
+        replacements (map-val (fn [v]
+                                (FixedTupleSpout.
+                                  (for [tup v]
+                                    (if (map? tup)
+                                      (FixedTuple. (:stream tup) (:values tup))
+                                      tup))))
+                              mock-sources)]
+    (doseq [[id spout] replacements]
+      (let [spout-spec (get spouts id)]
+        (.set_spout_object spout-spec (serialize-component-object spout))))
+    (doseq [spout (spout-objects spouts)]
+      (when-not (extends? CompletableSpout (.getClass spout))
+        (throw (RuntimeException. (str "Cannot complete topology unless every spout is a CompletableSpout (or mocked to be); failed by " spout)))))
+
+    (doseq [spout (spout-objects spouts)]
+      (startup spout))
+
+    (submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
+    (advance-cluster-time cluster-map 11)
+
+    (let [storm-id (common/get-storm-id state storm-name)]
+      ;;Give the topology time to come up without using it to wait for the spouts to complete
+      (simulate-wait cluster-map)
+
+      (while-timeout timeout-ms (not (every? exhausted? (spout-objects spouts)))
+                     (simulate-wait cluster-map))
+
+      (.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
+      (while-timeout timeout-ms (.assignment-info state storm-id nil)
+                     (simulate-wait cluster-map))
+      (when cleanup-state
+        (doseq [spout (spout-objects spouts)]
+          (cleanup spout))))
+
+    (if cleanup-state
+      (.getAndRemoveResults capturer)
+      (.getAndClearResults capturer))))
+
+(defn read-tuples
+  ([results component-id stream-id]
+   (let [fixed-tuples (get results component-id [])]
+     (mapcat
+       (fn [ft]
+         (if (= stream-id (. ft stream))
+           [(vec (. ft values))]))
+       fixed-tuples)
+     ))
+  ([results component-id]
+   (read-tuples results component-id Utils/DEFAULT_STREAM_ID)))
+
+(defn ms=
+  [& args]
+  (apply = (map multi-set args)))
+
+(def TRACKER-BOLT-ID "+++tracker-bolt")
+
+;; TODO: should override system-topology! and wrap everything there
+(defn mk-tracked-topology
+  ([tracked-cluster topology]
+   (let [track-id (::track-id tracked-cluster)
+         ret (.deepCopy topology)]
+     (dofor [[_ bolt] (.get_bolts ret)
+             :let [obj (deserialized-component-object (.get_bolt_object bolt))]]
+            (.set_bolt_object bolt (serialize-component-object
+                                     (BoltTracker. obj track-id))))
+     (dofor [[_ spout] (.get_spouts ret)
+             :let [obj (deserialized-component-object (.get_spout_object spout))]]
+            (.set_spout_object spout (serialize-component-object
+                                       (SpoutTracker. obj track-id))))
+     {:topology ret
+      :last-spout-emit (atom 0)
+      :cluster tracked-cluster})))
+
+(defn assoc-track-id
+  [cluster track-id]
+  (assoc cluster ::track-id track-id))
+
+(defn increment-global!
+  [id key amt]
+  (-> (RegisteredGlobalState/getState id)
+      (get key)
+      (.addAndGet amt)))
+
+(defn global-amt
+  [id key]
+  (-> (RegisteredGlobalState/getState id)
+      (get key)
+      .get))
+
+(defmacro with-tracked-cluster
+  [[cluster-sym & cluster-args] & body]
+  `(let [id# (uuid)]
+     (RegisteredGlobalState/setState
+       id#
+       (doto (ConcurrentHashMap.)
+         (.put "spout-emitted" (AtomicInteger. 0))
+         (.put "transferred" (AtomicInteger. 0))
+         (.put "processed" (AtomicInteger. 0))))
+     (with-var-roots
+       [acker/mk-acker-bolt
+        (let [old# acker/mk-acker-bolt]
+          (fn [& args#] (NonRichBoltTracker. (apply old# args#) id#)))
+        ;; critical that this particular function is overridden here,
+        ;; since the transferred stat needs to be incremented at the moment
+        ;; of tuple emission (and not on a separate thread later) for
+        ;; topologies to be tracked correctly. This is because "transferred" *must*
+        ;; be incremented before "processing".
+        executor/mk-executor-transfer-fn
+        (let [old# executor/mk-executor-transfer-fn]
+          (fn [& args#]
+            (let [transferrer# (apply old# args#)]
+              (fn [& args2#]
+                ;; (log-message "Transferring: " transfer-args#)
+                (increment-global! id# "transferred" 1)
+                (apply transferrer# args2#)))))]
+       (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
+                           (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
+                             ~@body)))
+     (RegisteredGlobalState/clearState id#)))
+
+(defn tracked-wait
+  "Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
+  ([tracked-topology]
+     (tracked-wait tracked-topology 1 TEST-TIMEOUT-MS))
+  ([tracked-topology amt]
+     (tracked-wait tracked-topology amt TEST-TIMEOUT-MS))
+  ([tracked-topology amt timeout-ms]
+    (let [target (+ amt @(:last-spout-emit tracked-topology))
+          track-id (-> tracked-topology :cluster ::track-id)
+          waiting? (fn []
+                     (or (not= target (global-amt track-id "spout-emitted"))
+                         (not= (global-amt track-id "transferred")
+                               (global-amt track-id "processed"))))]
+      (while-timeout timeout-ms (waiting?)
+                     ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
+                     ;; (println "Processed: " (global-amt track-id "processed"))
+                     ;; (println "Transferred: " (global-amt track-id "transferred"))
+                    (Thread/sleep (rand-int 200)))
+      (reset! (:last-spout-emit tracked-topology) target))))
+
+(defnk test-tuple
+  [values
+   :stream Utils/DEFAULT_STREAM_ID
+   :component "component"
+   :fields nil]
+  (let [fields (or fields
+                   (->> (iterate inc 1)
+                        (take (count values))
+                        (map #(str "field" %))))
+        spout-spec (mk-spout-spec* (TestWordSpout.)
+                                   {stream fields})
+        topology (StormTopology. {component spout-spec} {} {})
+        context (TopologyContext.
+                  topology
+                  (read-storm-config)
+                  {(int 1) component}
+                  {component [(int 1)]}
+                  {component {stream (Fields. fields)}}
+                  "test-storm-id"
+                  nil
+                  nil
+                  (int 1)
+                  nil
+                  [(int 1)]
+                  {}
+                  {}
+                  (HashMap.)
+                  (HashMap.)
+                  (atom false))]
+    (TupleImpl. context values 1 stream)))
+
+(defmacro with-timeout
+  [millis unit & body]
+  `(let [f# (future ~@body)]
+     (try
+       (.get f# ~millis ~unit)
+       (finally (future-cancel f#)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/testing4j.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing4j.clj b/storm-core/src/clj/org/apache/storm/testing4j.clj
new file mode 100644
index 0000000..5850262
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/testing4j.clj
@@ -0,0 +1,184 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.testing4j
+  (:import [java.util Map List Collection ArrayList])
+  (:require [org.apache.storm [LocalCluster :as LocalCluster]])
+  (:import [org.apache.storm Config ILocalCluster LocalCluster])
+  (:import [org.apache.storm.generated StormTopology])
+  (:import [org.apache.storm.daemon nimbus])
+  (:import [org.apache.storm.testing TestJob MockedSources TrackedTopology
+            MkClusterParam CompleteTopologyParam MkTupleParam])
+  (:import [org.apache.storm.utils Utils])
+  (:use [org.apache.storm testing util log])
+  (:gen-class
+   :name org.apache.storm.Testing
+   :methods [^:static [completeTopology
+                       [org.apache.storm.ILocalCluster  org.apache.storm.generated.StormTopology
+                        org.apache.storm.testing.CompleteTopologyParam]
+                       java.util.Map]
+             ^:static [completeTopology
+                       [org.apache.storm.ILocalCluster org.apache.storm.generated.StormTopology]
+                       java.util.Map]
+             ^:static [withSimulatedTime [Runnable] void]
+             ^:static [withLocalCluster [org.apache.storm.testing.TestJob] void]
+             ^:static [withLocalCluster [org.apache.storm.testing.MkClusterParam org.apache.storm.testing.TestJob] void]
+             ^:static [getLocalCluster [java.util.Map] org.apache.storm.ILocalCluster]
+             ^:static [withSimulatedTimeLocalCluster [org.apache.storm.testing.TestJob] void]
+             ^:static [withSimulatedTimeLocalCluster [org.apache.storm.testing.MkClusterParam org.apache.storm.testing.TestJob] void]
+             ^:static [withTrackedCluster [org.apache.storm.testing.TestJob] void]
+             ^:static [withTrackedCluster [org.apache.storm.testing.MkClusterParam org.apache.storm.testing.TestJob] void]
+             ^:static [readTuples [java.util.Map String String] java.util.List]
+             ^:static [readTuples [java.util.Map String] java.util.List]
+             ^:static [mkTrackedTopology [org.apache.storm.ILocalCluster org.apache.storm.generated.StormTopology] org.apache.storm.testing.TrackedTopology]
+             ^:static [trackedWait [org.apache.storm.testing.TrackedTopology] void]
+             ^:static [trackedWait [org.apache.storm.testing.TrackedTopology Integer] void]
+             ^:static [trackedWait [org.apache.storm.testing.TrackedTopology Integer Integer] void]
+             ^:static [advanceClusterTime [org.apache.storm.ILocalCluster Integer Integer] void]
+             ^:static [advanceClusterTime [org.apache.storm.ILocalCluster Integer] void]
+             ^:static [multiseteq [java.util.Collection java.util.Collection] boolean]
+             ^:static [multiseteq [java.util.Map java.util.Map] boolean]
+             ^:static [testTuple [java.util.List] org.apache.storm.tuple.Tuple]
+             ^:static [testTuple [java.util.List org.apache.storm.testing.MkTupleParam] org.apache.storm.tuple.Tuple]]))
+
+(defn -completeTopology
+  ([^ILocalCluster cluster ^StormTopology topology ^CompleteTopologyParam completeTopologyParam]
+    (let [mocked-sources (or (-> completeTopologyParam .getMockedSources .getData) {})
+          storm-conf (or (.getStormConf completeTopologyParam) {})
+          cleanup-state (or (.getCleanupState completeTopologyParam) true)
+          topology-name (.getTopologyName completeTopologyParam)
+          timeout-ms (or (.getTimeoutMs completeTopologyParam) TEST-TIMEOUT-MS)]
+      (complete-topology (.getState cluster) topology
+        :mock-sources mocked-sources
+        :storm-conf storm-conf
+        :cleanup-state cleanup-state
+        :topology-name topology-name
+        :timeout-ms timeout-ms)))
+  ([^ILocalCluster cluster ^StormTopology topology]
+    (-completeTopology cluster topology (CompleteTopologyParam.))))
+
+
+(defn -withSimulatedTime
+  [^Runnable code]
+  (with-simulated-time
+    (.run code)))
+
+(defmacro with-cluster
+  [cluster-type mkClusterParam code]
+  `(let [supervisors# (or (.getSupervisors ~mkClusterParam) 2)
+         ports-per-supervisor# (or (.getPortsPerSupervisor ~mkClusterParam) 3)
+         daemon-conf# (or (.getDaemonConf ~mkClusterParam) {})]
+     (~cluster-type [cluster# :supervisors supervisors#
+                     :ports-per-supervisor ports-per-supervisor#
+                     :daemon-conf daemon-conf#]
+                    (let [cluster# (LocalCluster. cluster#)]
+                      (.run ~code cluster#)))))
+
+(defn -withLocalCluster
+  ([^MkClusterParam mkClusterParam ^TestJob code]
+     (with-cluster with-local-cluster mkClusterParam code))
+  ([^TestJob code]
+     (-withLocalCluster (MkClusterParam.) code)))
+
+(defn -getLocalCluster
+  ([^Map clusterConf]
+     (let [daemon-conf (get-in clusterConf ["daemon-conf"] {})
+           supervisors (get-in clusterConf ["supervisors"] 2)
+           ports-per-supervisor (get-in clusterConf ["ports-per-supervisor"] 3)
+           inimbus (get-in clusterConf ["inimbus"] nil)
+           supervisor-slot-port-min (get-in clusterConf ["supervisor-slot-port-min"] 1024)
+           nimbus-daemon (get-in clusterConf ["nimbus-daemon"] false)
+           local-cluster-map (mk-local-storm-cluster :supervisors supervisors
+                                                     :ports-per-supervisor ports-per-supervisor
+                                                     :daemon-conf daemon-conf
+                                                     :inimbus inimbus
+                                                     :supervisor-slot-port-min supervisor-slot-port-min
+                                                     :nimbus-daemon nimbus-daemon
+                                                     )]
+       (LocalCluster. local-cluster-map))))
+
+(defn -withSimulatedTimeLocalCluster
+  ([^MkClusterParam mkClusterParam ^TestJob code]
+     (with-cluster with-simulated-time-local-cluster mkClusterParam code))
+  ([^TestJob code]
+     (-withSimulatedTimeLocalCluster (MkClusterParam.) code)))
+
+(defn -withTrackedCluster
+  ([^MkClusterParam mkClusterParam ^TestJob code]
+     (with-cluster with-tracked-cluster mkClusterParam code))
+  ([^TestJob code]
+     (-withTrackedCluster (MkClusterParam.) code)))
+
+(defn- find-tuples
+  [^List fixed-tuples ^String stream]
+  (let [ret (ArrayList.)]
+    (doseq [fixed-tuple fixed-tuples]
+      (if (= (.stream fixed-tuple) stream)
+        (.add ret (.values fixed-tuple))))
+    ret))
+
+(defn -readTuples
+  ([^Map result ^String componentId ^String streamId]
+   (let [stream-result (.get result componentId)
+         ret (if stream-result
+               (find-tuples stream-result streamId)
+               [])]
+     ret))
+  ([^Map result ^String componentId]
+   (-readTuples result componentId Utils/DEFAULT_STREAM_ID)))
+
+(defn -mkTrackedTopology
+  [^ILocalCluster trackedCluster ^StormTopology topology]
+  (-> (mk-tracked-topology (.getState trackedCluster) topology)
+      (TrackedTopology.)))
+
+(defn -trackedWait
+  ([^TrackedTopology trackedTopology ^Integer amt ^Integer timeout-ms]
+   (tracked-wait trackedTopology amt timeout-ms))
+  ([^TrackedTopology trackedTopology ^Integer amt]
+   (tracked-wait trackedTopology amt))
+  ([^TrackedTopology trackedTopology]
+   (-trackedWait trackedTopology 1)))
+
+(defn -advanceClusterTime
+  ([^ILocalCluster cluster ^Integer secs ^Integer step]
+   (advance-cluster-time (.getState cluster) secs step))
+  ([^ILocalCluster cluster ^Integer secs]
+   (-advanceClusterTime cluster secs 1)))
+
+(defn- multiseteq
+  [^Object obj1 ^Object obj2]
+  (let [obj1 (clojurify-structure obj1)
+        obj2 (clojurify-structure obj2)]
+    (ms= obj1 obj2)))
+
+(defn -multiseteq
+  [^Collection coll1 ^Collection coll2]
+  (multiseteq coll1 coll2))
+
+(defn -multiseteq
+  [^Map coll1 ^Map coll2]
+  (multiseteq coll1 coll2))
+
+(defn -testTuple
+  ([^List values]
+   (-testTuple values nil))
+  ([^List values ^MkTupleParam param]
+   (if (nil? param)
+     (test-tuple values)
+     (let [stream (or (.getStream param) Utils/DEFAULT_STREAM_ID)
+           component (or (.getComponent param) "component")
+           fields (.getFields param)]
+       (test-tuple values :stream stream :component component :fields fields)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/thrift.clj b/storm-core/src/clj/org/apache/storm/thrift.clj
new file mode 100644
index 0000000..47e233a
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/thrift.clj
@@ -0,0 +1,284 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.thrift
+  (:import [java.util HashMap]
+           [java.io Serializable]
+           [org.apache.storm.generated NodeInfo Assignment])
+  (:import [org.apache.storm.generated JavaObject Grouping Nimbus StormTopology
+            StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface
+            ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
+            GlobalStreamId ComponentObject ComponentObject$_Fields
+            ShellComponent SupervisorInfo])
+  (:import [org.apache.storm.utils Utils NimbusClient])
+  (:import [org.apache.storm Constants])
+  (:import [org.apache.storm.security.auth ReqContext])
+  (:import [org.apache.storm.grouping CustomStreamGrouping])
+  (:import [org.apache.storm.topology TopologyBuilder])
+  (:import [org.apache.storm.clojure RichShellBolt RichShellSpout])
+  (:import [org.apache.thrift.transport TTransport])
+  (:use [org.apache.storm util config log zookeeper]))
+
+(defn instantiate-java-object
+  [^JavaObject obj]
+  (let [name (symbol (.get_full_class_name obj))
+        args (map (memfn getFieldValue) (.get_args_list obj))]
+    (eval `(new ~name ~@args))))
+
+(def grouping-constants
+  {Grouping$_Fields/FIELDS :fields
+   Grouping$_Fields/SHUFFLE :shuffle
+   Grouping$_Fields/ALL :all
+   Grouping$_Fields/NONE :none
+   Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized
+   Grouping$_Fields/CUSTOM_OBJECT :custom-object
+   Grouping$_Fields/DIRECT :direct
+   Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle})
+
+(defn grouping-type
+  [^Grouping grouping]
+  (grouping-constants (.getSetField grouping)))
+
+(defn field-grouping
+  [^Grouping grouping]
+  (when-not (= (grouping-type grouping) :fields)
+    (throw (IllegalArgumentException. "Tried to get grouping fields from non fields grouping")))
+  (.get_fields grouping))
+
+(defn global-grouping?
+  [^Grouping grouping]
+  (and (= :fields (grouping-type grouping))
+       (empty? (field-grouping grouping))))
+
+(defn parallelism-hint
+  [^ComponentCommon component-common]
+  (let [phint (.get_parallelism_hint component-common)]
+    (if-not (.is_set_parallelism_hint component-common) 1 phint)))
+
+(defn nimbus-client-and-conn
+  ([host port]
+    (nimbus-client-and-conn host port nil))
+  ([host port as-user]
+  (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user)
+  (let [conf (read-storm-config)
+        nimbusClient (NimbusClient. conf host port nil as-user)
+        client (.getClient nimbusClient)
+        transport (.transport nimbusClient)]
+        [client transport] )))
+
+(defmacro with-nimbus-connection
+  [[client-sym host port] & body]
+  `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)]
+    (try
+      ~@body
+    (finally (.close conn#)))))
+
+(defmacro with-configured-nimbus-connection
+  [client-sym & body]
+  `(let [conf# (read-storm-config)
+         context# (ReqContext/context)
+         user# (if (.principal context#) (.getName (.principal context#)))
+         nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#)
+         ~client-sym (.getClient nimbusClient#)
+         conn# (.transport nimbusClient#)
+         ]
+     (try
+       ~@body
+     (finally (.close conn#)))))
+
+(defn direct-output-fields
+  [fields]
+  (StreamInfo. fields true))
+
+(defn output-fields
+  [fields]
+  (StreamInfo. fields false))
+
+(defn mk-output-spec
+  [output-spec]
+  (let [output-spec (if (map? output-spec)
+                      output-spec
+                      {Utils/DEFAULT_STREAM_ID output-spec})]
+    (map-val
+      (fn [out]
+        (if (instance? StreamInfo out)
+          out
+          (StreamInfo. out false)))
+      output-spec)))
+
+(defnk mk-plain-component-common
+  [inputs output-spec parallelism-hint :conf nil]
+  (let [ret (ComponentCommon. (HashMap. inputs) (HashMap. (mk-output-spec output-spec)))]
+    (when parallelism-hint
+      (.set_parallelism_hint ret parallelism-hint))
+    (when conf
+      (.set_json_conf ret (to-json conf)))
+    ret))
+
+(defnk mk-spout-spec*
+  [spout outputs :p nil :conf nil]
+  (SpoutSpec. (ComponentObject/serialized_java (Utils/javaSerialize spout))
+              (mk-plain-component-common {} outputs p :conf conf)))
+
+(defn mk-shuffle-grouping
+  []
+  (Grouping/shuffle (NullStruct.)))
+
+(defn mk-local-or-shuffle-grouping
+  []
+  (Grouping/local_or_shuffle (NullStruct.)))
+
+(defn mk-fields-grouping
+  [fields]
+  (Grouping/fields fields))
+
+(defn mk-global-grouping
+  []
+  (mk-fields-grouping []))
+
+(defn mk-direct-grouping
+  []
+  (Grouping/direct (NullStruct.)))
+
+(defn mk-all-grouping
+  []
+  (Grouping/all (NullStruct.)))
+
+(defn mk-none-grouping
+  []
+  (Grouping/none (NullStruct.)))
+
+(defn deserialized-component-object
+  [^ComponentObject obj]
+  (when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA)
+    (throw (RuntimeException. "Cannot deserialize non-java-serialized object")))
+  (Utils/javaDeserialize (.get_serialized_java obj) Serializable))
+
+(defn serialize-component-object
+  [obj]
+  (ComponentObject/serialized_java (Utils/javaSerialize obj)))
+
+(defn- mk-grouping
+  [grouping-spec]
+  (cond (nil? grouping-spec)
+        (mk-none-grouping)
+
+        (instance? Grouping grouping-spec)
+        grouping-spec
+
+        (instance? CustomStreamGrouping grouping-spec)
+        (Grouping/custom_serialized (Utils/javaSerialize grouping-spec))
+
+        (instance? JavaObject grouping-spec)
+        (Grouping/custom_object grouping-spec)
+
+        (sequential? grouping-spec)
+        (mk-fields-grouping grouping-spec)
+
+        (= grouping-spec :shuffle)
+        (mk-shuffle-grouping)
+
+        (= grouping-spec :local-or-shuffle)
+        (mk-local-or-shuffle-grouping)
+        (= grouping-spec :none)
+        (mk-none-grouping)
+
+        (= grouping-spec :all)
+        (mk-all-grouping)
+
+        (= grouping-spec :global)
+        (mk-global-grouping)
+
+        (= grouping-spec :direct)
+        (mk-direct-grouping)
+
+        true
+        (throw (IllegalArgumentException.
+                 (str grouping-spec " is not a valid grouping")))))
+
+(defn- mk-inputs
+  [inputs]
+  (into {} (for [[stream-id grouping-spec] inputs]
+             [(if (sequential? stream-id)
+                (GlobalStreamId. (first stream-id) (second stream-id))
+                (GlobalStreamId. stream-id Utils/DEFAULT_STREAM_ID))
+              (mk-grouping grouping-spec)])))
+
+(defnk mk-bolt-spec*
+  [inputs bolt outputs :p nil :conf nil]
+  (let [common (mk-plain-component-common (mk-inputs inputs) outputs p :conf conf)]
+    (Bolt. (ComponentObject/serialized_java (Utils/javaSerialize bolt))
+           common)))
+
+(defnk mk-spout-spec
+  [spout :parallelism-hint nil :p nil :conf nil]
+  (let [parallelism-hint (if p p parallelism-hint)]
+    {:obj spout :p parallelism-hint :conf conf}))
+
+(defn- shell-component-params
+  [command script-or-output-spec kwargs]
+  (if (string? script-or-output-spec)
+    [(into-array String [command script-or-output-spec])
+     (first kwargs)
+     (rest kwargs)]
+    [(into-array String command)
+     script-or-output-spec
+     kwargs]))
+
+(defnk mk-bolt-spec
+  [inputs bolt :parallelism-hint nil :p nil :conf nil]
+  (let [parallelism-hint (if p p parallelism-hint)]
+    {:obj bolt :inputs inputs :p parallelism-hint :conf conf}))
+
+(defn mk-shell-bolt-spec
+  [inputs command script-or-output-spec & kwargs]
+  (let [[command output-spec kwargs]
+        (shell-component-params command script-or-output-spec kwargs)]
+    (apply mk-bolt-spec inputs
+           (RichShellBolt. command (mk-output-spec output-spec)) kwargs)))
+
+(defn mk-shell-spout-spec
+  [command script-or-output-spec & kwargs]
+  (let [[command output-spec kwargs]
+        (shell-component-params command script-or-output-spec kwargs)]
+    (apply mk-spout-spec
+           (RichShellSpout. command (mk-output-spec output-spec)) kwargs)))
+
+(defn- add-inputs
+  [declarer inputs]
+  (doseq [[id grouping] (mk-inputs inputs)]
+    (.grouping declarer id grouping)))
+
+(defn mk-topology
+  ([spout-map bolt-map]
+   (let [builder (TopologyBuilder.)]
+     (doseq [[name {spout :obj p :p conf :conf}] spout-map]
+       (-> builder (.setSpout name spout (if-not (nil? p) (int p) p)) (.addConfigurations conf)))
+     (doseq [[name {bolt :obj p :p conf :conf inputs :inputs}] bolt-map]
+       (-> builder (.setBolt name bolt (if-not (nil? p) (int p) p)) (.addConfigurations conf) (add-inputs inputs)))
+     (.createTopology builder)))
+  ([spout-map bolt-map state-spout-map]
+   (mk-topology spout-map bolt-map)))
+
+;; clojurify-structure is needed or else every element becomes the same after successive calls
+;; don't know why this happens
+(def STORM-TOPOLOGY-FIELDS
+  (-> StormTopology/metaDataMap clojurify-structure keys))
+
+(def SPOUT-FIELDS
+  [StormTopology$_Fields/SPOUTS
+   StormTopology$_Fields/STATE_SPOUTS])
+

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/timer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/timer.clj b/storm-core/src/clj/org/apache/storm/timer.clj
new file mode 100644
index 0000000..0d8839e
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/timer.clj
@@ -0,0 +1,128 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.timer
+  (:import [org.apache.storm.utils Time])
+  (:import [java.util PriorityQueue Comparator Random])
+  (:import [java.util.concurrent Semaphore])
+  (:use [org.apache.storm util log]))
+
+;; The timer defined in this file is very similar to java.util.Timer, except
+;; it integrates with Storm's time simulation capabilities. This lets us test
+;; code that does asynchronous work on the timer thread
+
+(defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil]
+  (let [queue (PriorityQueue. 10 (reify Comparator
+                                   (compare
+                                     [this o1 o2]
+                                     (- (first o1) (first o2)))
+                                   (equals
+                                     [this obj]
+                                     true)))
+        active (atom true)
+        lock (Object.)
+        notifier (Semaphore. 0)
+        thread-name (if timer-name timer-name "timer")
+        timer-thread (Thread.
+                       (fn []
+                         (while @active
+                           (try
+                             (let [[time-millis _ _ :as elem] (locking lock (.peek queue))]
+                               (if (and elem (>= (current-time-millis) time-millis))
+                                 ;; It is imperative to not run the function
+                                 ;; inside the timer lock. Otherwise, it is
+                                 ;; possible to deadlock if the fn deals with
+                                 ;; other locks, like the submit lock.
+                                 (let [afn (locking lock (second (.poll queue)))]
+                                   (afn))
+                                 (if time-millis
+                                   ;; If any events are scheduled, sleep until
+                                   ;; event generation. If any recurring events
+                                   ;; are scheduled then we will always go
+                                   ;; through this branch, sleeping only the
+                                   ;; exact necessary amount of time. We give
+                                   ;; an upper bound, e.g. 1000 millis, to the
+                                   ;; sleeping time, to limit the response time
+                                   ;; for detecting any new event within 1 secs.
+                                   (Time/sleep (min 1000 (- time-millis (current-time-millis))))
+                                   ;; Otherwise poll to see if any new event
+                                   ;; was scheduled. This is, in essence, the
+                                   ;; response time for detecting any new event
+                                   ;; schedulings when there are no scheduled
+                                   ;; events.
+                                   (Time/sleep 1000))))
+                             (catch Throwable t
+                               ;; Because the interrupted exception can be
+                               ;; wrapped in a RuntimeException.
+                               (when-not (exception-cause? InterruptedException t)
+                                 (kill-fn t)
+                                 (reset! active false)
+                                 (throw t)))))
+                         (.release notifier)) thread-name)]
+    (.setDaemon timer-thread true)
+    (.setPriority timer-thread Thread/MAX_PRIORITY)
+    (.start timer-thread)
+    {:timer-thread timer-thread
+     :queue queue
+     :active active
+     :lock lock
+     :random (Random.)
+     :cancel-notifier notifier}))
+
+(defn- check-active!
+  [timer]
+  (when-not @(:active timer)
+    (throw (IllegalStateException. "Timer is not active"))))
+
+(defnk schedule
+  [timer delay-secs afn :check-active true :jitter-ms 0]
+  (when check-active (check-active! timer))
+  (let [id (uuid)
+        ^PriorityQueue queue (:queue timer)
+        end-time-ms (+ (current-time-millis) (secs-to-millis-long delay-secs))
+        end-time-ms (if (< 0 jitter-ms) (+ (.nextInt (:random timer) jitter-ms) end-time-ms) end-time-ms)]
+    (locking (:lock timer)
+      (.add queue [end-time-ms afn id]))))
+
+(defn schedule-recurring
+  [timer delay-secs recur-secs afn]
+  (schedule timer
+            delay-secs
+            (fn this []
+              (afn)
+              ; This avoids a race condition with cancel-timer.
+              (schedule timer recur-secs this :check-active false))))
+
+(defn schedule-recurring-with-jitter
+  [timer delay-secs recur-secs jitter-ms afn]
+  (schedule timer
+            delay-secs
+            (fn this []
+              (afn)
+              ; This avoids a race condition with cancel-timer.
+              (schedule timer recur-secs this :check-active false :jitter-ms jitter-ms))))
+
+(defn cancel-timer
+  [timer]
+  (check-active! timer)
+  (locking (:lock timer)
+    (reset! (:active timer) false)
+    (.interrupt (:timer-thread timer)))
+  (.acquire (:cancel-notifier timer)))
+
+(defn timer-waiting?
+  [timer]
+  (Time/isThreadWaiting (:timer-thread timer)))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/trident/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/trident/testing.clj b/storm-core/src/clj/org/apache/storm/trident/testing.clj
new file mode 100644
index 0000000..44e5ca9
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/trident/testing.clj
@@ -0,0 +1,79 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.trident.testing
+  (:require [org.apache.storm.LocalDRPC :as LocalDRPC])
+  (:import [org.apache.storm.trident.testing FeederBatchSpout FeederCommitterBatchSpout MemoryMapState MemoryMapState$Factory TuplifyArgs])
+  (:require [org.apache.storm [LocalDRPC]])
+  (:import [org.apache.storm LocalDRPC])
+  (:import [org.apache.storm.tuple Fields])
+  (:import [org.apache.storm.generated KillOptions])
+  (:require [org.apache.storm [testing :as t]])
+  (:use [org.apache.storm util])
+  )
+
+(defn local-drpc []
+  (LocalDRPC.))
+
+(defn exec-drpc [^LocalDRPC drpc function-name args]
+  (let [res (.execute drpc function-name args)]
+    (from-json res)))
+
+(defn exec-drpc-tuples [^LocalDRPC drpc function-name tuples]
+  (exec-drpc drpc function-name (to-json tuples)))
+
+(defn feeder-spout [fields]
+  (FeederBatchSpout. fields))
+
+(defn feeder-committer-spout [fields]
+  (FeederCommitterBatchSpout. fields))
+
+(defn feed [feeder tuples]
+  (.feed feeder tuples))
+
+(defn fields [& fields]
+  (Fields. fields))
+
+(defn memory-map-state []
+  (MemoryMapState$Factory.))
+
+(defmacro with-drpc [[drpc] & body]
+  `(let [~drpc (org.apache.storm.LocalDRPC.)]
+     ~@body
+     (.shutdown ~drpc)
+     ))
+
+(defn with-topology* [cluster topo body-fn]
+  (t/submit-local-topology (:nimbus cluster) "tester" {} (.build topo))
+  (body-fn)
+  (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0)))
+  )
+
+(defmacro with-topology [[cluster topo] & body]
+  `(with-topology* ~cluster ~topo (fn [] ~@body)))
+
+(defn bootstrap-imports []
+  (import 'org.apache.storm.LocalDRPC)
+  (import 'org.apache.storm.trident.TridentTopology)
+  (import '[org.apache.storm.trident.operation.builtin Count Sum Equals MapGet Debug FilterNull FirstN TupleCollectionGet])
+  )
+
+(defn drpc-tuples-input [topology function-name drpc outfields]
+  (-> topology
+      (.newDRPCStream function-name drpc)
+      (.each (fields "args") (TuplifyArgs.) outfields)
+      ))
+
+