You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/01/05 15:02:54 UTC

[4/7] storm git commit: STORM-1307: Port testing4j_test.clj to Java

STORM-1307: Port testing4j_test.clj to Java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/02e0e75c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/02e0e75c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/02e0e75c

Branch: refs/heads/master
Commit: 02e0e75c6bbbb580b6a62dbe394ade7298a9a1dd
Parents: d6a1e73
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Wed Dec 12 01:36:09 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Dec 19 16:48:38 2018 +0100

----------------------------------------------------------------------
 .../org/apache/storm/testing4j_test.clj         |  243 -----
 .../apache/storm/TopologyIntegrationTest.java   | 1011 ------------------
 .../storm/integration/AckEveryOtherBolt.java    |   48 +
 .../storm/integration/AckTrackingFeeder.java    |   59 +
 .../org/apache/storm/integration/AggBolt.java   |   59 +
 .../apache/storm/integration/AssertLoop.java    |   51 +
 .../apache/storm/integration/IdentityBolt.java  |   47 +
 .../apache/storm/integration/TestingTest.java   |  214 ++++
 .../integration/TopologyIntegrationTest.java    |  904 ++++++++++++++++
 9 files changed, 1382 insertions(+), 1254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
deleted file mode 100644
index 75a0432..0000000
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ /dev/null
@@ -1,243 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns integration.org.apache.storm.testing4j-test
-  (:use [clojure.test])
-  (:use [org.apache.storm daemon-config config util])
-  (:use [org.apache.storm clojure])
-  (:import [org.apache.storm Testing Config]
-           [org.apache.storm.generated GlobalStreamId])           
-  (:import [org.apache.storm.tuple Values])
-  (:import [org.apache.storm.utils Time])
-  (:import [org.apache.storm.testing MkClusterParam TestJob MockedSources TestWordSpout FeederSpout
-            TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
-            AckFailMapTracker AckTracker MkTupleParam])
-  (:import [org.apache.storm.utils Utils])
-  (:import [org.apache.storm Thrift ILocalCluster]))
-
-; If you are working on porting this test to Java, be aware that there are ports of these methods/bolts in TopologyIntegrationTest.java.
-(defn assert-loop 
-([afn ids] (assert-loop afn ids 10))
-([afn ids timeout-secs]
-  (loop [remaining-time (* timeout-secs 1000)]
-    (let [start-time (System/currentTimeMillis)
-          assertion-is-true (every? afn ids)]
-      (if (or assertion-is-true (neg? remaining-time))
-        (is assertion-is-true)
-        (do
-          (Thread/sleep 1)
-          (recur (- remaining-time (- (System/currentTimeMillis) start-time)))
-        ))))))
-
-(defn assert-acked [tracker & ids]
-  (assert-loop #(.isAcked tracker %) ids))
-
-(defn assert-failed [tracker & ids]
-  (assert-loop #(.isFailed tracker %) ids))
-  
-(defbolt agg-bolt ["num"] {:prepare true :params [amt]}
-  [conf context collector]
-  (let [seen (atom [])]
-    (bolt
-      (execute [tuple]
-        (swap! seen conj tuple)
-        (when (= (count @seen) amt)
-          (emit-bolt! collector [1] :anchor @seen)
-          (doseq [s @seen]
-            (ack! collector s))
-          (reset! seen [])
-          )))
-      ))
-      
-(defbolt identity-bolt ["num"]
-  [tuple collector]
-  (emit-bolt! collector (.getValues tuple) :anchor tuple)
-  (ack! collector tuple))
-      
-(defbolt ack-every-other {} {:prepare true}
-  [conf context collector]
-  (let [state (atom -1)]
-    (bolt
-      (execute [tuple]
-        (let [val (swap! state -)]
-          (when (pos? val)
-            (ack! collector tuple)
-            ))))))
-
-(defn ack-tracking-feeder [fields]
-  (let [tracker (AckTracker.)]
-    [(doto (FeederSpout. fields)
-       (.setAckFailDelegate tracker))
-     (fn [val]
-       (is (= (.getNumAcks tracker) val))
-       (.resetNumAcks tracker)
-       )]
-    ))
-; End section with methods/bolts that also exist in Java.
-  
-(deftest test-with-simulated-time
-  (is (= false (Time/isSimulating)))
-  (Testing/withSimulatedTime (fn []
-                               (is (= true (Time/isSimulating)))))
-  (is (= false (Time/isSimulating))))
-
-(deftest test-with-local-cluster
-  (let [mk-cluster-param (doto (MkClusterParam.)
-                           (.setSupervisors (int 2))
-                           (.setPortsPerSupervisor (int 5)))
-        daemon-conf (doto (Config.)
-                      (.put SUPERVISOR-ENABLE false)
-                      (.put TOPOLOGY-ACKER-EXECUTORS 0))]
-    (Testing/withLocalCluster mk-cluster-param (reify TestJob
-                                                 (^void run [this ^ILocalCluster cluster]
-                                                   (is (not (nil? cluster)))
-                                                   (is (not (nil? (.getNimbus cluster)))))))))
-
-(deftest test-with-simulated-time-local-cluster
-  (let [mk-cluster-param (doto (MkClusterParam.)
-                           (.setSupervisors (int 2)))
-        daemon-conf (doto (Config.)
-                      (.put SUPERVISOR-ENABLE false)
-                      (.put TOPOLOGY-ACKER-EXECUTORS 0))]
-    (is (not (Time/isSimulating)))
-    (Testing/withSimulatedTimeLocalCluster mk-cluster-param (reify TestJob
-                                                              (^void run [this ^ILocalCluster cluster]
-                                                                (is (not (nil? cluster)))
-                                                                (is (not (nil? (.getNimbus cluster))))
-                                                                (is (Time/isSimulating)))))
-    (is (not (Time/isSimulating)))))
-
-(deftest test-with-tracked-cluster
-  (Testing/withTrackedCluster
-   (reify TestJob
-     (^void run [this ^ILocalCluster cluster]
-       (let [[feeder checker] (ack-tracking-feeder ["num"])
-             tracked (Testing/mkTrackedTopology
-                      cluster
-                      (Thrift/buildTopology
-                       {"1" (Thrift/prepareSpoutDetails feeder)}
-                       {"2" (Thrift/prepareBoltDetails
-                              {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
-                               (Thrift/prepareShuffleGrouping)}
-                              identity-bolt)
-                        "3" (Thrift/prepareBoltDetails
-                              {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
-                               (Thrift/prepareShuffleGrouping)}
-                              identity-bolt)
-                        "4" (Thrift/prepareBoltDetails
-                             {(GlobalStreamId. "2" Utils/DEFAULT_STREAM_ID)
-                              (Thrift/prepareShuffleGrouping)
-                              (GlobalStreamId. "3" Utils/DEFAULT_STREAM_ID)
-                              (Thrift/prepareShuffleGrouping)}
-                             (agg-bolt 4))}))]
-         (.submitTopology cluster
-                          "test-acking2"
-                          (Config.)
-                          (.getTopology tracked))
-         (.advanceClusterTime cluster (int 11))
-         (.feed feeder [1])
-         (Testing/trackedWait tracked (int 1))
-         (checker 0)
-         (.feed feeder [1])
-         (Testing/trackedWait tracked (int 1))
-         (checker 2)
-         )))))
-
-(deftest test-advance-cluster-time
-  (let [daemon-conf (doto (Config.)
-                      (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true))
-        mk-cluster-param (doto (MkClusterParam.)
-                           (.setDaemonConf daemon-conf))]
-    (Testing/withSimulatedTimeLocalCluster
-     mk-cluster-param
-     (reify TestJob
-       (^void run [this ^ILocalCluster cluster]
-         (let [feeder (FeederSpout. ["field1"])
-               tracker (AckFailMapTracker.)
-               _ (.setAckFailDelegate feeder tracker)
-               topology (Thrift/buildTopology
-                         {"1" (Thrift/prepareSpoutDetails feeder)}
-                         {"2" (Thrift/prepareBoltDetails
-                                {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
-                                 (Thrift/prepareGlobalGrouping)}
-                                ack-every-other)})
-               storm-conf (doto (Config.)
-                            (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))]
-           (.submitTopology cluster
-                            "timeout-tester"
-                            storm-conf
-                            topology)
-           (.feed feeder ["a"] 1)
-           (.feed feeder ["b"] 2)
-           (.feed feeder ["c"] 3)
-           (Testing/advanceClusterTime cluster (int 9))
-           (assert-acked tracker 1 3)
-           (is (not (.isFailed tracker 2)))
-           (Testing/advanceClusterTime cluster (int 12))
-           (assert-failed tracker 2)
-           ))))))
-
-(deftest test-disable-tuple-timeout
-  (let [daemon-conf (doto (Config.)
-                      (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))
-        mk-cluster-param (doto (MkClusterParam.)
-                           (.setDaemonConf daemon-conf))]
-    (Testing/withSimulatedTimeLocalCluster
-      mk-cluster-param
-      (reify TestJob
-        (^void run [this ^ILocalCluster cluster]
-          (let [feeder (FeederSpout. ["field1"])
-                tracker (AckFailMapTracker.)
-                _ (.setAckFailDelegate feeder tracker)
-                topology (Thrift/buildTopology
-                           {"1" (Thrift/prepareSpoutDetails feeder)}
-                           {"2" (Thrift/prepareBoltDetails
-                                  {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
-                                   (Thrift/prepareGlobalGrouping)}
-                                  ack-every-other)})
-                storm-conf (doto (Config.)
-                             (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10)
-                             (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))]
-            (.submitTopology cluster
-              "disable-timeout-tester"
-              storm-conf
-              topology)
-            (.feed feeder ["a"] 1)
-            (.feed feeder ["b"] 2)
-            (.feed feeder ["c"] 3)
-            (Testing/advanceClusterTime cluster (int 9))
-            (assert-acked tracker 1 3)
-            (is (not (.isFailed tracker 2)))
-            (Testing/advanceClusterTime cluster (int 12))
-            (is (not (.isFailed tracker 2)))
-            ))))))
-
-(deftest test-test-tuple
-  (testing "one-param signature"
-    (let [tuple (Testing/testTuple ["james" "bond"])]
-      (is (= ["james" "bond"] (.getValues tuple)))
-      (is (= Utils/DEFAULT_STREAM_ID (.getSourceStreamId tuple)))
-      (is (= ["field1" "field2"] (-> tuple .getFields .toList)))
-      (is (= "component" (.getSourceComponent tuple)))))
-   (testing "two-params signature"
-    (let [mk-tuple-param (doto (MkTupleParam.)
-                           (.setStream "test-stream")
-                           (.setComponent "test-component")
-                           (.setFields (into-array String ["fname" "lname"])))
-          tuple (Testing/testTuple ["james" "bond"] mk-tuple-param)]
-      (is (= ["james" "bond"] (.getValues tuple)))
-      (is (= "test-stream" (.getSourceStreamId tuple)))
-      (is (= ["fname" "lname"] (-> tuple .getFields .toList)))
-      (is (= "test-component" (.getSourceComponent tuple))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/jvm/org/apache/storm/TopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/TopologyIntegrationTest.java b/storm-core/test/jvm/org/apache/storm/TopologyIntegrationTest.java
deleted file mode 100644
index 7cc7f65..0000000
--- a/storm-core/test/jvm/org/apache/storm/TopologyIntegrationTest.java
+++ /dev/null
@@ -1,1011 +0,0 @@
-/*
- * Copyright 2018 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm;
-
-import static org.apache.storm.utils.PredicateMatcher.matchesPredicate;
-import static org.hamcrest.CoreMatchers.everyItem;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.storm.Thrift.BoltDetails;
-import org.apache.storm.Thrift.SpoutDetails;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.generated.Grouping;
-import org.apache.storm.generated.InvalidTopologyException;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.generated.SubmitOptions;
-import org.apache.storm.generated.TopologyInitialStatus;
-import org.apache.storm.hooks.BaseTaskHook;
-import org.apache.storm.hooks.info.BoltAckInfo;
-import org.apache.storm.hooks.info.BoltExecuteInfo;
-import org.apache.storm.hooks.info.BoltFailInfo;
-import org.apache.storm.hooks.info.EmitInfo;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.testing.AckFailMapTracker;
-import org.apache.storm.testing.AckTracker;
-import org.apache.storm.testing.CompleteTopologyParam;
-import org.apache.storm.testing.FeederSpout;
-import org.apache.storm.testing.FixedTuple;
-import org.apache.storm.testing.IntegrationTest;
-import org.apache.storm.testing.MockedSources;
-import org.apache.storm.testing.TestAggregatesCounter;
-import org.apache.storm.testing.TestConfBolt;
-import org.apache.storm.testing.TestGlobalCount;
-import org.apache.storm.testing.TestPlannerSpout;
-import org.apache.storm.testing.TestWordCounter;
-import org.apache.storm.testing.TestWordSpout;
-import org.apache.storm.testing.TrackedTopology;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.awaitility.Awaitility;
-import org.awaitility.core.ConditionTimeoutException;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-
-@IntegrationTest
-public class TopologyIntegrationTest {
-
-    @ParameterizedTest
-    @ValueSource(strings = {"true", "false"})
-    public void testBasicTopology(boolean useLocalMessaging) throws Exception {
-        try (LocalCluster cluster = new LocalCluster.Builder()
-            .withSimulatedTime()
-            .withSupervisors(4)
-            .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
-            .build()) {
-            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("2",
-                Thrift.prepareBoltDetails(
-                    Collections.singletonMap(
-                        Utils.getGlobalStreamId("1", null),
-                        Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
-                    new TestWordCounter(), 4));
-            boltMap.put("3",
-                Thrift.prepareBoltDetails(
-                    Collections.singletonMap(
-                        Utils.getGlobalStreamId("1", null),
-                        Thrift.prepareGlobalGrouping()),
-                    new TestGlobalCount()));
-            boltMap.put("4",
-                Thrift.prepareBoltDetails(
-                    Collections.singletonMap(
-                        Utils.getGlobalStreamId("2", null),
-                        Thrift.prepareGlobalGrouping()),
-                    new TestAggregatesCounter()));
-            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
-
-            Map<String, Object> stormConf = new HashMap<>();
-            stormConf.put(Config.TOPOLOGY_WORKERS, 2);
-            stormConf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
-
-            List<FixedTuple> testTuples = Arrays.asList("nathan", "bob", "joey", "nathan").stream()
-                .map(value -> new FixedTuple(new Values(value)))
-                .collect(Collectors.toList());
-
-            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
-
-            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
-            completeTopologyParams.setMockedSources(mockedSources);
-            completeTopologyParams.setStormConf(stormConf);
-
-            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
-
-            assertThat(Testing.readTuples(results, "1"), containsInAnyOrder(
-                new Values("nathan"),
-                new Values("nathan"),
-                new Values("bob"),
-                new Values("joey")));
-            assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
-                new Values("nathan", 1),
-                new Values("nathan", 2),
-                new Values("bob", 1),
-                new Values("joey", 1)
-            ));
-            assertThat(Testing.readTuples(results, "3"), contains(
-                new Values(1),
-                new Values(2),
-                new Values(3),
-                new Values(4)
-            ));
-            assertThat(Testing.readTuples(results, "4"), contains(
-                new Values(1),
-                new Values(2),
-                new Values(3),
-                new Values(4)
-            ));
-        }
-    }
-
-    private static class EmitTaskIdBolt extends BaseRichBolt {
-
-        private int taskIndex;
-        private OutputCollector collector;
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("tid"));
-        }
-
-        @Override
-        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
-            this.collector = collector;
-            this.taskIndex = context.getThisTaskIndex();
-        }
-
-        @Override
-        public void execute(Tuple input) {
-            collector.emit(input, new Values(taskIndex));
-            collector.ack(input);
-        }
-
-    }
-
-    @Test
-    public void testMultiTasksPerCluster() throws Exception {
-        try (LocalCluster cluster = new LocalCluster.Builder()
-            .withSimulatedTime()
-            .withSupervisors(4)
-            .build()) {
-            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true)));
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("2",
-                Thrift.prepareBoltDetails(
-                    Collections.singletonMap(
-                        Utils.getGlobalStreamId("1", null),
-                        Thrift.prepareAllGrouping()),
-                    new EmitTaskIdBolt(), 3, Collections.singletonMap(Config.TOPOLOGY_TASKS, 6)));
-            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
-
-            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", Collections.singletonList(new FixedTuple(new Values("a")))));
-
-            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
-            completeTopologyParams.setMockedSources(mockedSources);
-
-            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
-
-            assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
-                new Values(0),
-                new Values(1),
-                new Values(2),
-                new Values(3),
-                new Values(4),
-                new Values(5)
-            ));
-        }
-    }
-
-    private static class AckEveryOtherBolt extends BaseRichBolt {
-
-        private boolean state = true;
-        private OutputCollector collector;
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        }
-
-        @Override
-        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void execute(Tuple input) {
-            if (state) {
-                collector.ack(input);
-            }
-            state = !state;
-        }
-
-    }
-
-    private void assertLoop(Predicate<Object> condition, Object... conditionParams) {
-        try {
-            Awaitility.with()
-                .pollInterval(1, TimeUnit.MILLISECONDS)
-                .atMost(10, TimeUnit.SECONDS)
-                .untilAsserted(() -> assertThat(Arrays.asList(conditionParams), everyItem(matchesPredicate(condition))));
-        } catch (ConditionTimeoutException e) {
-            throw new AssertionError(e.getMessage());
-        }
-    }
-
-    private void assertAcked(AckFailMapTracker tracker, Object... ids) {
-        assertLoop(tracker::isAcked, ids);
-    }
-
-    private void assertFailed(AckFailMapTracker tracker, Object... ids) {
-        assertLoop(tracker::isFailed, ids);
-    }
-
-    @Test
-    public void testTimeout() throws Exception {
-        try (LocalCluster cluster = new LocalCluster.Builder()
-            .withSimulatedTime()
-            .withSupervisors(4)
-            .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
-            .build()) {
-            FeederSpout feeder = new FeederSpout(new Fields("field1"));
-            AckFailMapTracker tracker = new AckFailMapTracker();
-            feeder.setAckFailDelegate(tracker);
-            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(feeder));
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("2",
-                Thrift.prepareBoltDetails(
-                    Collections.singletonMap(
-                        Utils.getGlobalStreamId("1", null),
-                        Thrift.prepareGlobalGrouping()),
-                    new AckEveryOtherBolt()));
-            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
-
-            cluster.submitTopology("timeout-tester", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology);
-
-            cluster.advanceClusterTime(11);
-            feeder.feed(new Values("a"), 1);
-            feeder.feed(new Values("b"), 2);
-            feeder.feed(new Values("c"), 3);
-            cluster.advanceClusterTime(9);
-            assertAcked(tracker, 1, 3);
-            assertThat(tracker.isFailed(2), is(false));
-            cluster.advanceClusterTime(12);
-            assertFailed(tracker, 2);
-        }
-    }
-
-    private static class ResetTimeoutBolt extends BaseRichBolt {
-
-        private int tupleCounter = 1;
-        private Tuple firstTuple = null;
-        private OutputCollector collector;
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        }
-
-        @Override
-        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void execute(Tuple input) {
-            if (tupleCounter == 1) {
-                firstTuple = input;
-            } else if (tupleCounter == 2) {
-                collector.resetTimeout(firstTuple);
-            } else if (tupleCounter == 5) {
-                collector.ack(firstTuple);
-                collector.ack(input);
-            } else {
-                collector.resetTimeout(firstTuple);
-                collector.ack(input);
-            }
-            tupleCounter++;
-        }
-    }
-
-    @Test
-    public void testResetTimeout() throws Exception {
-        try (LocalCluster cluster = new LocalCluster.Builder()
-            .withSimulatedTime()
-            .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
-            .build()) {
-            FeederSpout feeder = new FeederSpout(new Fields("field1"));
-            AckFailMapTracker tracker = new AckFailMapTracker();
-            feeder.setAckFailDelegate(tracker);
-            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(feeder));
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("2",
-                Thrift.prepareBoltDetails(
-                    Collections.singletonMap(
-                        Utils.getGlobalStreamId("1", null),
-                        Thrift.prepareGlobalGrouping()),
-                    new ResetTimeoutBolt()));
-            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
-
-            cluster.submitTopology("reset-timeout-tester", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology);
-
-            //The first tuple wil be used to check timeout reset
-            feeder.feed(new Values("a"), 1);
-            //The second tuple is used to wait for the spout to rotate its pending map
-            feeder.feed(new Values("b"), 2);
-            cluster.advanceClusterTime(9);
-            //The other tuples are used to reset the first tuple's timeout,
-            //and to wait for the message to get through to the spout (acks use the same path as timeout resets)
-            feeder.feed(new Values("c"), 3);
-            assertAcked(tracker, 3);
-            cluster.advanceClusterTime(9);
-            feeder.feed(new Values("d"), 4);
-            assertAcked(tracker, 4);
-            cluster.advanceClusterTime(2);
-            //The time is now twice the message timeout, the second tuple should expire since it was not acked
-            //Waiting for this also ensures that the first tuple gets failed if reset-timeout doesn't work
-            assertFailed(tracker, 2);
-            //Put in a tuple to cause the first tuple to be acked
-            feeder.feed(new Values("e"), 5);
-            assertAcked(tracker, 5);
-            //The first tuple should be acked, and should not have failed
-            assertThat(tracker.isFailed(1), is(false));
-            assertAcked(tracker, 1);
-        }
-    }
-
-    private StormTopology mkValidateTopology() {
-        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
-        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
-            Thrift.prepareBoltDetails(
-                Collections.singletonMap(
-                    Utils.getGlobalStreamId("1", null),
-                    Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
-                new TestWordCounter(), 4));
-        return Thrift.buildTopology(spoutMap, boltMap);
-    }
-
-    private StormTopology mkInvalidateTopology1() {
-        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
-        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
-            Thrift.prepareBoltDetails(
-                Collections.singletonMap(
-                    Utils.getGlobalStreamId("3", null),
-                    Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
-                new TestWordCounter(), 4));
-        return Thrift.buildTopology(spoutMap, boltMap);
-    }
-
-    private StormTopology mkInvalidateTopology2() {
-        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
-        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
-            Thrift.prepareBoltDetails(
-                Collections.singletonMap(
-                    Utils.getGlobalStreamId("1", null),
-                    Thrift.prepareFieldsGrouping(Collections.singletonList("non-exists-field"))),
-                new TestWordCounter(), 4));
-        return Thrift.buildTopology(spoutMap, boltMap);
-    }
-
-    private StormTopology mkInvalidateTopology3() {
-        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
-        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
-            Thrift.prepareBoltDetails(
-                Collections.singletonMap(
-                    Utils.getGlobalStreamId("1", "non-exists-stream"),
-                    Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
-                new TestWordCounter(), 4));
-        return Thrift.buildTopology(spoutMap, boltMap);
-    }
-
-    private boolean tryCompleteWordCountTopology(LocalCluster cluster, StormTopology topology) throws Exception {
-        try {
-            List<FixedTuple> testTuples = Arrays.asList("nathan", "bob", "joey", "nathan").stream()
-                .map(value -> new FixedTuple(new Values(value)))
-                .collect(Collectors.toList());
-            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
-            CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
-            completeTopologyParam.setMockedSources(mockedSources);
-            completeTopologyParam.setStormConf(Collections.singletonMap(Config.TOPOLOGY_WORKERS, 2));
-            Testing.completeTopology(cluster, topology, completeTopologyParam);
-            return false;
-        } catch (InvalidTopologyException e) {
-            return true;
-        }
-    }
-
-    @Test
-    public void testValidateTopologystructure() throws Exception {
-        try (LocalCluster cluster = new LocalCluster.Builder()
-            .withSimulatedTime()
-            .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
-            .build()) {
-            assertThat(tryCompleteWordCountTopology(cluster, mkValidateTopology()), is(false));
-            assertThat(tryCompleteWordCountTopology(cluster, mkInvalidateTopology1()), is(true));
-            assertThat(tryCompleteWordCountTopology(cluster, mkInvalidateTopology2()), is(true));
-            assertThat(tryCompleteWordCountTopology(cluster, mkInvalidateTopology3()), is(true));
-        }
-    }
-
-    private static class IdentityBolt extends BaseRichBolt {
-
-        private OutputCollector collector;
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("num"));
-        }
-
-        @Override
-        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void execute(Tuple input) {
-            collector.emit(input, input.getValues());
-            collector.ack(input);
-        }
-
-    }
-
-    @Test
-    public void testSystemStream() throws Exception {
-        //this test works because mocking a spout splits up the tuples evenly among the tasks
-        try (LocalCluster cluster = new LocalCluster.Builder()
-            .withSimulatedTime()
-            .build()) {
-            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
-
-            Map<GlobalStreamId, Grouping> boltInputs = new HashMap<>();
-            boltInputs.put(Utils.getGlobalStreamId("1", null), Thrift.prepareFieldsGrouping(Collections.singletonList("word")));
-            boltInputs.put(Utils.getGlobalStreamId("1", "__system"), Thrift.prepareGlobalGrouping());
-            Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
-                Thrift.prepareBoltDetails(
-                    boltInputs,
-                    new IdentityBolt(), 1));
-            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
-
-            Map<String, Object> stormConf = new HashMap<>();
-            stormConf.put(Config.TOPOLOGY_WORKERS, 2);
-
-            List<FixedTuple> testTuples = Arrays.asList("a", "b", "c").stream()
-                .map(value -> new FixedTuple(new Values(value)))
-                .collect(Collectors.toList());
-
-            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
-
-            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
-            completeTopologyParams.setMockedSources(mockedSources);
-            completeTopologyParams.setStormConf(stormConf);
-
-            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
-
-            assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
-                new Values("a"),
-                new Values("b"),
-                new Values("c")
-            ));
-        }
-    }
-
-    private static class SpoutAndChecker {
-
-        private final FeederSpout spout;
-        private final Consumer<Integer> checker;
-
-        public SpoutAndChecker(FeederSpout spout, Consumer<Integer> checker) {
-            this.spout = spout;
-            this.checker = checker;
-        }
-    }
-
-    private SpoutAndChecker ackTrackingFeeder(String... fields) {
-        AckTracker tracker = new AckTracker();
-        FeederSpout spout = new FeederSpout(new Fields(fields));
-        spout.setAckFailDelegate(tracker);
-        return new SpoutAndChecker(spout, expectedNumAcks -> {
-            assertThat(tracker.getNumAcks(), is(expectedNumAcks));
-            tracker.resetNumAcks();
-        });
-    }
-
-    private static class BranchingBolt extends BaseRichBolt {
-
-        private final int branches;
-        private OutputCollector collector;
-
-        public BranchingBolt(int branches) {
-            this.branches = branches;
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("num"));
-        }
-
-        @Override
-        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void execute(Tuple input) {
-            IntStream.range(0, branches)
-                .forEach(i -> collector.emit(input, new Values(i)));
-            collector.ack(input);
-        }
-    }
-
-    private static class AggBolt extends BaseRichBolt {
-
-        private final int branches;
-        private final List<Tuple> seen = new ArrayList<>();
-        private OutputCollector collector;
-
-        public AggBolt(int branches) {
-            this.branches = branches;
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("num"));
-        }
-
-        @Override
-        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void execute(Tuple input) {
-            seen.add(input);
-            if (seen.size() == branches) {
-                collector.emit(seen, new Values(1));
-                seen.forEach(t -> collector.ack(t));
-                seen.clear();
-            }
-        }
-    }
-
-    private static class AckBolt extends BaseRichBolt {
-
-        private OutputCollector collector;
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        }
-
-        @Override
-        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void execute(Tuple input) {
-            collector.ack(input);
-        }
-    }
-
-    @Test
-    public void testAcking() throws Exception {
-        try (LocalCluster cluster = new LocalCluster.Builder()
-            .withSimulatedTime()
-            .withTracked()
-            .build()) {
-            SpoutAndChecker feeder1 = ackTrackingFeeder("num");
-            SpoutAndChecker feeder2 = ackTrackingFeeder("num");
-            SpoutAndChecker feeder3 = ackTrackingFeeder("num");
-
-            Map<String, SpoutDetails> spoutMap = new HashMap<>();
-            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder1.spout));
-            spoutMap.put("2", Thrift.prepareSpoutDetails(feeder2.spout));
-            spoutMap.put("3", Thrift.prepareSpoutDetails(feeder3.spout));
-
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("4", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
-            boltMap.put("5", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(4)));
-            boltMap.put("6", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(1)));
-
-            Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
-            aggregatorInputs.put(Utils.getGlobalStreamId("4", null), Thrift.prepareShuffleGrouping());
-            aggregatorInputs.put(Utils.getGlobalStreamId("5", null), Thrift.prepareShuffleGrouping());
-            aggregatorInputs.put(Utils.getGlobalStreamId("6", null), Thrift.prepareShuffleGrouping());
-            boltMap.put("7", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(3)));
-
-            boltMap.put("8", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("7", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
-            boltMap.put("9", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("8", null), Thrift.prepareShuffleGrouping()), new AckBolt()));
-
-            TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
-
-            cluster.submitTopology("acking-test1", Collections.emptyMap(), tracked);
-
-            cluster.advanceClusterTime(11);
-            feeder1.spout.feed(new Values(1));
-            Testing.trackedWait(tracked, 1);
-            feeder1.checker.accept(0);
-            feeder2.spout.feed(new Values(1));
-            Testing.trackedWait(tracked, 1);
-            feeder1.checker.accept(1);
-            feeder2.checker.accept(1);
-            feeder1.spout.feed(new Values(1));
-            Testing.trackedWait(tracked, 1);
-            feeder1.checker.accept(0);
-            feeder1.spout.feed(new Values(1));
-            Testing.trackedWait(tracked, 1);
-            feeder1.checker.accept(1);
-            feeder3.spout.feed(new Values(1));
-            Testing.trackedWait(tracked, 1);
-            feeder1.checker.accept(0);
-            feeder3.checker.accept(0);
-            feeder2.spout.feed(new Values(1));
-            Testing.trackedWait(tracked, 1);
-            feeder1.spout.feed(new Values(1));
-            feeder2.spout.feed(new Values(1));
-            feeder3.spout.feed(new Values(1));
-        }
-    }
-
-    @Test
-    public void testAckBranching() throws Exception {
-        try (LocalCluster cluster = new LocalCluster.Builder()
-            .withSimulatedTime()
-            .withTracked()
-            .build()) {
-            SpoutAndChecker feeder = ackTrackingFeeder("num");
-
-            Map<String, SpoutDetails> spoutMap = new HashMap<>();
-            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.spout));
-
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
-            boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
-
-            Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
-            aggregatorInputs.put(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping());
-            aggregatorInputs.put(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping());
-            boltMap.put("4", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(4)));
-
-            TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
-
-            cluster.submitTopology("test-acking2", Collections.emptyMap(), tracked);
-
-            cluster.advanceClusterTime(11);
-            feeder.spout.feed(new Values(1));
-            Testing.trackedWait(tracked, 1);
-            feeder.checker.accept(0);
-            feeder.spout.feed(new Values(1));
-            Testing.trackedWait(tracked, 1);
-            feeder.checker.accept(2);
-        }
-    }
-
-    private static class DupAnchorBolt extends BaseRichBolt {
-
-        private OutputCollector collector;
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("num"));
-        }
-
-        @Override
-        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void execute(Tuple input) {
-            ArrayList<Tuple> anchors = new ArrayList<>();
-            anchors.add(input);
-            anchors.add(input);
-            collector.emit(anchors, new Values(1));
-            collector.ack(input);
-        }
-    }
-
-    private static boolean boltPrepared = false;
-
-    private static class PrepareTrackedBolt extends BaseRichBolt {
-
-        private OutputCollector collector;
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        }
-
-        @Override
-        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void execute(Tuple input) {
-            boltPrepared = true;
-            collector.ack(input);
-        }
-    }
-
-    private static boolean spoutOpened = false;
-
-    private static class OpenTrackedSpout extends BaseRichSpout {
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("val"));
-        }
-
-        @Override
-        public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
-        }
-
-        @Override
-        public void nextTuple() {
-            spoutOpened = true;
-        }
-
-    }
-
-    @Test
-    public void testSubmitInactiveTopology() throws Exception {
-        try (LocalCluster cluster = new LocalCluster.Builder()
-            .withSimulatedTime()
-            .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
-            .build()) {
-            FeederSpout feeder = new FeederSpout(new Fields("field1"));
-            AckFailMapTracker tracker = new AckFailMapTracker();
-            feeder.setAckFailDelegate(tracker);
-
-            Map<String, SpoutDetails> spoutMap = new HashMap<>();
-            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder));
-            spoutMap.put("2", Thrift.prepareSpoutDetails(new OpenTrackedSpout()));
-
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareGlobalGrouping()), new PrepareTrackedBolt()));
-
-            boltPrepared = false;
-            spoutOpened = false;
-
-            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
-
-            cluster.submitTopologyWithOpts("test", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology, new SubmitOptions(TopologyInitialStatus.INACTIVE));
-
-            cluster.advanceClusterTime(11);
-            feeder.feed(new Values("a"), 1);
-            cluster.advanceClusterTime(9);
-            assertThat(boltPrepared, is(false));
-            assertThat(spoutOpened, is(false));
-            cluster.getNimbus().activate("test");
-
-            cluster.advanceClusterTime(12);
-            assertAcked(tracker, 1);
-            assertThat(boltPrepared, is(true));
-            assertThat(spoutOpened, is(true));
-        }
-    }
-
-    @Test
-    public void testAckingSelfAnchor() throws Exception {
-        try (LocalCluster cluster = new LocalCluster.Builder()
-            .withSimulatedTime()
-            .withTracked()
-            .build()) {
-            SpoutAndChecker feeder = ackTrackingFeeder("num");
-
-            Map<String, SpoutDetails> spoutMap = new HashMap<>();
-            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.spout));
-
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new DupAnchorBolt()));
-            boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping()), new AckBolt()));
-
-            TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
-
-            cluster.submitTopology("test", Collections.emptyMap(), tracked);
-
-            cluster.advanceClusterTime(11);
-            feeder.spout.feed(new Values(1));
-            Testing.trackedWait(tracked, 1);
-            feeder.checker.accept(1);
-            feeder.spout.feed(new Values(1));
-            feeder.spout.feed(new Values(1));
-            feeder.spout.feed(new Values(1));
-            Testing.trackedWait(tracked, 3);
-            feeder.checker.accept(3);
-        }
-    }
-
-    private Map<Object, Object> listToMap(List<Object> list) {
-        assertThat(list.size() % 2, is(0));
-        Map<Object, Object> res = new HashMap<>();
-        for (int i = 0; i < list.size(); i += 2) {
-            res.put(list.get(i), list.get(i + 1));
-        }
-        return res;
-    }
-
-    @Test
-    public void testKryoDecoratorsConfig() throws Exception {
-        Map<String, Object> daemonConf = new HashMap<>();
-        daemonConf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
-        daemonConf.put(Config.TOPOLOGY_KRYO_DECORATORS, "this-is-overridden");
-        try (LocalCluster cluster = new LocalCluster.Builder()
-            .withSimulatedTime()
-            .withDaemonConf(daemonConf)
-            .build()) {
-            TopologyBuilder topologyBuilder = new TopologyBuilder();
-            topologyBuilder.setSpout("1", new TestPlannerSpout(new Fields("conf")));
-            topologyBuilder.setBolt("2", new TestConfBolt(Collections.singletonMap(Config.TOPOLOGY_KRYO_DECORATORS, Arrays.asList("one", "two"))))
-                .shuffleGrouping("1");
-
-            List<FixedTuple> testTuples = Arrays.asList(new Values(Config.TOPOLOGY_KRYO_DECORATORS)).stream()
-                .map(value -> new FixedTuple(value))
-                .collect(Collectors.toList());
-
-            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
-
-            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
-            completeTopologyParams.setMockedSources(mockedSources);
-            completeTopologyParams.setStormConf(Collections.singletonMap(Config.TOPOLOGY_KRYO_DECORATORS, Arrays.asList("one", "three")));
-
-            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topologyBuilder.createTopology(), completeTopologyParams);
-
-            List<Object> concatValues = Testing.readTuples(results, "2").stream()
-                .flatMap(values -> values.stream())
-                .collect(Collectors.toList());
-            assertThat(concatValues.get(0), is(Config.TOPOLOGY_KRYO_DECORATORS));
-            assertThat(concatValues.get(1), is(Arrays.asList("one", "two", "three")));
-        }
-    }
-
-    @Test
-    public void testComponentSpecificConfig() throws Exception {
-        Map<String, Object> daemonConf = new HashMap<>();
-        daemonConf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
-        try (LocalCluster cluster = new LocalCluster.Builder()
-            .withSimulatedTime()
-            .withDaemonConf(daemonConf)
-            .build()) {
-            TopologyBuilder topologyBuilder = new TopologyBuilder();
-            topologyBuilder.setSpout("1", new TestPlannerSpout(new Fields("conf")));
-            Map<String, Object> componentConf = new HashMap<>();
-            componentConf.put("fake.config", 123);
-            componentConf.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 20);
-            componentConf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 30);
-            componentConf.put(Config.TOPOLOGY_KRYO_REGISTER, Arrays.asList(Collections.singletonMap("fake.type", "bad.serializer"), Collections.singletonMap("fake.type2", "a.serializer")));
-            topologyBuilder.setBolt("2", new TestConfBolt(componentConf))
-                .shuffleGrouping("1")
-                .setMaxTaskParallelism(2)
-                .addConfiguration("fake.config2", 987);
-
-            List<FixedTuple> testTuples = Arrays.asList("fake.config", Config.TOPOLOGY_MAX_TASK_PARALLELISM, Config.TOPOLOGY_MAX_SPOUT_PENDING, "fake.config2", Config.TOPOLOGY_KRYO_REGISTER).stream()
-                .map(value -> new FixedTuple(new Values(value)))
-                .collect(Collectors.toList());
-            Map<String, String> kryoRegister = new HashMap<>();
-            kryoRegister.put("fake.type", "good.serializer");
-            kryoRegister.put("fake.type3", "a.serializer3");
-            Map<String, Object> stormConf = new HashMap<>();
-            stormConf.put(Config.TOPOLOGY_KRYO_REGISTER, Arrays.asList(kryoRegister));
-
-            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
-
-            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
-            completeTopologyParams.setMockedSources(mockedSources);
-            completeTopologyParams.setStormConf(stormConf);
-
-            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topologyBuilder.createTopology(), completeTopologyParams);
-
-            Map<String, Object> expectedValues = new HashMap<>();
-            expectedValues.put("fake.config", 123L);
-            expectedValues.put("fake.config2", 987L);
-            expectedValues.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 2L);
-            expectedValues.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 30L);
-            Map<String, String> expectedKryoRegister = new HashMap<>();
-            expectedKryoRegister.putAll(kryoRegister);
-            expectedKryoRegister.put("fake.type2", "a.serializer");
-            expectedValues.put(Config.TOPOLOGY_KRYO_REGISTER, expectedKryoRegister);
-            List<Object> concatValues = Testing.readTuples(results, "2").stream()
-                .flatMap(values -> values.stream())
-                .collect(Collectors.toList());
-            assertThat(listToMap(concatValues), is(expectedValues));
-        }
-    }
-    
-    private static class HooksBolt extends BaseRichBolt {
-
-        private int acked = 0;
-        private int failed = 0;
-        private int executed = 0;
-        private int emitted = 0;
-        private OutputCollector collector;
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("emit", "ack", "fail", "executed"));
-        }
-
-        @Override
-        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
-            this.collector = collector;
-            context.addTaskHook(new BaseTaskHook() {
-                @Override
-                public void boltExecute(BoltExecuteInfo info) {
-                    executed++;
-                }
-
-                @Override
-                public void boltFail(BoltFailInfo info) {
-                    failed++;
-                }
-
-                @Override
-                public void boltAck(BoltAckInfo info) {
-                    acked++;
-                }
-
-                @Override
-                public void emit(EmitInfo info) {
-                    emitted++;
-                }
-                
-            });
-        }
-
-        @Override
-        public void execute(Tuple input) {
-            collector.emit(new Values(emitted, acked, failed, executed));
-            if (acked - failed == 0) {
-                collector.ack(input);
-            } else {
-                collector.fail(input);
-            }
-        }
-    }
-    
-    @Test
-    public void testHooks() throws Exception {
-        try (LocalCluster cluster = new LocalCluster.Builder()
-            .withSimulatedTime()
-            .build()) {
-            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestPlannerSpout(new Fields("conf"))));
-
-            Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
-                Thrift.prepareBoltDetails(
-                    Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()),
-                    new HooksBolt()));
-            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
-
-            List<FixedTuple> testTuples = Arrays.asList(1, 1, 1, 1).stream()
-                .map(value -> new FixedTuple(new Values(value)))
-                .collect(Collectors.toList());
-
-            MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
-
-            CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
-            completeTopologyParams.setMockedSources(mockedSources);
-
-            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
-            
-            List<List<Object>> expectedTuples = Arrays.asList(
-                Arrays.asList(0, 0, 0, 0),
-                Arrays.asList(2, 1, 0, 1),
-                Arrays.asList(4, 1, 1, 2),
-                Arrays.asList(6, 2, 1, 3));
-
-            assertThat(Testing.readTuples(results, "2"), is(expectedTuples));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/jvm/org/apache/storm/integration/AckEveryOtherBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/integration/AckEveryOtherBolt.java b/storm-core/test/jvm/org/apache/storm/integration/AckEveryOtherBolt.java
new file mode 100644
index 0000000..6d9db7f
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/integration/AckEveryOtherBolt.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import java.util.Map;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+
+class AckEveryOtherBolt extends BaseRichBolt {
+
+    private boolean state = true;
+    private OutputCollector collector;
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    }
+
+    @Override
+    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        if (state) {
+            collector.ack(input);
+        }
+        state = !state;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/jvm/org/apache/storm/integration/AckTrackingFeeder.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/integration/AckTrackingFeeder.java b/storm-core/test/jvm/org/apache/storm/integration/AckTrackingFeeder.java
new file mode 100644
index 0000000..19c5776
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/integration/AckTrackingFeeder.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.List;
+import org.apache.storm.testing.AckTracker;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.tuple.Fields;
+
+class AckTrackingFeeder {
+
+    private final AckTracker tracker;
+    private final FeederSpout spout;
+
+    public AckTrackingFeeder(String... fields) {
+        tracker = new AckTracker();
+        spout = new FeederSpout(new Fields(fields));
+        spout.setAckFailDelegate(tracker);
+    }
+
+    public FeederSpout getSpout() {
+        return spout;
+    }
+
+    public void feed(List<Object> tuple) {
+        spout.feed(tuple);
+    }
+
+    public void feed(List<Object> tuple, Object msgId) {
+        spout.feed(tuple, msgId);
+    }
+
+    public void feedNoWait(List<Object> tuple, Object msgId) {
+        spout.feedNoWait(tuple, msgId);
+    }
+    
+    public void assertNumAcks(int expectedNumAcks) {
+        assertThat(tracker.getNumAcks(), is(expectedNumAcks));
+        tracker.resetNumAcks();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/jvm/org/apache/storm/integration/AggBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/integration/AggBolt.java b/storm-core/test/jvm/org/apache/storm/integration/AggBolt.java
new file mode 100644
index 0000000..99b355c
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/integration/AggBolt.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+class AggBolt extends BaseRichBolt {
+
+    private final int branches;
+    private final List<Tuple> seen = new ArrayList<>();
+    private OutputCollector collector;
+
+    public AggBolt(int branches) {
+        this.branches = branches;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("num"));
+    }
+
+    @Override
+    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        seen.add(input);
+        if (seen.size() == branches) {
+            collector.emit(seen, new Values(1));
+            seen.forEach(t -> collector.ack(t));
+            seen.clear();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/jvm/org/apache/storm/integration/AssertLoop.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/integration/AssertLoop.java b/storm-core/test/jvm/org/apache/storm/integration/AssertLoop.java
new file mode 100644
index 0000000..5ac8f1e
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/integration/AssertLoop.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import static org.apache.storm.utils.PredicateMatcher.matchesPredicate;
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import org.apache.storm.testing.AckFailMapTracker;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+
+class AssertLoop {
+
+    public static void assertLoop(Predicate<Object> condition, Object... conditionParams) {
+        try {
+            Awaitility.with()
+                .pollInterval(1, TimeUnit.MILLISECONDS)
+                .atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertThat(Arrays.asList(conditionParams), everyItem(matchesPredicate(condition))));
+        } catch (ConditionTimeoutException e) {
+            throw new AssertionError(e.getMessage());
+        }
+    }
+
+    public static void assertAcked(AckFailMapTracker tracker, Object... ids) {
+        assertLoop(tracker::isAcked, ids);
+    }
+
+    public static void assertFailed(AckFailMapTracker tracker, Object... ids) {
+        assertLoop(tracker::isFailed, ids);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/jvm/org/apache/storm/integration/IdentityBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/integration/IdentityBolt.java b/storm-core/test/jvm/org/apache/storm/integration/IdentityBolt.java
new file mode 100644
index 0000000..38aad5b
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/integration/IdentityBolt.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import java.util.Map;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+
+class IdentityBolt extends BaseRichBolt {
+
+    private OutputCollector collector;
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("num"));
+    }
+
+    @Override
+    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        collector.emit(input, input.getValues());
+        collector.ack(input);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/jvm/org/apache/storm/integration/TestingTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/integration/TestingTest.java b/storm-core/test/jvm/org/apache/storm/integration/TestingTest.java
new file mode 100644
index 0000000..bd8d229
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/integration/TestingTest.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import static org.apache.storm.integration.AssertLoop.assertAcked;
+import static org.apache.storm.integration.AssertLoop.assertFailed;
+
+import org.apache.storm.LocalCluster;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.Testing;
+import org.apache.storm.Thrift;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.testing.AckFailMapTracker;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.testing.MkTupleParam;
+import org.apache.storm.testing.TrackedTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+
+public class TestingTest {
+
+    @Test
+    public void testSimulatedTime() throws Exception {
+        assertThat(Time.isSimulating(), is(false));
+        try(SimulatedTime time = new SimulatedTime()) {
+            assertThat(Time.isSimulating(), is(true));
+        }
+    }
+    
+    @Test
+    public void testWithLocalCluster() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSupervisors(2)
+            .withPortsPerSupervisor(5)
+            .build()) {
+            assertThat(cluster, notNullValue());
+            assertThat(cluster.getNimbus(), notNullValue());
+        }
+    }
+    
+    @Test
+    public void testWithSimulatedTimeLocalCluster() throws Exception {
+        assertThat(Time.isSimulating(), is(false));
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSupervisors(2)
+            .withPortsPerSupervisor(5)
+            .withSimulatedTime()
+            .build()) {
+            assertThat(cluster, notNullValue());
+            assertThat(cluster.getNimbus(), notNullValue());
+            assertThat(Time.isSimulating(), is(true));
+        }
+    }
+    
+    @Test
+    public void testWithTrackedCluster() throws Exception {
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withSimulatedTime()
+            .withTracked()
+            .build()) {
+            AckTrackingFeeder feeder = new AckTrackingFeeder("num");
+
+            Map<String, Thrift.SpoutDetails> spoutMap = new HashMap<>();
+            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.getSpout()));
+
+            Map<String, Thrift.BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
+            boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
+
+            Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
+            aggregatorInputs.put(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping());
+            aggregatorInputs.put(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping());
+            boltMap.put("4", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(4)));
+
+            TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
+
+            cluster.submitTopology("test-acking2", new Config(), tracked);
+
+            cluster.advanceClusterTime(11);
+            feeder.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder.assertNumAcks(0);
+            feeder.feed(new Values(1));
+            Testing.trackedWait(tracked, 1);
+            feeder.assertNumAcks(2);
+        }
+    }
+    
+    @Test
+    public void testAdvanceClusterTime() throws Exception {
+        Config daemonConf = new Config();
+        daemonConf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true);
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withDaemonConf(daemonConf)
+            .withSimulatedTime()
+            .build()) {
+            FeederSpout feeder = new FeederSpout(new Fields("field1"));
+            AckFailMapTracker tracker = new AckFailMapTracker();
+            feeder.setAckFailDelegate(tracker);
+
+            Map<String, Thrift.SpoutDetails> spoutMap = new HashMap<>();
+            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder));
+
+            Map<String, Thrift.BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new AckEveryOtherBolt()));
+
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+            
+            Config stormConf = new Config();
+            stormConf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);
+
+            cluster.submitTopology("timeout-tester", stormConf, topology);
+
+            feeder.feed(new Values("a"), 1);
+            feeder.feed(new Values("b"), 2);
+            feeder.feed(new Values("c"), 3);
+            cluster.advanceClusterTime(9);
+            assertAcked(tracker, 1, 3);
+            assertThat(tracker.isFailed(2), is(false));
+            cluster.advanceClusterTime(12);
+            assertFailed(tracker, 2);
+        }
+    }
+    
+    @Test
+    public void testDisableTupleTimeout() throws Exception {
+        Config daemonConf = new Config();
+        daemonConf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, false);
+        try (LocalCluster cluster = new LocalCluster.Builder()
+            .withDaemonConf(daemonConf)
+            .withSimulatedTime()
+            .build()) {
+            FeederSpout feeder = new FeederSpout(new Fields("field1"));
+            AckFailMapTracker tracker = new AckFailMapTracker();
+            feeder.setAckFailDelegate(tracker);
+
+            Map<String, Thrift.SpoutDetails> spoutMap = new HashMap<>();
+            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder));
+
+            Map<String, Thrift.BoltDetails> boltMap = new HashMap<>();
+            boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new AckEveryOtherBolt()));
+
+            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+            
+            Config stormConf = new Config();
+            stormConf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);
+            stormConf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, false);
+
+            cluster.submitTopology("disable-timeout-tester", stormConf, topology);
+
+            feeder.feed(new Values("a"), 1);
+            feeder.feed(new Values("b"), 2);
+            feeder.feed(new Values("c"), 3);
+            cluster.advanceClusterTime(9);
+            assertAcked(tracker, 1, 3);
+            assertThat(tracker.isFailed(2), is(false));
+            cluster.advanceClusterTime(12);
+            assertThat(tracker.isFailed(2), is(false));
+        }
+    }
+    
+    @Test
+    public void testTestTuple() throws Exception {
+        Tuple tuple = Testing.testTuple(new Values("james", "bond"));
+        assertThat(tuple.getValues(), is(new Values("james", "bond")));
+        assertThat(tuple.getSourceStreamId(), is(Utils.DEFAULT_STREAM_ID));
+        assertThat(tuple.getFields().toList(), is(Arrays.asList("field1", "field2")));
+        assertThat(tuple.getSourceComponent(), is("component"));
+    }
+    
+    @Test
+    public void testTestTupleWithMkTupleParam() throws Exception {
+        MkTupleParam mkTupleParam = new MkTupleParam();
+        mkTupleParam.setStream("test-stream");
+        mkTupleParam.setComponent("test-component");
+        mkTupleParam.setFields("fname", "lname");
+        Tuple tuple = Testing.testTuple(new Values("james", "bond"), mkTupleParam);
+        assertThat(tuple.getValues(), is(new Values("james", "bond")));
+        assertThat(tuple.getSourceStreamId(), is("test-stream"));
+        assertThat(tuple.getFields().toList(), is(Arrays.asList("fname", "lname")));
+        assertThat(tuple.getSourceComponent(), is("test-component"));
+    }
+    
+}