You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/15 16:15:27 UTC
[3/6] storm git commit: STORM-1179: Create Maven Profiles for
Integration Tests - Mark Clojure tests as integration tests
http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/storm/trident/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/storm/trident/integration_test.clj b/storm-core/test/clj/storm/trident/integration_test.clj
deleted file mode 100644
index ac3bbea..0000000
--- a/storm-core/test/clj/storm/trident/integration_test.clj
+++ /dev/null
@@ -1,292 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns storm.trident.integration-test
- (:use [clojure test])
- (:require [backtype.storm [testing :as t]])
- (:import [storm.trident.testing Split CountAsAggregator StringLength TrueFilter
- MemoryMapState$Factory])
- (:import [storm.trident.state StateSpec])
- (:import [storm.trident.operation.impl CombinerAggStateUpdater])
- (:use [storm.trident testing])
- (:use [backtype.storm util]))
-
-(bootstrap-imports)
-
-(deftest test-memory-map-get-tuples
- (t/with-local-cluster [cluster]
- (with-drpc [drpc]
- (letlocals
- (bind topo (TridentTopology.))
- (bind feeder (feeder-spout ["sentence"]))
- (bind word-counts
- (-> topo
- (.newStream "tester" feeder)
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
- (.parallelismHint 6)
- ))
- (-> topo
- (.newDRPCStream "all-tuples" drpc)
- (.broadcast)
- (.stateQuery word-counts (fields "args") (TupleCollectionGet.) (fields "word" "count"))
- (.project (fields "word" "count")))
- (with-topology [cluster topo]
- (feed feeder [["hello the man said"] ["the"]])
- (is (= #{["hello" 1] ["said" 1] ["the" 2] ["man" 1]}
- (into #{} (exec-drpc drpc "all-tuples" "man"))))
- (feed feeder [["the foo"]])
- (is (= #{["hello" 1] ["said" 1] ["the" 3] ["man" 1] ["foo" 1]}
- (into #{} (exec-drpc drpc "all-tuples" "man")))))))))
-
-(deftest test-word-count
- (t/with-local-cluster [cluster]
- (with-drpc [drpc]
- (letlocals
- (bind topo (TridentTopology.))
- (bind feeder (feeder-spout ["sentence"]))
- (bind word-counts
- (-> topo
- (.newStream "tester" feeder)
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
- (.parallelismHint 6)
- ))
- (-> topo
- (.newDRPCStream "words" drpc)
- (.each (fields "args") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
- (.aggregate (fields "count") (Sum.) (fields "sum"))
- (.project (fields "sum")))
- (with-topology [cluster topo]
- (feed feeder [["hello the man said"] ["the"]])
- (is (= [[2]] (exec-drpc drpc "words" "the")))
- (is (= [[1]] (exec-drpc drpc "words" "hello")))
- (feed feeder [["the man on the moon"] ["where are you"]])
- (is (= [[4]] (exec-drpc drpc "words" "the")))
- (is (= [[2]] (exec-drpc drpc "words" "man")))
- (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
- )))))
-
-;; this test reproduces a bug where committer spouts freeze processing when
-;; there's at least one repartitioning after the spout
-(deftest test-word-count-committer-spout
- (t/with-local-cluster [cluster]
- (with-drpc [drpc]
- (letlocals
- (bind topo (TridentTopology.))
- (bind feeder (feeder-committer-spout ["sentence"]))
- (.setWaitToEmit feeder false) ;;this causes lots of empty batches
- (bind word-counts
- (-> topo
- (.newStream "tester" feeder)
- (.parallelismHint 2)
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
- (.parallelismHint 6)
- ))
- (-> topo
- (.newDRPCStream "words" drpc)
- (.each (fields "args") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
- (.aggregate (fields "count") (Sum.) (fields "sum"))
- (.project (fields "sum")))
- (with-topology [cluster topo]
- (feed feeder [["hello the man said"] ["the"]])
- (is (= [[2]] (exec-drpc drpc "words" "the")))
- (is (= [[1]] (exec-drpc drpc "words" "hello")))
- (Thread/sleep 1000) ;; this is necessary to reproduce the bug where committer spouts freeze processing
- (feed feeder [["the man on the moon"] ["where are you"]])
- (is (= [[4]] (exec-drpc drpc "words" "the")))
- (is (= [[2]] (exec-drpc drpc "words" "man")))
- (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
- (feed feeder [["the the"]])
- (is (= [[6]] (exec-drpc drpc "words" "the")))
- (feed feeder [["the"]])
- (is (= [[7]] (exec-drpc drpc "words" "the")))
- )))))
-
-
-(deftest test-count-agg
- (t/with-local-cluster [cluster]
- (with-drpc [drpc]
- (letlocals
- (bind topo (TridentTopology.))
- (-> topo
- (.newDRPCStream "numwords" drpc)
- (.each (fields "args") (Split.) (fields "word"))
- (.aggregate (CountAsAggregator.) (fields "count"))
- (.parallelismHint 2) ;;this makes sure batchGlobal is working correctly
- (.project (fields "count")))
- (with-topology [cluster topo]
- (doseq [i (range 100)]
- (is (= [[1]] (exec-drpc drpc "numwords" "the"))))
- (is (= [[0]] (exec-drpc drpc "numwords" "")))
- (is (= [[8]] (exec-drpc drpc "numwords" "1 2 3 4 5 6 7 8")))
- )))))
-
-(deftest test-split-merge
- (t/with-local-cluster [cluster]
- (with-drpc [drpc]
- (letlocals
- (bind topo (TridentTopology.))
- (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
- (bind s1
- (-> drpc-stream
- (.each (fields "args") (Split.) (fields "word"))
- (.project (fields "word"))))
- (bind s2
- (-> drpc-stream
- (.each (fields "args") (StringLength.) (fields "len"))
- (.project (fields "len"))))
-
- (.merge topo [s1 s2])
- (with-topology [cluster topo]
- (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
- (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
- )))))
-
-(deftest test-multiple-groupings-same-stream
- (t/with-local-cluster [cluster]
- (with-drpc [drpc]
- (letlocals
- (bind topo (TridentTopology.))
- (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
- (.each (fields "args") (TrueFilter.))))
- (bind s1
- (-> drpc-stream
- (.groupBy (fields "args"))
- (.aggregate (CountAsAggregator.) (fields "count"))))
- (bind s2
- (-> drpc-stream
- (.groupBy (fields "args"))
- (.aggregate (CountAsAggregator.) (fields "count"))))
-
- (.merge topo [s1 s2])
- (with-topology [cluster topo]
- (is (t/ms= [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the")))
- (is (t/ms= [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa")))
- )))))
-
-(deftest test-multi-repartition
- (t/with-local-cluster [cluster]
- (with-drpc [drpc]
- (letlocals
- (bind topo (TridentTopology.))
- (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
- (.each (fields "args") (Split.) (fields "word"))
- (.localOrShuffle)
- (.shuffle)
- (.aggregate (CountAsAggregator.) (fields "count"))
- ))
- (with-topology [cluster topo]
- (is (t/ms= [[2]] (exec-drpc drpc "tester" "the man")))
- (is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa")))
- )))))
-
-(deftest test-stream-projection-validation
- (t/with-local-cluster [cluster]
- (letlocals
- (bind feeder (feeder-committer-spout ["sentence"]))
- (bind topo (TridentTopology.))
- ;; valid projection fields will not throw exceptions
- (bind word-counts
- (-> topo
- (.newStream "tester" feeder)
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
- (.parallelismHint 6)
- ))
- (bind stream (-> topo
- (.newStream "tester" feeder)))
- ;; test .each
- (is (thrown? IllegalArgumentException
- (-> stream
- (.each (fields "sentence1") (Split.) (fields "word")))))
- ;; test .groupBy
- (is (thrown? IllegalArgumentException
- (-> stream
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word1")))))
- ;; test .aggregate
- (is (thrown? IllegalArgumentException
- (-> stream
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.aggregate (fields "word1") (Count.) (fields "count")))))
- ;; test .project
- (is (thrown? IllegalArgumentException
- (-> stream
- (.project (fields "sentence1")))))
- ;; test .partitionBy
- (is (thrown? IllegalArgumentException
- (-> stream
- (.partitionBy (fields "sentence1")))))
- ;; test .partitionAggregate
- (is (thrown? IllegalArgumentException
- (-> stream
- (.each (fields "sentence") (Split.) (fields "word"))
- (.partitionAggregate (fields "word1") (Count.) (fields "count")))))
- ;; test .persistentAggregate
- (is (thrown? IllegalArgumentException
- (-> stream
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.persistentAggregate (StateSpec. (MemoryMapState$Factory.)) (fields "non-existent") (Count.) (fields "count")))))
- ;; test .partitionPersist
- (is (thrown? IllegalArgumentException
- (-> stream
- (.each (fields "sentence") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.partitionPersist (StateSpec. (MemoryMapState$Factory.))
- (fields "non-existent")
- (CombinerAggStateUpdater. (Count.))
- (fields "count")))))
- ;; test .stateQuery
- (with-drpc [drpc]
- (is (thrown? IllegalArgumentException
- (-> topo
- (.newDRPCStream "words" drpc)
- (.each (fields "args") (Split.) (fields "word"))
- (.groupBy (fields "word"))
- (.stateQuery word-counts (fields "word1") (MapGet.) (fields "count"))))))
- )))
-
-;; (deftest test-split-merge
-;; (t/with-local-cluster [cluster]
-;; (with-drpc [drpc]
-;; (letlocals
-;; (bind topo (TridentTopology.))
-;; (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
-;; (bind s1
-;; (-> drpc-stream
-;; (.each (fields "args") (Split.) (fields "word"))
-;; (.project (fields "word"))))
-;; (bind s2
-;; (-> drpc-stream
-;; (.each (fields "args") (StringLength.) (fields "len"))
-;; (.project (fields "len"))))
-;;
-;; (.merge topo [s1 s2])
-;; (with-topology [cluster topo]
-;; (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
-;; (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
-;; )))))