You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/15 16:15:28 UTC

[4/6] storm git commit: STORM-1179: Create Maven Profiles for Integration Tests - Mark Clojure tests as integration tests

STORM-1179: Create Maven Profiles for Integration Tests
- Mark Clojure tests as integration tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d4fcc0fd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d4fcc0fd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d4fcc0fd

Branch: refs/heads/master
Commit: d4fcc0fde96c667d1763fe5626ddf1284e4a7f70
Parents: 8f1b4fb
Author: Hugo Louro <hm...@gmail.com>
Authored: Wed Dec 9 16:50:28 2015 -0800
Committer: Hugo Louro <hm...@gmail.com>
Committed: Wed Dec 9 18:58:22 2015 -0800

----------------------------------------------------------------------
 .../clj/backtype/storm/integration_test.clj     | 622 -------------------
 .../test/clj/backtype/storm/testing4j_test.clj  | 212 -------
 .../backtype/storm/integration_test.clj         | 622 +++++++++++++++++++
 .../backtype/storm/testing4j_test.clj           | 212 +++++++
 .../storm/trident/integration_test.clj          | 292 +++++++++
 .../test/clj/storm/trident/integration_test.clj | 292 ---------
 6 files changed, 1126 insertions(+), 1126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/backtype/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/integration_test.clj b/storm-core/test/clj/backtype/storm/integration_test.clj
deleted file mode 100644
index cc0208d..0000000
--- a/storm-core/test/clj/backtype/storm/integration_test.clj
+++ /dev/null
@@ -1,622 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.integration-test
-  (:use [clojure test])
-  (:import [backtype.storm Config])
-  (:import [backtype.storm.topology TopologyBuilder])
-  (:import [backtype.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions])
-  (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount
-            TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
-  (:import [backtype.storm.tuple Fields])
-  (:use [backtype.storm testing config clojure util])
-  (:use [backtype.storm.daemon common])
-  (:require [backtype.storm [thrift :as thrift]]))
-
-(deftest test-basic-topology
-  (doseq [zmq-on? [true false]]
-    (with-simulated-time-local-cluster [cluster :supervisors 4
-                                        :daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
-      (let [topology (thrift/mk-topology
-                      {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                      {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
-                       "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
-                       "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
-                       })
-            results (complete-topology cluster
-                                       topology
-                                       :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
-                                       :storm-conf {TOPOLOGY-WORKERS 2})]
-        (is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
-                 (read-tuples results "1")))
-        (is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
-                 (read-tuples results "2")))
-        (is (= [[1] [2] [3] [4]]
-               (read-tuples results "3")))
-        (is (= [[1] [2] [3] [4]]
-               (read-tuples results "4")))
-        ))))
-
-(defbolt emit-task-id ["tid"] {:prepare true}
-  [conf context collector]
-  (let [tid (.getThisTaskIndex context)]
-    (bolt
-      (execute [tuple]
-        (emit-bolt! collector [tid] :anchor tuple)
-        (ack! collector tuple)
-        ))))
-
-(deftest test-multi-tasks-per-executor
-  (with-simulated-time-local-cluster [cluster :supervisors 4]
-    (let [topology (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true))}
-                    {"2" (thrift/mk-bolt-spec {"1" :all} emit-task-id
-                      :parallelism-hint 3
-                      :conf {TOPOLOGY-TASKS 6})
-                     })
-          results (complete-topology cluster
-                                     topology
-                                     :mock-sources {"1" [["a"]]})]
-      (is (ms= [[0] [1] [2] [3] [4] [5]]
-               (read-tuples results "2")))
-      )))
-
-(defbolt ack-every-other {} {:prepare true}
-  [conf context collector]
-  (let [state (atom -1)]
-    (bolt
-      (execute [tuple]
-        (let [val (swap! state -)]
-          (when (pos? val)
-            (ack! collector tuple)
-            ))))))
-
-(defn assert-loop [afn ids]
-  (while (not (every? afn ids))
-    (Thread/sleep 1)))
-
-(defn assert-acked [tracker & ids]
-  (assert-loop #(.isAcked tracker %) ids))
-
-(defn assert-failed [tracker & ids]
-  (assert-loop #(.isFailed tracker %) ids))
-
-(deftest test-timeout
-  (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
-    (let [feeder (feeder-spout ["field1"])
-          tracker (AckFailMapTracker.)
-          _ (.setAckFailDelegate feeder tracker)
-          topology (thrift/mk-topology
-                     {"1" (thrift/mk-spout-spec feeder)}
-                     {"2" (thrift/mk-bolt-spec {"1" :global} ack-every-other)})]      
-      (submit-local-topology (:nimbus cluster)
-                             "timeout-tester"
-                             {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
-                             topology)
-      (advance-cluster-time cluster 11)
-      (.feed feeder ["a"] 1)
-      (.feed feeder ["b"] 2)
-      (.feed feeder ["c"] 3)
-      (advance-cluster-time cluster 9)
-      (assert-acked tracker 1 3)
-      (is (not (.isFailed tracker 2)))
-      (advance-cluster-time cluster 12)
-      (assert-failed tracker 2)
-      )))
-
-(defn mk-validate-topology-1 []
-  (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                    {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
-
-(defn mk-invalidate-topology-1 []
-  (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                    {"2" (thrift/mk-bolt-spec {"3" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
-
-(defn mk-invalidate-topology-2 []
-  (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                    {"2" (thrift/mk-bolt-spec {"1" ["non-exists-field"]} (TestWordCounter.) :parallelism-hint 4)}))
-
-(defn mk-invalidate-topology-3 []
-  (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                    {"2" (thrift/mk-bolt-spec {["1" "non-exists-stream"] ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
-
-(defn try-complete-wc-topology [cluster topology]
-  (try (do
-         (complete-topology cluster
-                            topology
-                            :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
-                            :storm-conf {TOPOLOGY-WORKERS 2})
-         false)
-       (catch InvalidTopologyException e true)))
-
-(deftest test-validate-topology-structure
-  (with-simulated-time-local-cluster [cluster :supervisors 4]
-    (let [any-error1? (try-complete-wc-topology cluster (mk-validate-topology-1))
-          any-error2? (try-complete-wc-topology cluster (mk-invalidate-topology-1))
-          any-error3? (try-complete-wc-topology cluster (mk-invalidate-topology-2))
-          any-error4? (try-complete-wc-topology cluster (mk-invalidate-topology-3))]
-      (is (= any-error1? false))
-      (is (= any-error2? true))
-      (is (= any-error3? true))
-      (is (= any-error4? true)))))
-
-(defbolt identity-bolt ["num"]
-  [tuple collector]
-  (emit-bolt! collector (.getValues tuple) :anchor tuple)
-  (ack! collector tuple))
-
-(deftest test-system-stream
-  ;; this test works because mocking a spout splits up the tuples evenly among the tasks
-  (with-simulated-time-local-cluster [cluster]
-      (let [topology (thrift/mk-topology
-                      {"1" (thrift/mk-spout-spec (TestWordSpout. true) :p 3)}
-                      {"2" (thrift/mk-bolt-spec {"1" ["word"] ["1" "__system"] :global} identity-bolt :p 1)
-                       })
-            results (complete-topology cluster
-                                       topology
-                                       :mock-sources {"1" [["a"] ["b"] ["c"]]}
-                                       :storm-conf {TOPOLOGY-WORKERS 2})]
-        (is (ms= [["a"] ["b"] ["c"] ["startup"] ["startup"] ["startup"]]
-                 (read-tuples results "2")))
-        )))
-
-(defn ack-tracking-feeder [fields]
-  (let [tracker (AckTracker.)]
-    [(doto (feeder-spout fields)
-       (.setAckFailDelegate tracker))
-     (fn [val]
-       (is (= (.getNumAcks tracker) val))
-       (.resetNumAcks tracker)
-       )]
-    ))
-
-(defbolt branching-bolt ["num"]
-  {:params [amt]}
-  [tuple collector]
-  (doseq [i (range amt)]
-    (emit-bolt! collector [i] :anchor tuple))
-  (ack! collector tuple))
-
-(defbolt agg-bolt ["num"] {:prepare true :params [amt]}
-  [conf context collector]
-  (let [seen (atom [])]
-    (bolt
-      (execute [tuple]
-        (swap! seen conj tuple)
-        (when (= (count @seen) amt)
-          (emit-bolt! collector [1] :anchor @seen)
-          (doseq [s @seen]
-            (ack! collector s))
-          (reset! seen [])
-          )))
-      ))
-
-(defbolt ack-bolt {}
-  [tuple collector]
-  (ack! collector tuple))
-
-(deftest test-acking
-  (with-tracked-cluster [cluster]
-    (let [[feeder1 checker1] (ack-tracking-feeder ["num"])
-          [feeder2 checker2] (ack-tracking-feeder ["num"])
-          [feeder3 checker3] (ack-tracking-feeder ["num"])
-          tracked (mk-tracked-topology
-                   cluster
-                   (topology
-                     {"1" (spout-spec feeder1)
-                      "2" (spout-spec feeder2)
-                      "3" (spout-spec feeder3)}
-                     {"4" (bolt-spec {"1" :shuffle} (branching-bolt 2))
-                      "5" (bolt-spec {"2" :shuffle} (branching-bolt 4))
-                      "6" (bolt-spec {"3" :shuffle} (branching-bolt 1))
-                      "7" (bolt-spec
-                            {"4" :shuffle
-                            "5" :shuffle
-                            "6" :shuffle}
-                            (agg-bolt 3))
-                      "8" (bolt-spec {"7" :shuffle} (branching-bolt 2))
-                      "9" (bolt-spec {"8" :shuffle} ack-bolt)}
-                     ))]
-      (submit-local-topology (:nimbus cluster)
-                             "acking-test1"
-                             {}
-                             (:topology tracked))
-      (advance-cluster-time cluster 11)
-      (.feed feeder1 [1])
-      (tracked-wait tracked 1)
-      (checker1 0)
-      (.feed feeder2 [1])
-      (tracked-wait tracked 1)
-      (checker1 1)
-      (checker2 1)
-      (.feed feeder1 [1])
-      (tracked-wait tracked 1)
-      (checker1 0)
-      (.feed feeder1 [1])
-      (tracked-wait tracked 1)
-      (checker1 1)
-      (.feed feeder3 [1])
-      (tracked-wait tracked 1)
-      (checker1 0)
-      (checker3 0)
-      (.feed feeder2 [1])
-      (tracked-wait tracked 1)
-      (checker1 1)
-      (checker2 1)
-      (checker3 1)
-      
-      )))
-
-(deftest test-ack-branching
-  (with-tracked-cluster [cluster]
-    (let [[feeder checker] (ack-tracking-feeder ["num"])
-          tracked (mk-tracked-topology
-                   cluster
-                   (topology
-                     {"1" (spout-spec feeder)}
-                     {"2" (bolt-spec {"1" :shuffle} identity-bolt)
-                      "3" (bolt-spec {"1" :shuffle} identity-bolt)
-                      "4" (bolt-spec
-                            {"2" :shuffle
-                             "3" :shuffle}
-                             (agg-bolt 4))}))]
-      (submit-local-topology (:nimbus cluster)
-                             "test-acking2"
-                             {}
-                             (:topology tracked))
-      (advance-cluster-time cluster 11)
-      (.feed feeder [1])
-      (tracked-wait tracked 1)
-      (checker 0)
-      (.feed feeder [1])
-      (tracked-wait tracked 1)
-      (checker 2)
-      )))
-
-(defbolt dup-anchor ["num"]
-  [tuple collector]
-  (emit-bolt! collector [1] :anchor [tuple tuple])
-  (ack! collector tuple))
-
-(def bolt-prepared? (atom false))
-(defbolt prepare-tracked-bolt [] {:prepare true}
-  [conf context collector]  
-  (reset! bolt-prepared? true)
-  (bolt
-   (execute [tuple]
-            (ack! collector tuple))))
-
-(def spout-opened? (atom false))
-(defspout open-tracked-spout ["val"]
-  [conf context collector]
-  (reset! spout-opened? true)
-  (spout
-   (nextTuple [])))
-
-(deftest test-submit-inactive-topology
-  (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
-    (let [feeder (feeder-spout ["field1"])
-          tracker (AckFailMapTracker.)
-          _ (.setAckFailDelegate feeder tracker)
-          topology (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec feeder)
-                     "2" (thrift/mk-spout-spec open-tracked-spout)}
-                    {"3" (thrift/mk-bolt-spec {"1" :global} prepare-tracked-bolt)})]
-      (reset! bolt-prepared? false)
-      (reset! spout-opened? false)      
-      
-      (submit-local-topology-with-opts (:nimbus cluster)
-        "test"
-        {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
-        topology
-        (SubmitOptions. TopologyInitialStatus/INACTIVE))
-      (advance-cluster-time cluster 11)
-      (.feed feeder ["a"] 1)
-      (advance-cluster-time cluster 9)
-      (is (not @bolt-prepared?))
-      (is (not @spout-opened?))        
-      (.activate (:nimbus cluster) "test")              
-      
-      (advance-cluster-time cluster 12)
-      (assert-acked tracker 1)
-      (is @bolt-prepared?)
-      (is @spout-opened?))))
-
-(deftest test-acking-self-anchor
-  (with-tracked-cluster [cluster]
-    (let [[feeder checker] (ack-tracking-feeder ["num"])
-          tracked (mk-tracked-topology
-                   cluster
-                   (topology
-                     {"1" (spout-spec feeder)}
-                     {"2" (bolt-spec {"1" :shuffle} dup-anchor)
-                      "3" (bolt-spec {"2" :shuffle} ack-bolt)}))]
-      (submit-local-topology (:nimbus cluster)
-                             "test"
-                             {}
-                             (:topology tracked))
-      (advance-cluster-time cluster 11)
-      (.feed feeder [1])
-      (tracked-wait tracked 1)
-      (checker 1)
-      (.feed feeder [1])
-      (.feed feeder [1])
-      (.feed feeder [1])
-      (tracked-wait tracked 3)
-      (checker 3)
-      )))
-
-;; (defspout ConstantSpout ["val"] {:prepare false}
-;;   [collector]
-;;   (Time/sleep 100)
-;;   (emit-spout! collector [1]))
-
-;; (def errored (atom false))
-;; (def restarted (atom false))
-
-;; (defbolt local-error-checker {} [tuple collector]
-;;   (when-not @errored
-;;     (reset! errored true)
-;;     (println "erroring")
-;;     (throw (RuntimeException.)))
-;;   (when-not @restarted (println "restarted"))
-;;   (reset! restarted true))
-
-;; (deftest test-no-halt-local-mode
-;;   (with-simulated-time-local-cluster [cluster]
-;;       (let [topology (topology
-;;                       {1 (spout-spec ConstantSpout)}
-;;                       {2 (bolt-spec {1 :shuffle} local-error-checker)
-;;                        })]
-;;         (submit-local-topology (:nimbus cluster)
-;;                                "test"
-;;                                {}
-;;                                topology)
-;;         (while (not @restarted)
-;;           (advance-time-ms! 100))
-;;         )))
-
-(defspout IncSpout ["word"]
-  [conf context collector]
-  (let [state (atom 0)]
-    (spout
-     (nextTuple []
-       (Thread/sleep 100)
-       (emit-spout! collector [@state] :id 1)         
-       )
-     (ack [id]
-       (swap! state inc))
-     )))
-
-
-(defspout IncSpout2 ["word"] {:params [prefix]}
-  [conf context collector]
-  (let [state (atom 0)]
-    (spout
-     (nextTuple []
-       (Thread/sleep 100)
-       (swap! state inc)
-       (emit-spout! collector [(str prefix "-" @state)])         
-       )
-     )))
-
-;; (deftest test-clojure-spout
-;;   (with-local-cluster [cluster]
-;;     (let [nimbus (:nimbus cluster)
-;;           top (topology
-;;                {1 (spout-spec IncSpout)}
-;;                {}
-;;                )]
-;;       (submit-local-topology nimbus
-;;                              "spout-test"
-;;                              {TOPOLOGY-DEBUG true
-;;                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 3}
-;;                              top)
-;;       (Thread/sleep 10000)
-;;       (.killTopology nimbus "spout-test")
-;;       (Thread/sleep 10000)
-;;       )))
-
-(deftest test-kryo-decorators-config
-  (with-simulated-time-local-cluster [cluster
-                                      :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
-                                                    TOPOLOGY-KRYO-DECORATORS ["this-is-overriden"]}]
-    (letlocals
-     (bind builder (TopologyBuilder.))
-     (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
-     (-> builder
-         (.setBolt "2"
-                   (TestConfBolt.
-                    {TOPOLOGY-KRYO-DECORATORS ["one" "two"]}))
-         (.shuffleGrouping "1"))
-     
-     (bind results
-           (complete-topology cluster
-                              (.createTopology builder)
-                              :storm-conf {TOPOLOGY-KRYO-DECORATORS ["one" "three"]}
-                              :mock-sources {"1" [[TOPOLOGY-KRYO-DECORATORS]]}))
-     (is (= {"topology.kryo.decorators" (list "one" "two" "three")}            
-            (->> (read-tuples results "2")
-                 (apply concat)
-                 (apply hash-map)))))))
-
-(deftest test-component-specific-config
-  (with-simulated-time-local-cluster [cluster
-                                      :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true}]
-    (letlocals
-     (bind builder (TopologyBuilder.))
-     (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
-     (-> builder
-         (.setBolt "2"
-                   (TestConfBolt.
-                    {"fake.config" 123
-                     TOPOLOGY-MAX-TASK-PARALLELISM 20
-                     TOPOLOGY-MAX-SPOUT-PENDING 30
-                     TOPOLOGY-KRYO-REGISTER [{"fake.type" "bad.serializer"}
-                                             {"fake.type2" "a.serializer"}]
-                     }))
-         (.shuffleGrouping "1")
-         (.setMaxTaskParallelism (int 2))
-         (.addConfiguration "fake.config2" 987)
-         )
-     
-
-     (bind results
-           (complete-topology cluster
-                              (.createTopology builder)
-                              :storm-conf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer" "fake.type3" "a.serializer3"}]}
-                              :mock-sources {"1" [["fake.config"]
-                                                  [TOPOLOGY-MAX-TASK-PARALLELISM]
-                                                  [TOPOLOGY-MAX-SPOUT-PENDING]
-                                                  ["fake.config2"]
-                                                  [TOPOLOGY-KRYO-REGISTER]
-                                                  ]}))
-     (is (= {"fake.config" 123
-             "fake.config2" 987
-             TOPOLOGY-MAX-TASK-PARALLELISM 2
-             TOPOLOGY-MAX-SPOUT-PENDING 30
-             TOPOLOGY-KRYO-REGISTER {"fake.type" "good.serializer"
-                                     "fake.type2" "a.serializer"
-                                     "fake.type3" "a.serializer3"}}
-            (->> (read-tuples results "2")
-                 (apply concat)
-                 (apply hash-map))
-            ))
-     )))
-
-(defbolt hooks-bolt ["emit" "ack" "fail" "executed"] {:prepare true}
-  [conf context collector]
-  (let [acked (atom 0)
-        failed (atom 0)
-        executed (atom 0)
-        emitted (atom 0)]
-    (.addTaskHook context
-                  (reify backtype.storm.hooks.ITaskHook
-                    (prepare [this conf context]
-                      )
-                    (cleanup [this]
-                      )
-                    (emit [this info]
-                      (swap! emitted inc))
-                    (boltAck [this info]
-                      (swap! acked inc))
-                    (boltFail [this info]
-                      (swap! failed inc))
-                    (boltExecute [this info]
-                      (swap! executed inc))
-                      ))
-    (bolt
-     (execute [tuple]
-        (emit-bolt! collector [@emitted @acked @failed @executed])
-        (if (= 0 (- @acked @failed))
-          (ack! collector tuple)
-          (fail! collector tuple))
-        ))))
-
-(deftest test-hooks
-  (with-simulated-time-local-cluster [cluster]
-    (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])))
-                              }
-                             {"2" (bolt-spec {"1" :shuffle}
-                                             hooks-bolt)
-                              })
-          results (complete-topology cluster
-                                     topology
-                                     :mock-sources {"1" [[1]
-                                                         [1]
-                                                         [1]
-                                                         [1]
-                                                         ]})]
-      (is (= [[0 0 0 0]
-              [2 1 0 1]
-              [4 1 1 2]
-              [6 2 1 3]]
-             (read-tuples results "2")
-             )))))
-
-(defbolt report-errors-bolt {}
-  [tuple collector]
-  (doseq [i (range (.getValue tuple 0))]
-    (report-error! collector (RuntimeException.)))
-  (ack! collector tuple))
-
-(deftest test-throttled-errors
-  (with-simulated-time
-    (with-tracked-cluster [cluster]
-      (let [state (:storm-cluster-state cluster)
-            [feeder checker] (ack-tracking-feeder ["num"])
-            tracked (mk-tracked-topology
-                     cluster
-                     (topology
-                       {"1" (spout-spec feeder)}
-                       {"2" (bolt-spec {"1" :shuffle} report-errors-bolt)}))
-            _       (submit-local-topology (:nimbus cluster)
-                                             "test-errors"
-                                             {TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS 10
-                                              TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL 4
-                                              TOPOLOGY-DEBUG true
-                                              }
-                                             (:topology tracked))
-            _ (advance-cluster-time cluster 11)
-            storm-id (get-storm-id state "test-errors")
-            errors-count (fn [] (count (.errors state storm-id "2")))]
-
-        (is (nil? (.last-error state storm-id "2")))
-
-        ;; so it launches the topology
-        (advance-cluster-time cluster 2)
-        (.feed feeder [6])
-        (tracked-wait tracked 1)
-        (is (= 4 (errors-count)))
-        (is (.last-error state storm-id "2"))
-        
-        (advance-time-secs! 5)
-        (.feed feeder [2])
-        (tracked-wait tracked 1)
-        (is (= 4 (errors-count)))
-        (is (.last-error state storm-id "2"))
-        
-        (advance-time-secs! 6)
-        (.feed feeder [2])
-        (tracked-wait tracked 1)
-        (is (= 6 (errors-count)))
-        (is (.last-error state storm-id "2"))
-        
-        (advance-time-secs! 6)
-        (.feed feeder [3])
-        (tracked-wait tracked 1)
-        (is (= 8 (errors-count)))
-        (is (.last-error state storm-id "2"))))))
-
-
-(deftest test-acking-branching-complex
-  ;; test acking with branching in the topology
-  )
-
-
-(deftest test-fields-grouping
-  ;; 1. put a shitload of random tuples through it and test that counts are right
-  ;; 2. test that different spouts with different phints group the same way
-  )
-
-(deftest test-all-grouping
-  )
-
-(deftest test-direct-grouping
-  )

http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/backtype/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/testing4j_test.clj b/storm-core/test/clj/backtype/storm/testing4j_test.clj
deleted file mode 100644
index b504f28..0000000
--- a/storm-core/test/clj/backtype/storm/testing4j_test.clj
+++ /dev/null
@@ -1,212 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.testing4j-test
-  (:use [clojure.test])
-  (:use [backtype.storm config clojure testing util])
-  (:require [backtype.storm.integration-test :as it])
-  (:require [backtype.storm.thrift :as thrift])
-  (:import [backtype.storm Testing Config ILocalCluster])
-  (:import [backtype.storm.tuple Values Tuple])
-  (:import [backtype.storm.utils Time Utils])
-  (:import [backtype.storm.testing MkClusterParam TestJob MockedSources TestWordSpout
-            TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
-            AckFailMapTracker MkTupleParam]))
-
-(deftest test-with-simulated-time
-  (is (= false (Time/isSimulating)))
-  (Testing/withSimulatedTime (fn []
-                               (is (= true (Time/isSimulating)))))
-  (is (= false (Time/isSimulating))))
-
-(deftest test-with-local-cluster
-  (let [mk-cluster-param (doto (MkClusterParam.)
-                           (.setSupervisors (int 2))
-                           (.setPortsPerSupervisor (int 5)))
-        daemon-conf (doto (Config.)
-                      (.put SUPERVISOR-ENABLE false)
-                      (.put TOPOLOGY-ACKER-EXECUTORS 0))]
-    (Testing/withLocalCluster mk-cluster-param (reify TestJob
-                                                 (^void run [this ^ILocalCluster cluster]
-                                                   (is (not (nil? cluster)))
-                                                   (is (not (nil? (.getState cluster))))
-                                                   (is (not (nil? (:nimbus (.getState cluster))))))))))
-
-(deftest test-with-simulated-time-local-cluster
-  (let [mk-cluster-param (doto (MkClusterParam.)
-                           (.setSupervisors (int 2)))
-        daemon-conf (doto (Config.)
-                      (.put SUPERVISOR-ENABLE false)
-                      (.put TOPOLOGY-ACKER-EXECUTORS 0))]
-    (is (not (Time/isSimulating)))
-    (Testing/withSimulatedTimeLocalCluster mk-cluster-param (reify TestJob
-                                                              (^void run [this ^ILocalCluster cluster]
-                                                                (is (not (nil? cluster)))
-                                                                (is (not (nil? (.getState cluster))))
-                                                                (is (not (nil? (:nimbus (.getState cluster)))))
-                                                                (is (Time/isSimulating)))))
-    (is (not (Time/isSimulating)))))
-
-(deftest test-complete-topology
-  (doseq [zmq-on? [true false]
-          :let [daemon-conf (doto (Config.)
-                              (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
-                mk-cluster-param (doto (MkClusterParam.)
-                                   (.setSupervisors (int 4))
-                                   (.setDaemonConf daemon-conf))]]
-    (Testing/withSimulatedTimeLocalCluster
-     (reify TestJob
-       (^void run [this ^ILocalCluster cluster]
-         (let [topology (thrift/mk-topology
-                         {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                         {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
-                          "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
-                          "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
-                          })
-               mocked-sources (doto (MockedSources.)
-                                (.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
-                                                                      (Values. (into-array ["bob"]))
-                                                                      (Values. (into-array ["joey"]))
-                                                                      (Values. (into-array ["nathan"]))])
-                                              ))
-               storm-conf (doto (Config.)
-                            (.setNumWorkers 2))
-               complete-topology-param (doto (CompleteTopologyParam.)
-                                         (.setMockedSources mocked-sources)
-                                         (.setStormConf storm-conf))
-               results (Testing/completeTopology cluster
-                                                 topology
-                                                 complete-topology-param)]
-           (is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
-                           (Testing/readTuples results "1")))
-           (is (Testing/multiseteq [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
-                           (read-tuples results "2")))
-           (is (= [[1] [2] [3] [4]]
-                  (Testing/readTuples results "3")))
-           (is (= [[1] [2] [3] [4]]
-                  (Testing/readTuples results "4")))
-           ))))))
-
-(deftest test-with-tracked-cluster
-  (Testing/withTrackedCluster
-   (reify TestJob
-     (^void run [this ^ILocalCluster cluster]
-       (let [[feeder checker] (it/ack-tracking-feeder ["num"])
-             tracked (Testing/mkTrackedTopology
-                      cluster
-                      (topology
-                       {"1" (spout-spec feeder)}
-                       {"2" (bolt-spec {"1" :shuffle} it/identity-bolt)
-                        "3" (bolt-spec {"1" :shuffle} it/identity-bolt)
-                        "4" (bolt-spec
-                             {"2" :shuffle
-                              "3" :shuffle}
-                             (it/agg-bolt 4))}))]
-         (.submitTopology cluster
-                          "test-acking2"
-                          (Config.)
-                          (.getTopology tracked))
-         (advance-cluster-time (.getState cluster) 11)
-         (.feed feeder [1])
-         (Testing/trackedWait tracked (int 1))
-         (checker 0)
-         (.feed feeder [1])
-         (Testing/trackedWait tracked (int 1))
-         (checker 2)
-         )))))
-
-(deftest test-advance-cluster-time
-  (let [daemon-conf (doto (Config.)
-                      (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true))
-        mk-cluster-param (doto (MkClusterParam.)
-                           (.setDaemonConf daemon-conf))]
-    (Testing/withSimulatedTimeLocalCluster
-     mk-cluster-param
-     (reify TestJob
-       (^void run [this ^ILocalCluster cluster]
-         (let [feeder (feeder-spout ["field1"])
-               tracker (AckFailMapTracker.)
-               _ (.setAckFailDelegate feeder tracker)
-               topology (thrift/mk-topology
-                         {"1" (thrift/mk-spout-spec feeder)}
-                         {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
-               storm-conf (doto (Config.)
-                            (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))]
-           (.submitTopology cluster
-                            "timeout-tester"
-                            storm-conf
-                            topology)
-           (.feed feeder ["a"] 1)
-           (.feed feeder ["b"] 2)
-           (.feed feeder ["c"] 3)
-           (Testing/advanceClusterTime cluster (int 9))
-           (it/assert-acked tracker 1 3)
-           (is (not (.isFailed tracker 2)))
-           (Testing/advanceClusterTime cluster (int 12))
-           (it/assert-failed tracker 2)
-           ))))))
-
-(deftest test-disable-tuple-timeout
-  (let [daemon-conf (doto (Config.)
-                      (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))
-        mk-cluster-param (doto (MkClusterParam.)
-                           (.setDaemonConf daemon-conf))]
-    (Testing/withSimulatedTimeLocalCluster
-      mk-cluster-param
-      (reify TestJob
-        (^void run [this ^ILocalCluster cluster]
-          (let [feeder (feeder-spout ["field1"])
-                tracker (AckFailMapTracker.)
-                _ (.setAckFailDelegate feeder tracker)
-                topology (thrift/mk-topology
-                           {"1" (thrift/mk-spout-spec feeder)}
-                           {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
-                storm-conf (doto (Config.)
-                             (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10)
-                             (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))]
-            (.submitTopology cluster
-              "disable-timeout-tester"
-              storm-conf
-              topology)
-            (.feed feeder ["a"] 1)
-            (.feed feeder ["b"] 2)
-            (.feed feeder ["c"] 3)
-            (Testing/advanceClusterTime cluster (int 9))
-            (it/assert-acked tracker 1 3)
-            (is (not (.isFailed tracker 2)))
-            (Testing/advanceClusterTime cluster (int 12))
-            (is (not (.isFailed tracker 2)))
-            ))))))
-
-(deftest test-test-tuple
-  (letlocals
-   ;; test the one-param signature
-   (bind ^Tuple tuple (Testing/testTuple ["james" "bond"]))
-   (is (= ["james" "bond"] (.getValues tuple)))
-   (is (= Utils/DEFAULT_STREAM_ID (.getSourceStreamId tuple)))
-   (is (= ["field1" "field2"] (-> tuple .getFields .toList)))
-   (is (= "component" (.getSourceComponent tuple)))
-
-   ;; test the two-params signature
-   (bind mk-tuple-param (MkTupleParam.))
-   (doto mk-tuple-param
-     (.setStream "test-stream")
-     (.setComponent "test-component")
-     (.setFields (into-array String ["fname" "lname"])))
-   (bind ^Tuple tuple (Testing/testTuple ["james" "bond"] mk-tuple-param))
-   (is (= ["james" "bond"] (.getValues tuple)))
-   (is (= "test-stream" (.getSourceStreamId tuple)))
-   (is (= ["fname" "lname"] (-> tuple .getFields .toList)))
-   (is (= "test-component" (.getSourceComponent tuple)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/integration/backtype/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/backtype/storm/integration_test.clj b/storm-core/test/clj/integration/backtype/storm/integration_test.clj
new file mode 100644
index 0000000..f5fa501
--- /dev/null
+++ b/storm-core/test/clj/integration/backtype/storm/integration_test.clj
@@ -0,0 +1,622 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns integration.backtype.storm.integration-test
+  (:use [clojure test])
+  (:import [backtype.storm Config])
+  (:import [backtype.storm.topology TopologyBuilder])
+  (:import [backtype.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions])
+  (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount
+            TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
+  (:import [backtype.storm.tuple Fields])
+  (:use [backtype.storm testing config clojure util])
+  (:use [backtype.storm.daemon common])
+  (:require [backtype.storm [thrift :as thrift]]))
+
+(deftest test-basic-topology
+  (doseq [zmq-on? [true false]]
+    (with-simulated-time-local-cluster [cluster :supervisors 4
+                                        :daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
+      (let [topology (thrift/mk-topology
+                      {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                      {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
+                       "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
+                       "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
+                       })
+            results (complete-topology cluster
+                                       topology
+                                       :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
+                                       :storm-conf {TOPOLOGY-WORKERS 2})]
+        (is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
+                 (read-tuples results "1")))
+        (is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
+                 (read-tuples results "2")))
+        (is (= [[1] [2] [3] [4]]
+               (read-tuples results "3")))
+        (is (= [[1] [2] [3] [4]]
+               (read-tuples results "4")))
+        ))))
+
+(defbolt emit-task-id ["tid"] {:prepare true}
+  [conf context collector]
+  (let [tid (.getThisTaskIndex context)]
+    (bolt
+      (execute [tuple]
+        (emit-bolt! collector [tid] :anchor tuple)
+        (ack! collector tuple)
+        ))))
+
+(deftest test-multi-tasks-per-executor
+  (with-simulated-time-local-cluster [cluster :supervisors 4]
+    (let [topology (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true))}
+                    {"2" (thrift/mk-bolt-spec {"1" :all} emit-task-id
+                      :parallelism-hint 3
+                      :conf {TOPOLOGY-TASKS 6})
+                     })
+          results (complete-topology cluster
+                                     topology
+                                     :mock-sources {"1" [["a"]]})]
+      (is (ms= [[0] [1] [2] [3] [4] [5]]
+               (read-tuples results "2")))
+      )))
+
+(defbolt ack-every-other {} {:prepare true}
+  [conf context collector]
+  (let [state (atom -1)]
+    (bolt
+      (execute [tuple]
+        (let [val (swap! state -)]
+          (when (pos? val)
+            (ack! collector tuple)
+            ))))))
+
+(defn assert-loop [afn ids]
+  (while (not (every? afn ids))
+    (Thread/sleep 1)))
+
+(defn assert-acked [tracker & ids]
+  (assert-loop #(.isAcked tracker %) ids))
+
+(defn assert-failed [tracker & ids]
+  (assert-loop #(.isFailed tracker %) ids))
+
+(deftest test-timeout
+  (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
+    (let [feeder (feeder-spout ["field1"])
+          tracker (AckFailMapTracker.)
+          _ (.setAckFailDelegate feeder tracker)
+          topology (thrift/mk-topology
+                     {"1" (thrift/mk-spout-spec feeder)}
+                     {"2" (thrift/mk-bolt-spec {"1" :global} ack-every-other)})]      
+      (submit-local-topology (:nimbus cluster)
+                             "timeout-tester"
+                             {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
+                             topology)
+      (advance-cluster-time cluster 11)
+      (.feed feeder ["a"] 1)
+      (.feed feeder ["b"] 2)
+      (.feed feeder ["c"] 3)
+      (advance-cluster-time cluster 9)
+      (assert-acked tracker 1 3)
+      (is (not (.isFailed tracker 2)))
+      (advance-cluster-time cluster 12)
+      (assert-failed tracker 2)
+      )))
+
+(defn mk-validate-topology-1 []
+  (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                    {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+
+(defn mk-invalidate-topology-1 []
+  (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                    {"2" (thrift/mk-bolt-spec {"3" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+
+(defn mk-invalidate-topology-2 []
+  (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                    {"2" (thrift/mk-bolt-spec {"1" ["non-exists-field"]} (TestWordCounter.) :parallelism-hint 4)}))
+
+(defn mk-invalidate-topology-3 []
+  (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                    {"2" (thrift/mk-bolt-spec {["1" "non-exists-stream"] ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+
+(defn try-complete-wc-topology [cluster topology]
+  (try (do
+         (complete-topology cluster
+                            topology
+                            :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
+                            :storm-conf {TOPOLOGY-WORKERS 2})
+         false)
+       (catch InvalidTopologyException e true)))
+
+(deftest test-validate-topology-structure
+  (with-simulated-time-local-cluster [cluster :supervisors 4]
+    (let [any-error1? (try-complete-wc-topology cluster (mk-validate-topology-1))
+          any-error2? (try-complete-wc-topology cluster (mk-invalidate-topology-1))
+          any-error3? (try-complete-wc-topology cluster (mk-invalidate-topology-2))
+          any-error4? (try-complete-wc-topology cluster (mk-invalidate-topology-3))]
+      (is (= any-error1? false))
+      (is (= any-error2? true))
+      (is (= any-error3? true))
+      (is (= any-error4? true)))))
+
+(defbolt identity-bolt ["num"]
+  [tuple collector]
+  (emit-bolt! collector (.getValues tuple) :anchor tuple)
+  (ack! collector tuple))
+
+(deftest test-system-stream
+  ;; this test works because mocking a spout splits up the tuples evenly among the tasks
+  (with-simulated-time-local-cluster [cluster]
+      (let [topology (thrift/mk-topology
+                      {"1" (thrift/mk-spout-spec (TestWordSpout. true) :p 3)}
+                      {"2" (thrift/mk-bolt-spec {"1" ["word"] ["1" "__system"] :global} identity-bolt :p 1)
+                       })
+            results (complete-topology cluster
+                                       topology
+                                       :mock-sources {"1" [["a"] ["b"] ["c"]]}
+                                       :storm-conf {TOPOLOGY-WORKERS 2})]
+        (is (ms= [["a"] ["b"] ["c"] ["startup"] ["startup"] ["startup"]]
+                 (read-tuples results "2")))
+        )))
+
+(defn ack-tracking-feeder [fields]
+  (let [tracker (AckTracker.)]
+    [(doto (feeder-spout fields)
+       (.setAckFailDelegate tracker))
+     (fn [val]
+       (is (= (.getNumAcks tracker) val))
+       (.resetNumAcks tracker)
+       )]
+    ))
+
+(defbolt branching-bolt ["num"]
+  {:params [amt]}
+  [tuple collector]
+  (doseq [i (range amt)]
+    (emit-bolt! collector [i] :anchor tuple))
+  (ack! collector tuple))
+
+(defbolt agg-bolt ["num"] {:prepare true :params [amt]}
+  [conf context collector]
+  (let [seen (atom [])]
+    (bolt
+      (execute [tuple]
+        (swap! seen conj tuple)
+        (when (= (count @seen) amt)
+          (emit-bolt! collector [1] :anchor @seen)
+          (doseq [s @seen]
+            (ack! collector s))
+          (reset! seen [])
+          )))
+      ))
+
+(defbolt ack-bolt {}
+  [tuple collector]
+  (ack! collector tuple))
+
+(deftest test-acking
+  (with-tracked-cluster [cluster]
+    (let [[feeder1 checker1] (ack-tracking-feeder ["num"])
+          [feeder2 checker2] (ack-tracking-feeder ["num"])
+          [feeder3 checker3] (ack-tracking-feeder ["num"])
+          tracked (mk-tracked-topology
+                   cluster
+                   (topology
+                     {"1" (spout-spec feeder1)
+                      "2" (spout-spec feeder2)
+                      "3" (spout-spec feeder3)}
+                     {"4" (bolt-spec {"1" :shuffle} (branching-bolt 2))
+                      "5" (bolt-spec {"2" :shuffle} (branching-bolt 4))
+                      "6" (bolt-spec {"3" :shuffle} (branching-bolt 1))
+                      "7" (bolt-spec
+                            {"4" :shuffle
+                            "5" :shuffle
+                            "6" :shuffle}
+                            (agg-bolt 3))
+                      "8" (bolt-spec {"7" :shuffle} (branching-bolt 2))
+                      "9" (bolt-spec {"8" :shuffle} ack-bolt)}
+                     ))]
+      (submit-local-topology (:nimbus cluster)
+                             "acking-test1"
+                             {}
+                             (:topology tracked))
+      (advance-cluster-time cluster 11)
+      (.feed feeder1 [1])
+      (tracked-wait tracked 1)
+      (checker1 0)
+      (.feed feeder2 [1])
+      (tracked-wait tracked 1)
+      (checker1 1)
+      (checker2 1)
+      (.feed feeder1 [1])
+      (tracked-wait tracked 1)
+      (checker1 0)
+      (.feed feeder1 [1])
+      (tracked-wait tracked 1)
+      (checker1 1)
+      (.feed feeder3 [1])
+      (tracked-wait tracked 1)
+      (checker1 0)
+      (checker3 0)
+      (.feed feeder2 [1])
+      (tracked-wait tracked 1)
+      (checker1 1)
+      (checker2 1)
+      (checker3 1)
+      
+      )))
+
+(deftest test-ack-branching
+  (with-tracked-cluster [cluster]
+    (let [[feeder checker] (ack-tracking-feeder ["num"])
+          tracked (mk-tracked-topology
+                   cluster
+                   (topology
+                     {"1" (spout-spec feeder)}
+                     {"2" (bolt-spec {"1" :shuffle} identity-bolt)
+                      "3" (bolt-spec {"1" :shuffle} identity-bolt)
+                      "4" (bolt-spec
+                            {"2" :shuffle
+                             "3" :shuffle}
+                             (agg-bolt 4))}))]
+      (submit-local-topology (:nimbus cluster)
+                             "test-acking2"
+                             {}
+                             (:topology tracked))
+      (advance-cluster-time cluster 11)
+      (.feed feeder [1])
+      (tracked-wait tracked 1)
+      (checker 0)
+      (.feed feeder [1])
+      (tracked-wait tracked 1)
+      (checker 2)
+      )))
+
+(defbolt dup-anchor ["num"]
+  [tuple collector]
+  (emit-bolt! collector [1] :anchor [tuple tuple])
+  (ack! collector tuple))
+
+(def bolt-prepared? (atom false))
+(defbolt prepare-tracked-bolt [] {:prepare true}
+  [conf context collector]  
+  (reset! bolt-prepared? true)
+  (bolt
+   (execute [tuple]
+            (ack! collector tuple))))
+
+(def spout-opened? (atom false))
+(defspout open-tracked-spout ["val"]
+  [conf context collector]
+  (reset! spout-opened? true)
+  (spout
+   (nextTuple [])))
+
+(deftest test-submit-inactive-topology
+  (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
+    (let [feeder (feeder-spout ["field1"])
+          tracker (AckFailMapTracker.)
+          _ (.setAckFailDelegate feeder tracker)
+          topology (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec feeder)
+                     "2" (thrift/mk-spout-spec open-tracked-spout)}
+                    {"3" (thrift/mk-bolt-spec {"1" :global} prepare-tracked-bolt)})]
+      (reset! bolt-prepared? false)
+      (reset! spout-opened? false)      
+      
+      (submit-local-topology-with-opts (:nimbus cluster)
+        "test"
+        {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
+        topology
+        (SubmitOptions. TopologyInitialStatus/INACTIVE))
+      (advance-cluster-time cluster 11)
+      (.feed feeder ["a"] 1)
+      (advance-cluster-time cluster 9)
+      (is (not @bolt-prepared?))
+      (is (not @spout-opened?))        
+      (.activate (:nimbus cluster) "test")              
+      
+      (advance-cluster-time cluster 12)
+      (assert-acked tracker 1)
+      (is @bolt-prepared?)
+      (is @spout-opened?))))
+
+(deftest test-acking-self-anchor
+  (with-tracked-cluster [cluster]
+    (let [[feeder checker] (ack-tracking-feeder ["num"])
+          tracked (mk-tracked-topology
+                   cluster
+                   (topology
+                     {"1" (spout-spec feeder)}
+                     {"2" (bolt-spec {"1" :shuffle} dup-anchor)
+                      "3" (bolt-spec {"2" :shuffle} ack-bolt)}))]
+      (submit-local-topology (:nimbus cluster)
+                             "test"
+                             {}
+                             (:topology tracked))
+      (advance-cluster-time cluster 11)
+      (.feed feeder [1])
+      (tracked-wait tracked 1)
+      (checker 1)
+      (.feed feeder [1])
+      (.feed feeder [1])
+      (.feed feeder [1])
+      (tracked-wait tracked 3)
+      (checker 3)
+      )))
+
+;; (defspout ConstantSpout ["val"] {:prepare false}
+;;   [collector]
+;;   (Time/sleep 100)
+;;   (emit-spout! collector [1]))
+
+;; (def errored (atom false))
+;; (def restarted (atom false))
+
+;; (defbolt local-error-checker {} [tuple collector]
+;;   (when-not @errored
+;;     (reset! errored true)
+;;     (println "erroring")
+;;     (throw (RuntimeException.)))
+;;   (when-not @restarted (println "restarted"))
+;;   (reset! restarted true))
+
+;; (deftest test-no-halt-local-mode
+;;   (with-simulated-time-local-cluster [cluster]
+;;       (let [topology (topology
+;;                       {1 (spout-spec ConstantSpout)}
+;;                       {2 (bolt-spec {1 :shuffle} local-error-checker)
+;;                        })]
+;;         (submit-local-topology (:nimbus cluster)
+;;                                "test"
+;;                                {}
+;;                                topology)
+;;         (while (not @restarted)
+;;           (advance-time-ms! 100))
+;;         )))
+
+(defspout IncSpout ["word"]
+  [conf context collector]
+  (let [state (atom 0)]
+    (spout
+     (nextTuple []
+       (Thread/sleep 100)
+       (emit-spout! collector [@state] :id 1)         
+       )
+     (ack [id]
+       (swap! state inc))
+     )))
+
+
+(defspout IncSpout2 ["word"] {:params [prefix]}
+  [conf context collector]
+  (let [state (atom 0)]
+    (spout
+     (nextTuple []
+       (Thread/sleep 100)
+       (swap! state inc)
+       (emit-spout! collector [(str prefix "-" @state)])         
+       )
+     )))
+
+;; (deftest test-clojure-spout
+;;   (with-local-cluster [cluster]
+;;     (let [nimbus (:nimbus cluster)
+;;           top (topology
+;;                {1 (spout-spec IncSpout)}
+;;                {}
+;;                )]
+;;       (submit-local-topology nimbus
+;;                              "spout-test"
+;;                              {TOPOLOGY-DEBUG true
+;;                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 3}
+;;                              top)
+;;       (Thread/sleep 10000)
+;;       (.killTopology nimbus "spout-test")
+;;       (Thread/sleep 10000)
+;;       )))
+
+(deftest test-kryo-decorators-config
+  (with-simulated-time-local-cluster [cluster
+                                      :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
+                                                    TOPOLOGY-KRYO-DECORATORS ["this-is-overriden"]}]
+    (letlocals
+     (bind builder (TopologyBuilder.))
+     (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
+     (-> builder
+         (.setBolt "2"
+                   (TestConfBolt.
+                    {TOPOLOGY-KRYO-DECORATORS ["one" "two"]}))
+         (.shuffleGrouping "1"))
+     
+     (bind results
+           (complete-topology cluster
+                              (.createTopology builder)
+                              :storm-conf {TOPOLOGY-KRYO-DECORATORS ["one" "three"]}
+                              :mock-sources {"1" [[TOPOLOGY-KRYO-DECORATORS]]}))
+     (is (= {"topology.kryo.decorators" (list "one" "two" "three")}            
+            (->> (read-tuples results "2")
+                 (apply concat)
+                 (apply hash-map)))))))
+
+(deftest test-component-specific-config
+  (with-simulated-time-local-cluster [cluster
+                                      :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true}]
+    (letlocals
+     (bind builder (TopologyBuilder.))
+     (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
+     (-> builder
+         (.setBolt "2"
+                   (TestConfBolt.
+                    {"fake.config" 123
+                     TOPOLOGY-MAX-TASK-PARALLELISM 20
+                     TOPOLOGY-MAX-SPOUT-PENDING 30
+                     TOPOLOGY-KRYO-REGISTER [{"fake.type" "bad.serializer"}
+                                             {"fake.type2" "a.serializer"}]
+                     }))
+         (.shuffleGrouping "1")
+         (.setMaxTaskParallelism (int 2))
+         (.addConfiguration "fake.config2" 987)
+         )
+     
+
+     (bind results
+           (complete-topology cluster
+                              (.createTopology builder)
+                              :storm-conf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer" "fake.type3" "a.serializer3"}]}
+                              :mock-sources {"1" [["fake.config"]
+                                                  [TOPOLOGY-MAX-TASK-PARALLELISM]
+                                                  [TOPOLOGY-MAX-SPOUT-PENDING]
+                                                  ["fake.config2"]
+                                                  [TOPOLOGY-KRYO-REGISTER]
+                                                  ]}))
+     (is (= {"fake.config" 123
+             "fake.config2" 987
+             TOPOLOGY-MAX-TASK-PARALLELISM 2
+             TOPOLOGY-MAX-SPOUT-PENDING 30
+             TOPOLOGY-KRYO-REGISTER {"fake.type" "good.serializer"
+                                     "fake.type2" "a.serializer"
+                                     "fake.type3" "a.serializer3"}}
+            (->> (read-tuples results "2")
+                 (apply concat)
+                 (apply hash-map))
+            ))
+     )))
+
+(defbolt hooks-bolt ["emit" "ack" "fail" "executed"] {:prepare true}
+  [conf context collector]
+  (let [acked (atom 0)
+        failed (atom 0)
+        executed (atom 0)
+        emitted (atom 0)]
+    (.addTaskHook context
+                  (reify backtype.storm.hooks.ITaskHook
+                    (prepare [this conf context]
+                      )
+                    (cleanup [this]
+                      )
+                    (emit [this info]
+                      (swap! emitted inc))
+                    (boltAck [this info]
+                      (swap! acked inc))
+                    (boltFail [this info]
+                      (swap! failed inc))
+                    (boltExecute [this info]
+                      (swap! executed inc))
+                      ))
+    (bolt
+     (execute [tuple]
+        (emit-bolt! collector [@emitted @acked @failed @executed])
+        (if (= 0 (- @acked @failed))
+          (ack! collector tuple)
+          (fail! collector tuple))
+        ))))
+
+(deftest test-hooks
+  (with-simulated-time-local-cluster [cluster]
+    (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])))
+                              }
+                             {"2" (bolt-spec {"1" :shuffle}
+                                             hooks-bolt)
+                              })
+          results (complete-topology cluster
+                                     topology
+                                     :mock-sources {"1" [[1]
+                                                         [1]
+                                                         [1]
+                                                         [1]
+                                                         ]})]
+      (is (= [[0 0 0 0]
+              [2 1 0 1]
+              [4 1 1 2]
+              [6 2 1 3]]
+             (read-tuples results "2")
+             )))))
+
+(defbolt report-errors-bolt {}
+  [tuple collector]
+  (doseq [i (range (.getValue tuple 0))]
+    (report-error! collector (RuntimeException.)))
+  (ack! collector tuple))
+
+(deftest test-throttled-errors
+  (with-simulated-time
+    (with-tracked-cluster [cluster]
+      (let [state (:storm-cluster-state cluster)
+            [feeder checker] (ack-tracking-feeder ["num"])
+            tracked (mk-tracked-topology
+                     cluster
+                     (topology
+                       {"1" (spout-spec feeder)}
+                       {"2" (bolt-spec {"1" :shuffle} report-errors-bolt)}))
+            _       (submit-local-topology (:nimbus cluster)
+                                             "test-errors"
+                                             {TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS 10
+                                              TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL 4
+                                              TOPOLOGY-DEBUG true
+                                              }
+                                             (:topology tracked))
+            _ (advance-cluster-time cluster 11)
+            storm-id (get-storm-id state "test-errors")
+            errors-count (fn [] (count (.errors state storm-id "2")))]
+
+        (is (nil? (.last-error state storm-id "2")))
+
+        ;; so it launches the topology
+        (advance-cluster-time cluster 2)
+        (.feed feeder [6])
+        (tracked-wait tracked 1)
+        (is (= 4 (errors-count)))
+        (is (.last-error state storm-id "2"))
+        
+        (advance-time-secs! 5)
+        (.feed feeder [2])
+        (tracked-wait tracked 1)
+        (is (= 4 (errors-count)))
+        (is (.last-error state storm-id "2"))
+        
+        (advance-time-secs! 6)
+        (.feed feeder [2])
+        (tracked-wait tracked 1)
+        (is (= 6 (errors-count)))
+        (is (.last-error state storm-id "2"))
+        
+        (advance-time-secs! 6)
+        (.feed feeder [3])
+        (tracked-wait tracked 1)
+        (is (= 8 (errors-count)))
+        (is (.last-error state storm-id "2"))))))
+
+
+(deftest test-acking-branching-complex
+  ;; test acking with branching in the topology
+  )
+
+
+(deftest test-fields-grouping
+  ;; 1. put a shitload of random tuples through it and test that counts are right
+  ;; 2. test that different spouts with different phints group the same way
+  )
+
+(deftest test-all-grouping
+  )
+
+(deftest test-direct-grouping
+  )

http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/integration/backtype/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/backtype/storm/testing4j_test.clj b/storm-core/test/clj/integration/backtype/storm/testing4j_test.clj
new file mode 100644
index 0000000..5cdb182
--- /dev/null
+++ b/storm-core/test/clj/integration/backtype/storm/testing4j_test.clj
@@ -0,0 +1,212 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns integration.backtype.storm.testing4j-test
+  (:use [clojure.test])
+  (:use [backtype.storm config clojure testing util])
+  (:require [integration.backtype.storm.integration-test :as it])
+  (:require [backtype.storm.thrift :as thrift])
+  (:import [backtype.storm Testing Config ILocalCluster])
+  (:import [backtype.storm.tuple Values Tuple])
+  (:import [backtype.storm.utils Time Utils])
+  (:import [backtype.storm.testing MkClusterParam TestJob MockedSources TestWordSpout
+            TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
+            AckFailMapTracker MkTupleParam]))
+
+(deftest test-with-simulated-time
+  (is (= false (Time/isSimulating)))
+  (Testing/withSimulatedTime (fn []
+                               (is (= true (Time/isSimulating)))))
+  (is (= false (Time/isSimulating))))
+
+(deftest test-with-local-cluster
+  (let [mk-cluster-param (doto (MkClusterParam.)
+                           (.setSupervisors (int 2))
+                           (.setPortsPerSupervisor (int 5)))
+        daemon-conf (doto (Config.)
+                      (.put SUPERVISOR-ENABLE false)
+                      (.put TOPOLOGY-ACKER-EXECUTORS 0))]
+    (Testing/withLocalCluster mk-cluster-param (reify TestJob
+                                                 (^void run [this ^ILocalCluster cluster]
+                                                   (is (not (nil? cluster)))
+                                                   (is (not (nil? (.getState cluster))))
+                                                   (is (not (nil? (:nimbus (.getState cluster))))))))))
+
+(deftest test-with-simulated-time-local-cluster
+  (let [mk-cluster-param (doto (MkClusterParam.)
+                           (.setSupervisors (int 2)))
+        daemon-conf (doto (Config.)
+                      (.put SUPERVISOR-ENABLE false)
+                      (.put TOPOLOGY-ACKER-EXECUTORS 0))]
+    (is (not (Time/isSimulating)))
+    (Testing/withSimulatedTimeLocalCluster mk-cluster-param (reify TestJob
+                                                              (^void run [this ^ILocalCluster cluster]
+                                                                (is (not (nil? cluster)))
+                                                                (is (not (nil? (.getState cluster))))
+                                                                (is (not (nil? (:nimbus (.getState cluster)))))
+                                                                (is (Time/isSimulating)))))
+    (is (not (Time/isSimulating)))))
+
+(deftest test-complete-topology
+  (doseq [zmq-on? [true false]
+          :let [daemon-conf (doto (Config.)
+                              (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
+                mk-cluster-param (doto (MkClusterParam.)
+                                   (.setSupervisors (int 4))
+                                   (.setDaemonConf daemon-conf))]]
+    (Testing/withSimulatedTimeLocalCluster
+     (reify TestJob
+       (^void run [this ^ILocalCluster cluster]
+         (let [topology (thrift/mk-topology
+                         {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                         {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
+                          "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
+                          "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
+                          })
+               mocked-sources (doto (MockedSources.)
+                                (.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
+                                                                      (Values. (into-array ["bob"]))
+                                                                      (Values. (into-array ["joey"]))
+                                                                      (Values. (into-array ["nathan"]))])
+                                              ))
+               storm-conf (doto (Config.)
+                            (.setNumWorkers 2))
+               complete-topology-param (doto (CompleteTopologyParam.)
+                                         (.setMockedSources mocked-sources)
+                                         (.setStormConf storm-conf))
+               results (Testing/completeTopology cluster
+                                                 topology
+                                                 complete-topology-param)]
+           (is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
+                           (Testing/readTuples results "1")))
+           (is (Testing/multiseteq [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
+                           (read-tuples results "2")))
+           (is (= [[1] [2] [3] [4]]
+                  (Testing/readTuples results "3")))
+           (is (= [[1] [2] [3] [4]]
+                  (Testing/readTuples results "4")))
+           ))))))
+
+(deftest test-with-tracked-cluster
+  (Testing/withTrackedCluster
+   (reify TestJob
+     (^void run [this ^ILocalCluster cluster]
+       (let [[feeder checker] (it/ack-tracking-feeder ["num"])
+             tracked (Testing/mkTrackedTopology
+                      cluster
+                      (topology
+                       {"1" (spout-spec feeder)}
+                       {"2" (bolt-spec {"1" :shuffle} it/identity-bolt)
+                        "3" (bolt-spec {"1" :shuffle} it/identity-bolt)
+                        "4" (bolt-spec
+                             {"2" :shuffle
+                              "3" :shuffle}
+                             (it/agg-bolt 4))}))]
+         (.submitTopology cluster
+                          "test-acking2"
+                          (Config.)
+                          (.getTopology tracked))
+         (advance-cluster-time (.getState cluster) 11)
+         (.feed feeder [1])
+         (Testing/trackedWait tracked (int 1))
+         (checker 0)
+         (.feed feeder [1])
+         (Testing/trackedWait tracked (int 1))
+         (checker 2)
+         )))))
+
+(deftest test-advance-cluster-time
+  (let [daemon-conf (doto (Config.)
+                      (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true))
+        mk-cluster-param (doto (MkClusterParam.)
+                           (.setDaemonConf daemon-conf))]
+    (Testing/withSimulatedTimeLocalCluster
+     mk-cluster-param
+     (reify TestJob
+       (^void run [this ^ILocalCluster cluster]
+         (let [feeder (feeder-spout ["field1"])
+               tracker (AckFailMapTracker.)
+               _ (.setAckFailDelegate feeder tracker)
+               topology (thrift/mk-topology
+                         {"1" (thrift/mk-spout-spec feeder)}
+                         {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
+               storm-conf (doto (Config.)
+                            (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))]
+           (.submitTopology cluster
+                            "timeout-tester"
+                            storm-conf
+                            topology)
+           (.feed feeder ["a"] 1)
+           (.feed feeder ["b"] 2)
+           (.feed feeder ["c"] 3)
+           (Testing/advanceClusterTime cluster (int 9))
+           (it/assert-acked tracker 1 3)
+           (is (not (.isFailed tracker 2)))
+           (Testing/advanceClusterTime cluster (int 12))
+           (it/assert-failed tracker 2)
+           ))))))
+
+(deftest test-disable-tuple-timeout
+  (let [daemon-conf (doto (Config.)
+                      (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))
+        mk-cluster-param (doto (MkClusterParam.)
+                           (.setDaemonConf daemon-conf))]
+    (Testing/withSimulatedTimeLocalCluster
+      mk-cluster-param
+      (reify TestJob
+        (^void run [this ^ILocalCluster cluster]
+          (let [feeder (feeder-spout ["field1"])
+                tracker (AckFailMapTracker.)
+                _ (.setAckFailDelegate feeder tracker)
+                topology (thrift/mk-topology
+                           {"1" (thrift/mk-spout-spec feeder)}
+                           {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
+                storm-conf (doto (Config.)
+                             (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10)
+                             (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))]
+            (.submitTopology cluster
+              "disable-timeout-tester"
+              storm-conf
+              topology)
+            (.feed feeder ["a"] 1)
+            (.feed feeder ["b"] 2)
+            (.feed feeder ["c"] 3)
+            (Testing/advanceClusterTime cluster (int 9))
+            (it/assert-acked tracker 1 3)
+            (is (not (.isFailed tracker 2)))
+            (Testing/advanceClusterTime cluster (int 12))
+            (is (not (.isFailed tracker 2)))
+            ))))))
+
+(deftest test-test-tuple
+  (letlocals
+   ;; test the one-param signature
+   (bind ^Tuple tuple (Testing/testTuple ["james" "bond"]))
+   (is (= ["james" "bond"] (.getValues tuple)))
+   (is (= Utils/DEFAULT_STREAM_ID (.getSourceStreamId tuple)))
+   (is (= ["field1" "field2"] (-> tuple .getFields .toList)))
+   (is (= "component" (.getSourceComponent tuple)))
+
+   ;; test the two-params signature
+   (bind mk-tuple-param (MkTupleParam.))
+   (doto mk-tuple-param
+     (.setStream "test-stream")
+     (.setComponent "test-component")
+     (.setFields (into-array String ["fname" "lname"])))
+   (bind ^Tuple tuple (Testing/testTuple ["james" "bond"] mk-tuple-param))
+   (is (= ["james" "bond"] (.getValues tuple)))
+   (is (= "test-stream" (.getSourceStreamId tuple)))
+   (is (= ["fname" "lname"] (-> tuple .getFields .toList)))
+   (is (= "test-component" (.getSourceComponent tuple)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/integration/storm/trident/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/storm/trident/integration_test.clj b/storm-core/test/clj/integration/storm/trident/integration_test.clj
new file mode 100644
index 0000000..bcd8173
--- /dev/null
+++ b/storm-core/test/clj/integration/storm/trident/integration_test.clj
@@ -0,0 +1,292 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns integration.storm.trident.integration-test
+  (:use [clojure test])
+  (:require [backtype.storm [testing :as t]])
+  (:import [storm.trident.testing Split CountAsAggregator StringLength TrueFilter
+            MemoryMapState$Factory])
+  (:import [storm.trident.state StateSpec])
+  (:import [storm.trident.operation.impl CombinerAggStateUpdater])
+  (:use [storm.trident testing])
+  (:use [backtype.storm util]))
+  
+(bootstrap-imports)
+
+(deftest test-memory-map-get-tuples
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind feeder (feeder-spout ["sentence"]))
+        (bind word-counts
+          (-> topo
+              (.newStream "tester" feeder)
+              (.each (fields "sentence") (Split.) (fields "word"))
+              (.groupBy (fields "word"))
+              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+              (.parallelismHint 6)
+              ))       
+        (-> topo
+            (.newDRPCStream "all-tuples" drpc)
+            (.broadcast)
+            (.stateQuery word-counts (fields "args") (TupleCollectionGet.) (fields "word" "count"))
+            (.project (fields "word" "count")))
+        (with-topology [cluster topo]
+          (feed feeder [["hello the man said"] ["the"]])
+          (is (= #{["hello" 1] ["said" 1] ["the" 2] ["man" 1]}
+                 (into #{} (exec-drpc drpc "all-tuples" "man"))))
+          (feed feeder [["the foo"]])
+          (is (= #{["hello" 1] ["said" 1] ["the" 3] ["man" 1] ["foo" 1]}
+                 (into #{} (exec-drpc drpc "all-tuples" "man")))))))))
+
+(deftest test-word-count
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind feeder (feeder-spout ["sentence"]))
+        (bind word-counts
+          (-> topo
+              (.newStream "tester" feeder)
+              (.each (fields "sentence") (Split.) (fields "word"))
+              (.groupBy (fields "word"))
+              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+              (.parallelismHint 6)
+              ))
+        (-> topo
+            (.newDRPCStream "words" drpc)
+            (.each (fields "args") (Split.) (fields "word"))
+            (.groupBy (fields "word"))
+            (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
+            (.aggregate (fields "count") (Sum.) (fields "sum"))
+            (.project (fields "sum")))
+        (with-topology [cluster topo]
+          (feed feeder [["hello the man said"] ["the"]])
+          (is (= [[2]] (exec-drpc drpc "words" "the")))
+          (is (= [[1]] (exec-drpc drpc "words" "hello")))
+          (feed feeder [["the man on the moon"] ["where are you"]])
+          (is (= [[4]] (exec-drpc drpc "words" "the")))
+          (is (= [[2]] (exec-drpc drpc "words" "man")))
+          (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
+          )))))
+
+;; this test reproduces a bug where committer spouts freeze processing when 
+;; there's at least one repartitioning after the spout
+(deftest test-word-count-committer-spout
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind feeder (feeder-committer-spout ["sentence"]))
+        (.setWaitToEmit feeder false) ;;this causes lots of empty batches
+        (bind word-counts
+          (-> topo
+              (.newStream "tester" feeder)
+              (.parallelismHint 2)
+              (.each (fields "sentence") (Split.) (fields "word"))
+              (.groupBy (fields "word"))
+              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+              (.parallelismHint 6)
+              ))
+        (-> topo
+            (.newDRPCStream "words" drpc)
+            (.each (fields "args") (Split.) (fields "word"))
+            (.groupBy (fields "word"))
+            (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
+            (.aggregate (fields "count") (Sum.) (fields "sum"))
+            (.project (fields "sum")))
+        (with-topology [cluster topo]
+          (feed feeder [["hello the man said"] ["the"]])
+          (is (= [[2]] (exec-drpc drpc "words" "the")))
+          (is (= [[1]] (exec-drpc drpc "words" "hello")))
+          (Thread/sleep 1000) ;; this is necessary to reproduce the bug where committer spouts freeze processing
+          (feed feeder [["the man on the moon"] ["where are you"]])
+          (is (= [[4]] (exec-drpc drpc "words" "the")))
+          (is (= [[2]] (exec-drpc drpc "words" "man")))
+          (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
+          (feed feeder [["the the"]])
+          (is (= [[6]] (exec-drpc drpc "words" "the")))
+          (feed feeder [["the"]])
+          (is (= [[7]] (exec-drpc drpc "words" "the")))
+          )))))
+
+
+(deftest test-count-agg
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (-> topo
+            (.newDRPCStream "numwords" drpc)
+            (.each (fields "args") (Split.) (fields "word"))
+            (.aggregate (CountAsAggregator.) (fields "count"))
+            (.parallelismHint 2) ;;this makes sure batchGlobal is working correctly
+            (.project (fields "count")))
+        (with-topology [cluster topo]
+          (doseq [i (range 100)]
+            (is (= [[1]] (exec-drpc drpc "numwords" "the"))))
+          (is (= [[0]] (exec-drpc drpc "numwords" "")))
+          (is (= [[8]] (exec-drpc drpc "numwords" "1 2 3 4 5 6 7 8")))
+          )))))
+          
+(deftest test-split-merge
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
+        (bind s1
+          (-> drpc-stream
+              (.each (fields "args") (Split.) (fields "word"))
+              (.project (fields "word"))))
+        (bind s2
+          (-> drpc-stream
+              (.each (fields "args") (StringLength.) (fields "len"))
+              (.project (fields "len"))))
+
+        (.merge topo [s1 s2])
+        (with-topology [cluster topo]
+          (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
+          (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
+          )))))
+
+(deftest test-multiple-groupings-same-stream
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
+                                   (.each (fields "args") (TrueFilter.))))
+        (bind s1
+          (-> drpc-stream
+              (.groupBy (fields "args"))
+              (.aggregate (CountAsAggregator.) (fields "count"))))
+        (bind s2
+          (-> drpc-stream
+              (.groupBy (fields "args"))
+              (.aggregate (CountAsAggregator.) (fields "count"))))
+
+        (.merge topo [s1 s2])
+        (with-topology [cluster topo]
+          (is (t/ms= [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the")))
+          (is (t/ms= [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa")))
+          )))))
+          
+(deftest test-multi-repartition
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
+                                   (.each (fields "args") (Split.) (fields "word"))
+                                   (.localOrShuffle)
+                                   (.shuffle)
+                                   (.aggregate (CountAsAggregator.) (fields "count"))
+                                   ))
+        (with-topology [cluster topo]
+          (is (t/ms= [[2]] (exec-drpc drpc "tester" "the man")))
+          (is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa")))
+          )))))
+
+(deftest test-stream-projection-validation
+  (t/with-local-cluster [cluster]
+    (letlocals
+     (bind feeder (feeder-committer-spout ["sentence"]))
+     (bind topo (TridentTopology.))
+     ;; valid projection fields will not throw exceptions
+     (bind word-counts
+           (-> topo
+               (.newStream "tester" feeder)
+               (.each (fields "sentence") (Split.) (fields "word"))
+               (.groupBy (fields "word"))
+               (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+               (.parallelismHint 6)
+               ))
+     (bind stream (-> topo
+                      (.newStream "tester" feeder)))
+     ;; test .each
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence1") (Split.) (fields "word")))))
+     ;; test .groupBy
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence") (Split.) (fields "word"))
+                      (.groupBy (fields "word1")))))
+     ;; test .aggregate
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence") (Split.) (fields "word"))
+                      (.groupBy (fields "word"))
+                      (.aggregate (fields "word1") (Count.) (fields "count")))))
+     ;; test .project
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.project (fields "sentence1")))))
+     ;; test .partitionBy
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.partitionBy (fields "sentence1")))))
+     ;; test .partitionAggregate
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence") (Split.) (fields "word"))
+                      (.partitionAggregate (fields "word1") (Count.) (fields "count")))))
+     ;; test .persistentAggregate
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence") (Split.) (fields "word"))
+                      (.groupBy (fields "word"))
+                      (.persistentAggregate (StateSpec. (MemoryMapState$Factory.)) (fields "non-existent") (Count.) (fields "count")))))
+     ;; test .partitionPersist
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence") (Split.) (fields "word"))
+                      (.groupBy (fields "word"))
+                      (.partitionPersist (StateSpec. (MemoryMapState$Factory.))
+                                         (fields "non-existent")
+                                         (CombinerAggStateUpdater. (Count.))
+                                         (fields "count")))))
+     ;; test .stateQuery
+     (with-drpc [drpc]
+       (is (thrown? IllegalArgumentException
+                    (-> topo
+                        (.newDRPCStream "words" drpc)
+                        (.each (fields "args") (Split.) (fields "word"))
+                        (.groupBy (fields "word"))
+                        (.stateQuery word-counts (fields "word1") (MapGet.) (fields "count"))))))
+     )))
+
+;; (deftest test-split-merge
+;;   (t/with-local-cluster [cluster]
+;;     (with-drpc [drpc]
+;;       (letlocals
+;;         (bind topo (TridentTopology.))
+;;         (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
+;;         (bind s1
+;;           (-> drpc-stream
+;;               (.each (fields "args") (Split.) (fields "word"))
+;;               (.project (fields "word"))))
+;;         (bind s2
+;;           (-> drpc-stream
+;;               (.each (fields "args") (StringLength.) (fields "len"))
+;;               (.project (fields "len"))))
+;; 
+;;         (.merge topo [s1 s2])
+;;         (with-topology [cluster topo]
+;;           (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
+;;           (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
+;;           )))))