You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/15 16:15:28 UTC
[4/6] storm git commit: STORM-1179: Create Maven Profiles for
Integration Tests - Mark Clojure tests as integration tests
STORM-1179: Create Maven Profiles for Integration Tests
- Mark Clojure tests as integration tests
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d4fcc0fd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d4fcc0fd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d4fcc0fd
Branch: refs/heads/master
Commit: d4fcc0fde96c667d1763fe5626ddf1284e4a7f70
Parents: 8f1b4fb
Author: Hugo Louro <hm...@gmail.com>
Authored: Wed Dec 9 16:50:28 2015 -0800
Committer: Hugo Louro <hm...@gmail.com>
Committed: Wed Dec 9 18:58:22 2015 -0800
----------------------------------------------------------------------
.../clj/backtype/storm/integration_test.clj | 622 -------------------
.../test/clj/backtype/storm/testing4j_test.clj | 212 -------
.../backtype/storm/integration_test.clj | 622 +++++++++++++++++++
.../backtype/storm/testing4j_test.clj | 212 +++++++
.../storm/trident/integration_test.clj | 292 +++++++++
.../test/clj/storm/trident/integration_test.clj | 292 ---------
6 files changed, 1126 insertions(+), 1126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/backtype/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/integration_test.clj b/storm-core/test/clj/backtype/storm/integration_test.clj
deleted file mode 100644
index cc0208d..0000000
--- a/storm-core/test/clj/backtype/storm/integration_test.clj
+++ /dev/null
@@ -1,622 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.integration-test
- (:use [clojure test])
- (:import [backtype.storm Config])
- (:import [backtype.storm.topology TopologyBuilder])
- (:import [backtype.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions])
- (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount
- TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
- (:import [backtype.storm.tuple Fields])
- (:use [backtype.storm testing config clojure util])
- (:use [backtype.storm.daemon common])
- (:require [backtype.storm [thrift :as thrift]]))
-
-(deftest test-basic-topology
- (doseq [zmq-on? [true false]]
- (with-simulated-time-local-cluster [cluster :supervisors 4
- :daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
- (let [topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
- {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
- "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
- "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
- })
- results (complete-topology cluster
- topology
- :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
- :storm-conf {TOPOLOGY-WORKERS 2})]
- (is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
- (read-tuples results "1")))
- (is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
- (read-tuples results "2")))
- (is (= [[1] [2] [3] [4]]
- (read-tuples results "3")))
- (is (= [[1] [2] [3] [4]]
- (read-tuples results "4")))
- ))))
-
-(defbolt emit-task-id ["tid"] {:prepare true}
- [conf context collector]
- (let [tid (.getThisTaskIndex context)]
- (bolt
- (execute [tuple]
- (emit-bolt! collector [tid] :anchor tuple)
- (ack! collector tuple)
- ))))
-
-(deftest test-multi-tasks-per-executor
- (with-simulated-time-local-cluster [cluster :supervisors 4]
- (let [topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true))}
- {"2" (thrift/mk-bolt-spec {"1" :all} emit-task-id
- :parallelism-hint 3
- :conf {TOPOLOGY-TASKS 6})
- })
- results (complete-topology cluster
- topology
- :mock-sources {"1" [["a"]]})]
- (is (ms= [[0] [1] [2] [3] [4] [5]]
- (read-tuples results "2")))
- )))
-
-(defbolt ack-every-other {} {:prepare true}
- [conf context collector]
- (let [state (atom -1)]
- (bolt
- (execute [tuple]
- (let [val (swap! state -)]
- (when (pos? val)
- (ack! collector tuple)
- ))))))
-
-(defn assert-loop [afn ids]
- (while (not (every? afn ids))
- (Thread/sleep 1)))
-
-(defn assert-acked [tracker & ids]
- (assert-loop #(.isAcked tracker %) ids))
-
-(defn assert-failed [tracker & ids]
- (assert-loop #(.isFailed tracker %) ids))
-
-(deftest test-timeout
- (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
- (let [feeder (feeder-spout ["field1"])
- tracker (AckFailMapTracker.)
- _ (.setAckFailDelegate feeder tracker)
- topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec feeder)}
- {"2" (thrift/mk-bolt-spec {"1" :global} ack-every-other)})]
- (submit-local-topology (:nimbus cluster)
- "timeout-tester"
- {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
- topology)
- (advance-cluster-time cluster 11)
- (.feed feeder ["a"] 1)
- (.feed feeder ["b"] 2)
- (.feed feeder ["c"] 3)
- (advance-cluster-time cluster 9)
- (assert-acked tracker 1 3)
- (is (not (.isFailed tracker 2)))
- (advance-cluster-time cluster 12)
- (assert-failed tracker 2)
- )))
-
-(defn mk-validate-topology-1 []
- (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
- {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
-
-(defn mk-invalidate-topology-1 []
- (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
- {"2" (thrift/mk-bolt-spec {"3" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
-
-(defn mk-invalidate-topology-2 []
- (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
- {"2" (thrift/mk-bolt-spec {"1" ["non-exists-field"]} (TestWordCounter.) :parallelism-hint 4)}))
-
-(defn mk-invalidate-topology-3 []
- (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
- {"2" (thrift/mk-bolt-spec {["1" "non-exists-stream"] ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
-
-(defn try-complete-wc-topology [cluster topology]
- (try (do
- (complete-topology cluster
- topology
- :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
- :storm-conf {TOPOLOGY-WORKERS 2})
- false)
- (catch InvalidTopologyException e true)))
-
-(deftest test-validate-topology-structure
- (with-simulated-time-local-cluster [cluster :supervisors 4]
- (let [any-error1? (try-complete-wc-topology cluster (mk-validate-topology-1))
- any-error2? (try-complete-wc-topology cluster (mk-invalidate-topology-1))
- any-error3? (try-complete-wc-topology cluster (mk-invalidate-topology-2))
- any-error4? (try-complete-wc-topology cluster (mk-invalidate-topology-3))]
- (is (= any-error1? false))
- (is (= any-error2? true))
- (is (= any-error3? true))
- (is (= any-error4? true)))))
-
-(defbolt identity-bolt ["num"]
- [tuple collector]
- (emit-bolt! collector (.getValues tuple) :anchor tuple)
- (ack! collector tuple))
-
-(deftest test-system-stream
- ;; this test works because mocking a spout splits up the tuples evenly among the tasks
- (with-simulated-time-local-cluster [cluster]
- (let [topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :p 3)}
- {"2" (thrift/mk-bolt-spec {"1" ["word"] ["1" "__system"] :global} identity-bolt :p 1)
- })
- results (complete-topology cluster
- topology
- :mock-sources {"1" [["a"] ["b"] ["c"]]}
- :storm-conf {TOPOLOGY-WORKERS 2})]
- (is (ms= [["a"] ["b"] ["c"] ["startup"] ["startup"] ["startup"]]
- (read-tuples results "2")))
- )))
-
-(defn ack-tracking-feeder [fields]
- (let [tracker (AckTracker.)]
- [(doto (feeder-spout fields)
- (.setAckFailDelegate tracker))
- (fn [val]
- (is (= (.getNumAcks tracker) val))
- (.resetNumAcks tracker)
- )]
- ))
-
-(defbolt branching-bolt ["num"]
- {:params [amt]}
- [tuple collector]
- (doseq [i (range amt)]
- (emit-bolt! collector [i] :anchor tuple))
- (ack! collector tuple))
-
-(defbolt agg-bolt ["num"] {:prepare true :params [amt]}
- [conf context collector]
- (let [seen (atom [])]
- (bolt
- (execute [tuple]
- (swap! seen conj tuple)
- (when (= (count @seen) amt)
- (emit-bolt! collector [1] :anchor @seen)
- (doseq [s @seen]
- (ack! collector s))
- (reset! seen [])
- )))
- ))
-
-(defbolt ack-bolt {}
- [tuple collector]
- (ack! collector tuple))
-
-(deftest test-acking
- (with-tracked-cluster [cluster]
- (let [[feeder1 checker1] (ack-tracking-feeder ["num"])
- [feeder2 checker2] (ack-tracking-feeder ["num"])
- [feeder3 checker3] (ack-tracking-feeder ["num"])
- tracked (mk-tracked-topology
- cluster
- (topology
- {"1" (spout-spec feeder1)
- "2" (spout-spec feeder2)
- "3" (spout-spec feeder3)}
- {"4" (bolt-spec {"1" :shuffle} (branching-bolt 2))
- "5" (bolt-spec {"2" :shuffle} (branching-bolt 4))
- "6" (bolt-spec {"3" :shuffle} (branching-bolt 1))
- "7" (bolt-spec
- {"4" :shuffle
- "5" :shuffle
- "6" :shuffle}
- (agg-bolt 3))
- "8" (bolt-spec {"7" :shuffle} (branching-bolt 2))
- "9" (bolt-spec {"8" :shuffle} ack-bolt)}
- ))]
- (submit-local-topology (:nimbus cluster)
- "acking-test1"
- {}
- (:topology tracked))
- (advance-cluster-time cluster 11)
- (.feed feeder1 [1])
- (tracked-wait tracked 1)
- (checker1 0)
- (.feed feeder2 [1])
- (tracked-wait tracked 1)
- (checker1 1)
- (checker2 1)
- (.feed feeder1 [1])
- (tracked-wait tracked 1)
- (checker1 0)
- (.feed feeder1 [1])
- (tracked-wait tracked 1)
- (checker1 1)
- (.feed feeder3 [1])
- (tracked-wait tracked 1)
- (checker1 0)
- (checker3 0)
- (.feed feeder2 [1])
- (tracked-wait tracked 1)
- (checker1 1)
- (checker2 1)
- (checker3 1)
-
- )))
-
-(deftest test-ack-branching
- (with-tracked-cluster [cluster]
- (let [[feeder checker] (ack-tracking-feeder ["num"])
- tracked (mk-tracked-topology
- cluster
- (topology
- {"1" (spout-spec feeder)}
- {"2" (bolt-spec {"1" :shuffle} identity-bolt)
- "3" (bolt-spec {"1" :shuffle} identity-bolt)
- "4" (bolt-spec
- {"2" :shuffle
- "3" :shuffle}
- (agg-bolt 4))}))]
- (submit-local-topology (:nimbus cluster)
- "test-acking2"
- {}
- (:topology tracked))
- (advance-cluster-time cluster 11)
- (.feed feeder [1])
- (tracked-wait tracked 1)
- (checker 0)
- (.feed feeder [1])
- (tracked-wait tracked 1)
- (checker 2)
- )))
-
-(defbolt dup-anchor ["num"]
- [tuple collector]
- (emit-bolt! collector [1] :anchor [tuple tuple])
- (ack! collector tuple))
-
-(def bolt-prepared? (atom false))
-(defbolt prepare-tracked-bolt [] {:prepare true}
- [conf context collector]
- (reset! bolt-prepared? true)
- (bolt
- (execute [tuple]
- (ack! collector tuple))))
-
-(def spout-opened? (atom false))
-(defspout open-tracked-spout ["val"]
- [conf context collector]
- (reset! spout-opened? true)
- (spout
- (nextTuple [])))
-
-(deftest test-submit-inactive-topology
- (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
- (let [feeder (feeder-spout ["field1"])
- tracker (AckFailMapTracker.)
- _ (.setAckFailDelegate feeder tracker)
- topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec feeder)
- "2" (thrift/mk-spout-spec open-tracked-spout)}
- {"3" (thrift/mk-bolt-spec {"1" :global} prepare-tracked-bolt)})]
- (reset! bolt-prepared? false)
- (reset! spout-opened? false)
-
- (submit-local-topology-with-opts (:nimbus cluster)
- "test"
- {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
- topology
- (SubmitOptions. TopologyInitialStatus/INACTIVE))
- (advance-cluster-time cluster 11)
- (.feed feeder ["a"] 1)
- (advance-cluster-time cluster 9)
- (is (not @bolt-prepared?))
- (is (not @spout-opened?))
- (.activate (:nimbus cluster) "test")
-
- (advance-cluster-time cluster 12)
- (assert-acked tracker 1)
- (is @bolt-prepared?)
- (is @spout-opened?))))
-
-(deftest test-acking-self-anchor
- (with-tracked-cluster [cluster]
- (let [[feeder checker] (ack-tracking-feeder ["num"])
- tracked (mk-tracked-topology
- cluster
- (topology
- {"1" (spout-spec feeder)}
- {"2" (bolt-spec {"1" :shuffle} dup-anchor)
- "3" (bolt-spec {"2" :shuffle} ack-bolt)}))]
- (submit-local-topology (:nimbus cluster)
- "test"
- {}
- (:topology tracked))
- (advance-cluster-time cluster 11)
- (.feed feeder [1])
- (tracked-wait tracked 1)
- (checker 1)
- (.feed feeder [1])
- (.feed feeder [1])
- (.feed feeder [1])
- (tracked-wait tracked 3)
- (checker 3)
- )))
-
-;; (defspout ConstantSpout ["val"] {:prepare false}
-;; [collector]
-;; (Time/sleep 100)
-;; (emit-spout! collector [1]))
-
-;; (def errored (atom false))
-;; (def restarted (atom false))
-
-;; (defbolt local-error-checker {} [tuple collector]
-;; (when-not @errored
-;; (reset! errored true)
-;; (println "erroring")
-;; (throw (RuntimeException.)))
-;; (when-not @restarted (println "restarted"))
-;; (reset! restarted true))
-
-;; (deftest test-no-halt-local-mode
-;; (with-simulated-time-local-cluster [cluster]
-;; (let [topology (topology
-;; {1 (spout-spec ConstantSpout)}
-;; {2 (bolt-spec {1 :shuffle} local-error-checker)
-;; })]
-;; (submit-local-topology (:nimbus cluster)
-;; "test"
-;; {}
-;; topology)
-;; (while (not @restarted)
-;; (advance-time-ms! 100))
-;; )))
-
-(defspout IncSpout ["word"]
- [conf context collector]
- (let [state (atom 0)]
- (spout
- (nextTuple []
- (Thread/sleep 100)
- (emit-spout! collector [@state] :id 1)
- )
- (ack [id]
- (swap! state inc))
- )))
-
-
-(defspout IncSpout2 ["word"] {:params [prefix]}
- [conf context collector]
- (let [state (atom 0)]
- (spout
- (nextTuple []
- (Thread/sleep 100)
- (swap! state inc)
- (emit-spout! collector [(str prefix "-" @state)])
- )
- )))
-
-;; (deftest test-clojure-spout
-;; (with-local-cluster [cluster]
-;; (let [nimbus (:nimbus cluster)
-;; top (topology
-;; {1 (spout-spec IncSpout)}
-;; {}
-;; )]
-;; (submit-local-topology nimbus
-;; "spout-test"
-;; {TOPOLOGY-DEBUG true
-;; TOPOLOGY-MESSAGE-TIMEOUT-SECS 3}
-;; top)
-;; (Thread/sleep 10000)
-;; (.killTopology nimbus "spout-test")
-;; (Thread/sleep 10000)
-;; )))
-
-(deftest test-kryo-decorators-config
- (with-simulated-time-local-cluster [cluster
- :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
- TOPOLOGY-KRYO-DECORATORS ["this-is-overriden"]}]
- (letlocals
- (bind builder (TopologyBuilder.))
- (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
- (-> builder
- (.setBolt "2"
- (TestConfBolt.
- {TOPOLOGY-KRYO-DECORATORS ["one" "two"]}))
- (.shuffleGrouping "1"))
-
- (bind results
- (complete-topology cluster
- (.createTopology builder)
- :storm-conf {TOPOLOGY-KRYO-DECORATORS ["one" "three"]}
- :mock-sources {"1" [[TOPOLOGY-KRYO-DECORATORS]]}))
- (is (= {"topology.kryo.decorators" (list "one" "two" "three")}
- (->> (read-tuples results "2")
- (apply concat)
- (apply hash-map)))))))
-
-(deftest test-component-specific-config
- (with-simulated-time-local-cluster [cluster
- :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true}]
- (letlocals
- (bind builder (TopologyBuilder.))
- (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
- (-> builder
- (.setBolt "2"
- (TestConfBolt.
- {"fake.config" 123
- TOPOLOGY-MAX-TASK-PARALLELISM 20
- TOPOLOGY-MAX-SPOUT-PENDING 30
- TOPOLOGY-KRYO-REGISTER [{"fake.type" "bad.serializer"}
- {"fake.type2" "a.serializer"}]
- }))
- (.shuffleGrouping "1")
- (.setMaxTaskParallelism (int 2))
- (.addConfiguration "fake.config2" 987)
- )
-
-
- (bind results
- (complete-topology cluster
- (.createTopology builder)
- :storm-conf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer" "fake.type3" "a.serializer3"}]}
- :mock-sources {"1" [["fake.config"]
- [TOPOLOGY-MAX-TASK-PARALLELISM]
- [TOPOLOGY-MAX-SPOUT-PENDING]
- ["fake.config2"]
- [TOPOLOGY-KRYO-REGISTER]
- ]}))
- (is (= {"fake.config" 123
- "fake.config2" 987
- TOPOLOGY-MAX-TASK-PARALLELISM 2
- TOPOLOGY-MAX-SPOUT-PENDING 30
- TOPOLOGY-KRYO-REGISTER {"fake.type" "good.serializer"
- "fake.type2" "a.serializer"
- "fake.type3" "a.serializer3"}}
- (->> (read-tuples results "2")
- (apply concat)
- (apply hash-map))
- ))
- )))
-
-(defbolt hooks-bolt ["emit" "ack" "fail" "executed"] {:prepare true}
- [conf context collector]
- (let [acked (atom 0)
- failed (atom 0)
- executed (atom 0)
- emitted (atom 0)]
- (.addTaskHook context
- (reify backtype.storm.hooks.ITaskHook
- (prepare [this conf context]
- )
- (cleanup [this]
- )
- (emit [this info]
- (swap! emitted inc))
- (boltAck [this info]
- (swap! acked inc))
- (boltFail [this info]
- (swap! failed inc))
- (boltExecute [this info]
- (swap! executed inc))
- ))
- (bolt
- (execute [tuple]
- (emit-bolt! collector [@emitted @acked @failed @executed])
- (if (= 0 (- @acked @failed))
- (ack! collector tuple)
- (fail! collector tuple))
- ))))
-
-(deftest test-hooks
- (with-simulated-time-local-cluster [cluster]
- (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])))
- }
- {"2" (bolt-spec {"1" :shuffle}
- hooks-bolt)
- })
- results (complete-topology cluster
- topology
- :mock-sources {"1" [[1]
- [1]
- [1]
- [1]
- ]})]
- (is (= [[0 0 0 0]
- [2 1 0 1]
- [4 1 1 2]
- [6 2 1 3]]
- (read-tuples results "2")
- )))))
-
-(defbolt report-errors-bolt {}
- [tuple collector]
- (doseq [i (range (.getValue tuple 0))]
- (report-error! collector (RuntimeException.)))
- (ack! collector tuple))
-
-(deftest test-throttled-errors
- (with-simulated-time
- (with-tracked-cluster [cluster]
- (let [state (:storm-cluster-state cluster)
- [feeder checker] (ack-tracking-feeder ["num"])
- tracked (mk-tracked-topology
- cluster
- (topology
- {"1" (spout-spec feeder)}
- {"2" (bolt-spec {"1" :shuffle} report-errors-bolt)}))
- _ (submit-local-topology (:nimbus cluster)
- "test-errors"
- {TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS 10
- TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL 4
- TOPOLOGY-DEBUG true
- }
- (:topology tracked))
- _ (advance-cluster-time cluster 11)
- storm-id (get-storm-id state "test-errors")
- errors-count (fn [] (count (.errors state storm-id "2")))]
-
- (is (nil? (.last-error state storm-id "2")))
-
- ;; so it launches the topology
- (advance-cluster-time cluster 2)
- (.feed feeder [6])
- (tracked-wait tracked 1)
- (is (= 4 (errors-count)))
- (is (.last-error state storm-id "2"))
-
- (advance-time-secs! 5)
- (.feed feeder [2])
- (tracked-wait tracked 1)
- (is (= 4 (errors-count)))
- (is (.last-error state storm-id "2"))
-
- (advance-time-secs! 6)
- (.feed feeder [2])
- (tracked-wait tracked 1)
- (is (= 6 (errors-count)))
- (is (.last-error state storm-id "2"))
-
- (advance-time-secs! 6)
- (.feed feeder [3])
- (tracked-wait tracked 1)
- (is (= 8 (errors-count)))
- (is (.last-error state storm-id "2"))))))
-
-
-(deftest test-acking-branching-complex
- ;; test acking with branching in the topology
- )
-
-
-(deftest test-fields-grouping
- ;; 1. put a shitload of random tuples through it and test that counts are right
- ;; 2. test that different spouts with different phints group the same way
- )
-
-(deftest test-all-grouping
- )
-
-(deftest test-direct-grouping
- )
http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/backtype/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/testing4j_test.clj b/storm-core/test/clj/backtype/storm/testing4j_test.clj
deleted file mode 100644
index b504f28..0000000
--- a/storm-core/test/clj/backtype/storm/testing4j_test.clj
+++ /dev/null
@@ -1,212 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.testing4j-test
- (:use [clojure.test])
- (:use [backtype.storm config clojure testing util])
- (:require [backtype.storm.integration-test :as it])
- (:require [backtype.storm.thrift :as thrift])
- (:import [backtype.storm Testing Config ILocalCluster])
- (:import [backtype.storm.tuple Values Tuple])
- (:import [backtype.storm.utils Time Utils])
- (:import [backtype.storm.testing MkClusterParam TestJob MockedSources TestWordSpout
- TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
- AckFailMapTracker MkTupleParam]))
-
-(deftest test-with-simulated-time
- (is (= false (Time/isSimulating)))
- (Testing/withSimulatedTime (fn []
- (is (= true (Time/isSimulating)))))
- (is (= false (Time/isSimulating))))
-
-(deftest test-with-local-cluster
- (let [mk-cluster-param (doto (MkClusterParam.)
- (.setSupervisors (int 2))
- (.setPortsPerSupervisor (int 5)))
- daemon-conf (doto (Config.)
- (.put SUPERVISOR-ENABLE false)
- (.put TOPOLOGY-ACKER-EXECUTORS 0))]
- (Testing/withLocalCluster mk-cluster-param (reify TestJob
- (^void run [this ^ILocalCluster cluster]
- (is (not (nil? cluster)))
- (is (not (nil? (.getState cluster))))
- (is (not (nil? (:nimbus (.getState cluster))))))))))
-
-(deftest test-with-simulated-time-local-cluster
- (let [mk-cluster-param (doto (MkClusterParam.)
- (.setSupervisors (int 2)))
- daemon-conf (doto (Config.)
- (.put SUPERVISOR-ENABLE false)
- (.put TOPOLOGY-ACKER-EXECUTORS 0))]
- (is (not (Time/isSimulating)))
- (Testing/withSimulatedTimeLocalCluster mk-cluster-param (reify TestJob
- (^void run [this ^ILocalCluster cluster]
- (is (not (nil? cluster)))
- (is (not (nil? (.getState cluster))))
- (is (not (nil? (:nimbus (.getState cluster)))))
- (is (Time/isSimulating)))))
- (is (not (Time/isSimulating)))))
-
-(deftest test-complete-topology
- (doseq [zmq-on? [true false]
- :let [daemon-conf (doto (Config.)
- (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
- mk-cluster-param (doto (MkClusterParam.)
- (.setSupervisors (int 4))
- (.setDaemonConf daemon-conf))]]
- (Testing/withSimulatedTimeLocalCluster
- (reify TestJob
- (^void run [this ^ILocalCluster cluster]
- (let [topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
- {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
- "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
- "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
- })
- mocked-sources (doto (MockedSources.)
- (.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
- (Values. (into-array ["bob"]))
- (Values. (into-array ["joey"]))
- (Values. (into-array ["nathan"]))])
- ))
- storm-conf (doto (Config.)
- (.setNumWorkers 2))
- complete-topology-param (doto (CompleteTopologyParam.)
- (.setMockedSources mocked-sources)
- (.setStormConf storm-conf))
- results (Testing/completeTopology cluster
- topology
- complete-topology-param)]
- (is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
- (Testing/readTuples results "1")))
- (is (Testing/multiseteq [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
- (read-tuples results "2")))
- (is (= [[1] [2] [3] [4]]
- (Testing/readTuples results "3")))
- (is (= [[1] [2] [3] [4]]
- (Testing/readTuples results "4")))
- ))))))
-
-(deftest test-with-tracked-cluster
- (Testing/withTrackedCluster
- (reify TestJob
- (^void run [this ^ILocalCluster cluster]
- (let [[feeder checker] (it/ack-tracking-feeder ["num"])
- tracked (Testing/mkTrackedTopology
- cluster
- (topology
- {"1" (spout-spec feeder)}
- {"2" (bolt-spec {"1" :shuffle} it/identity-bolt)
- "3" (bolt-spec {"1" :shuffle} it/identity-bolt)
- "4" (bolt-spec
- {"2" :shuffle
- "3" :shuffle}
- (it/agg-bolt 4))}))]
- (.submitTopology cluster
- "test-acking2"
- (Config.)
- (.getTopology tracked))
- (advance-cluster-time (.getState cluster) 11)
- (.feed feeder [1])
- (Testing/trackedWait tracked (int 1))
- (checker 0)
- (.feed feeder [1])
- (Testing/trackedWait tracked (int 1))
- (checker 2)
- )))))
-
-(deftest test-advance-cluster-time
- (let [daemon-conf (doto (Config.)
- (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true))
- mk-cluster-param (doto (MkClusterParam.)
- (.setDaemonConf daemon-conf))]
- (Testing/withSimulatedTimeLocalCluster
- mk-cluster-param
- (reify TestJob
- (^void run [this ^ILocalCluster cluster]
- (let [feeder (feeder-spout ["field1"])
- tracker (AckFailMapTracker.)
- _ (.setAckFailDelegate feeder tracker)
- topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec feeder)}
- {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
- storm-conf (doto (Config.)
- (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))]
- (.submitTopology cluster
- "timeout-tester"
- storm-conf
- topology)
- (.feed feeder ["a"] 1)
- (.feed feeder ["b"] 2)
- (.feed feeder ["c"] 3)
- (Testing/advanceClusterTime cluster (int 9))
- (it/assert-acked tracker 1 3)
- (is (not (.isFailed tracker 2)))
- (Testing/advanceClusterTime cluster (int 12))
- (it/assert-failed tracker 2)
- ))))))
-
-(deftest test-disable-tuple-timeout
- (let [daemon-conf (doto (Config.)
- (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))
- mk-cluster-param (doto (MkClusterParam.)
- (.setDaemonConf daemon-conf))]
- (Testing/withSimulatedTimeLocalCluster
- mk-cluster-param
- (reify TestJob
- (^void run [this ^ILocalCluster cluster]
- (let [feeder (feeder-spout ["field1"])
- tracker (AckFailMapTracker.)
- _ (.setAckFailDelegate feeder tracker)
- topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec feeder)}
- {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
- storm-conf (doto (Config.)
- (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10)
- (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))]
- (.submitTopology cluster
- "disable-timeout-tester"
- storm-conf
- topology)
- (.feed feeder ["a"] 1)
- (.feed feeder ["b"] 2)
- (.feed feeder ["c"] 3)
- (Testing/advanceClusterTime cluster (int 9))
- (it/assert-acked tracker 1 3)
- (is (not (.isFailed tracker 2)))
- (Testing/advanceClusterTime cluster (int 12))
- (is (not (.isFailed tracker 2)))
- ))))))
-
-(deftest test-test-tuple
- (letlocals
- ;; test the one-param signature
- (bind ^Tuple tuple (Testing/testTuple ["james" "bond"]))
- (is (= ["james" "bond"] (.getValues tuple)))
- (is (= Utils/DEFAULT_STREAM_ID (.getSourceStreamId tuple)))
- (is (= ["field1" "field2"] (-> tuple .getFields .toList)))
- (is (= "component" (.getSourceComponent tuple)))
-
- ;; test the two-params signature
- (bind mk-tuple-param (MkTupleParam.))
- (doto mk-tuple-param
- (.setStream "test-stream")
- (.setComponent "test-component")
- (.setFields (into-array String ["fname" "lname"])))
- (bind ^Tuple tuple (Testing/testTuple ["james" "bond"] mk-tuple-param))
- (is (= ["james" "bond"] (.getValues tuple)))
- (is (= "test-stream" (.getSourceStreamId tuple)))
- (is (= ["fname" "lname"] (-> tuple .getFields .toList)))
- (is (= "test-component" (.getSourceComponent tuple)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/integration/backtype/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/backtype/storm/integration_test.clj b/storm-core/test/clj/integration/backtype/storm/integration_test.clj
new file mode 100644
index 0000000..f5fa501
--- /dev/null
+++ b/storm-core/test/clj/integration/backtype/storm/integration_test.clj
@@ -0,0 +1,622 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns integration.backtype.storm.integration-test
+ (:use [clojure test])
+ (:import [backtype.storm Config])
+ (:import [backtype.storm.topology TopologyBuilder])
+ (:import [backtype.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions])
+ (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount
+ TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
+ (:import [backtype.storm.tuple Fields])
+ (:use [backtype.storm testing config clojure util])
+ (:use [backtype.storm.daemon common])
+ (:require [backtype.storm [thrift :as thrift]]))
+
+(deftest test-basic-topology
+ (doseq [zmq-on? [true false]]
+ (with-simulated-time-local-cluster [cluster :supervisors 4
+ :daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
+ (let [topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+ {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
+ "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
+ "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
+ })
+ results (complete-topology cluster
+ topology
+ :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
+ :storm-conf {TOPOLOGY-WORKERS 2})]
+ (is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
+ (read-tuples results "1")))
+ (is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
+ (read-tuples results "2")))
+ (is (= [[1] [2] [3] [4]]
+ (read-tuples results "3")))
+ (is (= [[1] [2] [3] [4]]
+ (read-tuples results "4")))
+ ))))
+
+(defbolt emit-task-id ["tid"] {:prepare true}
+ [conf context collector]
+ (let [tid (.getThisTaskIndex context)]
+ (bolt
+ (execute [tuple]
+ (emit-bolt! collector [tid] :anchor tuple)
+ (ack! collector tuple)
+ ))))
+
+(deftest test-multi-tasks-per-executor
+ (with-simulated-time-local-cluster [cluster :supervisors 4]
+ (let [topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestWordSpout. true))}
+ {"2" (thrift/mk-bolt-spec {"1" :all} emit-task-id
+ :parallelism-hint 3
+ :conf {TOPOLOGY-TASKS 6})
+ })
+ results (complete-topology cluster
+ topology
+ :mock-sources {"1" [["a"]]})]
+ (is (ms= [[0] [1] [2] [3] [4] [5]]
+ (read-tuples results "2")))
+ )))
+
+(defbolt ack-every-other {} {:prepare true}
+ [conf context collector]
+ (let [state (atom -1)]
+ (bolt
+ (execute [tuple]
+ (let [val (swap! state -)]
+ (when (pos? val)
+ (ack! collector tuple)
+ ))))))
+
+(defn assert-loop [afn ids]
+ (while (not (every? afn ids))
+ (Thread/sleep 1)))
+
+(defn assert-acked [tracker & ids]
+ (assert-loop #(.isAcked tracker %) ids))
+
+(defn assert-failed [tracker & ids]
+ (assert-loop #(.isFailed tracker %) ids))
+
+(deftest test-timeout
+ (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
+ (let [feeder (feeder-spout ["field1"])
+ tracker (AckFailMapTracker.)
+ _ (.setAckFailDelegate feeder tracker)
+ topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec feeder)}
+ {"2" (thrift/mk-bolt-spec {"1" :global} ack-every-other)})]
+ (submit-local-topology (:nimbus cluster)
+ "timeout-tester"
+ {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
+ topology)
+ (advance-cluster-time cluster 11)
+ (.feed feeder ["a"] 1)
+ (.feed feeder ["b"] 2)
+ (.feed feeder ["c"] 3)
+ (advance-cluster-time cluster 9)
+ (assert-acked tracker 1 3)
+ (is (not (.isFailed tracker 2)))
+ (advance-cluster-time cluster 12)
+ (assert-failed tracker 2)
+ )))
+
+(defn mk-validate-topology-1 []
+ (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+ {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+
+(defn mk-invalidate-topology-1 []
+ (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+ {"2" (thrift/mk-bolt-spec {"3" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+
+(defn mk-invalidate-topology-2 []
+ (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+ {"2" (thrift/mk-bolt-spec {"1" ["non-exists-field"]} (TestWordCounter.) :parallelism-hint 4)}))
+
+(defn mk-invalidate-topology-3 []
+ (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+ {"2" (thrift/mk-bolt-spec {["1" "non-exists-stream"] ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+
+(defn try-complete-wc-topology [cluster topology]
+ (try (do
+ (complete-topology cluster
+ topology
+ :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
+ :storm-conf {TOPOLOGY-WORKERS 2})
+ false)
+ (catch InvalidTopologyException e true)))
+
+(deftest test-validate-topology-structure
+ (with-simulated-time-local-cluster [cluster :supervisors 4]
+ (let [any-error1? (try-complete-wc-topology cluster (mk-validate-topology-1))
+ any-error2? (try-complete-wc-topology cluster (mk-invalidate-topology-1))
+ any-error3? (try-complete-wc-topology cluster (mk-invalidate-topology-2))
+ any-error4? (try-complete-wc-topology cluster (mk-invalidate-topology-3))]
+ (is (= any-error1? false))
+ (is (= any-error2? true))
+ (is (= any-error3? true))
+ (is (= any-error4? true)))))
+
+(defbolt identity-bolt ["num"]
+ [tuple collector]
+ (emit-bolt! collector (.getValues tuple) :anchor tuple)
+ (ack! collector tuple))
+
+(deftest test-system-stream
+ ;; this test works because mocking a spout splits up the tuples evenly among the tasks
+ (with-simulated-time-local-cluster [cluster]
+ (let [topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestWordSpout. true) :p 3)}
+ {"2" (thrift/mk-bolt-spec {"1" ["word"] ["1" "__system"] :global} identity-bolt :p 1)
+ })
+ results (complete-topology cluster
+ topology
+ :mock-sources {"1" [["a"] ["b"] ["c"]]}
+ :storm-conf {TOPOLOGY-WORKERS 2})]
+ (is (ms= [["a"] ["b"] ["c"] ["startup"] ["startup"] ["startup"]]
+ (read-tuples results "2")))
+ )))
+
+(defn ack-tracking-feeder [fields]
+ (let [tracker (AckTracker.)]
+ [(doto (feeder-spout fields)
+ (.setAckFailDelegate tracker))
+ (fn [val]
+ (is (= (.getNumAcks tracker) val))
+ (.resetNumAcks tracker)
+ )]
+ ))
+
+(defbolt branching-bolt ["num"]
+ {:params [amt]}
+ [tuple collector]
+ (doseq [i (range amt)]
+ (emit-bolt! collector [i] :anchor tuple))
+ (ack! collector tuple))
+
+(defbolt agg-bolt ["num"] {:prepare true :params [amt]}
+ [conf context collector]
+ (let [seen (atom [])]
+ (bolt
+ (execute [tuple]
+ (swap! seen conj tuple)
+ (when (= (count @seen) amt)
+ (emit-bolt! collector [1] :anchor @seen)
+ (doseq [s @seen]
+ (ack! collector s))
+ (reset! seen [])
+ )))
+ ))
+
+(defbolt ack-bolt {}
+ [tuple collector]
+ (ack! collector tuple))
+
+(deftest test-acking
+ (with-tracked-cluster [cluster]
+ (let [[feeder1 checker1] (ack-tracking-feeder ["num"])
+ [feeder2 checker2] (ack-tracking-feeder ["num"])
+ [feeder3 checker3] (ack-tracking-feeder ["num"])
+ tracked (mk-tracked-topology
+ cluster
+ (topology
+ {"1" (spout-spec feeder1)
+ "2" (spout-spec feeder2)
+ "3" (spout-spec feeder3)}
+ {"4" (bolt-spec {"1" :shuffle} (branching-bolt 2))
+ "5" (bolt-spec {"2" :shuffle} (branching-bolt 4))
+ "6" (bolt-spec {"3" :shuffle} (branching-bolt 1))
+ "7" (bolt-spec
+ {"4" :shuffle
+ "5" :shuffle
+ "6" :shuffle}
+ (agg-bolt 3))
+ "8" (bolt-spec {"7" :shuffle} (branching-bolt 2))
+ "9" (bolt-spec {"8" :shuffle} ack-bolt)}
+ ))]
+ (submit-local-topology (:nimbus cluster)
+ "acking-test1"
+ {}
+ (:topology tracked))
+ (advance-cluster-time cluster 11)
+ (.feed feeder1 [1])
+ (tracked-wait tracked 1)
+ (checker1 0)
+ (.feed feeder2 [1])
+ (tracked-wait tracked 1)
+ (checker1 1)
+ (checker2 1)
+ (.feed feeder1 [1])
+ (tracked-wait tracked 1)
+ (checker1 0)
+ (.feed feeder1 [1])
+ (tracked-wait tracked 1)
+ (checker1 1)
+ (.feed feeder3 [1])
+ (tracked-wait tracked 1)
+ (checker1 0)
+ (checker3 0)
+ (.feed feeder2 [1])
+ (tracked-wait tracked 1)
+ (checker1 1)
+ (checker2 1)
+ (checker3 1)
+
+ )))
+
+(deftest test-ack-branching
+ (with-tracked-cluster [cluster]
+ (let [[feeder checker] (ack-tracking-feeder ["num"])
+ tracked (mk-tracked-topology
+ cluster
+ (topology
+ {"1" (spout-spec feeder)}
+ {"2" (bolt-spec {"1" :shuffle} identity-bolt)
+ "3" (bolt-spec {"1" :shuffle} identity-bolt)
+ "4" (bolt-spec
+ {"2" :shuffle
+ "3" :shuffle}
+ (agg-bolt 4))}))]
+ (submit-local-topology (:nimbus cluster)
+ "test-acking2"
+ {}
+ (:topology tracked))
+ (advance-cluster-time cluster 11)
+ (.feed feeder [1])
+ (tracked-wait tracked 1)
+ (checker 0)
+ (.feed feeder [1])
+ (tracked-wait tracked 1)
+ (checker 2)
+ )))
+
+(defbolt dup-anchor ["num"]
+ [tuple collector]
+ (emit-bolt! collector [1] :anchor [tuple tuple])
+ (ack! collector tuple))
+
+(def bolt-prepared? (atom false))
+(defbolt prepare-tracked-bolt [] {:prepare true}
+ [conf context collector]
+ (reset! bolt-prepared? true)
+ (bolt
+ (execute [tuple]
+ (ack! collector tuple))))
+
+(def spout-opened? (atom false))
+(defspout open-tracked-spout ["val"]
+ [conf context collector]
+ (reset! spout-opened? true)
+ (spout
+ (nextTuple [])))
+
+(deftest test-submit-inactive-topology
+ (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
+ (let [feeder (feeder-spout ["field1"])
+ tracker (AckFailMapTracker.)
+ _ (.setAckFailDelegate feeder tracker)
+ topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec feeder)
+ "2" (thrift/mk-spout-spec open-tracked-spout)}
+ {"3" (thrift/mk-bolt-spec {"1" :global} prepare-tracked-bolt)})]
+ (reset! bolt-prepared? false)
+ (reset! spout-opened? false)
+
+ (submit-local-topology-with-opts (:nimbus cluster)
+ "test"
+ {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
+ topology
+ (SubmitOptions. TopologyInitialStatus/INACTIVE))
+ (advance-cluster-time cluster 11)
+ (.feed feeder ["a"] 1)
+ (advance-cluster-time cluster 9)
+ (is (not @bolt-prepared?))
+ (is (not @spout-opened?))
+ (.activate (:nimbus cluster) "test")
+
+ (advance-cluster-time cluster 12)
+ (assert-acked tracker 1)
+ (is @bolt-prepared?)
+ (is @spout-opened?))))
+
+(deftest test-acking-self-anchor
+ (with-tracked-cluster [cluster]
+ (let [[feeder checker] (ack-tracking-feeder ["num"])
+ tracked (mk-tracked-topology
+ cluster
+ (topology
+ {"1" (spout-spec feeder)}
+ {"2" (bolt-spec {"1" :shuffle} dup-anchor)
+ "3" (bolt-spec {"2" :shuffle} ack-bolt)}))]
+ (submit-local-topology (:nimbus cluster)
+ "test"
+ {}
+ (:topology tracked))
+ (advance-cluster-time cluster 11)
+ (.feed feeder [1])
+ (tracked-wait tracked 1)
+ (checker 1)
+ (.feed feeder [1])
+ (.feed feeder [1])
+ (.feed feeder [1])
+ (tracked-wait tracked 3)
+ (checker 3)
+ )))
+
+;; (defspout ConstantSpout ["val"] {:prepare false}
+;; [collector]
+;; (Time/sleep 100)
+;; (emit-spout! collector [1]))
+
+;; (def errored (atom false))
+;; (def restarted (atom false))
+
+;; (defbolt local-error-checker {} [tuple collector]
+;; (when-not @errored
+;; (reset! errored true)
+;; (println "erroring")
+;; (throw (RuntimeException.)))
+;; (when-not @restarted (println "restarted"))
+;; (reset! restarted true))
+
+;; (deftest test-no-halt-local-mode
+;; (with-simulated-time-local-cluster [cluster]
+;; (let [topology (topology
+;; {1 (spout-spec ConstantSpout)}
+;; {2 (bolt-spec {1 :shuffle} local-error-checker)
+;; })]
+;; (submit-local-topology (:nimbus cluster)
+;; "test"
+;; {}
+;; topology)
+;; (while (not @restarted)
+;; (advance-time-ms! 100))
+;; )))
+
+(defspout IncSpout ["word"]
+ [conf context collector]
+ (let [state (atom 0)]
+ (spout
+ (nextTuple []
+ (Thread/sleep 100)
+ (emit-spout! collector [@state] :id 1)
+ )
+ (ack [id]
+ (swap! state inc))
+ )))
+
+
+(defspout IncSpout2 ["word"] {:params [prefix]}
+ [conf context collector]
+ (let [state (atom 0)]
+ (spout
+ (nextTuple []
+ (Thread/sleep 100)
+ (swap! state inc)
+ (emit-spout! collector [(str prefix "-" @state)])
+ )
+ )))
+
+;; (deftest test-clojure-spout
+;; (with-local-cluster [cluster]
+;; (let [nimbus (:nimbus cluster)
+;; top (topology
+;; {1 (spout-spec IncSpout)}
+;; {}
+;; )]
+;; (submit-local-topology nimbus
+;; "spout-test"
+;; {TOPOLOGY-DEBUG true
+;; TOPOLOGY-MESSAGE-TIMEOUT-SECS 3}
+;; top)
+;; (Thread/sleep 10000)
+;; (.killTopology nimbus "spout-test")
+;; (Thread/sleep 10000)
+;; )))
+
+(deftest test-kryo-decorators-config
+ (with-simulated-time-local-cluster [cluster
+ :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
+ TOPOLOGY-KRYO-DECORATORS ["this-is-overriden"]}]
+ (letlocals
+ (bind builder (TopologyBuilder.))
+ (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
+ (-> builder
+ (.setBolt "2"
+ (TestConfBolt.
+ {TOPOLOGY-KRYO-DECORATORS ["one" "two"]}))
+ (.shuffleGrouping "1"))
+
+ (bind results
+ (complete-topology cluster
+ (.createTopology builder)
+ :storm-conf {TOPOLOGY-KRYO-DECORATORS ["one" "three"]}
+ :mock-sources {"1" [[TOPOLOGY-KRYO-DECORATORS]]}))
+ (is (= {"topology.kryo.decorators" (list "one" "two" "three")}
+ (->> (read-tuples results "2")
+ (apply concat)
+ (apply hash-map)))))))
+
+(deftest test-component-specific-config
+ (with-simulated-time-local-cluster [cluster
+ :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true}]
+ (letlocals
+ (bind builder (TopologyBuilder.))
+ (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
+ (-> builder
+ (.setBolt "2"
+ (TestConfBolt.
+ {"fake.config" 123
+ TOPOLOGY-MAX-TASK-PARALLELISM 20
+ TOPOLOGY-MAX-SPOUT-PENDING 30
+ TOPOLOGY-KRYO-REGISTER [{"fake.type" "bad.serializer"}
+ {"fake.type2" "a.serializer"}]
+ }))
+ (.shuffleGrouping "1")
+ (.setMaxTaskParallelism (int 2))
+ (.addConfiguration "fake.config2" 987)
+ )
+
+
+ (bind results
+ (complete-topology cluster
+ (.createTopology builder)
+ :storm-conf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer" "fake.type3" "a.serializer3"}]}
+ :mock-sources {"1" [["fake.config"]
+ [TOPOLOGY-MAX-TASK-PARALLELISM]
+ [TOPOLOGY-MAX-SPOUT-PENDING]
+ ["fake.config2"]
+ [TOPOLOGY-KRYO-REGISTER]
+ ]}))
+ (is (= {"fake.config" 123
+ "fake.config2" 987
+ TOPOLOGY-MAX-TASK-PARALLELISM 2
+ TOPOLOGY-MAX-SPOUT-PENDING 30
+ TOPOLOGY-KRYO-REGISTER {"fake.type" "good.serializer"
+ "fake.type2" "a.serializer"
+ "fake.type3" "a.serializer3"}}
+ (->> (read-tuples results "2")
+ (apply concat)
+ (apply hash-map))
+ ))
+ )))
+
+(defbolt hooks-bolt ["emit" "ack" "fail" "executed"] {:prepare true}
+ [conf context collector]
+ (let [acked (atom 0)
+ failed (atom 0)
+ executed (atom 0)
+ emitted (atom 0)]
+ (.addTaskHook context
+ (reify backtype.storm.hooks.ITaskHook
+ (prepare [this conf context]
+ )
+ (cleanup [this]
+ )
+ (emit [this info]
+ (swap! emitted inc))
+ (boltAck [this info]
+ (swap! acked inc))
+ (boltFail [this info]
+ (swap! failed inc))
+ (boltExecute [this info]
+ (swap! executed inc))
+ ))
+ (bolt
+ (execute [tuple]
+ (emit-bolt! collector [@emitted @acked @failed @executed])
+ (if (= 0 (- @acked @failed))
+ (ack! collector tuple)
+ (fail! collector tuple))
+ ))))
+
+(deftest test-hooks
+ (with-simulated-time-local-cluster [cluster]
+ (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])))
+ }
+ {"2" (bolt-spec {"1" :shuffle}
+ hooks-bolt)
+ })
+ results (complete-topology cluster
+ topology
+ :mock-sources {"1" [[1]
+ [1]
+ [1]
+ [1]
+ ]})]
+ (is (= [[0 0 0 0]
+ [2 1 0 1]
+ [4 1 1 2]
+ [6 2 1 3]]
+ (read-tuples results "2")
+ )))))
+
+(defbolt report-errors-bolt {}
+ [tuple collector]
+ (doseq [i (range (.getValue tuple 0))]
+ (report-error! collector (RuntimeException.)))
+ (ack! collector tuple))
+
+(deftest test-throttled-errors
+ (with-simulated-time
+ (with-tracked-cluster [cluster]
+ (let [state (:storm-cluster-state cluster)
+ [feeder checker] (ack-tracking-feeder ["num"])
+ tracked (mk-tracked-topology
+ cluster
+ (topology
+ {"1" (spout-spec feeder)}
+ {"2" (bolt-spec {"1" :shuffle} report-errors-bolt)}))
+ _ (submit-local-topology (:nimbus cluster)
+ "test-errors"
+ {TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS 10
+ TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL 4
+ TOPOLOGY-DEBUG true
+ }
+ (:topology tracked))
+ _ (advance-cluster-time cluster 11)
+ storm-id (get-storm-id state "test-errors")
+ errors-count (fn [] (count (.errors state storm-id "2")))]
+
+ (is (nil? (.last-error state storm-id "2")))
+
+ ;; so it launches the topology
+ (advance-cluster-time cluster 2)
+ (.feed feeder [6])
+ (tracked-wait tracked 1)
+ (is (= 4 (errors-count)))
+ (is (.last-error state storm-id "2"))
+
+ (advance-time-secs! 5)
+ (.feed feeder [2])
+ (tracked-wait tracked 1)
+ (is (= 4 (errors-count)))
+ (is (.last-error state storm-id "2"))
+
+ (advance-time-secs! 6)
+ (.feed feeder [2])
+ (tracked-wait tracked 1)
+ (is (= 6 (errors-count)))
+ (is (.last-error state storm-id "2"))
+
+ (advance-time-secs! 6)
+ (.feed feeder [3])
+ (tracked-wait tracked 1)
+ (is (= 8 (errors-count)))
+ (is (.last-error state storm-id "2"))))))
+
+
+(deftest test-acking-branching-complex
+ ;; test acking with branching in the topology
+ )
+
+
+(deftest test-fields-grouping
+ ;; 1. put a shitload of random tuples through it and test that counts are right
+ ;; 2. test that different spouts with different phints group the same way
+ )
+
+(deftest test-all-grouping
+ )
+
+(deftest test-direct-grouping
+ )
http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/integration/backtype/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/backtype/storm/testing4j_test.clj b/storm-core/test/clj/integration/backtype/storm/testing4j_test.clj
new file mode 100644
index 0000000..5cdb182
--- /dev/null
+++ b/storm-core/test/clj/integration/backtype/storm/testing4j_test.clj
@@ -0,0 +1,212 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns integration.backtype.storm.testing4j-test
+ (:use [clojure.test])
+ (:use [backtype.storm config clojure testing util])
+ (:require [integration.backtype.storm.integration-test :as it])
+ (:require [backtype.storm.thrift :as thrift])
+ (:import [backtype.storm Testing Config ILocalCluster])
+ (:import [backtype.storm.tuple Values Tuple])
+ (:import [backtype.storm.utils Time Utils])
+ (:import [backtype.storm.testing MkClusterParam TestJob MockedSources TestWordSpout
+ TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
+ AckFailMapTracker MkTupleParam]))
+
+(deftest test-with-simulated-time
+ (is (= false (Time/isSimulating)))
+ (Testing/withSimulatedTime (fn []
+ (is (= true (Time/isSimulating)))))
+ (is (= false (Time/isSimulating))))
+
+(deftest test-with-local-cluster
+ (let [mk-cluster-param (doto (MkClusterParam.)
+ (.setSupervisors (int 2))
+ (.setPortsPerSupervisor (int 5)))
+ daemon-conf (doto (Config.)
+ (.put SUPERVISOR-ENABLE false)
+ (.put TOPOLOGY-ACKER-EXECUTORS 0))]
+ (Testing/withLocalCluster mk-cluster-param (reify TestJob
+ (^void run [this ^ILocalCluster cluster]
+ (is (not (nil? cluster)))
+ (is (not (nil? (.getState cluster))))
+ (is (not (nil? (:nimbus (.getState cluster))))))))))
+
+(deftest test-with-simulated-time-local-cluster
+ (let [mk-cluster-param (doto (MkClusterParam.)
+ (.setSupervisors (int 2)))
+ daemon-conf (doto (Config.)
+ (.put SUPERVISOR-ENABLE false)
+ (.put TOPOLOGY-ACKER-EXECUTORS 0))]
+ (is (not (Time/isSimulating)))
+ (Testing/withSimulatedTimeLocalCluster mk-cluster-param (reify TestJob
+ (^void run [this ^ILocalCluster cluster]
+ (is (not (nil? cluster)))
+ (is (not (nil? (.getState cluster))))
+ (is (not (nil? (:nimbus (.getState cluster)))))
+ (is (Time/isSimulating)))))
+ (is (not (Time/isSimulating)))))
+
+(deftest test-complete-topology
+ (doseq [zmq-on? [true false]
+ :let [daemon-conf (doto (Config.)
+ (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
+ mk-cluster-param (doto (MkClusterParam.)
+ (.setSupervisors (int 4))
+ (.setDaemonConf daemon-conf))]]
+ (Testing/withSimulatedTimeLocalCluster
+ (reify TestJob
+ (^void run [this ^ILocalCluster cluster]
+ (let [topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+ {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
+ "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
+ "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
+ })
+ mocked-sources (doto (MockedSources.)
+ (.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
+ (Values. (into-array ["bob"]))
+ (Values. (into-array ["joey"]))
+ (Values. (into-array ["nathan"]))])
+ ))
+ storm-conf (doto (Config.)
+ (.setNumWorkers 2))
+ complete-topology-param (doto (CompleteTopologyParam.)
+ (.setMockedSources mocked-sources)
+ (.setStormConf storm-conf))
+ results (Testing/completeTopology cluster
+ topology
+ complete-topology-param)]
+ (is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
+ (Testing/readTuples results "1")))
+ (is (Testing/multiseteq [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
+ (read-tuples results "2")))
+ (is (= [[1] [2] [3] [4]]
+ (Testing/readTuples results "3")))
+ (is (= [[1] [2] [3] [4]]
+ (Testing/readTuples results "4")))
+ ))))))
+
+(deftest test-with-tracked-cluster
+ (Testing/withTrackedCluster
+ (reify TestJob
+ (^void run [this ^ILocalCluster cluster]
+ (let [[feeder checker] (it/ack-tracking-feeder ["num"])
+ tracked (Testing/mkTrackedTopology
+ cluster
+ (topology
+ {"1" (spout-spec feeder)}
+ {"2" (bolt-spec {"1" :shuffle} it/identity-bolt)
+ "3" (bolt-spec {"1" :shuffle} it/identity-bolt)
+ "4" (bolt-spec
+ {"2" :shuffle
+ "3" :shuffle}
+ (it/agg-bolt 4))}))]
+ (.submitTopology cluster
+ "test-acking2"
+ (Config.)
+ (.getTopology tracked))
+ (advance-cluster-time (.getState cluster) 11)
+ (.feed feeder [1])
+ (Testing/trackedWait tracked (int 1))
+ (checker 0)
+ (.feed feeder [1])
+ (Testing/trackedWait tracked (int 1))
+ (checker 2)
+ )))))
+
+(deftest test-advance-cluster-time
+ (let [daemon-conf (doto (Config.)
+ (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true))
+ mk-cluster-param (doto (MkClusterParam.)
+ (.setDaemonConf daemon-conf))]
+ (Testing/withSimulatedTimeLocalCluster
+ mk-cluster-param
+ (reify TestJob
+ (^void run [this ^ILocalCluster cluster]
+ (let [feeder (feeder-spout ["field1"])
+ tracker (AckFailMapTracker.)
+ _ (.setAckFailDelegate feeder tracker)
+ topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec feeder)}
+ {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
+ storm-conf (doto (Config.)
+ (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))]
+ (.submitTopology cluster
+ "timeout-tester"
+ storm-conf
+ topology)
+ (.feed feeder ["a"] 1)
+ (.feed feeder ["b"] 2)
+ (.feed feeder ["c"] 3)
+ (Testing/advanceClusterTime cluster (int 9))
+ (it/assert-acked tracker 1 3)
+ (is (not (.isFailed tracker 2)))
+ (Testing/advanceClusterTime cluster (int 12))
+ (it/assert-failed tracker 2)
+ ))))))
+
+(deftest test-disable-tuple-timeout
+ (let [daemon-conf (doto (Config.)
+ (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))
+ mk-cluster-param (doto (MkClusterParam.)
+ (.setDaemonConf daemon-conf))]
+ (Testing/withSimulatedTimeLocalCluster
+ mk-cluster-param
+ (reify TestJob
+ (^void run [this ^ILocalCluster cluster]
+ (let [feeder (feeder-spout ["field1"])
+ tracker (AckFailMapTracker.)
+ _ (.setAckFailDelegate feeder tracker)
+ topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec feeder)}
+ {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
+ storm-conf (doto (Config.)
+ (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10)
+ (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))]
+ (.submitTopology cluster
+ "disable-timeout-tester"
+ storm-conf
+ topology)
+ (.feed feeder ["a"] 1)
+ (.feed feeder ["b"] 2)
+ (.feed feeder ["c"] 3)
+ (Testing/advanceClusterTime cluster (int 9))
+ (it/assert-acked tracker 1 3)
+ (is (not (.isFailed tracker 2)))
+ (Testing/advanceClusterTime cluster (int 12))
+ (is (not (.isFailed tracker 2)))
+ ))))))
+
+(deftest test-test-tuple
+ (letlocals
+ ;; test the one-param signature
+ (bind ^Tuple tuple (Testing/testTuple ["james" "bond"]))
+ (is (= ["james" "bond"] (.getValues tuple)))
+ (is (= Utils/DEFAULT_STREAM_ID (.getSourceStreamId tuple)))
+ (is (= ["field1" "field2"] (-> tuple .getFields .toList)))
+ (is (= "component" (.getSourceComponent tuple)))
+
+ ;; test the two-params signature
+ (bind mk-tuple-param (MkTupleParam.))
+ (doto mk-tuple-param
+ (.setStream "test-stream")
+ (.setComponent "test-component")
+ (.setFields (into-array String ["fname" "lname"])))
+ (bind ^Tuple tuple (Testing/testTuple ["james" "bond"] mk-tuple-param))
+ (is (= ["james" "bond"] (.getValues tuple)))
+ (is (= "test-stream" (.getSourceStreamId tuple)))
+ (is (= ["fname" "lname"] (-> tuple .getFields .toList)))
+ (is (= "test-component" (.getSourceComponent tuple)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/integration/storm/trident/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/storm/trident/integration_test.clj b/storm-core/test/clj/integration/storm/trident/integration_test.clj
new file mode 100644
index 0000000..bcd8173
--- /dev/null
+++ b/storm-core/test/clj/integration/storm/trident/integration_test.clj
@@ -0,0 +1,292 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns integration.storm.trident.integration-test
+ (:use [clojure test])
+ (:require [backtype.storm [testing :as t]])
+ (:import [storm.trident.testing Split CountAsAggregator StringLength TrueFilter
+ MemoryMapState$Factory])
+ (:import [storm.trident.state StateSpec])
+ (:import [storm.trident.operation.impl CombinerAggStateUpdater])
+ (:use [storm.trident testing])
+ (:use [backtype.storm util]))
+
+(bootstrap-imports)
+
+(deftest test-memory-map-get-tuples
+ (t/with-local-cluster [cluster]
+ (with-drpc [drpc]
+ (letlocals
+ (bind topo (TridentTopology.))
+ (bind feeder (feeder-spout ["sentence"]))
+ (bind word-counts
+ (-> topo
+ (.newStream "tester" feeder)
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+ (.parallelismHint 6)
+ ))
+ (-> topo
+ (.newDRPCStream "all-tuples" drpc)
+ (.broadcast)
+ (.stateQuery word-counts (fields "args") (TupleCollectionGet.) (fields "word" "count"))
+ (.project (fields "word" "count")))
+ (with-topology [cluster topo]
+ (feed feeder [["hello the man said"] ["the"]])
+ (is (= #{["hello" 1] ["said" 1] ["the" 2] ["man" 1]}
+ (into #{} (exec-drpc drpc "all-tuples" "man"))))
+ (feed feeder [["the foo"]])
+ (is (= #{["hello" 1] ["said" 1] ["the" 3] ["man" 1] ["foo" 1]}
+ (into #{} (exec-drpc drpc "all-tuples" "man")))))))))
+
+(deftest test-word-count
+ (t/with-local-cluster [cluster]
+ (with-drpc [drpc]
+ (letlocals
+ (bind topo (TridentTopology.))
+ (bind feeder (feeder-spout ["sentence"]))
+ (bind word-counts
+ (-> topo
+ (.newStream "tester" feeder)
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+ (.parallelismHint 6)
+ ))
+ (-> topo
+ (.newDRPCStream "words" drpc)
+ (.each (fields "args") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
+ (.aggregate (fields "count") (Sum.) (fields "sum"))
+ (.project (fields "sum")))
+ (with-topology [cluster topo]
+ (feed feeder [["hello the man said"] ["the"]])
+ (is (= [[2]] (exec-drpc drpc "words" "the")))
+ (is (= [[1]] (exec-drpc drpc "words" "hello")))
+ (feed feeder [["the man on the moon"] ["where are you"]])
+ (is (= [[4]] (exec-drpc drpc "words" "the")))
+ (is (= [[2]] (exec-drpc drpc "words" "man")))
+ (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
+ )))))
+
+;; this test reproduces a bug where committer spouts freeze processing when
+;; there's at least one repartitioning after the spout
+(deftest test-word-count-committer-spout
+ (t/with-local-cluster [cluster]
+ (with-drpc [drpc]
+ (letlocals
+ (bind topo (TridentTopology.))
+ (bind feeder (feeder-committer-spout ["sentence"]))
+ (.setWaitToEmit feeder false) ;;this causes lots of empty batches
+ (bind word-counts
+ (-> topo
+ (.newStream "tester" feeder)
+ (.parallelismHint 2)
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+ (.parallelismHint 6)
+ ))
+ (-> topo
+ (.newDRPCStream "words" drpc)
+ (.each (fields "args") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
+ (.aggregate (fields "count") (Sum.) (fields "sum"))
+ (.project (fields "sum")))
+ (with-topology [cluster topo]
+ (feed feeder [["hello the man said"] ["the"]])
+ (is (= [[2]] (exec-drpc drpc "words" "the")))
+ (is (= [[1]] (exec-drpc drpc "words" "hello")))
+ (Thread/sleep 1000) ;; this is necessary to reproduce the bug where committer spouts freeze processing
+ (feed feeder [["the man on the moon"] ["where are you"]])
+ (is (= [[4]] (exec-drpc drpc "words" "the")))
+ (is (= [[2]] (exec-drpc drpc "words" "man")))
+ (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
+ (feed feeder [["the the"]])
+ (is (= [[6]] (exec-drpc drpc "words" "the")))
+ (feed feeder [["the"]])
+ (is (= [[7]] (exec-drpc drpc "words" "the")))
+ )))))
+
+
+(deftest test-count-agg
+ (t/with-local-cluster [cluster]
+ (with-drpc [drpc]
+ (letlocals
+ (bind topo (TridentTopology.))
+ (-> topo
+ (.newDRPCStream "numwords" drpc)
+ (.each (fields "args") (Split.) (fields "word"))
+ (.aggregate (CountAsAggregator.) (fields "count"))
+ (.parallelismHint 2) ;;this makes sure batchGlobal is working correctly
+ (.project (fields "count")))
+ (with-topology [cluster topo]
+ (doseq [i (range 100)]
+ (is (= [[1]] (exec-drpc drpc "numwords" "the"))))
+ (is (= [[0]] (exec-drpc drpc "numwords" "")))
+ (is (= [[8]] (exec-drpc drpc "numwords" "1 2 3 4 5 6 7 8")))
+ )))))
+
+(deftest test-split-merge
+ (t/with-local-cluster [cluster]
+ (with-drpc [drpc]
+ (letlocals
+ (bind topo (TridentTopology.))
+ (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
+ (bind s1
+ (-> drpc-stream
+ (.each (fields "args") (Split.) (fields "word"))
+ (.project (fields "word"))))
+ (bind s2
+ (-> drpc-stream
+ (.each (fields "args") (StringLength.) (fields "len"))
+ (.project (fields "len"))))
+
+ (.merge topo [s1 s2])
+ (with-topology [cluster topo]
+ (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
+ (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
+ )))))
+
+(deftest test-multiple-groupings-same-stream
+ (t/with-local-cluster [cluster]
+ (with-drpc [drpc]
+ (letlocals
+ (bind topo (TridentTopology.))
+ (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
+ (.each (fields "args") (TrueFilter.))))
+ (bind s1
+ (-> drpc-stream
+ (.groupBy (fields "args"))
+ (.aggregate (CountAsAggregator.) (fields "count"))))
+ (bind s2
+ (-> drpc-stream
+ (.groupBy (fields "args"))
+ (.aggregate (CountAsAggregator.) (fields "count"))))
+
+ (.merge topo [s1 s2])
+ (with-topology [cluster topo]
+ (is (t/ms= [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the")))
+ (is (t/ms= [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa")))
+ )))))
+
+(deftest test-multi-repartition
+ (t/with-local-cluster [cluster]
+ (with-drpc [drpc]
+ (letlocals
+ (bind topo (TridentTopology.))
+ (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
+ (.each (fields "args") (Split.) (fields "word"))
+ (.localOrShuffle)
+ (.shuffle)
+ (.aggregate (CountAsAggregator.) (fields "count"))
+ ))
+ (with-topology [cluster topo]
+ (is (t/ms= [[2]] (exec-drpc drpc "tester" "the man")))
+ (is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa")))
+ )))))
+
+(deftest test-stream-projection-validation
+ (t/with-local-cluster [cluster]
+ (letlocals
+ (bind feeder (feeder-committer-spout ["sentence"]))
+ (bind topo (TridentTopology.))
+ ;; valid projection fields will not throw exceptions
+ (bind word-counts
+ (-> topo
+ (.newStream "tester" feeder)
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+ (.parallelismHint 6)
+ ))
+ (bind stream (-> topo
+ (.newStream "tester" feeder)))
+ ;; test .each
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.each (fields "sentence1") (Split.) (fields "word")))))
+ ;; test .groupBy
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.groupBy (fields "word1")))))
+ ;; test .aggregate
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.aggregate (fields "word1") (Count.) (fields "count")))))
+ ;; test .project
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.project (fields "sentence1")))))
+ ;; test .partitionBy
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.partitionBy (fields "sentence1")))))
+ ;; test .partitionAggregate
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.partitionAggregate (fields "word1") (Count.) (fields "count")))))
+ ;; test .persistentAggregate
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.persistentAggregate (StateSpec. (MemoryMapState$Factory.)) (fields "non-existent") (Count.) (fields "count")))))
+ ;; test .partitionPersist
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.partitionPersist (StateSpec. (MemoryMapState$Factory.))
+ (fields "non-existent")
+ (CombinerAggStateUpdater. (Count.))
+ (fields "count")))))
+ ;; test .stateQuery
+ (with-drpc [drpc]
+ (is (thrown? IllegalArgumentException
+ (-> topo
+ (.newDRPCStream "words" drpc)
+ (.each (fields "args") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.stateQuery word-counts (fields "word1") (MapGet.) (fields "count"))))))
+ )))
+
+;; (deftest test-split-merge
+;; (t/with-local-cluster [cluster]
+;; (with-drpc [drpc]
+;; (letlocals
+;; (bind topo (TridentTopology.))
+;; (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
+;; (bind s1
+;; (-> drpc-stream
+;; (.each (fields "args") (Split.) (fields "word"))
+;; (.project (fields "word"))))
+;; (bind s2
+;; (-> drpc-stream
+;; (.each (fields "args") (StringLength.) (fields "len"))
+;; (.project (fields "len"))))
+;;
+;; (.merge topo [s1 s2])
+;; (with-topology [cluster topo]
+;; (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
+;; (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
+;; )))))