You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/15 16:15:25 UTC

[1/6] storm git commit: STORM-1179: Create Maven Profiles for Integration Tests - Added support for the following profiles - integration-tests-only - all-tests

Repository: storm
Updated Branches:
  refs/heads/master efbcd4702 -> 8b3c20846


STORM-1179: Create Maven Profiles for Integration Tests
- Added support for the following profiles
  - integration-tests-only
  - all-tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8f1b4fba
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8f1b4fba
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8f1b4fba

Branch: refs/heads/master
Commit: 8f1b4fba81f791b1f0ce4025a0a77b8eb5848ca1
Parents: ceb3a0c
Author: Hugo Louro <hm...@gmail.com>
Authored: Wed Dec 9 16:43:14 2015 -0800
Committer: Hugo Louro <hm...@gmail.com>
Committed: Wed Dec 9 18:58:09 2015 -0800

----------------------------------------------------------------------
 DEVELOPER.md                                    | 48 ++++++++++++++++-
 pom.xml                                         | 57 ++++++++++++++++++--
 storm-core/pom.xml                              | 12 +++++
 .../backtype/storm/testing/IntegrationTest.java | 38 +++++++++++++
 storm-multilang/javascript/pom.xml              | 12 ++++-
 storm-multilang/python/pom.xml                  |  9 ++++
 storm-multilang/ruby/pom.xml                    | 11 +++-
 7 files changed, 179 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8f1b4fba/DEVELOPER.md
----------------------------------------------------------------------
diff --git a/DEVELOPER.md b/DEVELOPER.md
index 18b2ef6..7a98ead 100644
--- a/DEVELOPER.md
+++ b/DEVELOPER.md
@@ -118,6 +118,25 @@ GitHub.
     3. Storm committers will iterate with you on the design to make sure you are on the right track.
     4. Implement your issue, create a pull request (see below), and iterate from there.
 
+### Testing
+
+Unit tests and Integration tests are an essential part of code contributions.
+
+To mark a Java test as a Java integration test, add the annotation `@Category(IntegrationTest.class)` to the test class definition as well as to its hierarchy of superclasses. Java integration tests can be in the same package as Java unit tests.
+ 
+```java
+    @Category(IntegrationTest.class)
+    public class MyIntegrationTest {
+    ...
+    }
+```
+ 
+To mark a Clojure test as Clojure integration test, the test source must be located in a package with name prefixed by `integration.`
+
+For example, the test `test/clj/backtype.storm.drpc_test.clj` is considered a clojure unit test, whereas
+ `test/clj/integration.backtype.storm.drpc_test.clj` is considered a clojure integration test.
+
+Please refer to section <a href="#building">Build the code and run the tests</a> for how to run integration tests, and the info on the build phase each test runs. 
 
 <a name="contribute-documentation"></a>
 
@@ -258,6 +277,29 @@ sh genthrift.sh
 
 ## Testing
 
+Tests are separated in two groups, Unit tests, and Integration tests. Java unit tests, Clojure unit tests, and Clojure integration tests (for reasons inherent to the clojure-maven-plugin) run in the maven `test` phase. Java integration tests run in the maven `integration-test` or `verify` phases. 
+ 
+To run Clojure and Java unit tests but no integration tests execute the command
+ 
+    mvn test
+
+Integration tests require that you activate the profile `integration-test` and that you specify the `maven-failsafe-plugin` in the module pom file.
+ 
+To run all Java and Clojure integration tests but no unit tests execute one of the commands
+ 
+    mvn -P  integration-tests-only verify
+    mvn -P  integration-tests-only integration-test
+
+To run all unit tests plus Clojure integration tests but no Java integration tests execute the command
+ 
+    mvn -P all-tests test
+
+To run all unit tests and all integration tests execute one of the commands
+ 
+    mvn -P all-tests verify
+    mvn -P all-tests integration-test
+ 
+ 
 You can also run tests selectively via the Clojure REPL.  The following example runs the tests in
 [auth_test.clj](storm-core/test/clj/backtype/storm/security/auth/auth_test.clj), which has the namespace
 `backtype.storm.security.auth.auth-test`.
@@ -270,6 +312,10 @@ You can also run tests selectively with `-Dtest=<test_name>`.  This works for bo
 
 Unfortunately you might experience failures in clojure tests which are wrapped in the `maven-clojure-plugin` and thus doesn't provide too much useful output at first sight - you might end up with a maven test failure with an error message as unhelpful as `Clojure failed.`. In this case it's recommended to look into `target/test-reports` of the failed project to see what actual tests have failed or scroll through the maven output looking for obvious issues like missing binaries.
 
+By default integration tests are not run in the test phase. To run Java and Clojure integration tests you must enable the profile
+ 
+
+
 <a name="packaging"></a>
 
 ## Create a Storm distribution (packaging)
@@ -310,7 +356,7 @@ You can verify whether the digital signatures match their corresponding files:
 
 ## Testing
 
-Tests should never rely on timing in order to pass.  In Storm can properly test functionality that depends on time by
+Tests should never rely on timing in order to pass.  Storm can properly test functionality that depends on time by
 simulating time, which means we do not have to worry about e.g. random delays failing our tests indeterministically.
 
 If you are testing topologies that do not do full tuple acking, then you should be testing using the "tracked

http://git-wip-us.apache.org/repos/asf/storm/blob/8f1b4fba/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3f8e547..b641887 100644
--- a/pom.xml
+++ b/pom.xml
@@ -229,6 +229,13 @@
         <hdrhistogram.version>2.1.7</hdrhistogram.version>
         <calcite.version>1.4.0-incubating</calcite.version>
         <jackson.version>2.6.3</jackson.version>
+        <maven-surefire.version>2.18.1</maven-surefire.version>
+
+        <!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile -->
+        <java.unit.test.exclude>backtype.storm.testing.IntegrationTest</java.unit.test.exclude>
+        <java.unit.test.include>**/Test*.java, **/*Test.java, **/*TestCase.java</java.unit.test.include>    <!--maven surefire plugin default test list-->
+        <!-- by default the clojure test set are all clojure tests that are not integration tests. This property is overridden in the profiles -->
+        <clojure.test.set>!integration.*</clojure.test.set>
     </properties>
 
     <modules>
@@ -254,7 +261,6 @@
         <module>examples/storm-starter</module>
     </modules>
 
-
     <profiles>
         <profile>
             <id>sign</id>
@@ -334,7 +340,26 @@
                 </plugins>
             </build>
         </profile>
-
+        <profile>
+            <id>all-tests</id>
+            <properties>
+                <java.integration.test.include>**/*.java</java.integration.test.include>
+                <java.integration.test.group>backtype.storm.testing.IntegrationTest</java.integration.test.group>
+                <clojure.test.set>*.*</clojure.test.set>
+            </properties>
+        </profile>
+        <profile>
+            <id>integration-tests-only</id>
+            <properties>
+                <!--Java-->
+                <java.unit.test.include>no.unit.tests</java.unit.test.include>
+                <java.integration.test.include>**/*.java</java.integration.test.include>
+                <java.integration.test.group>backtype.storm.testing.IntegrationTest</java.integration.test.group>
+                <!--Clojure-->
+                <clojure.test.set>integration.*</clojure.test.set>
+                <clojure.test.declared.namespace.only>true</clojure.test.declared.namespace.only>
+            </properties>
+        </profile>
     </profiles>
 
     <distributionManagement>
@@ -727,16 +752,38 @@
     </repositories>
 
     <build>
-
         <pluginManagement>
             <plugins>
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-surefire-plugin</artifactId>
-                    <version>2.18.1</version>
+                    <version>${maven-surefire.version}</version>
+                    <configuration>
+                        <redirectTestOutputToFile>true</redirectTestOutputToFile>
+                        <excludedGroups>${java.unit.test.exclude}</excludedGroups>
+                        <includes>
+                            <include>${java.unit.test.include}</include>
+                        </includes>
+                    </configuration>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-failsafe-plugin</artifactId>
+                    <version>${maven-surefire.version}</version>
                     <configuration>
-                      <redirectTestOutputToFile>true</redirectTestOutputToFile>
+                        <includes>
+                            <include>${java.integration.test.include}</include>
+                        </includes>
+                        <groups>${java.integration.test.group}</groups>  <!--set in integration-test the profile-->
                     </configuration>
+                    <executions>
+                        <execution>
+                            <goals>
+                                <goal>integration-test</goal>
+                                <goal>verify</goal>
+                            </goals>
+                        </execution>
+                    </executions>
                 </plugin>
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/8f1b4fba/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 72c4a3a..891ef72 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -412,6 +412,11 @@
                             <testScript>test/resources/test_runner.clj</testScript>
                             <!-- argLine is set by JaCoCo for code coverage -->
                             <vmargs>${argLine} ${test.extra.args}</vmargs>
+                            <!-- Run clojure unit tests or all tests (including integration tests) depending on the profile enabled -->
+                            <testNamespaces>
+                                <testNamespace>${clojure.test.set}</testNamespace>
+                            </testNamespaces>
+                            <testDeclaredNamespaceOnly>${clojure.test.declared.namespace.only}</testDeclaredNamespaceOnly>
                         </configuration>
                     </execution>
                 </executions>
@@ -427,6 +432,13 @@
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <configuration>
+                    <reportsDirectory>${project.build.directory}/test-reports</reportsDirectory>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
                 <executions>
                     <execution>

http://git-wip-us.apache.org/repos/asf/storm/blob/8f1b4fba/storm-core/src/jvm/backtype/storm/testing/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/IntegrationTest.java b/storm-core/src/jvm/backtype/storm/testing/IntegrationTest.java
new file mode 100644
index 0000000..69df693
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/IntegrationTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package backtype.storm.testing;
+
+/**
+ * Marker interface used to mark integration tests. Integration tests will be run during the Maven
+ * <b><i>integration-test</i></b> phase, whereas unit tests will be run during the Maven <b><i>test</i></b> phase.
+ * <p/>
+ * Integration tests can be in the same package as unit tests. To mark a test as integration test,
+ * add the annotation @Category(IntegrationTest.class) to the class definition as well as to its hierarchy of superclasses.
+ * For example:
+ * <p/>
+ *
+ *
+ * @ Category(IntegrationTest.class)<br/>
+ * public class MyIntegrationTest {<br/>
+ *  ...<br/>
+ * }
+ *
+ */
+public interface IntegrationTest {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/8f1b4fba/storm-multilang/javascript/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/javascript/pom.xml b/storm-multilang/javascript/pom.xml
index e1cb993..592f3fd 100644
--- a/storm-multilang/javascript/pom.xml
+++ b/storm-multilang/javascript/pom.xml
@@ -16,7 +16,8 @@
  limitations under the License.
 -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <artifactId>storm</artifactId>
@@ -29,4 +30,13 @@
     <packaging>jar</packaging>
     <name>multilang-javascript</name>
 
+    <dependencies>
+        <!-- The JUnit dependency is required for this submodule by the maven-surefire-plugin <excludedGroups> configuration -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/8f1b4fba/storm-multilang/python/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/python/pom.xml b/storm-multilang/python/pom.xml
index 379c0bc..d140a71 100644
--- a/storm-multilang/python/pom.xml
+++ b/storm-multilang/python/pom.xml
@@ -29,4 +29,13 @@
     <packaging>jar</packaging>
     <name>multilang-python</name>
 
+    <dependencies>
+        <!-- The JUnit dependency is required for this submodule by the maven-surefire-plugin <excludedGroups> configuration -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/8f1b4fba/storm-multilang/ruby/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/ruby/pom.xml b/storm-multilang/ruby/pom.xml
index 6b5dd0c..ec5b173 100644
--- a/storm-multilang/ruby/pom.xml
+++ b/storm-multilang/ruby/pom.xml
@@ -28,5 +28,14 @@
     <artifactId>multilang-ruby</artifactId>
     <packaging>jar</packaging>
     <name>multilang-ruby</name>
-    
+
+    <dependencies>
+        <!-- The JUnit dependency is required for this submodule by the maven-surefire-plugin <excludedGroups> configuration -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 </project>


[5/6] storm git commit: Merge branch 'STORM-1179' of https://github.com/hmcl/storm-apache into STORM-1179

Posted by sr...@apache.org.
Merge branch 'STORM-1179' of https://github.com/hmcl/storm-apache into STORM-1179


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1382b92e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1382b92e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1382b92e

Branch: refs/heads/master
Commit: 1382b92e25254cc4c58e8b4d03ec646b7b669c8f
Parents: efbcd47 a1c159f
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Dec 15 07:00:08 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Dec 15 07:00:08 2015 -0800

----------------------------------------------------------------------
 DEVELOPER.md                                    |  48 +-
 external/storm-elasticsearch/pom.xml            |   7 +-
 .../bolt/AbstractEsBoltIntegrationTest.java     |   7 +-
 .../elasticsearch/bolt/EsIndexBoltTest.java     |   3 +
 .../bolt/EsLookupBoltIntegrationTest.java       |  18 +-
 .../elasticsearch/bolt/EsPercolateBoltTest.java |   5 +-
 pom.xml                                         |  57 +-
 storm-core/pom.xml                              |  12 +
 .../backtype/storm/testing/IntegrationTest.java |  38 ++
 .../clj/backtype/storm/integration_test.clj     | 622 -------------------
 .../test/clj/backtype/storm/testing4j_test.clj  | 212 -------
 .../backtype/storm/integration_test.clj         | 622 +++++++++++++++++++
 .../backtype/storm/testing4j_test.clj           | 212 +++++++
 .../storm/trident/integration_test.clj          | 292 +++++++++
 .../test/clj/storm/trident/integration_test.clj | 292 ---------
 storm-multilang/javascript/pom.xml              |  12 +-
 storm-multilang/python/pom.xml                  |   9 +
 storm-multilang/ruby/pom.xml                    |  11 +-
 18 files changed, 1333 insertions(+), 1146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1382b92e/pom.xml
----------------------------------------------------------------------


[6/6] storm git commit: Added STORM-1179 to Changelog.

Posted by sr...@apache.org.
Added STORM-1179 to Changelog.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8b3c2084
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8b3c2084
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8b3c2084

Branch: refs/heads/master
Commit: 8b3c20846a026732a15108e49c5f43fcea37674e
Parents: 1382b92
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Dec 15 07:14:44 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Dec 15 07:14:44 2015 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8b3c2084/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 28fc538..d95010b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1179: Create Maven Profiles for Integration Tests.
  * STORM-1387: workers-artifacts directory configurable, and default to be under storm.log.dir.
  * STORM-1211: Add trident state and query support for cassandra connector
  * STORM-1359: Change kryo links from google code to github


[3/6] storm git commit: STORM-1179: Create Maven Profiles for Integration Tests - Mark Clojure tests as integration tests

Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/storm/trident/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/storm/trident/integration_test.clj b/storm-core/test/clj/storm/trident/integration_test.clj
deleted file mode 100644
index ac3bbea..0000000
--- a/storm-core/test/clj/storm/trident/integration_test.clj
+++ /dev/null
@@ -1,292 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns storm.trident.integration-test
-  (:use [clojure test])
-  (:require [backtype.storm [testing :as t]])
-  (:import [storm.trident.testing Split CountAsAggregator StringLength TrueFilter
-            MemoryMapState$Factory])
-  (:import [storm.trident.state StateSpec])
-  (:import [storm.trident.operation.impl CombinerAggStateUpdater])
-  (:use [storm.trident testing])
-  (:use [backtype.storm util]))
-  
-(bootstrap-imports)
-
-(deftest test-memory-map-get-tuples
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
-      (letlocals
-        (bind topo (TridentTopology.))
-        (bind feeder (feeder-spout ["sentence"]))
-        (bind word-counts
-          (-> topo
-              (.newStream "tester" feeder)
-              (.each (fields "sentence") (Split.) (fields "word"))
-              (.groupBy (fields "word"))
-              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
-              (.parallelismHint 6)
-              ))       
-        (-> topo
-            (.newDRPCStream "all-tuples" drpc)
-            (.broadcast)
-            (.stateQuery word-counts (fields "args") (TupleCollectionGet.) (fields "word" "count"))
-            (.project (fields "word" "count")))
-        (with-topology [cluster topo]
-          (feed feeder [["hello the man said"] ["the"]])
-          (is (= #{["hello" 1] ["said" 1] ["the" 2] ["man" 1]}
-                 (into #{} (exec-drpc drpc "all-tuples" "man"))))
-          (feed feeder [["the foo"]])
-          (is (= #{["hello" 1] ["said" 1] ["the" 3] ["man" 1] ["foo" 1]}
-                 (into #{} (exec-drpc drpc "all-tuples" "man")))))))))
-
-(deftest test-word-count
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
-      (letlocals
-        (bind topo (TridentTopology.))
-        (bind feeder (feeder-spout ["sentence"]))
-        (bind word-counts
-          (-> topo
-              (.newStream "tester" feeder)
-              (.each (fields "sentence") (Split.) (fields "word"))
-              (.groupBy (fields "word"))
-              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
-              (.parallelismHint 6)
-              ))
-        (-> topo
-            (.newDRPCStream "words" drpc)
-            (.each (fields "args") (Split.) (fields "word"))
-            (.groupBy (fields "word"))
-            (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
-            (.aggregate (fields "count") (Sum.) (fields "sum"))
-            (.project (fields "sum")))
-        (with-topology [cluster topo]
-          (feed feeder [["hello the man said"] ["the"]])
-          (is (= [[2]] (exec-drpc drpc "words" "the")))
-          (is (= [[1]] (exec-drpc drpc "words" "hello")))
-          (feed feeder [["the man on the moon"] ["where are you"]])
-          (is (= [[4]] (exec-drpc drpc "words" "the")))
-          (is (= [[2]] (exec-drpc drpc "words" "man")))
-          (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
-          )))))
-
-;; this test reproduces a bug where committer spouts freeze processing when 
-;; there's at least one repartitioning after the spout
-(deftest test-word-count-committer-spout
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
-      (letlocals
-        (bind topo (TridentTopology.))
-        (bind feeder (feeder-committer-spout ["sentence"]))
-        (.setWaitToEmit feeder false) ;;this causes lots of empty batches
-        (bind word-counts
-          (-> topo
-              (.newStream "tester" feeder)
-              (.parallelismHint 2)
-              (.each (fields "sentence") (Split.) (fields "word"))
-              (.groupBy (fields "word"))
-              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
-              (.parallelismHint 6)
-              ))
-        (-> topo
-            (.newDRPCStream "words" drpc)
-            (.each (fields "args") (Split.) (fields "word"))
-            (.groupBy (fields "word"))
-            (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
-            (.aggregate (fields "count") (Sum.) (fields "sum"))
-            (.project (fields "sum")))
-        (with-topology [cluster topo]
-          (feed feeder [["hello the man said"] ["the"]])
-          (is (= [[2]] (exec-drpc drpc "words" "the")))
-          (is (= [[1]] (exec-drpc drpc "words" "hello")))
-          (Thread/sleep 1000) ;; this is necessary to reproduce the bug where committer spouts freeze processing
-          (feed feeder [["the man on the moon"] ["where are you"]])
-          (is (= [[4]] (exec-drpc drpc "words" "the")))
-          (is (= [[2]] (exec-drpc drpc "words" "man")))
-          (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
-          (feed feeder [["the the"]])
-          (is (= [[6]] (exec-drpc drpc "words" "the")))
-          (feed feeder [["the"]])
-          (is (= [[7]] (exec-drpc drpc "words" "the")))
-          )))))
-
-
-(deftest test-count-agg
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
-      (letlocals
-        (bind topo (TridentTopology.))
-        (-> topo
-            (.newDRPCStream "numwords" drpc)
-            (.each (fields "args") (Split.) (fields "word"))
-            (.aggregate (CountAsAggregator.) (fields "count"))
-            (.parallelismHint 2) ;;this makes sure batchGlobal is working correctly
-            (.project (fields "count")))
-        (with-topology [cluster topo]
-          (doseq [i (range 100)]
-            (is (= [[1]] (exec-drpc drpc "numwords" "the"))))
-          (is (= [[0]] (exec-drpc drpc "numwords" "")))
-          (is (= [[8]] (exec-drpc drpc "numwords" "1 2 3 4 5 6 7 8")))
-          )))))
-          
-(deftest test-split-merge
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
-      (letlocals
-        (bind topo (TridentTopology.))
-        (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
-        (bind s1
-          (-> drpc-stream
-              (.each (fields "args") (Split.) (fields "word"))
-              (.project (fields "word"))))
-        (bind s2
-          (-> drpc-stream
-              (.each (fields "args") (StringLength.) (fields "len"))
-              (.project (fields "len"))))
-
-        (.merge topo [s1 s2])
-        (with-topology [cluster topo]
-          (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
-          (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
-          )))))
-
-(deftest test-multiple-groupings-same-stream
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
-      (letlocals
-        (bind topo (TridentTopology.))
-        (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
-                                   (.each (fields "args") (TrueFilter.))))
-        (bind s1
-          (-> drpc-stream
-              (.groupBy (fields "args"))
-              (.aggregate (CountAsAggregator.) (fields "count"))))
-        (bind s2
-          (-> drpc-stream
-              (.groupBy (fields "args"))
-              (.aggregate (CountAsAggregator.) (fields "count"))))
-
-        (.merge topo [s1 s2])
-        (with-topology [cluster topo]
-          (is (t/ms= [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the")))
-          (is (t/ms= [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa")))
-          )))))
-          
-(deftest test-multi-repartition
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
-      (letlocals
-        (bind topo (TridentTopology.))
-        (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
-                                   (.each (fields "args") (Split.) (fields "word"))
-                                   (.localOrShuffle)
-                                   (.shuffle)
-                                   (.aggregate (CountAsAggregator.) (fields "count"))
-                                   ))
-        (with-topology [cluster topo]
-          (is (t/ms= [[2]] (exec-drpc drpc "tester" "the man")))
-          (is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa")))
-          )))))
-
-(deftest test-stream-projection-validation
-  (t/with-local-cluster [cluster]
-    (letlocals
-     (bind feeder (feeder-committer-spout ["sentence"]))
-     (bind topo (TridentTopology.))
-     ;; valid projection fields will not throw exceptions
-     (bind word-counts
-           (-> topo
-               (.newStream "tester" feeder)
-               (.each (fields "sentence") (Split.) (fields "word"))
-               (.groupBy (fields "word"))
-               (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
-               (.parallelismHint 6)
-               ))
-     (bind stream (-> topo
-                      (.newStream "tester" feeder)))
-     ;; test .each
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.each (fields "sentence1") (Split.) (fields "word")))))
-     ;; test .groupBy
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.each (fields "sentence") (Split.) (fields "word"))
-                      (.groupBy (fields "word1")))))
-     ;; test .aggregate
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.each (fields "sentence") (Split.) (fields "word"))
-                      (.groupBy (fields "word"))
-                      (.aggregate (fields "word1") (Count.) (fields "count")))))
-     ;; test .project
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.project (fields "sentence1")))))
-     ;; test .partitionBy
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.partitionBy (fields "sentence1")))))
-     ;; test .partitionAggregate
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.each (fields "sentence") (Split.) (fields "word"))
-                      (.partitionAggregate (fields "word1") (Count.) (fields "count")))))
-     ;; test .persistentAggregate
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.each (fields "sentence") (Split.) (fields "word"))
-                      (.groupBy (fields "word"))
-                      (.persistentAggregate (StateSpec. (MemoryMapState$Factory.)) (fields "non-existent") (Count.) (fields "count")))))
-     ;; test .partitionPersist
-     (is (thrown? IllegalArgumentException
-                  (-> stream
-                      (.each (fields "sentence") (Split.) (fields "word"))
-                      (.groupBy (fields "word"))
-                      (.partitionPersist (StateSpec. (MemoryMapState$Factory.))
-                                         (fields "non-existent")
-                                         (CombinerAggStateUpdater. (Count.))
-                                         (fields "count")))))
-     ;; test .stateQuery
-     (with-drpc [drpc]
-       (is (thrown? IllegalArgumentException
-                    (-> topo
-                        (.newDRPCStream "words" drpc)
-                        (.each (fields "args") (Split.) (fields "word"))
-                        (.groupBy (fields "word"))
-                        (.stateQuery word-counts (fields "word1") (MapGet.) (fields "count"))))))
-     )))
-
-;; (deftest test-split-merge
-;;   (t/with-local-cluster [cluster]
-;;     (with-drpc [drpc]
-;;       (letlocals
-;;         (bind topo (TridentTopology.))
-;;         (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
-;;         (bind s1
-;;           (-> drpc-stream
-;;               (.each (fields "args") (Split.) (fields "word"))
-;;               (.project (fields "word"))))
-;;         (bind s2
-;;           (-> drpc-stream
-;;               (.each (fields "args") (StringLength.) (fields "len"))
-;;               (.project (fields "len"))))
-;; 
-;;         (.merge topo [s1 s2])
-;;         (with-topology [cluster topo]
-;;           (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
-;;           (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
-;;           )))))


[4/6] storm git commit: STORM-1179: Create Maven Profiles for Integration Tests - Mark Clojure tests as integration tests

Posted by sr...@apache.org.
STORM-1179: Create Maven Profiles for Integration Tests
- Mark Clojure tests as integration tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d4fcc0fd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d4fcc0fd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d4fcc0fd

Branch: refs/heads/master
Commit: d4fcc0fde96c667d1763fe5626ddf1284e4a7f70
Parents: 8f1b4fb
Author: Hugo Louro <hm...@gmail.com>
Authored: Wed Dec 9 16:50:28 2015 -0800
Committer: Hugo Louro <hm...@gmail.com>
Committed: Wed Dec 9 18:58:22 2015 -0800

----------------------------------------------------------------------
 .../clj/backtype/storm/integration_test.clj     | 622 -------------------
 .../test/clj/backtype/storm/testing4j_test.clj  | 212 -------
 .../backtype/storm/integration_test.clj         | 622 +++++++++++++++++++
 .../backtype/storm/testing4j_test.clj           | 212 +++++++
 .../storm/trident/integration_test.clj          | 292 +++++++++
 .../test/clj/storm/trident/integration_test.clj | 292 ---------
 6 files changed, 1126 insertions(+), 1126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/backtype/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/integration_test.clj b/storm-core/test/clj/backtype/storm/integration_test.clj
deleted file mode 100644
index cc0208d..0000000
--- a/storm-core/test/clj/backtype/storm/integration_test.clj
+++ /dev/null
@@ -1,622 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.integration-test
-  (:use [clojure test])
-  (:import [backtype.storm Config])
-  (:import [backtype.storm.topology TopologyBuilder])
-  (:import [backtype.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions])
-  (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount
-            TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
-  (:import [backtype.storm.tuple Fields])
-  (:use [backtype.storm testing config clojure util])
-  (:use [backtype.storm.daemon common])
-  (:require [backtype.storm [thrift :as thrift]]))
-
-(deftest test-basic-topology
-  (doseq [zmq-on? [true false]]
-    (with-simulated-time-local-cluster [cluster :supervisors 4
-                                        :daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
-      (let [topology (thrift/mk-topology
-                      {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                      {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
-                       "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
-                       "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
-                       })
-            results (complete-topology cluster
-                                       topology
-                                       :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
-                                       :storm-conf {TOPOLOGY-WORKERS 2})]
-        (is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
-                 (read-tuples results "1")))
-        (is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
-                 (read-tuples results "2")))
-        (is (= [[1] [2] [3] [4]]
-               (read-tuples results "3")))
-        (is (= [[1] [2] [3] [4]]
-               (read-tuples results "4")))
-        ))))
-
-(defbolt emit-task-id ["tid"] {:prepare true}
-  [conf context collector]
-  (let [tid (.getThisTaskIndex context)]
-    (bolt
-      (execute [tuple]
-        (emit-bolt! collector [tid] :anchor tuple)
-        (ack! collector tuple)
-        ))))
-
-(deftest test-multi-tasks-per-executor
-  (with-simulated-time-local-cluster [cluster :supervisors 4]
-    (let [topology (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true))}
-                    {"2" (thrift/mk-bolt-spec {"1" :all} emit-task-id
-                      :parallelism-hint 3
-                      :conf {TOPOLOGY-TASKS 6})
-                     })
-          results (complete-topology cluster
-                                     topology
-                                     :mock-sources {"1" [["a"]]})]
-      (is (ms= [[0] [1] [2] [3] [4] [5]]
-               (read-tuples results "2")))
-      )))
-
-(defbolt ack-every-other {} {:prepare true}
-  [conf context collector]
-  (let [state (atom -1)]
-    (bolt
-      (execute [tuple]
-        (let [val (swap! state -)]
-          (when (pos? val)
-            (ack! collector tuple)
-            ))))))
-
-(defn assert-loop [afn ids]
-  (while (not (every? afn ids))
-    (Thread/sleep 1)))
-
-(defn assert-acked [tracker & ids]
-  (assert-loop #(.isAcked tracker %) ids))
-
-(defn assert-failed [tracker & ids]
-  (assert-loop #(.isFailed tracker %) ids))
-
-(deftest test-timeout
-  (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
-    (let [feeder (feeder-spout ["field1"])
-          tracker (AckFailMapTracker.)
-          _ (.setAckFailDelegate feeder tracker)
-          topology (thrift/mk-topology
-                     {"1" (thrift/mk-spout-spec feeder)}
-                     {"2" (thrift/mk-bolt-spec {"1" :global} ack-every-other)})]      
-      (submit-local-topology (:nimbus cluster)
-                             "timeout-tester"
-                             {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
-                             topology)
-      (advance-cluster-time cluster 11)
-      (.feed feeder ["a"] 1)
-      (.feed feeder ["b"] 2)
-      (.feed feeder ["c"] 3)
-      (advance-cluster-time cluster 9)
-      (assert-acked tracker 1 3)
-      (is (not (.isFailed tracker 2)))
-      (advance-cluster-time cluster 12)
-      (assert-failed tracker 2)
-      )))
-
-(defn mk-validate-topology-1 []
-  (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                    {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
-
-(defn mk-invalidate-topology-1 []
-  (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                    {"2" (thrift/mk-bolt-spec {"3" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
-
-(defn mk-invalidate-topology-2 []
-  (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                    {"2" (thrift/mk-bolt-spec {"1" ["non-exists-field"]} (TestWordCounter.) :parallelism-hint 4)}))
-
-(defn mk-invalidate-topology-3 []
-  (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                    {"2" (thrift/mk-bolt-spec {["1" "non-exists-stream"] ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
-
-(defn try-complete-wc-topology [cluster topology]
-  (try (do
-         (complete-topology cluster
-                            topology
-                            :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
-                            :storm-conf {TOPOLOGY-WORKERS 2})
-         false)
-       (catch InvalidTopologyException e true)))
-
-(deftest test-validate-topology-structure
-  (with-simulated-time-local-cluster [cluster :supervisors 4]
-    (let [any-error1? (try-complete-wc-topology cluster (mk-validate-topology-1))
-          any-error2? (try-complete-wc-topology cluster (mk-invalidate-topology-1))
-          any-error3? (try-complete-wc-topology cluster (mk-invalidate-topology-2))
-          any-error4? (try-complete-wc-topology cluster (mk-invalidate-topology-3))]
-      (is (= any-error1? false))
-      (is (= any-error2? true))
-      (is (= any-error3? true))
-      (is (= any-error4? true)))))
-
-(defbolt identity-bolt ["num"]
-  [tuple collector]
-  (emit-bolt! collector (.getValues tuple) :anchor tuple)
-  (ack! collector tuple))
-
-(deftest test-system-stream
-  ;; this test works because mocking a spout splits up the tuples evenly among the tasks
-  (with-simulated-time-local-cluster [cluster]
-      (let [topology (thrift/mk-topology
-                      {"1" (thrift/mk-spout-spec (TestWordSpout. true) :p 3)}
-                      {"2" (thrift/mk-bolt-spec {"1" ["word"] ["1" "__system"] :global} identity-bolt :p 1)
-                       })
-            results (complete-topology cluster
-                                       topology
-                                       :mock-sources {"1" [["a"] ["b"] ["c"]]}
-                                       :storm-conf {TOPOLOGY-WORKERS 2})]
-        (is (ms= [["a"] ["b"] ["c"] ["startup"] ["startup"] ["startup"]]
-                 (read-tuples results "2")))
-        )))
-
-(defn ack-tracking-feeder [fields]
-  (let [tracker (AckTracker.)]
-    [(doto (feeder-spout fields)
-       (.setAckFailDelegate tracker))
-     (fn [val]
-       (is (= (.getNumAcks tracker) val))
-       (.resetNumAcks tracker)
-       )]
-    ))
-
-(defbolt branching-bolt ["num"]
-  {:params [amt]}
-  [tuple collector]
-  (doseq [i (range amt)]
-    (emit-bolt! collector [i] :anchor tuple))
-  (ack! collector tuple))
-
-(defbolt agg-bolt ["num"] {:prepare true :params [amt]}
-  [conf context collector]
-  (let [seen (atom [])]
-    (bolt
-      (execute [tuple]
-        (swap! seen conj tuple)
-        (when (= (count @seen) amt)
-          (emit-bolt! collector [1] :anchor @seen)
-          (doseq [s @seen]
-            (ack! collector s))
-          (reset! seen [])
-          )))
-      ))
-
-(defbolt ack-bolt {}
-  [tuple collector]
-  (ack! collector tuple))
-
-(deftest test-acking
-  (with-tracked-cluster [cluster]
-    (let [[feeder1 checker1] (ack-tracking-feeder ["num"])
-          [feeder2 checker2] (ack-tracking-feeder ["num"])
-          [feeder3 checker3] (ack-tracking-feeder ["num"])
-          tracked (mk-tracked-topology
-                   cluster
-                   (topology
-                     {"1" (spout-spec feeder1)
-                      "2" (spout-spec feeder2)
-                      "3" (spout-spec feeder3)}
-                     {"4" (bolt-spec {"1" :shuffle} (branching-bolt 2))
-                      "5" (bolt-spec {"2" :shuffle} (branching-bolt 4))
-                      "6" (bolt-spec {"3" :shuffle} (branching-bolt 1))
-                      "7" (bolt-spec
-                            {"4" :shuffle
-                            "5" :shuffle
-                            "6" :shuffle}
-                            (agg-bolt 3))
-                      "8" (bolt-spec {"7" :shuffle} (branching-bolt 2))
-                      "9" (bolt-spec {"8" :shuffle} ack-bolt)}
-                     ))]
-      (submit-local-topology (:nimbus cluster)
-                             "acking-test1"
-                             {}
-                             (:topology tracked))
-      (advance-cluster-time cluster 11)
-      (.feed feeder1 [1])
-      (tracked-wait tracked 1)
-      (checker1 0)
-      (.feed feeder2 [1])
-      (tracked-wait tracked 1)
-      (checker1 1)
-      (checker2 1)
-      (.feed feeder1 [1])
-      (tracked-wait tracked 1)
-      (checker1 0)
-      (.feed feeder1 [1])
-      (tracked-wait tracked 1)
-      (checker1 1)
-      (.feed feeder3 [1])
-      (tracked-wait tracked 1)
-      (checker1 0)
-      (checker3 0)
-      (.feed feeder2 [1])
-      (tracked-wait tracked 1)
-      (checker1 1)
-      (checker2 1)
-      (checker3 1)
-      
-      )))
-
-(deftest test-ack-branching
-  (with-tracked-cluster [cluster]
-    (let [[feeder checker] (ack-tracking-feeder ["num"])
-          tracked (mk-tracked-topology
-                   cluster
-                   (topology
-                     {"1" (spout-spec feeder)}
-                     {"2" (bolt-spec {"1" :shuffle} identity-bolt)
-                      "3" (bolt-spec {"1" :shuffle} identity-bolt)
-                      "4" (bolt-spec
-                            {"2" :shuffle
-                             "3" :shuffle}
-                             (agg-bolt 4))}))]
-      (submit-local-topology (:nimbus cluster)
-                             "test-acking2"
-                             {}
-                             (:topology tracked))
-      (advance-cluster-time cluster 11)
-      (.feed feeder [1])
-      (tracked-wait tracked 1)
-      (checker 0)
-      (.feed feeder [1])
-      (tracked-wait tracked 1)
-      (checker 2)
-      )))
-
-(defbolt dup-anchor ["num"]
-  [tuple collector]
-  (emit-bolt! collector [1] :anchor [tuple tuple])
-  (ack! collector tuple))
-
-(def bolt-prepared? (atom false))
-(defbolt prepare-tracked-bolt [] {:prepare true}
-  [conf context collector]  
-  (reset! bolt-prepared? true)
-  (bolt
-   (execute [tuple]
-            (ack! collector tuple))))
-
-(def spout-opened? (atom false))
-(defspout open-tracked-spout ["val"]
-  [conf context collector]
-  (reset! spout-opened? true)
-  (spout
-   (nextTuple [])))
-
-(deftest test-submit-inactive-topology
-  (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
-    (let [feeder (feeder-spout ["field1"])
-          tracker (AckFailMapTracker.)
-          _ (.setAckFailDelegate feeder tracker)
-          topology (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec feeder)
-                     "2" (thrift/mk-spout-spec open-tracked-spout)}
-                    {"3" (thrift/mk-bolt-spec {"1" :global} prepare-tracked-bolt)})]
-      (reset! bolt-prepared? false)
-      (reset! spout-opened? false)      
-      
-      (submit-local-topology-with-opts (:nimbus cluster)
-        "test"
-        {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
-        topology
-        (SubmitOptions. TopologyInitialStatus/INACTIVE))
-      (advance-cluster-time cluster 11)
-      (.feed feeder ["a"] 1)
-      (advance-cluster-time cluster 9)
-      (is (not @bolt-prepared?))
-      (is (not @spout-opened?))        
-      (.activate (:nimbus cluster) "test")              
-      
-      (advance-cluster-time cluster 12)
-      (assert-acked tracker 1)
-      (is @bolt-prepared?)
-      (is @spout-opened?))))
-
-(deftest test-acking-self-anchor
-  (with-tracked-cluster [cluster]
-    (let [[feeder checker] (ack-tracking-feeder ["num"])
-          tracked (mk-tracked-topology
-                   cluster
-                   (topology
-                     {"1" (spout-spec feeder)}
-                     {"2" (bolt-spec {"1" :shuffle} dup-anchor)
-                      "3" (bolt-spec {"2" :shuffle} ack-bolt)}))]
-      (submit-local-topology (:nimbus cluster)
-                             "test"
-                             {}
-                             (:topology tracked))
-      (advance-cluster-time cluster 11)
-      (.feed feeder [1])
-      (tracked-wait tracked 1)
-      (checker 1)
-      (.feed feeder [1])
-      (.feed feeder [1])
-      (.feed feeder [1])
-      (tracked-wait tracked 3)
-      (checker 3)
-      )))
-
-;; (defspout ConstantSpout ["val"] {:prepare false}
-;;   [collector]
-;;   (Time/sleep 100)
-;;   (emit-spout! collector [1]))
-
-;; (def errored (atom false))
-;; (def restarted (atom false))
-
-;; (defbolt local-error-checker {} [tuple collector]
-;;   (when-not @errored
-;;     (reset! errored true)
-;;     (println "erroring")
-;;     (throw (RuntimeException.)))
-;;   (when-not @restarted (println "restarted"))
-;;   (reset! restarted true))
-
-;; (deftest test-no-halt-local-mode
-;;   (with-simulated-time-local-cluster [cluster]
-;;       (let [topology (topology
-;;                       {1 (spout-spec ConstantSpout)}
-;;                       {2 (bolt-spec {1 :shuffle} local-error-checker)
-;;                        })]
-;;         (submit-local-topology (:nimbus cluster)
-;;                                "test"
-;;                                {}
-;;                                topology)
-;;         (while (not @restarted)
-;;           (advance-time-ms! 100))
-;;         )))
-
-(defspout IncSpout ["word"]
-  [conf context collector]
-  (let [state (atom 0)]
-    (spout
-     (nextTuple []
-       (Thread/sleep 100)
-       (emit-spout! collector [@state] :id 1)         
-       )
-     (ack [id]
-       (swap! state inc))
-     )))
-
-
-(defspout IncSpout2 ["word"] {:params [prefix]}
-  [conf context collector]
-  (let [state (atom 0)]
-    (spout
-     (nextTuple []
-       (Thread/sleep 100)
-       (swap! state inc)
-       (emit-spout! collector [(str prefix "-" @state)])         
-       )
-     )))
-
-;; (deftest test-clojure-spout
-;;   (with-local-cluster [cluster]
-;;     (let [nimbus (:nimbus cluster)
-;;           top (topology
-;;                {1 (spout-spec IncSpout)}
-;;                {}
-;;                )]
-;;       (submit-local-topology nimbus
-;;                              "spout-test"
-;;                              {TOPOLOGY-DEBUG true
-;;                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 3}
-;;                              top)
-;;       (Thread/sleep 10000)
-;;       (.killTopology nimbus "spout-test")
-;;       (Thread/sleep 10000)
-;;       )))
-
-(deftest test-kryo-decorators-config
-  (with-simulated-time-local-cluster [cluster
-                                      :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
-                                                    TOPOLOGY-KRYO-DECORATORS ["this-is-overriden"]}]
-    (letlocals
-     (bind builder (TopologyBuilder.))
-     (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
-     (-> builder
-         (.setBolt "2"
-                   (TestConfBolt.
-                    {TOPOLOGY-KRYO-DECORATORS ["one" "two"]}))
-         (.shuffleGrouping "1"))
-     
-     (bind results
-           (complete-topology cluster
-                              (.createTopology builder)
-                              :storm-conf {TOPOLOGY-KRYO-DECORATORS ["one" "three"]}
-                              :mock-sources {"1" [[TOPOLOGY-KRYO-DECORATORS]]}))
-     (is (= {"topology.kryo.decorators" (list "one" "two" "three")}            
-            (->> (read-tuples results "2")
-                 (apply concat)
-                 (apply hash-map)))))))
-
-(deftest test-component-specific-config
-  (with-simulated-time-local-cluster [cluster
-                                      :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true}]
-    (letlocals
-     (bind builder (TopologyBuilder.))
-     (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
-     (-> builder
-         (.setBolt "2"
-                   (TestConfBolt.
-                    {"fake.config" 123
-                     TOPOLOGY-MAX-TASK-PARALLELISM 20
-                     TOPOLOGY-MAX-SPOUT-PENDING 30
-                     TOPOLOGY-KRYO-REGISTER [{"fake.type" "bad.serializer"}
-                                             {"fake.type2" "a.serializer"}]
-                     }))
-         (.shuffleGrouping "1")
-         (.setMaxTaskParallelism (int 2))
-         (.addConfiguration "fake.config2" 987)
-         )
-     
-
-     (bind results
-           (complete-topology cluster
-                              (.createTopology builder)
-                              :storm-conf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer" "fake.type3" "a.serializer3"}]}
-                              :mock-sources {"1" [["fake.config"]
-                                                  [TOPOLOGY-MAX-TASK-PARALLELISM]
-                                                  [TOPOLOGY-MAX-SPOUT-PENDING]
-                                                  ["fake.config2"]
-                                                  [TOPOLOGY-KRYO-REGISTER]
-                                                  ]}))
-     (is (= {"fake.config" 123
-             "fake.config2" 987
-             TOPOLOGY-MAX-TASK-PARALLELISM 2
-             TOPOLOGY-MAX-SPOUT-PENDING 30
-             TOPOLOGY-KRYO-REGISTER {"fake.type" "good.serializer"
-                                     "fake.type2" "a.serializer"
-                                     "fake.type3" "a.serializer3"}}
-            (->> (read-tuples results "2")
-                 (apply concat)
-                 (apply hash-map))
-            ))
-     )))
-
-(defbolt hooks-bolt ["emit" "ack" "fail" "executed"] {:prepare true}
-  [conf context collector]
-  (let [acked (atom 0)
-        failed (atom 0)
-        executed (atom 0)
-        emitted (atom 0)]
-    (.addTaskHook context
-                  (reify backtype.storm.hooks.ITaskHook
-                    (prepare [this conf context]
-                      )
-                    (cleanup [this]
-                      )
-                    (emit [this info]
-                      (swap! emitted inc))
-                    (boltAck [this info]
-                      (swap! acked inc))
-                    (boltFail [this info]
-                      (swap! failed inc))
-                    (boltExecute [this info]
-                      (swap! executed inc))
-                      ))
-    (bolt
-     (execute [tuple]
-        (emit-bolt! collector [@emitted @acked @failed @executed])
-        (if (= 0 (- @acked @failed))
-          (ack! collector tuple)
-          (fail! collector tuple))
-        ))))
-
-(deftest test-hooks
-  (with-simulated-time-local-cluster [cluster]
-    (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])))
-                              }
-                             {"2" (bolt-spec {"1" :shuffle}
-                                             hooks-bolt)
-                              })
-          results (complete-topology cluster
-                                     topology
-                                     :mock-sources {"1" [[1]
-                                                         [1]
-                                                         [1]
-                                                         [1]
-                                                         ]})]
-      (is (= [[0 0 0 0]
-              [2 1 0 1]
-              [4 1 1 2]
-              [6 2 1 3]]
-             (read-tuples results "2")
-             )))))
-
-(defbolt report-errors-bolt {}
-  [tuple collector]
-  (doseq [i (range (.getValue tuple 0))]
-    (report-error! collector (RuntimeException.)))
-  (ack! collector tuple))
-
-(deftest test-throttled-errors
-  (with-simulated-time
-    (with-tracked-cluster [cluster]
-      (let [state (:storm-cluster-state cluster)
-            [feeder checker] (ack-tracking-feeder ["num"])
-            tracked (mk-tracked-topology
-                     cluster
-                     (topology
-                       {"1" (spout-spec feeder)}
-                       {"2" (bolt-spec {"1" :shuffle} report-errors-bolt)}))
-            _       (submit-local-topology (:nimbus cluster)
-                                             "test-errors"
-                                             {TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS 10
-                                              TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL 4
-                                              TOPOLOGY-DEBUG true
-                                              }
-                                             (:topology tracked))
-            _ (advance-cluster-time cluster 11)
-            storm-id (get-storm-id state "test-errors")
-            errors-count (fn [] (count (.errors state storm-id "2")))]
-
-        (is (nil? (.last-error state storm-id "2")))
-
-        ;; so it launches the topology
-        (advance-cluster-time cluster 2)
-        (.feed feeder [6])
-        (tracked-wait tracked 1)
-        (is (= 4 (errors-count)))
-        (is (.last-error state storm-id "2"))
-        
-        (advance-time-secs! 5)
-        (.feed feeder [2])
-        (tracked-wait tracked 1)
-        (is (= 4 (errors-count)))
-        (is (.last-error state storm-id "2"))
-        
-        (advance-time-secs! 6)
-        (.feed feeder [2])
-        (tracked-wait tracked 1)
-        (is (= 6 (errors-count)))
-        (is (.last-error state storm-id "2"))
-        
-        (advance-time-secs! 6)
-        (.feed feeder [3])
-        (tracked-wait tracked 1)
-        (is (= 8 (errors-count)))
-        (is (.last-error state storm-id "2"))))))
-
-
-(deftest test-acking-branching-complex
-  ;; test acking with branching in the topology
-  )
-
-
-(deftest test-fields-grouping
-  ;; 1. put a shitload of random tuples through it and test that counts are right
-  ;; 2. test that different spouts with different phints group the same way
-  )
-
-(deftest test-all-grouping
-  )
-
-(deftest test-direct-grouping
-  )

http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/backtype/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/testing4j_test.clj b/storm-core/test/clj/backtype/storm/testing4j_test.clj
deleted file mode 100644
index b504f28..0000000
--- a/storm-core/test/clj/backtype/storm/testing4j_test.clj
+++ /dev/null
@@ -1,212 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.testing4j-test
-  (:use [clojure.test])
-  (:use [backtype.storm config clojure testing util])
-  (:require [backtype.storm.integration-test :as it])
-  (:require [backtype.storm.thrift :as thrift])
-  (:import [backtype.storm Testing Config ILocalCluster])
-  (:import [backtype.storm.tuple Values Tuple])
-  (:import [backtype.storm.utils Time Utils])
-  (:import [backtype.storm.testing MkClusterParam TestJob MockedSources TestWordSpout
-            TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
-            AckFailMapTracker MkTupleParam]))
-
-(deftest test-with-simulated-time
-  (is (= false (Time/isSimulating)))
-  (Testing/withSimulatedTime (fn []
-                               (is (= true (Time/isSimulating)))))
-  (is (= false (Time/isSimulating))))
-
-(deftest test-with-local-cluster
-  (let [mk-cluster-param (doto (MkClusterParam.)
-                           (.setSupervisors (int 2))
-                           (.setPortsPerSupervisor (int 5)))
-        daemon-conf (doto (Config.)
-                      (.put SUPERVISOR-ENABLE false)
-                      (.put TOPOLOGY-ACKER-EXECUTORS 0))]
-    (Testing/withLocalCluster mk-cluster-param (reify TestJob
-                                                 (^void run [this ^ILocalCluster cluster]
-                                                   (is (not (nil? cluster)))
-                                                   (is (not (nil? (.getState cluster))))
-                                                   (is (not (nil? (:nimbus (.getState cluster))))))))))
-
-(deftest test-with-simulated-time-local-cluster
-  (let [mk-cluster-param (doto (MkClusterParam.)
-                           (.setSupervisors (int 2)))
-        daemon-conf (doto (Config.)
-                      (.put SUPERVISOR-ENABLE false)
-                      (.put TOPOLOGY-ACKER-EXECUTORS 0))]
-    (is (not (Time/isSimulating)))
-    (Testing/withSimulatedTimeLocalCluster mk-cluster-param (reify TestJob
-                                                              (^void run [this ^ILocalCluster cluster]
-                                                                (is (not (nil? cluster)))
-                                                                (is (not (nil? (.getState cluster))))
-                                                                (is (not (nil? (:nimbus (.getState cluster)))))
-                                                                (is (Time/isSimulating)))))
-    (is (not (Time/isSimulating)))))
-
-(deftest test-complete-topology
-  (doseq [zmq-on? [true false]
-          :let [daemon-conf (doto (Config.)
-                              (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
-                mk-cluster-param (doto (MkClusterParam.)
-                                   (.setSupervisors (int 4))
-                                   (.setDaemonConf daemon-conf))]]
-    (Testing/withSimulatedTimeLocalCluster
-     (reify TestJob
-       (^void run [this ^ILocalCluster cluster]
-         (let [topology (thrift/mk-topology
-                         {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-                         {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
-                          "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
-                          "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
-                          })
-               mocked-sources (doto (MockedSources.)
-                                (.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
-                                                                      (Values. (into-array ["bob"]))
-                                                                      (Values. (into-array ["joey"]))
-                                                                      (Values. (into-array ["nathan"]))])
-                                              ))
-               storm-conf (doto (Config.)
-                            (.setNumWorkers 2))
-               complete-topology-param (doto (CompleteTopologyParam.)
-                                         (.setMockedSources mocked-sources)
-                                         (.setStormConf storm-conf))
-               results (Testing/completeTopology cluster
-                                                 topology
-                                                 complete-topology-param)]
-           (is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
-                           (Testing/readTuples results "1")))
-           (is (Testing/multiseteq [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
-                           (read-tuples results "2")))
-           (is (= [[1] [2] [3] [4]]
-                  (Testing/readTuples results "3")))
-           (is (= [[1] [2] [3] [4]]
-                  (Testing/readTuples results "4")))
-           ))))))
-
-(deftest test-with-tracked-cluster
-  (Testing/withTrackedCluster
-   (reify TestJob
-     (^void run [this ^ILocalCluster cluster]
-       (let [[feeder checker] (it/ack-tracking-feeder ["num"])
-             tracked (Testing/mkTrackedTopology
-                      cluster
-                      (topology
-                       {"1" (spout-spec feeder)}
-                       {"2" (bolt-spec {"1" :shuffle} it/identity-bolt)
-                        "3" (bolt-spec {"1" :shuffle} it/identity-bolt)
-                        "4" (bolt-spec
-                             {"2" :shuffle
-                              "3" :shuffle}
-                             (it/agg-bolt 4))}))]
-         (.submitTopology cluster
-                          "test-acking2"
-                          (Config.)
-                          (.getTopology tracked))
-         (advance-cluster-time (.getState cluster) 11)
-         (.feed feeder [1])
-         (Testing/trackedWait tracked (int 1))
-         (checker 0)
-         (.feed feeder [1])
-         (Testing/trackedWait tracked (int 1))
-         (checker 2)
-         )))))
-
-(deftest test-advance-cluster-time
-  (let [daemon-conf (doto (Config.)
-                      (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true))
-        mk-cluster-param (doto (MkClusterParam.)
-                           (.setDaemonConf daemon-conf))]
-    (Testing/withSimulatedTimeLocalCluster
-     mk-cluster-param
-     (reify TestJob
-       (^void run [this ^ILocalCluster cluster]
-         (let [feeder (feeder-spout ["field1"])
-               tracker (AckFailMapTracker.)
-               _ (.setAckFailDelegate feeder tracker)
-               topology (thrift/mk-topology
-                         {"1" (thrift/mk-spout-spec feeder)}
-                         {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
-               storm-conf (doto (Config.)
-                            (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))]
-           (.submitTopology cluster
-                            "timeout-tester"
-                            storm-conf
-                            topology)
-           (.feed feeder ["a"] 1)
-           (.feed feeder ["b"] 2)
-           (.feed feeder ["c"] 3)
-           (Testing/advanceClusterTime cluster (int 9))
-           (it/assert-acked tracker 1 3)
-           (is (not (.isFailed tracker 2)))
-           (Testing/advanceClusterTime cluster (int 12))
-           (it/assert-failed tracker 2)
-           ))))))
-
-(deftest test-disable-tuple-timeout
-  (let [daemon-conf (doto (Config.)
-                      (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))
-        mk-cluster-param (doto (MkClusterParam.)
-                           (.setDaemonConf daemon-conf))]
-    (Testing/withSimulatedTimeLocalCluster
-      mk-cluster-param
-      (reify TestJob
-        (^void run [this ^ILocalCluster cluster]
-          (let [feeder (feeder-spout ["field1"])
-                tracker (AckFailMapTracker.)
-                _ (.setAckFailDelegate feeder tracker)
-                topology (thrift/mk-topology
-                           {"1" (thrift/mk-spout-spec feeder)}
-                           {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
-                storm-conf (doto (Config.)
-                             (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10)
-                             (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))]
-            (.submitTopology cluster
-              "disable-timeout-tester"
-              storm-conf
-              topology)
-            (.feed feeder ["a"] 1)
-            (.feed feeder ["b"] 2)
-            (.feed feeder ["c"] 3)
-            (Testing/advanceClusterTime cluster (int 9))
-            (it/assert-acked tracker 1 3)
-            (is (not (.isFailed tracker 2)))
-            (Testing/advanceClusterTime cluster (int 12))
-            (is (not (.isFailed tracker 2)))
-            ))))))
-
-(deftest test-test-tuple
-  (letlocals
-   ;; test the one-param signature
-   (bind ^Tuple tuple (Testing/testTuple ["james" "bond"]))
-   (is (= ["james" "bond"] (.getValues tuple)))
-   (is (= Utils/DEFAULT_STREAM_ID (.getSourceStreamId tuple)))
-   (is (= ["field1" "field2"] (-> tuple .getFields .toList)))
-   (is (= "component" (.getSourceComponent tuple)))
-
-   ;; test the two-params signature
-   (bind mk-tuple-param (MkTupleParam.))
-   (doto mk-tuple-param
-     (.setStream "test-stream")
-     (.setComponent "test-component")
-     (.setFields (into-array String ["fname" "lname"])))
-   (bind ^Tuple tuple (Testing/testTuple ["james" "bond"] mk-tuple-param))
-   (is (= ["james" "bond"] (.getValues tuple)))
-   (is (= "test-stream" (.getSourceStreamId tuple)))
-   (is (= ["fname" "lname"] (-> tuple .getFields .toList)))
-   (is (= "test-component" (.getSourceComponent tuple)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/integration/backtype/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/backtype/storm/integration_test.clj b/storm-core/test/clj/integration/backtype/storm/integration_test.clj
new file mode 100644
index 0000000..f5fa501
--- /dev/null
+++ b/storm-core/test/clj/integration/backtype/storm/integration_test.clj
@@ -0,0 +1,622 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns integration.backtype.storm.integration-test
+  (:use [clojure test])
+  (:import [backtype.storm Config])
+  (:import [backtype.storm.topology TopologyBuilder])
+  (:import [backtype.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions])
+  (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount
+            TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
+  (:import [backtype.storm.tuple Fields])
+  (:use [backtype.storm testing config clojure util])
+  (:use [backtype.storm.daemon common])
+  (:require [backtype.storm [thrift :as thrift]]))
+
+(deftest test-basic-topology
+  (doseq [zmq-on? [true false]]
+    (with-simulated-time-local-cluster [cluster :supervisors 4
+                                        :daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
+      (let [topology (thrift/mk-topology
+                      {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                      {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
+                       "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
+                       "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
+                       })
+            results (complete-topology cluster
+                                       topology
+                                       :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
+                                       :storm-conf {TOPOLOGY-WORKERS 2})]
+        (is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
+                 (read-tuples results "1")))
+        (is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
+                 (read-tuples results "2")))
+        (is (= [[1] [2] [3] [4]]
+               (read-tuples results "3")))
+        (is (= [[1] [2] [3] [4]]
+               (read-tuples results "4")))
+        ))))
+
+(defbolt emit-task-id ["tid"] {:prepare true}
+  [conf context collector]
+  (let [tid (.getThisTaskIndex context)]
+    (bolt
+      (execute [tuple]
+        (emit-bolt! collector [tid] :anchor tuple)
+        (ack! collector tuple)
+        ))))
+
+(deftest test-multi-tasks-per-executor
+  (with-simulated-time-local-cluster [cluster :supervisors 4]
+    (let [topology (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true))}
+                    {"2" (thrift/mk-bolt-spec {"1" :all} emit-task-id
+                      :parallelism-hint 3
+                      :conf {TOPOLOGY-TASKS 6})
+                     })
+          results (complete-topology cluster
+                                     topology
+                                     :mock-sources {"1" [["a"]]})]
+      (is (ms= [[0] [1] [2] [3] [4] [5]]
+               (read-tuples results "2")))
+      )))
+
+(defbolt ack-every-other {} {:prepare true}
+  [conf context collector]
+  (let [state (atom -1)]
+    (bolt
+      (execute [tuple]
+        (let [val (swap! state -)]
+          (when (pos? val)
+            (ack! collector tuple)
+            ))))))
+
+(defn assert-loop [afn ids]
+  (while (not (every? afn ids))
+    (Thread/sleep 1)))
+
+(defn assert-acked [tracker & ids]
+  (assert-loop #(.isAcked tracker %) ids))
+
+(defn assert-failed [tracker & ids]
+  (assert-loop #(.isFailed tracker %) ids))
+
+(deftest test-timeout
+  (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
+    (let [feeder (feeder-spout ["field1"])
+          tracker (AckFailMapTracker.)
+          _ (.setAckFailDelegate feeder tracker)
+          topology (thrift/mk-topology
+                     {"1" (thrift/mk-spout-spec feeder)}
+                     {"2" (thrift/mk-bolt-spec {"1" :global} ack-every-other)})]      
+      (submit-local-topology (:nimbus cluster)
+                             "timeout-tester"
+                             {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
+                             topology)
+      (advance-cluster-time cluster 11)
+      (.feed feeder ["a"] 1)
+      (.feed feeder ["b"] 2)
+      (.feed feeder ["c"] 3)
+      (advance-cluster-time cluster 9)
+      (assert-acked tracker 1 3)
+      (is (not (.isFailed tracker 2)))
+      (advance-cluster-time cluster 12)
+      (assert-failed tracker 2)
+      )))
+
+(defn mk-validate-topology-1 []
+  (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                    {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+
+(defn mk-invalidate-topology-1 []
+  (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                    {"2" (thrift/mk-bolt-spec {"3" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+
+(defn mk-invalidate-topology-2 []
+  (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                    {"2" (thrift/mk-bolt-spec {"1" ["non-exists-field"]} (TestWordCounter.) :parallelism-hint 4)}))
+
+(defn mk-invalidate-topology-3 []
+  (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                    {"2" (thrift/mk-bolt-spec {["1" "non-exists-stream"] ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+
+(defn try-complete-wc-topology [cluster topology]
+  (try (do
+         (complete-topology cluster
+                            topology
+                            :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
+                            :storm-conf {TOPOLOGY-WORKERS 2})
+         false)
+       (catch InvalidTopologyException e true)))
+
+(deftest test-validate-topology-structure
+  (with-simulated-time-local-cluster [cluster :supervisors 4]
+    (let [any-error1? (try-complete-wc-topology cluster (mk-validate-topology-1))
+          any-error2? (try-complete-wc-topology cluster (mk-invalidate-topology-1))
+          any-error3? (try-complete-wc-topology cluster (mk-invalidate-topology-2))
+          any-error4? (try-complete-wc-topology cluster (mk-invalidate-topology-3))]
+      (is (= any-error1? false))
+      (is (= any-error2? true))
+      (is (= any-error3? true))
+      (is (= any-error4? true)))))
+
+(defbolt identity-bolt ["num"]
+  [tuple collector]
+  (emit-bolt! collector (.getValues tuple) :anchor tuple)
+  (ack! collector tuple))
+
+(deftest test-system-stream
+  ;; this test works because mocking a spout splits up the tuples evenly among the tasks
+  (with-simulated-time-local-cluster [cluster]
+      (let [topology (thrift/mk-topology
+                      {"1" (thrift/mk-spout-spec (TestWordSpout. true) :p 3)}
+                      {"2" (thrift/mk-bolt-spec {"1" ["word"] ["1" "__system"] :global} identity-bolt :p 1)
+                       })
+            results (complete-topology cluster
+                                       topology
+                                       :mock-sources {"1" [["a"] ["b"] ["c"]]}
+                                       :storm-conf {TOPOLOGY-WORKERS 2})]
+        (is (ms= [["a"] ["b"] ["c"] ["startup"] ["startup"] ["startup"]]
+                 (read-tuples results "2")))
+        )))
+
+(defn ack-tracking-feeder [fields]
+  (let [tracker (AckTracker.)]
+    [(doto (feeder-spout fields)
+       (.setAckFailDelegate tracker))
+     (fn [val]
+       (is (= (.getNumAcks tracker) val))
+       (.resetNumAcks tracker)
+       )]
+    ))
+
+(defbolt branching-bolt ["num"]
+  {:params [amt]}
+  [tuple collector]
+  (doseq [i (range amt)]
+    (emit-bolt! collector [i] :anchor tuple))
+  (ack! collector tuple))
+
+(defbolt agg-bolt ["num"] {:prepare true :params [amt]}
+  [conf context collector]
+  (let [seen (atom [])]
+    (bolt
+      (execute [tuple]
+        (swap! seen conj tuple)
+        (when (= (count @seen) amt)
+          (emit-bolt! collector [1] :anchor @seen)
+          (doseq [s @seen]
+            (ack! collector s))
+          (reset! seen [])
+          )))
+      ))
+
+(defbolt ack-bolt {}
+  [tuple collector]
+  (ack! collector tuple))
+
+(deftest test-acking
+  (with-tracked-cluster [cluster]
+    (let [[feeder1 checker1] (ack-tracking-feeder ["num"])
+          [feeder2 checker2] (ack-tracking-feeder ["num"])
+          [feeder3 checker3] (ack-tracking-feeder ["num"])
+          tracked (mk-tracked-topology
+                   cluster
+                   (topology
+                     {"1" (spout-spec feeder1)
+                      "2" (spout-spec feeder2)
+                      "3" (spout-spec feeder3)}
+                     {"4" (bolt-spec {"1" :shuffle} (branching-bolt 2))
+                      "5" (bolt-spec {"2" :shuffle} (branching-bolt 4))
+                      "6" (bolt-spec {"3" :shuffle} (branching-bolt 1))
+                      "7" (bolt-spec
+                            {"4" :shuffle
+                            "5" :shuffle
+                            "6" :shuffle}
+                            (agg-bolt 3))
+                      "8" (bolt-spec {"7" :shuffle} (branching-bolt 2))
+                      "9" (bolt-spec {"8" :shuffle} ack-bolt)}
+                     ))]
+      (submit-local-topology (:nimbus cluster)
+                             "acking-test1"
+                             {}
+                             (:topology tracked))
+      (advance-cluster-time cluster 11)
+      (.feed feeder1 [1])
+      (tracked-wait tracked 1)
+      (checker1 0)
+      (.feed feeder2 [1])
+      (tracked-wait tracked 1)
+      (checker1 1)
+      (checker2 1)
+      (.feed feeder1 [1])
+      (tracked-wait tracked 1)
+      (checker1 0)
+      (.feed feeder1 [1])
+      (tracked-wait tracked 1)
+      (checker1 1)
+      (.feed feeder3 [1])
+      (tracked-wait tracked 1)
+      (checker1 0)
+      (checker3 0)
+      (.feed feeder2 [1])
+      (tracked-wait tracked 1)
+      (checker1 1)
+      (checker2 1)
+      (checker3 1)
+      
+      )))
+
+(deftest test-ack-branching
+  (with-tracked-cluster [cluster]
+    (let [[feeder checker] (ack-tracking-feeder ["num"])
+          tracked (mk-tracked-topology
+                   cluster
+                   (topology
+                     {"1" (spout-spec feeder)}
+                     {"2" (bolt-spec {"1" :shuffle} identity-bolt)
+                      "3" (bolt-spec {"1" :shuffle} identity-bolt)
+                      "4" (bolt-spec
+                            {"2" :shuffle
+                             "3" :shuffle}
+                             (agg-bolt 4))}))]
+      (submit-local-topology (:nimbus cluster)
+                             "test-acking2"
+                             {}
+                             (:topology tracked))
+      (advance-cluster-time cluster 11)
+      (.feed feeder [1])
+      (tracked-wait tracked 1)
+      (checker 0)
+      (.feed feeder [1])
+      (tracked-wait tracked 1)
+      (checker 2)
+      )))
+
+(defbolt dup-anchor ["num"]
+  [tuple collector]
+  (emit-bolt! collector [1] :anchor [tuple tuple])
+  (ack! collector tuple))
+
+(def bolt-prepared? (atom false))
+(defbolt prepare-tracked-bolt [] {:prepare true}
+  [conf context collector]  
+  (reset! bolt-prepared? true)
+  (bolt
+   (execute [tuple]
+            (ack! collector tuple))))
+
+(def spout-opened? (atom false))
+(defspout open-tracked-spout ["val"]
+  [conf context collector]
+  (reset! spout-opened? true)
+  (spout
+   (nextTuple [])))
+
+(deftest test-submit-inactive-topology
+  (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
+    (let [feeder (feeder-spout ["field1"])
+          tracker (AckFailMapTracker.)
+          _ (.setAckFailDelegate feeder tracker)
+          topology (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec feeder)
+                     "2" (thrift/mk-spout-spec open-tracked-spout)}
+                    {"3" (thrift/mk-bolt-spec {"1" :global} prepare-tracked-bolt)})]
+      (reset! bolt-prepared? false)
+      (reset! spout-opened? false)      
+      
+      (submit-local-topology-with-opts (:nimbus cluster)
+        "test"
+        {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
+        topology
+        (SubmitOptions. TopologyInitialStatus/INACTIVE))
+      (advance-cluster-time cluster 11)
+      (.feed feeder ["a"] 1)
+      (advance-cluster-time cluster 9)
+      (is (not @bolt-prepared?))
+      (is (not @spout-opened?))        
+      (.activate (:nimbus cluster) "test")              
+      
+      (advance-cluster-time cluster 12)
+      (assert-acked tracker 1)
+      (is @bolt-prepared?)
+      (is @spout-opened?))))
+
+(deftest test-acking-self-anchor
+  (with-tracked-cluster [cluster]
+    (let [[feeder checker] (ack-tracking-feeder ["num"])
+          tracked (mk-tracked-topology
+                   cluster
+                   (topology
+                     {"1" (spout-spec feeder)}
+                     {"2" (bolt-spec {"1" :shuffle} dup-anchor)
+                      "3" (bolt-spec {"2" :shuffle} ack-bolt)}))]
+      (submit-local-topology (:nimbus cluster)
+                             "test"
+                             {}
+                             (:topology tracked))
+      (advance-cluster-time cluster 11)
+      (.feed feeder [1])
+      (tracked-wait tracked 1)
+      (checker 1)
+      (.feed feeder [1])
+      (.feed feeder [1])
+      (.feed feeder [1])
+      (tracked-wait tracked 3)
+      (checker 3)
+      )))
+
+;; (defspout ConstantSpout ["val"] {:prepare false}
+;;   [collector]
+;;   (Time/sleep 100)
+;;   (emit-spout! collector [1]))
+
+;; (def errored (atom false))
+;; (def restarted (atom false))
+
+;; (defbolt local-error-checker {} [tuple collector]
+;;   (when-not @errored
+;;     (reset! errored true)
+;;     (println "erroring")
+;;     (throw (RuntimeException.)))
+;;   (when-not @restarted (println "restarted"))
+;;   (reset! restarted true))
+
+;; (deftest test-no-halt-local-mode
+;;   (with-simulated-time-local-cluster [cluster]
+;;       (let [topology (topology
+;;                       {1 (spout-spec ConstantSpout)}
+;;                       {2 (bolt-spec {1 :shuffle} local-error-checker)
+;;                        })]
+;;         (submit-local-topology (:nimbus cluster)
+;;                                "test"
+;;                                {}
+;;                                topology)
+;;         (while (not @restarted)
+;;           (advance-time-ms! 100))
+;;         )))
+
+(defspout IncSpout ["word"]
+  [conf context collector]
+  (let [state (atom 0)]
+    (spout
+     (nextTuple []
+       (Thread/sleep 100)
+       (emit-spout! collector [@state] :id 1)         
+       )
+     (ack [id]
+       (swap! state inc))
+     )))
+
+
+(defspout IncSpout2 ["word"] {:params [prefix]}
+  [conf context collector]
+  (let [state (atom 0)]
+    (spout
+     (nextTuple []
+       (Thread/sleep 100)
+       (swap! state inc)
+       (emit-spout! collector [(str prefix "-" @state)])         
+       )
+     )))
+
+;; (deftest test-clojure-spout
+;;   (with-local-cluster [cluster]
+;;     (let [nimbus (:nimbus cluster)
+;;           top (topology
+;;                {1 (spout-spec IncSpout)}
+;;                {}
+;;                )]
+;;       (submit-local-topology nimbus
+;;                              "spout-test"
+;;                              {TOPOLOGY-DEBUG true
+;;                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 3}
+;;                              top)
+;;       (Thread/sleep 10000)
+;;       (.killTopology nimbus "spout-test")
+;;       (Thread/sleep 10000)
+;;       )))
+
+(deftest test-kryo-decorators-config
+  (with-simulated-time-local-cluster [cluster
+                                      :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
+                                                    TOPOLOGY-KRYO-DECORATORS ["this-is-overriden"]}]
+    (letlocals
+     (bind builder (TopologyBuilder.))
+     (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
+     (-> builder
+         (.setBolt "2"
+                   (TestConfBolt.
+                    {TOPOLOGY-KRYO-DECORATORS ["one" "two"]}))
+         (.shuffleGrouping "1"))
+     
+     (bind results
+           (complete-topology cluster
+                              (.createTopology builder)
+                              :storm-conf {TOPOLOGY-KRYO-DECORATORS ["one" "three"]}
+                              :mock-sources {"1" [[TOPOLOGY-KRYO-DECORATORS]]}))
+     (is (= {"topology.kryo.decorators" (list "one" "two" "three")}            
+            (->> (read-tuples results "2")
+                 (apply concat)
+                 (apply hash-map)))))))
+
+(deftest test-component-specific-config
+  (with-simulated-time-local-cluster [cluster
+                                      :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true}]
+    (letlocals
+     (bind builder (TopologyBuilder.))
+     (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
+     (-> builder
+         (.setBolt "2"
+                   (TestConfBolt.
+                    {"fake.config" 123
+                     TOPOLOGY-MAX-TASK-PARALLELISM 20
+                     TOPOLOGY-MAX-SPOUT-PENDING 30
+                     TOPOLOGY-KRYO-REGISTER [{"fake.type" "bad.serializer"}
+                                             {"fake.type2" "a.serializer"}]
+                     }))
+         (.shuffleGrouping "1")
+         (.setMaxTaskParallelism (int 2))
+         (.addConfiguration "fake.config2" 987)
+         )
+     
+
+     (bind results
+           (complete-topology cluster
+                              (.createTopology builder)
+                              :storm-conf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer" "fake.type3" "a.serializer3"}]}
+                              :mock-sources {"1" [["fake.config"]
+                                                  [TOPOLOGY-MAX-TASK-PARALLELISM]
+                                                  [TOPOLOGY-MAX-SPOUT-PENDING]
+                                                  ["fake.config2"]
+                                                  [TOPOLOGY-KRYO-REGISTER]
+                                                  ]}))
+     (is (= {"fake.config" 123
+             "fake.config2" 987
+             TOPOLOGY-MAX-TASK-PARALLELISM 2
+             TOPOLOGY-MAX-SPOUT-PENDING 30
+             TOPOLOGY-KRYO-REGISTER {"fake.type" "good.serializer"
+                                     "fake.type2" "a.serializer"
+                                     "fake.type3" "a.serializer3"}}
+            (->> (read-tuples results "2")
+                 (apply concat)
+                 (apply hash-map))
+            ))
+     )))
+
+(defbolt hooks-bolt ["emit" "ack" "fail" "executed"] {:prepare true}
+  [conf context collector]
+  (let [acked (atom 0)
+        failed (atom 0)
+        executed (atom 0)
+        emitted (atom 0)]
+    (.addTaskHook context
+                  (reify backtype.storm.hooks.ITaskHook
+                    (prepare [this conf context]
+                      )
+                    (cleanup [this]
+                      )
+                    (emit [this info]
+                      (swap! emitted inc))
+                    (boltAck [this info]
+                      (swap! acked inc))
+                    (boltFail [this info]
+                      (swap! failed inc))
+                    (boltExecute [this info]
+                      (swap! executed inc))
+                      ))
+    (bolt
+     (execute [tuple]
+        (emit-bolt! collector [@emitted @acked @failed @executed])
+        (if (= 0 (- @acked @failed))
+          (ack! collector tuple)
+          (fail! collector tuple))
+        ))))
+
+(deftest test-hooks
+  (with-simulated-time-local-cluster [cluster]
+    (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])))
+                              }
+                             {"2" (bolt-spec {"1" :shuffle}
+                                             hooks-bolt)
+                              })
+          results (complete-topology cluster
+                                     topology
+                                     :mock-sources {"1" [[1]
+                                                         [1]
+                                                         [1]
+                                                         [1]
+                                                         ]})]
+      (is (= [[0 0 0 0]
+              [2 1 0 1]
+              [4 1 1 2]
+              [6 2 1 3]]
+             (read-tuples results "2")
+             )))))
+
+(defbolt report-errors-bolt {}
+  [tuple collector]
+  (doseq [i (range (.getValue tuple 0))]
+    (report-error! collector (RuntimeException.)))
+  (ack! collector tuple))
+
+(deftest test-throttled-errors
+  (with-simulated-time
+    (with-tracked-cluster [cluster]
+      (let [state (:storm-cluster-state cluster)
+            [feeder checker] (ack-tracking-feeder ["num"])
+            tracked (mk-tracked-topology
+                     cluster
+                     (topology
+                       {"1" (spout-spec feeder)}
+                       {"2" (bolt-spec {"1" :shuffle} report-errors-bolt)}))
+            _       (submit-local-topology (:nimbus cluster)
+                                             "test-errors"
+                                             {TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS 10
+                                              TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL 4
+                                              TOPOLOGY-DEBUG true
+                                              }
+                                             (:topology tracked))
+            _ (advance-cluster-time cluster 11)
+            storm-id (get-storm-id state "test-errors")
+            errors-count (fn [] (count (.errors state storm-id "2")))]
+
+        (is (nil? (.last-error state storm-id "2")))
+
+        ;; so it launches the topology
+        (advance-cluster-time cluster 2)
+        (.feed feeder [6])
+        (tracked-wait tracked 1)
+        (is (= 4 (errors-count)))
+        (is (.last-error state storm-id "2"))
+        
+        (advance-time-secs! 5)
+        (.feed feeder [2])
+        (tracked-wait tracked 1)
+        (is (= 4 (errors-count)))
+        (is (.last-error state storm-id "2"))
+        
+        (advance-time-secs! 6)
+        (.feed feeder [2])
+        (tracked-wait tracked 1)
+        (is (= 6 (errors-count)))
+        (is (.last-error state storm-id "2"))
+        
+        (advance-time-secs! 6)
+        (.feed feeder [3])
+        (tracked-wait tracked 1)
+        (is (= 8 (errors-count)))
+        (is (.last-error state storm-id "2"))))))
+
+
+(deftest test-acking-branching-complex
+  ;; test acking with branching in the topology
+  )
+
+
+(deftest test-fields-grouping
+  ;; 1. put a shitload of random tuples through it and test that counts are right
+  ;; 2. test that different spouts with different phints group the same way
+  )
+
+(deftest test-all-grouping
+  )
+
+(deftest test-direct-grouping
+  )

http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/integration/backtype/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/backtype/storm/testing4j_test.clj b/storm-core/test/clj/integration/backtype/storm/testing4j_test.clj
new file mode 100644
index 0000000..5cdb182
--- /dev/null
+++ b/storm-core/test/clj/integration/backtype/storm/testing4j_test.clj
@@ -0,0 +1,212 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns integration.backtype.storm.testing4j-test
+  (:use [clojure.test])
+  (:use [backtype.storm config clojure testing util])
+  (:require [integration.backtype.storm.integration-test :as it])
+  (:require [backtype.storm.thrift :as thrift])
+  (:import [backtype.storm Testing Config ILocalCluster])
+  (:import [backtype.storm.tuple Values Tuple])
+  (:import [backtype.storm.utils Time Utils])
+  (:import [backtype.storm.testing MkClusterParam TestJob MockedSources TestWordSpout
+            TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
+            AckFailMapTracker MkTupleParam]))
+
+(deftest test-with-simulated-time
+  (is (= false (Time/isSimulating)))
+  (Testing/withSimulatedTime (fn []
+                               (is (= true (Time/isSimulating)))))
+  (is (= false (Time/isSimulating))))
+
+(deftest test-with-local-cluster
+  (let [mk-cluster-param (doto (MkClusterParam.)
+                           (.setSupervisors (int 2))
+                           (.setPortsPerSupervisor (int 5)))
+        daemon-conf (doto (Config.)
+                      (.put SUPERVISOR-ENABLE false)
+                      (.put TOPOLOGY-ACKER-EXECUTORS 0))]
+    (Testing/withLocalCluster mk-cluster-param (reify TestJob
+                                                 (^void run [this ^ILocalCluster cluster]
+                                                   (is (not (nil? cluster)))
+                                                   (is (not (nil? (.getState cluster))))
+                                                   (is (not (nil? (:nimbus (.getState cluster))))))))))
+
+(deftest test-with-simulated-time-local-cluster
+  (let [mk-cluster-param (doto (MkClusterParam.)
+                           (.setSupervisors (int 2)))
+        daemon-conf (doto (Config.)
+                      (.put SUPERVISOR-ENABLE false)
+                      (.put TOPOLOGY-ACKER-EXECUTORS 0))]
+    (is (not (Time/isSimulating)))
+    (Testing/withSimulatedTimeLocalCluster mk-cluster-param (reify TestJob
+                                                              (^void run [this ^ILocalCluster cluster]
+                                                                (is (not (nil? cluster)))
+                                                                (is (not (nil? (.getState cluster))))
+                                                                (is (not (nil? (:nimbus (.getState cluster)))))
+                                                                (is (Time/isSimulating)))))
+    (is (not (Time/isSimulating)))))
+
+(deftest test-complete-topology
+  (doseq [zmq-on? [true false]
+          :let [daemon-conf (doto (Config.)
+                              (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
+                mk-cluster-param (doto (MkClusterParam.)
+                                   (.setSupervisors (int 4))
+                                   (.setDaemonConf daemon-conf))]]
+    (Testing/withSimulatedTimeLocalCluster
+     (reify TestJob
+       (^void run [this ^ILocalCluster cluster]
+         (let [topology (thrift/mk-topology
+                         {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                         {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
+                          "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
+                          "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
+                          })
+               mocked-sources (doto (MockedSources.)
+                                (.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
+                                                                      (Values. (into-array ["bob"]))
+                                                                      (Values. (into-array ["joey"]))
+                                                                      (Values. (into-array ["nathan"]))])
+                                              ))
+               storm-conf (doto (Config.)
+                            (.setNumWorkers 2))
+               complete-topology-param (doto (CompleteTopologyParam.)
+                                         (.setMockedSources mocked-sources)
+                                         (.setStormConf storm-conf))
+               results (Testing/completeTopology cluster
+                                                 topology
+                                                 complete-topology-param)]
+           (is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
+                           (Testing/readTuples results "1")))
+           (is (Testing/multiseteq [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
+                           (read-tuples results "2")))
+           (is (= [[1] [2] [3] [4]]
+                  (Testing/readTuples results "3")))
+           (is (= [[1] [2] [3] [4]]
+                  (Testing/readTuples results "4")))
+           ))))))
+
+(deftest test-with-tracked-cluster
+  (Testing/withTrackedCluster
+   (reify TestJob
+     (^void run [this ^ILocalCluster cluster]
+       (let [[feeder checker] (it/ack-tracking-feeder ["num"])
+             tracked (Testing/mkTrackedTopology
+                      cluster
+                      (topology
+                       {"1" (spout-spec feeder)}
+                       {"2" (bolt-spec {"1" :shuffle} it/identity-bolt)
+                        "3" (bolt-spec {"1" :shuffle} it/identity-bolt)
+                        "4" (bolt-spec
+                             {"2" :shuffle
+                              "3" :shuffle}
+                             (it/agg-bolt 4))}))]
+         (.submitTopology cluster
+                          "test-acking2"
+                          (Config.)
+                          (.getTopology tracked))
+         (advance-cluster-time (.getState cluster) 11)
+         (.feed feeder [1])
+         (Testing/trackedWait tracked (int 1))
+         (checker 0)
+         (.feed feeder [1])
+         (Testing/trackedWait tracked (int 1))
+         (checker 2)
+         )))))
+
+(deftest test-advance-cluster-time
+  (let [daemon-conf (doto (Config.)
+                      (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true))
+        mk-cluster-param (doto (MkClusterParam.)
+                           (.setDaemonConf daemon-conf))]
+    (Testing/withSimulatedTimeLocalCluster
+     mk-cluster-param
+     (reify TestJob
+       (^void run [this ^ILocalCluster cluster]
+         (let [feeder (feeder-spout ["field1"])
+               tracker (AckFailMapTracker.)
+               _ (.setAckFailDelegate feeder tracker)
+               topology (thrift/mk-topology
+                         {"1" (thrift/mk-spout-spec feeder)}
+                         {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
+               storm-conf (doto (Config.)
+                            (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))]
+           (.submitTopology cluster
+                            "timeout-tester"
+                            storm-conf
+                            topology)
+           (.feed feeder ["a"] 1)
+           (.feed feeder ["b"] 2)
+           (.feed feeder ["c"] 3)
+           (Testing/advanceClusterTime cluster (int 9))
+           (it/assert-acked tracker 1 3)
+           (is (not (.isFailed tracker 2)))
+           (Testing/advanceClusterTime cluster (int 12))
+           (it/assert-failed tracker 2)
+           ))))))
+
+(deftest test-disable-tuple-timeout
+  (let [daemon-conf (doto (Config.)
+                      (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))
+        mk-cluster-param (doto (MkClusterParam.)
+                           (.setDaemonConf daemon-conf))]
+    (Testing/withSimulatedTimeLocalCluster
+      mk-cluster-param
+      (reify TestJob
+        (^void run [this ^ILocalCluster cluster]
+          (let [feeder (feeder-spout ["field1"])
+                tracker (AckFailMapTracker.)
+                _ (.setAckFailDelegate feeder tracker)
+                topology (thrift/mk-topology
+                           {"1" (thrift/mk-spout-spec feeder)}
+                           {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
+                storm-conf (doto (Config.)
+                             (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10)
+                             (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))]
+            (.submitTopology cluster
+              "disable-timeout-tester"
+              storm-conf
+              topology)
+            (.feed feeder ["a"] 1)
+            (.feed feeder ["b"] 2)
+            (.feed feeder ["c"] 3)
+            (Testing/advanceClusterTime cluster (int 9))
+            (it/assert-acked tracker 1 3)
+            (is (not (.isFailed tracker 2)))
+            (Testing/advanceClusterTime cluster (int 12))
+            (is (not (.isFailed tracker 2)))
+            ))))))
+
+(deftest test-test-tuple
+  (letlocals
+   ;; test the one-param signature
+   (bind ^Tuple tuple (Testing/testTuple ["james" "bond"]))
+   (is (= ["james" "bond"] (.getValues tuple)))
+   (is (= Utils/DEFAULT_STREAM_ID (.getSourceStreamId tuple)))
+   (is (= ["field1" "field2"] (-> tuple .getFields .toList)))
+   (is (= "component" (.getSourceComponent tuple)))
+
+   ;; test the two-params signature
+   (bind mk-tuple-param (MkTupleParam.))
+   (doto mk-tuple-param
+     (.setStream "test-stream")
+     (.setComponent "test-component")
+     (.setFields (into-array String ["fname" "lname"])))
+   (bind ^Tuple tuple (Testing/testTuple ["james" "bond"] mk-tuple-param))
+   (is (= ["james" "bond"] (.getValues tuple)))
+   (is (= "test-stream" (.getSourceStreamId tuple)))
+   (is (= ["fname" "lname"] (-> tuple .getFields .toList)))
+   (is (= "test-component" (.getSourceComponent tuple)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d4fcc0fd/storm-core/test/clj/integration/storm/trident/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/storm/trident/integration_test.clj b/storm-core/test/clj/integration/storm/trident/integration_test.clj
new file mode 100644
index 0000000..bcd8173
--- /dev/null
+++ b/storm-core/test/clj/integration/storm/trident/integration_test.clj
@@ -0,0 +1,292 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns integration.storm.trident.integration-test
+  (:use [clojure test])
+  (:require [backtype.storm [testing :as t]])
+  (:import [storm.trident.testing Split CountAsAggregator StringLength TrueFilter
+            MemoryMapState$Factory])
+  (:import [storm.trident.state StateSpec])
+  (:import [storm.trident.operation.impl CombinerAggStateUpdater])
+  (:use [storm.trident testing])
+  (:use [backtype.storm util]))
+  
+(bootstrap-imports)
+
+(deftest test-memory-map-get-tuples
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind feeder (feeder-spout ["sentence"]))
+        (bind word-counts
+          (-> topo
+              (.newStream "tester" feeder)
+              (.each (fields "sentence") (Split.) (fields "word"))
+              (.groupBy (fields "word"))
+              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+              (.parallelismHint 6)
+              ))       
+        (-> topo
+            (.newDRPCStream "all-tuples" drpc)
+            (.broadcast)
+            (.stateQuery word-counts (fields "args") (TupleCollectionGet.) (fields "word" "count"))
+            (.project (fields "word" "count")))
+        (with-topology [cluster topo]
+          (feed feeder [["hello the man said"] ["the"]])
+          (is (= #{["hello" 1] ["said" 1] ["the" 2] ["man" 1]}
+                 (into #{} (exec-drpc drpc "all-tuples" "man"))))
+          (feed feeder [["the foo"]])
+          (is (= #{["hello" 1] ["said" 1] ["the" 3] ["man" 1] ["foo" 1]}
+                 (into #{} (exec-drpc drpc "all-tuples" "man")))))))))
+
+(deftest test-word-count
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind feeder (feeder-spout ["sentence"]))
+        (bind word-counts
+          (-> topo
+              (.newStream "tester" feeder)
+              (.each (fields "sentence") (Split.) (fields "word"))
+              (.groupBy (fields "word"))
+              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+              (.parallelismHint 6)
+              ))
+        (-> topo
+            (.newDRPCStream "words" drpc)
+            (.each (fields "args") (Split.) (fields "word"))
+            (.groupBy (fields "word"))
+            (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
+            (.aggregate (fields "count") (Sum.) (fields "sum"))
+            (.project (fields "sum")))
+        (with-topology [cluster topo]
+          (feed feeder [["hello the man said"] ["the"]])
+          (is (= [[2]] (exec-drpc drpc "words" "the")))
+          (is (= [[1]] (exec-drpc drpc "words" "hello")))
+          (feed feeder [["the man on the moon"] ["where are you"]])
+          (is (= [[4]] (exec-drpc drpc "words" "the")))
+          (is (= [[2]] (exec-drpc drpc "words" "man")))
+          (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
+          )))))
+
+;; this test reproduces a bug where committer spouts freeze processing when 
+;; there's at least one repartitioning after the spout
+(deftest test-word-count-committer-spout
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind feeder (feeder-committer-spout ["sentence"]))
+        (.setWaitToEmit feeder false) ;;this causes lots of empty batches
+        (bind word-counts
+          (-> topo
+              (.newStream "tester" feeder)
+              (.parallelismHint 2)
+              (.each (fields "sentence") (Split.) (fields "word"))
+              (.groupBy (fields "word"))
+              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+              (.parallelismHint 6)
+              ))
+        (-> topo
+            (.newDRPCStream "words" drpc)
+            (.each (fields "args") (Split.) (fields "word"))
+            (.groupBy (fields "word"))
+            (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
+            (.aggregate (fields "count") (Sum.) (fields "sum"))
+            (.project (fields "sum")))
+        (with-topology [cluster topo]
+          (feed feeder [["hello the man said"] ["the"]])
+          (is (= [[2]] (exec-drpc drpc "words" "the")))
+          (is (= [[1]] (exec-drpc drpc "words" "hello")))
+          (Thread/sleep 1000) ;; this is necessary to reproduce the bug where committer spouts freeze processing
+          (feed feeder [["the man on the moon"] ["where are you"]])
+          (is (= [[4]] (exec-drpc drpc "words" "the")))
+          (is (= [[2]] (exec-drpc drpc "words" "man")))
+          (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
+          (feed feeder [["the the"]])
+          (is (= [[6]] (exec-drpc drpc "words" "the")))
+          (feed feeder [["the"]])
+          (is (= [[7]] (exec-drpc drpc "words" "the")))
+          )))))
+
+
+(deftest test-count-agg
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (-> topo
+            (.newDRPCStream "numwords" drpc)
+            (.each (fields "args") (Split.) (fields "word"))
+            (.aggregate (CountAsAggregator.) (fields "count"))
+            (.parallelismHint 2) ;;this makes sure batchGlobal is working correctly
+            (.project (fields "count")))
+        (with-topology [cluster topo]
+          (doseq [i (range 100)]
+            (is (= [[1]] (exec-drpc drpc "numwords" "the"))))
+          (is (= [[0]] (exec-drpc drpc "numwords" "")))
+          (is (= [[8]] (exec-drpc drpc "numwords" "1 2 3 4 5 6 7 8")))
+          )))))
+          
+(deftest test-split-merge
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
+        (bind s1
+          (-> drpc-stream
+              (.each (fields "args") (Split.) (fields "word"))
+              (.project (fields "word"))))
+        (bind s2
+          (-> drpc-stream
+              (.each (fields "args") (StringLength.) (fields "len"))
+              (.project (fields "len"))))
+
+        (.merge topo [s1 s2])
+        (with-topology [cluster topo]
+          (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
+          (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
+          )))))
+
+(deftest test-multiple-groupings-same-stream
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
+                                   (.each (fields "args") (TrueFilter.))))
+        (bind s1
+          (-> drpc-stream
+              (.groupBy (fields "args"))
+              (.aggregate (CountAsAggregator.) (fields "count"))))
+        (bind s2
+          (-> drpc-stream
+              (.groupBy (fields "args"))
+              (.aggregate (CountAsAggregator.) (fields "count"))))
+
+        (.merge topo [s1 s2])
+        (with-topology [cluster topo]
+          (is (t/ms= [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the")))
+          (is (t/ms= [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa")))
+          )))))
+          
+(deftest test-multi-repartition
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
+                                   (.each (fields "args") (Split.) (fields "word"))
+                                   (.localOrShuffle)
+                                   (.shuffle)
+                                   (.aggregate (CountAsAggregator.) (fields "count"))
+                                   ))
+        (with-topology [cluster topo]
+          (is (t/ms= [[2]] (exec-drpc drpc "tester" "the man")))
+          (is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa")))
+          )))))
+
+(deftest test-stream-projection-validation
+  (t/with-local-cluster [cluster]
+    (letlocals
+     (bind feeder (feeder-committer-spout ["sentence"]))
+     (bind topo (TridentTopology.))
+     ;; valid projection fields will not throw exceptions
+     (bind word-counts
+           (-> topo
+               (.newStream "tester" feeder)
+               (.each (fields "sentence") (Split.) (fields "word"))
+               (.groupBy (fields "word"))
+               (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+               (.parallelismHint 6)
+               ))
+     (bind stream (-> topo
+                      (.newStream "tester" feeder)))
+     ;; test .each
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence1") (Split.) (fields "word")))))
+     ;; test .groupBy
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence") (Split.) (fields "word"))
+                      (.groupBy (fields "word1")))))
+     ;; test .aggregate
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence") (Split.) (fields "word"))
+                      (.groupBy (fields "word"))
+                      (.aggregate (fields "word1") (Count.) (fields "count")))))
+     ;; test .project
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.project (fields "sentence1")))))
+     ;; test .partitionBy
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.partitionBy (fields "sentence1")))))
+     ;; test .partitionAggregate
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence") (Split.) (fields "word"))
+                      (.partitionAggregate (fields "word1") (Count.) (fields "count")))))
+     ;; test .persistentAggregate
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence") (Split.) (fields "word"))
+                      (.groupBy (fields "word"))
+                      (.persistentAggregate (StateSpec. (MemoryMapState$Factory.)) (fields "non-existent") (Count.) (fields "count")))))
+     ;; test .partitionPersist
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence") (Split.) (fields "word"))
+                      (.groupBy (fields "word"))
+                      (.partitionPersist (StateSpec. (MemoryMapState$Factory.))
+                                         (fields "non-existent")
+                                         (CombinerAggStateUpdater. (Count.))
+                                         (fields "count")))))
+     ;; test .stateQuery
+     (with-drpc [drpc]
+       (is (thrown? IllegalArgumentException
+                    (-> topo
+                        (.newDRPCStream "words" drpc)
+                        (.each (fields "args") (Split.) (fields "word"))
+                        (.groupBy (fields "word"))
+                        (.stateQuery word-counts (fields "word1") (MapGet.) (fields "count"))))))
+     )))
+
+;; (deftest test-split-merge
+;;   (t/with-local-cluster [cluster]
+;;     (with-drpc [drpc]
+;;       (letlocals
+;;         (bind topo (TridentTopology.))
+;;         (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
+;;         (bind s1
+;;           (-> drpc-stream
+;;               (.each (fields "args") (Split.) (fields "word"))
+;;               (.project (fields "word"))))
+;;         (bind s2
+;;           (-> drpc-stream
+;;               (.each (fields "args") (StringLength.) (fields "len"))
+;;               (.project (fields "len"))))
+;; 
+;;         (.merge topo [s1 s2])
+;;         (with-topology [cluster topo]
+;;           (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
+;;           (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
+;;           )))))


[2/6] storm git commit: STORM-1179: Create Maven Profiles for Integration Tests - Mark Java Unit Test as Integration Test

Posted by sr...@apache.org.
STORM-1179: Create Maven Profiles for Integration Tests
- Mark Java Unit Test as Integration Test


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1c159fe
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1c159fe
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1c159fe

Branch: refs/heads/master
Commit: a1c159fe19dc28c90f5d27534add765a6520daf7
Parents: d4fcc0f
Author: Hugo Louro <hm...@gmail.com>
Authored: Wed Dec 9 16:51:34 2015 -0800
Committer: Hugo Louro <hm...@gmail.com>
Committed: Wed Dec 9 18:58:22 2015 -0800

----------------------------------------------------------------------
 external/storm-elasticsearch/pom.xml              |  7 ++++++-
 .../bolt/AbstractEsBoltIntegrationTest.java       |  7 +++++--
 .../storm/elasticsearch/bolt/EsIndexBoltTest.java |  3 +++
 .../bolt/EsLookupBoltIntegrationTest.java         | 18 ++++++++++--------
 .../elasticsearch/bolt/EsPercolateBoltTest.java   |  5 ++++-
 5 files changed, 28 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a1c159fe/external/storm-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml
index fd263e6..afe2e69 100644
--- a/external/storm-elasticsearch/pom.xml
+++ b/external/storm-elasticsearch/pom.xml
@@ -15,7 +15,8 @@
  See the License for the specific language governing permissions and
  limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -102,6 +103,10 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+            </plugin>
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a1c159fe/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
index 9bed459..5121b6a 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
@@ -17,8 +17,7 @@
  */
 package org.apache.storm.elasticsearch.bolt;
 
-import java.io.File;
-
+import backtype.storm.testing.IntegrationTest;
 import org.apache.commons.io.FileUtils;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
@@ -33,10 +32,14 @@ import org.elasticsearch.node.Node;
 import org.elasticsearch.node.NodeBuilder;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import java.io.File;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+@Category(IntegrationTest.class)
 public abstract class AbstractEsBoltIntegrationTest<Bolt extends AbstractEsBolt> extends AbstractEsBoltTest<Bolt> {
 
     protected static Node node;

http://git-wip-us.apache.org/repos/asf/storm/blob/a1c159fe/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index ba87616..13fade6 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.elasticsearch.bolt;
 
+import backtype.storm.testing.IntegrationTest;
 import backtype.storm.tuple.Tuple;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
@@ -25,9 +26,11 @@ import org.elasticsearch.action.count.CountResponse;
 import org.elasticsearch.index.query.TermQueryBuilder;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.mockito.Mockito.verify;
 
+@Category(IntegrationTest.class)
 public class EsIndexBoltTest extends AbstractEsBoltIntegrationTest<EsIndexBolt> {
 
     @Test

http://git-wip-us.apache.org/repos/asf/storm/blob/a1c159fe/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
index d100b84..e5016c8 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
@@ -17,10 +17,11 @@
  */
 package org.apache.storm.elasticsearch.bolt;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-
+import backtype.storm.testing.IntegrationTest;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
 import org.apache.storm.elasticsearch.ElasticsearchGetRequest;
 import org.apache.storm.elasticsearch.EsLookupResultOutput;
 import org.apache.storm.elasticsearch.common.EsConfig;
@@ -29,21 +30,22 @@ import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.GetResponse;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.ITuple;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.verify;
 
+@Category(IntegrationTest.class)
 @RunWith(MockitoJUnitRunner.class)
 public class EsLookupBoltIntegrationTest extends AbstractEsBoltIntegrationTest<EsLookupBolt> {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a1c159fe/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index ef3d6bb..e4f2be0 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.elasticsearch.bolt;
 
+import backtype.storm.testing.IntegrationTest;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import org.apache.storm.elasticsearch.common.EsConfig;
@@ -24,10 +25,12 @@ import org.apache.storm.elasticsearch.common.EsTestUtil;
 import org.apache.storm.elasticsearch.common.EsTupleMapper;
 import org.elasticsearch.action.percolate.PercolateResponse;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
-import static org.mockito.Mockito.verify;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
 
+@Category(IntegrationTest.class)
 public class EsPercolateBoltTest extends AbstractEsBoltIntegrationTest<EsPercolateBolt> {
 
     @Override