You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/15 16:15:27 UTC

[3/6] storm git commit: STORM-1179: Create Maven Profiles for Integration Tests - Mark Clojure tests as integration tests

http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/storm/trident/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/storm/trident/integration_test.clj b/storm-core/test/clj/storm/trident/integration_test.clj
deleted file mode 100644
index ac3bbea..0000000
--- a/storm-core/test/clj/storm/trident/integration_test.clj
+++ /dev/null
@@ -1,292 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns storm.trident.integration-test
-  (:use [clojure test])
-  (:require [backtype.storm [testing :as t]])
-  (:import [storm.trident.testing Split CountAsAggregator StringLength TrueFilter
-            MemoryMapState$Factory])
-  (:import [storm.trident.state StateSpec])
-  (:import [storm.trident.operation.impl CombinerAggStateUpdater])
-  (:use [storm.trident testing])
-  (:use [backtype.storm util]))
-  
-(bootstrap-imports)
-
-(deftest test-memory-map-get-tuples
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
-      (letlocals
-        (bind topo (TridentTopology.))
-        (bind feeder (feeder-spout ["sentence"]))
-        (bind word-counts
-          (-> topo
-              (.newStream "tester" feeder)
-              (.each (fields "sentence") (Split.) (fields "word"))
-              (.groupBy (fields "word"))
-              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
-              (.parallelismHint 6)
-              ))       
-        (-> topo
-            (.newDRPCStream "all-tuples" drpc)
-            (.broadcast)
-            (.stateQuery word-counts (fields "args") (TupleCollectionGet.) (fields "word" "count"))
-            (.project (fields "word" "count")))
-        (with-topology [cluster topo]
-          (feed feeder [["hello the man said"] ["the"]])
-          (is (= #{["hello" 1] ["said" 1] ["the" 2] ["man" 1]}
-                 (into #{} (exec-drpc drpc "all-tuples" "man"))))
-          (feed feeder [["the foo"]])
-          (is (= #{["hello" 1] ["said" 1] ["the" 3] ["man" 1] ["foo" 1]}
-                 (into #{} (exec-drpc drpc "all-tuples" "man")))))))))
-
-(deftest test-word-count
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
-      (letlocals
-        (bind topo (TridentTopology.))
-        (bind feeder (feeder-spout ["sentence"]))
-        (bind word-counts
-          (-> topo
-              (.newStream "tester" feeder)
-              (.each (fields "sentence") (Split.) (fields "word"))
-              (.groupBy (fields "word"))
-              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
-              (.parallelismHint 6)
-              ))
-        (-> topo
-            (.newDRPCStream "words" drpc)
-            (.each (fields "args") (Split.) (fields "word"))
-            (.groupBy (fields "word"))
-            (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
-            (.aggregate (fields "count") (Sum.) (fields "sum"))
-            (.project (fields "sum")))
-        (with-topology [cluster topo]
-          (feed feeder [["hello the man said"] ["the"]])
-          (is (= [[2]] (exec-drpc drpc "words" "the")))
-          (is (= [[1]] (exec-drpc drpc "words" "hello")))
-          (feed feeder [["the man on the moon"] ["where are you"]])
-          (is (= [[4]] (exec-drpc drpc "words" "the")))
-          (is (= [[2]] (exec-drpc drpc "words" "man")))
-          (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
-          )))))
-
-;; this test reproduces a bug where committer spouts freeze processing when 
-;; there's at least one repartitioning after the spout
-(deftest test-word-count-committer-spout
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
-      (letlocals
-        (bind topo (TridentTopology.))
-        (bind feeder (feeder-committer-spout ["sentence"]))
-        (.setWaitToEmit feeder false) ;;this causes lots of empty batches
-        (bind word-counts
-          (-> topo
-              (.newStream "tester" feeder)
-              (.parallelismHint 2)
-              (.each (fields "sentence") (Split.) (fields "word"))
-              (.groupBy (fields "word"))
-              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
-              (.parallelismHint 6)
-              ))
-        (-> topo
-            (.newDRPCStream "words" drpc)
-            (.each (fields "args") (Split.) (fields "word"))
-            (.groupBy (fields "word"))
-            (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
-            (.aggregate (fields "count") (Sum.) (fields "sum"))
-            (.project (fields "sum")))
-        (with-topology [cluster topo]
-          (feed feeder [["hello the man said"] ["the"]])
-          (is (= [[2]] (exec-drpc drpc "words" "the")))
-          (is (= [[1]] (exec-drpc drpc "words" "hello")))
-          (Thread/sleep 1000) ;; this is necessary to reproduce the bug where committer spouts freeze processing
-          (feed feeder [["the man on the moon"] ["where are you"]])
-          (is (= [[4]] (exec-drpc drpc "words" "the")))
-          (is (= [[2]] (exec-drpc drpc "words" "man")))
-          (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
-          (feed feeder [["the the"]])
-          (is (= [[6]] (exec-drpc drpc "words" "the")))
-          (feed feeder [["the"]])
-          (is (= [[7]] (exec-drpc drpc "words" "the")))
-          )))))
-
-
-(deftest test-count-agg
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
-      (letlocals
-        (bind topo (TridentTopology.))
-        (-> topo
-            (.newDRPCStream "numwords" drpc)
-            (.each (fields "args") (Split.) (fields "word"))
-            (.aggregate (CountAsAggregator.) (fields "count"))
-            (.parallelismHint 2) ;;this makes sure batchGlobal is working correctly
-            (.project (fields "count")))
-        (with-topology [cluster topo]
-          (doseq [i (range 100)]
-            (is (= [[1]] (exec-drpc drpc "numwords" "the"))))
-          (is (= [[0]] (exec-drpc drpc "numwords" "")))
-          (is (= [[8]] (exec-drpc drpc "numwords" "1 2 3 4 5 6 7 8")))
-          )))))
-          
-(deftest test-split-merge
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
-      (letlocals
-        (bind topo (TridentTopology.))
-        (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
-        (bind s1
-          (-> drpc-stream
-              (.each (fields "args") (Split.) (fields "word"))
-              (.project (fields "word"))))
-        (bind s2
-          (-> drpc-stream
-              (.each (fields "args") (StringLength.) (fields "len"))
-              (.project (fields "len"))))
-
-        (.merge topo [s1 s2])
-        (with-topology [cluster topo]
-          (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
-          (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
-          )))))
-
-(deftest test-multiple-groupings-same-stream
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
-      (letlocals
-        (bind topo (TridentTopology.))
-        (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
-                                   (.each (fields "args") (TrueFilter.))))
-        (bind s1
-          (-> drpc-stream
-              (.groupBy (fields "args"))
-              (.aggregate (CountAsAggregator.) (fields "count"))))
-        (bind s2
-          (-> drpc-stream
-              (.groupBy (fields "args"))
-              (.aggregate (CountAsAggregator.) (fields "count"))))
-
-        (.merge topo [s1 s2])
-        (with-topology [cluster topo]
-          (is (t/ms= [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the")))
-          (is (t/ms= [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa")))
-          )))))
-          
-(deftest test-multi-repartition
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
-      (letlocals
-        (bind topo (TridentTopology.))
-        (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
-                                   (.each (fields "args") (Split.) (fields "word"))
-                                   (.localOrShuffle)
-                                   (.shuffle)
-                                   (.aggregate (CountAsAggregator.) (fields "count"))
-                                   ))
-        (with-topology [cluster topo]
-          (is (t/ms= [[2]] (exec-drpc drpc "tester" "the man")))
-          (is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa")))
-          )))))
-
-(deftest test-stream-projection-validation
-  (t/with-local-cluster [cluster]
-    (letlocals
-     (bind feeder (feeder-committer-spout ["sentence"]))
-     (bind topo (TridentTopology.))
-     ;; valid projection fields will not throw exceptions
-     (bind word-counts
-           (-> topo
-               (.newStream "tester" feeder)
-               (.each (fields "sentence") (Split.) (fields "word"))
-               (.groupBy (fields "word"))
-               (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
-               (.parallelismHint 6)
-               ))
-     (bind stream (-> topo
-                      (.newStream "tester" feeder)))
-     ;; test .each
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.each (fields "sentence1") (Split.) (fields "word")))))
-     ;; test .groupBy
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.each (fields "sentence") (Split.) (fields "word"))
-                      (.groupBy (fields "word1")))))
-     ;; test .aggregate
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.each (fields "sentence") (Split.) (fields "word"))
-                      (.groupBy (fields "word"))
-                      (.aggregate (fields "word1") (Count.) (fields "count")))))
-     ;; test .project
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.project (fields "sentence1")))))
-     ;; test .partitionBy
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.partitionBy (fields "sentence1")))))
-     ;; test .partitionAggregate
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.each (fields "sentence") (Split.) (fields "word"))
-                      (.partitionAggregate (fields "word1") (Count.) (fields "count")))))
-     ;; test .persistentAggregate
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.each (fields "sentence") (Split.) (fields "word"))
-                      (.groupBy (fields "word"))
-                      (.persistentAggregate (StateSpec. (MemoryMapState$Factory.)) (fields "non-existent") (Count.) (fields "count")))))
-     ;; test .partitionPersist
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.each (fields "sentence") (Split.) (fields "word"))
-                      (.groupBy (fields "word"))
-                      (.partitionPersist (StateSpec. (MemoryMapState$Factory.))
-                                         (fields "non-existent")
-                                         (CombinerAggStateUpdater. (Count.))
-                                         (fields "count")))))
-     ;; test .stateQuery
-     (with-drpc [drpc]
-       (is (thrown? IllegalArgumentException
-                    (-> topo
-                        (.newDRPCStream "words" drpc)
-                        (.each (fields "args") (Split.) (fields "word"))
-                        (.groupBy (fields "word"))
-                        (.stateQuery word-counts (fields "word1") (MapGet.) (fields "count"))))))
-     )))
-
-;; (deftest test-split-merge
-;;   (t/with-local-cluster [cluster]
-;;     (with-drpc [drpc]
-;;       (letlocals
-;;         (bind topo (TridentTopology.))
-;;         (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
-;;         (bind s1
-;;           (-> drpc-stream
-;;               (.each (fields "args") (Split.) (fields "word"))
-;;               (.project (fields "word"))))
-;;         (bind s2
-;;           (-> drpc-stream
-;;               (.each (fields "args") (StringLength.) (fields "len"))
-;;               (.project (fields "len"))))
-;; 
-;;         (.merge topo [s1 s2])
-;;         (with-topology [cluster topo]
-;;           (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
-;;           (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
-;;           )))))