You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/01/05 15:02:54 UTC
[4/7] storm git commit: STORM-1307: Port testing4j_test.clj to Java
STORM-1307: Port testing4j_test.clj to Java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/02e0e75c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/02e0e75c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/02e0e75c
Branch: refs/heads/master
Commit: 02e0e75c6bbbb580b6a62dbe394ade7298a9a1dd
Parents: d6a1e73
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Wed Dec 12 01:36:09 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Dec 19 16:48:38 2018 +0100
----------------------------------------------------------------------
.../org/apache/storm/testing4j_test.clj | 243 -----
.../apache/storm/TopologyIntegrationTest.java | 1011 ------------------
.../storm/integration/AckEveryOtherBolt.java | 48 +
.../storm/integration/AckTrackingFeeder.java | 59 +
.../org/apache/storm/integration/AggBolt.java | 59 +
.../apache/storm/integration/AssertLoop.java | 51 +
.../apache/storm/integration/IdentityBolt.java | 47 +
.../apache/storm/integration/TestingTest.java | 214 ++++
.../integration/TopologyIntegrationTest.java | 904 ++++++++++++++++
9 files changed, 1382 insertions(+), 1254 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
deleted file mode 100644
index 75a0432..0000000
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ /dev/null
@@ -1,243 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns integration.org.apache.storm.testing4j-test
- (:use [clojure.test])
- (:use [org.apache.storm daemon-config config util])
- (:use [org.apache.storm clojure])
- (:import [org.apache.storm Testing Config]
- [org.apache.storm.generated GlobalStreamId])
- (:import [org.apache.storm.tuple Values])
- (:import [org.apache.storm.utils Time])
- (:import [org.apache.storm.testing MkClusterParam TestJob MockedSources TestWordSpout FeederSpout
- TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
- AckFailMapTracker AckTracker MkTupleParam])
- (:import [org.apache.storm.utils Utils])
- (:import [org.apache.storm Thrift ILocalCluster]))
-
-; If you are working on porting this test to Java, be aware that there are ports of these methods/bolts in TopologyIntegrationTest.java.
-(defn assert-loop
-([afn ids] (assert-loop afn ids 10))
-([afn ids timeout-secs]
- (loop [remaining-time (* timeout-secs 1000)]
- (let [start-time (System/currentTimeMillis)
- assertion-is-true (every? afn ids)]
- (if (or assertion-is-true (neg? remaining-time))
- (is assertion-is-true)
- (do
- (Thread/sleep 1)
- (recur (- remaining-time (- (System/currentTimeMillis) start-time)))
- ))))))
-
-(defn assert-acked [tracker & ids]
- (assert-loop #(.isAcked tracker %) ids))
-
-(defn assert-failed [tracker & ids]
- (assert-loop #(.isFailed tracker %) ids))
-
-(defbolt agg-bolt ["num"] {:prepare true :params [amt]}
- [conf context collector]
- (let [seen (atom [])]
- (bolt
- (execute [tuple]
- (swap! seen conj tuple)
- (when (= (count @seen) amt)
- (emit-bolt! collector [1] :anchor @seen)
- (doseq [s @seen]
- (ack! collector s))
- (reset! seen [])
- )))
- ))
-
-(defbolt identity-bolt ["num"]
- [tuple collector]
- (emit-bolt! collector (.getValues tuple) :anchor tuple)
- (ack! collector tuple))
-
-(defbolt ack-every-other {} {:prepare true}
- [conf context collector]
- (let [state (atom -1)]
- (bolt
- (execute [tuple]
- (let [val (swap! state -)]
- (when (pos? val)
- (ack! collector tuple)
- ))))))
-
-(defn ack-tracking-feeder [fields]
- (let [tracker (AckTracker.)]
- [(doto (FeederSpout. fields)
- (.setAckFailDelegate tracker))
- (fn [val]
- (is (= (.getNumAcks tracker) val))
- (.resetNumAcks tracker)
- )]
- ))
-; End section with methods/bolts that also exist in Java.
-
-(deftest test-with-simulated-time
- (is (= false (Time/isSimulating)))
- (Testing/withSimulatedTime (fn []
- (is (= true (Time/isSimulating)))))
- (is (= false (Time/isSimulating))))
-
-(deftest test-with-local-cluster
- (let [mk-cluster-param (doto (MkClusterParam.)
- (.setSupervisors (int 2))
- (.setPortsPerSupervisor (int 5)))
- daemon-conf (doto (Config.)
- (.put SUPERVISOR-ENABLE false)
- (.put TOPOLOGY-ACKER-EXECUTORS 0))]
- (Testing/withLocalCluster mk-cluster-param (reify TestJob
- (^void run [this ^ILocalCluster cluster]
- (is (not (nil? cluster)))
- (is (not (nil? (.getNimbus cluster)))))))))
-
-(deftest test-with-simulated-time-local-cluster
- (let [mk-cluster-param (doto (MkClusterParam.)
- (.setSupervisors (int 2)))
- daemon-conf (doto (Config.)
- (.put SUPERVISOR-ENABLE false)
- (.put TOPOLOGY-ACKER-EXECUTORS 0))]
- (is (not (Time/isSimulating)))
- (Testing/withSimulatedTimeLocalCluster mk-cluster-param (reify TestJob
- (^void run [this ^ILocalCluster cluster]
- (is (not (nil? cluster)))
- (is (not (nil? (.getNimbus cluster))))
- (is (Time/isSimulating)))))
- (is (not (Time/isSimulating)))))
-
-(deftest test-with-tracked-cluster
- (Testing/withTrackedCluster
- (reify TestJob
- (^void run [this ^ILocalCluster cluster]
- (let [[feeder checker] (ack-tracking-feeder ["num"])
- tracked (Testing/mkTrackedTopology
- cluster
- (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails feeder)}
- {"2" (Thrift/prepareBoltDetails
- {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
- (Thrift/prepareShuffleGrouping)}
- identity-bolt)
- "3" (Thrift/prepareBoltDetails
- {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
- (Thrift/prepareShuffleGrouping)}
- identity-bolt)
- "4" (Thrift/prepareBoltDetails
- {(GlobalStreamId. "2" Utils/DEFAULT_STREAM_ID)
- (Thrift/prepareShuffleGrouping)
- (GlobalStreamId. "3" Utils/DEFAULT_STREAM_ID)
- (Thrift/prepareShuffleGrouping)}
- (agg-bolt 4))}))]
- (.submitTopology cluster
- "test-acking2"
- (Config.)
- (.getTopology tracked))
- (.advanceClusterTime cluster (int 11))
- (.feed feeder [1])
- (Testing/trackedWait tracked (int 1))
- (checker 0)
- (.feed feeder [1])
- (Testing/trackedWait tracked (int 1))
- (checker 2)
- )))))
-
-(deftest test-advance-cluster-time
- (let [daemon-conf (doto (Config.)
- (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true))
- mk-cluster-param (doto (MkClusterParam.)
- (.setDaemonConf daemon-conf))]
- (Testing/withSimulatedTimeLocalCluster
- mk-cluster-param
- (reify TestJob
- (^void run [this ^ILocalCluster cluster]
- (let [feeder (FeederSpout. ["field1"])
- tracker (AckFailMapTracker.)
- _ (.setAckFailDelegate feeder tracker)
- topology (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails feeder)}
- {"2" (Thrift/prepareBoltDetails
- {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
- (Thrift/prepareGlobalGrouping)}
- ack-every-other)})
- storm-conf (doto (Config.)
- (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))]
- (.submitTopology cluster
- "timeout-tester"
- storm-conf
- topology)
- (.feed feeder ["a"] 1)
- (.feed feeder ["b"] 2)
- (.feed feeder ["c"] 3)
- (Testing/advanceClusterTime cluster (int 9))
- (assert-acked tracker 1 3)
- (is (not (.isFailed tracker 2)))
- (Testing/advanceClusterTime cluster (int 12))
- (assert-failed tracker 2)
- ))))))
-
-(deftest test-disable-tuple-timeout
- (let [daemon-conf (doto (Config.)
- (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))
- mk-cluster-param (doto (MkClusterParam.)
- (.setDaemonConf daemon-conf))]
- (Testing/withSimulatedTimeLocalCluster
- mk-cluster-param
- (reify TestJob
- (^void run [this ^ILocalCluster cluster]
- (let [feeder (FeederSpout. ["field1"])
- tracker (AckFailMapTracker.)
- _ (.setAckFailDelegate feeder tracker)
- topology (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails feeder)}
- {"2" (Thrift/prepareBoltDetails
- {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
- (Thrift/prepareGlobalGrouping)}
- ack-every-other)})
- storm-conf (doto (Config.)
- (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10)
- (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))]
- (.submitTopology cluster
- "disable-timeout-tester"
- storm-conf
- topology)
- (.feed feeder ["a"] 1)
- (.feed feeder ["b"] 2)
- (.feed feeder ["c"] 3)
- (Testing/advanceClusterTime cluster (int 9))
- (assert-acked tracker 1 3)
- (is (not (.isFailed tracker 2)))
- (Testing/advanceClusterTime cluster (int 12))
- (is (not (.isFailed tracker 2)))
- ))))))
-
-(deftest test-test-tuple
- (testing "one-param signature"
- (let [tuple (Testing/testTuple ["james" "bond"])]
- (is (= ["james" "bond"] (.getValues tuple)))
- (is (= Utils/DEFAULT_STREAM_ID (.getSourceStreamId tuple)))
- (is (= ["field1" "field2"] (-> tuple .getFields .toList)))
- (is (= "component" (.getSourceComponent tuple)))))
- (testing "two-params signature"
- (let [mk-tuple-param (doto (MkTupleParam.)
- (.setStream "test-stream")
- (.setComponent "test-component")
- (.setFields (into-array String ["fname" "lname"])))
- tuple (Testing/testTuple ["james" "bond"] mk-tuple-param)]
- (is (= ["james" "bond"] (.getValues tuple)))
- (is (= "test-stream" (.getSourceStreamId tuple)))
- (is (= ["fname" "lname"] (-> tuple .getFields .toList)))
- (is (= "test-component" (.getSourceComponent tuple))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/jvm/org/apache/storm/TopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/TopologyIntegrationTest.java b/storm-core/test/jvm/org/apache/storm/TopologyIntegrationTest.java
deleted file mode 100644
index 7cc7f65..0000000
--- a/storm-core/test/jvm/org/apache/storm/TopologyIntegrationTest.java
+++ /dev/null
@@ -1,1011 +0,0 @@
-/*
- * Copyright 2018 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm;
-
-import static org.apache.storm.utils.PredicateMatcher.matchesPredicate;
-import static org.hamcrest.CoreMatchers.everyItem;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.storm.Thrift.BoltDetails;
-import org.apache.storm.Thrift.SpoutDetails;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.generated.Grouping;
-import org.apache.storm.generated.InvalidTopologyException;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.generated.SubmitOptions;
-import org.apache.storm.generated.TopologyInitialStatus;
-import org.apache.storm.hooks.BaseTaskHook;
-import org.apache.storm.hooks.info.BoltAckInfo;
-import org.apache.storm.hooks.info.BoltExecuteInfo;
-import org.apache.storm.hooks.info.BoltFailInfo;
-import org.apache.storm.hooks.info.EmitInfo;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.testing.AckFailMapTracker;
-import org.apache.storm.testing.AckTracker;
-import org.apache.storm.testing.CompleteTopologyParam;
-import org.apache.storm.testing.FeederSpout;
-import org.apache.storm.testing.FixedTuple;
-import org.apache.storm.testing.IntegrationTest;
-import org.apache.storm.testing.MockedSources;
-import org.apache.storm.testing.TestAggregatesCounter;
-import org.apache.storm.testing.TestConfBolt;
-import org.apache.storm.testing.TestGlobalCount;
-import org.apache.storm.testing.TestPlannerSpout;
-import org.apache.storm.testing.TestWordCounter;
-import org.apache.storm.testing.TestWordSpout;
-import org.apache.storm.testing.TrackedTopology;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.awaitility.Awaitility;
-import org.awaitility.core.ConditionTimeoutException;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-
-@IntegrationTest
-public class TopologyIntegrationTest {
-
- @ParameterizedTest
- @ValueSource(strings = {"true", "false"})
- public void testBasicTopology(boolean useLocalMessaging) throws Exception {
- try (LocalCluster cluster = new LocalCluster.Builder()
- .withSimulatedTime()
- .withSupervisors(4)
- .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
- .build()) {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", null),
- Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
- new TestWordCounter(), 4));
- boltMap.put("3",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", null),
- Thrift.prepareGlobalGrouping()),
- new TestGlobalCount()));
- boltMap.put("4",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("2", null),
- Thrift.prepareGlobalGrouping()),
- new TestAggregatesCounter()));
- StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
-
- Map<String, Object> stormConf = new HashMap<>();
- stormConf.put(Config.TOPOLOGY_WORKERS, 2);
- stormConf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
-
- List<FixedTuple> testTuples = Arrays.asList("nathan", "bob", "joey", "nathan").stream()
- .map(value -> new FixedTuple(new Values(value)))
- .collect(Collectors.toList());
-
- MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
-
- CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
- completeTopologyParams.setMockedSources(mockedSources);
- completeTopologyParams.setStormConf(stormConf);
-
- Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
-
- assertThat(Testing.readTuples(results, "1"), containsInAnyOrder(
- new Values("nathan"),
- new Values("nathan"),
- new Values("bob"),
- new Values("joey")));
- assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
- new Values("nathan", 1),
- new Values("nathan", 2),
- new Values("bob", 1),
- new Values("joey", 1)
- ));
- assertThat(Testing.readTuples(results, "3"), contains(
- new Values(1),
- new Values(2),
- new Values(3),
- new Values(4)
- ));
- assertThat(Testing.readTuples(results, "4"), contains(
- new Values(1),
- new Values(2),
- new Values(3),
- new Values(4)
- ));
- }
- }
-
- private static class EmitTaskIdBolt extends BaseRichBolt {
-
- private int taskIndex;
- private OutputCollector collector;
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("tid"));
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- this.taskIndex = context.getThisTaskIndex();
- }
-
- @Override
- public void execute(Tuple input) {
- collector.emit(input, new Values(taskIndex));
- collector.ack(input);
- }
-
- }
-
- @Test
- public void testMultiTasksPerCluster() throws Exception {
- try (LocalCluster cluster = new LocalCluster.Builder()
- .withSimulatedTime()
- .withSupervisors(4)
- .build()) {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true)));
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", null),
- Thrift.prepareAllGrouping()),
- new EmitTaskIdBolt(), 3, Collections.singletonMap(Config.TOPOLOGY_TASKS, 6)));
- StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
-
- MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", Collections.singletonList(new FixedTuple(new Values("a")))));
-
- CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
- completeTopologyParams.setMockedSources(mockedSources);
-
- Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
-
- assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
- new Values(0),
- new Values(1),
- new Values(2),
- new Values(3),
- new Values(4),
- new Values(5)
- ));
- }
- }
-
- private static class AckEveryOtherBolt extends BaseRichBolt {
-
- private boolean state = true;
- private OutputCollector collector;
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- if (state) {
- collector.ack(input);
- }
- state = !state;
- }
-
- }
-
- private void assertLoop(Predicate<Object> condition, Object... conditionParams) {
- try {
- Awaitility.with()
- .pollInterval(1, TimeUnit.MILLISECONDS)
- .atMost(10, TimeUnit.SECONDS)
- .untilAsserted(() -> assertThat(Arrays.asList(conditionParams), everyItem(matchesPredicate(condition))));
- } catch (ConditionTimeoutException e) {
- throw new AssertionError(e.getMessage());
- }
- }
-
- private void assertAcked(AckFailMapTracker tracker, Object... ids) {
- assertLoop(tracker::isAcked, ids);
- }
-
- private void assertFailed(AckFailMapTracker tracker, Object... ids) {
- assertLoop(tracker::isFailed, ids);
- }
-
- @Test
- public void testTimeout() throws Exception {
- try (LocalCluster cluster = new LocalCluster.Builder()
- .withSimulatedTime()
- .withSupervisors(4)
- .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
- .build()) {
- FeederSpout feeder = new FeederSpout(new Fields("field1"));
- AckFailMapTracker tracker = new AckFailMapTracker();
- feeder.setAckFailDelegate(tracker);
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(feeder));
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", null),
- Thrift.prepareGlobalGrouping()),
- new AckEveryOtherBolt()));
- StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
-
- cluster.submitTopology("timeout-tester", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology);
-
- cluster.advanceClusterTime(11);
- feeder.feed(new Values("a"), 1);
- feeder.feed(new Values("b"), 2);
- feeder.feed(new Values("c"), 3);
- cluster.advanceClusterTime(9);
- assertAcked(tracker, 1, 3);
- assertThat(tracker.isFailed(2), is(false));
- cluster.advanceClusterTime(12);
- assertFailed(tracker, 2);
- }
- }
-
- private static class ResetTimeoutBolt extends BaseRichBolt {
-
- private int tupleCounter = 1;
- private Tuple firstTuple = null;
- private OutputCollector collector;
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- if (tupleCounter == 1) {
- firstTuple = input;
- } else if (tupleCounter == 2) {
- collector.resetTimeout(firstTuple);
- } else if (tupleCounter == 5) {
- collector.ack(firstTuple);
- collector.ack(input);
- } else {
- collector.resetTimeout(firstTuple);
- collector.ack(input);
- }
- tupleCounter++;
- }
- }
-
- @Test
- public void testResetTimeout() throws Exception {
- try (LocalCluster cluster = new LocalCluster.Builder()
- .withSimulatedTime()
- .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
- .build()) {
- FeederSpout feeder = new FeederSpout(new Fields("field1"));
- AckFailMapTracker tracker = new AckFailMapTracker();
- feeder.setAckFailDelegate(tracker);
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(feeder));
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", null),
- Thrift.prepareGlobalGrouping()),
- new ResetTimeoutBolt()));
- StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
-
- cluster.submitTopology("reset-timeout-tester", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology);
-
- //The first tuple wil be used to check timeout reset
- feeder.feed(new Values("a"), 1);
- //The second tuple is used to wait for the spout to rotate its pending map
- feeder.feed(new Values("b"), 2);
- cluster.advanceClusterTime(9);
- //The other tuples are used to reset the first tuple's timeout,
- //and to wait for the message to get through to the spout (acks use the same path as timeout resets)
- feeder.feed(new Values("c"), 3);
- assertAcked(tracker, 3);
- cluster.advanceClusterTime(9);
- feeder.feed(new Values("d"), 4);
- assertAcked(tracker, 4);
- cluster.advanceClusterTime(2);
- //The time is now twice the message timeout, the second tuple should expire since it was not acked
- //Waiting for this also ensures that the first tuple gets failed if reset-timeout doesn't work
- assertFailed(tracker, 2);
- //Put in a tuple to cause the first tuple to be acked
- feeder.feed(new Values("e"), 5);
- assertAcked(tracker, 5);
- //The first tuple should be acked, and should not have failed
- assertThat(tracker.isFailed(1), is(false));
- assertAcked(tracker, 1);
- }
- }
-
- private StormTopology mkValidateTopology() {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
- Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", null),
- Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
- new TestWordCounter(), 4));
- return Thrift.buildTopology(spoutMap, boltMap);
- }
-
- private StormTopology mkInvalidateTopology1() {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
- Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("3", null),
- Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
- new TestWordCounter(), 4));
- return Thrift.buildTopology(spoutMap, boltMap);
- }
-
- private StormTopology mkInvalidateTopology2() {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
- Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", null),
- Thrift.prepareFieldsGrouping(Collections.singletonList("non-exists-field"))),
- new TestWordCounter(), 4));
- return Thrift.buildTopology(spoutMap, boltMap);
- }
-
- private StormTopology mkInvalidateTopology3() {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
- Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", "non-exists-stream"),
- Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
- new TestWordCounter(), 4));
- return Thrift.buildTopology(spoutMap, boltMap);
- }
-
- private boolean tryCompleteWordCountTopology(LocalCluster cluster, StormTopology topology) throws Exception {
- try {
- List<FixedTuple> testTuples = Arrays.asList("nathan", "bob", "joey", "nathan").stream()
- .map(value -> new FixedTuple(new Values(value)))
- .collect(Collectors.toList());
- MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
- CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
- completeTopologyParam.setMockedSources(mockedSources);
- completeTopologyParam.setStormConf(Collections.singletonMap(Config.TOPOLOGY_WORKERS, 2));
- Testing.completeTopology(cluster, topology, completeTopologyParam);
- return false;
- } catch (InvalidTopologyException e) {
- return true;
- }
- }
-
- @Test
- public void testValidateTopologystructure() throws Exception {
- try (LocalCluster cluster = new LocalCluster.Builder()
- .withSimulatedTime()
- .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
- .build()) {
- assertThat(tryCompleteWordCountTopology(cluster, mkValidateTopology()), is(false));
- assertThat(tryCompleteWordCountTopology(cluster, mkInvalidateTopology1()), is(true));
- assertThat(tryCompleteWordCountTopology(cluster, mkInvalidateTopology2()), is(true));
- assertThat(tryCompleteWordCountTopology(cluster, mkInvalidateTopology3()), is(true));
- }
- }
-
- private static class IdentityBolt extends BaseRichBolt {
-
- private OutputCollector collector;
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("num"));
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- collector.emit(input, input.getValues());
- collector.ack(input);
- }
-
- }
-
- @Test
- public void testSystemStream() throws Exception {
- //this test works because mocking a spout splits up the tuples evenly among the tasks
- try (LocalCluster cluster = new LocalCluster.Builder()
- .withSimulatedTime()
- .build()) {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
-
- Map<GlobalStreamId, Grouping> boltInputs = new HashMap<>();
- boltInputs.put(Utils.getGlobalStreamId("1", null), Thrift.prepareFieldsGrouping(Collections.singletonList("word")));
- boltInputs.put(Utils.getGlobalStreamId("1", "__system"), Thrift.prepareGlobalGrouping());
- Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
- Thrift.prepareBoltDetails(
- boltInputs,
- new IdentityBolt(), 1));
- StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
-
- Map<String, Object> stormConf = new HashMap<>();
- stormConf.put(Config.TOPOLOGY_WORKERS, 2);
-
- List<FixedTuple> testTuples = Arrays.asList("a", "b", "c").stream()
- .map(value -> new FixedTuple(new Values(value)))
- .collect(Collectors.toList());
-
- MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
-
- CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
- completeTopologyParams.setMockedSources(mockedSources);
- completeTopologyParams.setStormConf(stormConf);
-
- Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
-
- assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
- new Values("a"),
- new Values("b"),
- new Values("c")
- ));
- }
- }
-
- private static class SpoutAndChecker {
-
- private final FeederSpout spout;
- private final Consumer<Integer> checker;
-
- public SpoutAndChecker(FeederSpout spout, Consumer<Integer> checker) {
- this.spout = spout;
- this.checker = checker;
- }
- }
-
- private SpoutAndChecker ackTrackingFeeder(String... fields) {
- AckTracker tracker = new AckTracker();
- FeederSpout spout = new FeederSpout(new Fields(fields));
- spout.setAckFailDelegate(tracker);
- return new SpoutAndChecker(spout, expectedNumAcks -> {
- assertThat(tracker.getNumAcks(), is(expectedNumAcks));
- tracker.resetNumAcks();
- });
- }
-
- private static class BranchingBolt extends BaseRichBolt {
-
- private final int branches;
- private OutputCollector collector;
-
- public BranchingBolt(int branches) {
- this.branches = branches;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("num"));
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- IntStream.range(0, branches)
- .forEach(i -> collector.emit(input, new Values(i)));
- collector.ack(input);
- }
- }
-
- private static class AggBolt extends BaseRichBolt {
-
- private final int branches;
- private final List<Tuple> seen = new ArrayList<>();
- private OutputCollector collector;
-
- public AggBolt(int branches) {
- this.branches = branches;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("num"));
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- seen.add(input);
- if (seen.size() == branches) {
- collector.emit(seen, new Values(1));
- seen.forEach(t -> collector.ack(t));
- seen.clear();
- }
- }
- }
-
- private static class AckBolt extends BaseRichBolt {
-
- private OutputCollector collector;
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- collector.ack(input);
- }
- }
-
- @Test
- public void testAcking() throws Exception {
- try (LocalCluster cluster = new LocalCluster.Builder()
- .withSimulatedTime()
- .withTracked()
- .build()) {
- SpoutAndChecker feeder1 = ackTrackingFeeder("num");
- SpoutAndChecker feeder2 = ackTrackingFeeder("num");
- SpoutAndChecker feeder3 = ackTrackingFeeder("num");
-
- Map<String, SpoutDetails> spoutMap = new HashMap<>();
- spoutMap.put("1", Thrift.prepareSpoutDetails(feeder1.spout));
- spoutMap.put("2", Thrift.prepareSpoutDetails(feeder2.spout));
- spoutMap.put("3", Thrift.prepareSpoutDetails(feeder3.spout));
-
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("4", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
- boltMap.put("5", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(4)));
- boltMap.put("6", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(1)));
-
- Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
- aggregatorInputs.put(Utils.getGlobalStreamId("4", null), Thrift.prepareShuffleGrouping());
- aggregatorInputs.put(Utils.getGlobalStreamId("5", null), Thrift.prepareShuffleGrouping());
- aggregatorInputs.put(Utils.getGlobalStreamId("6", null), Thrift.prepareShuffleGrouping());
- boltMap.put("7", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(3)));
-
- boltMap.put("8", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("7", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
- boltMap.put("9", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("8", null), Thrift.prepareShuffleGrouping()), new AckBolt()));
-
- TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
-
- cluster.submitTopology("acking-test1", Collections.emptyMap(), tracked);
-
- cluster.advanceClusterTime(11);
- feeder1.spout.feed(new Values(1));
- Testing.trackedWait(tracked, 1);
- feeder1.checker.accept(0);
- feeder2.spout.feed(new Values(1));
- Testing.trackedWait(tracked, 1);
- feeder1.checker.accept(1);
- feeder2.checker.accept(1);
- feeder1.spout.feed(new Values(1));
- Testing.trackedWait(tracked, 1);
- feeder1.checker.accept(0);
- feeder1.spout.feed(new Values(1));
- Testing.trackedWait(tracked, 1);
- feeder1.checker.accept(1);
- feeder3.spout.feed(new Values(1));
- Testing.trackedWait(tracked, 1);
- feeder1.checker.accept(0);
- feeder3.checker.accept(0);
- feeder2.spout.feed(new Values(1));
- Testing.trackedWait(tracked, 1);
- feeder1.spout.feed(new Values(1));
- feeder2.spout.feed(new Values(1));
- feeder3.spout.feed(new Values(1));
- }
- }
-
- @Test
- public void testAckBranching() throws Exception {
- try (LocalCluster cluster = new LocalCluster.Builder()
- .withSimulatedTime()
- .withTracked()
- .build()) {
- SpoutAndChecker feeder = ackTrackingFeeder("num");
-
- Map<String, SpoutDetails> spoutMap = new HashMap<>();
- spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.spout));
-
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
- boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
-
- Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
- aggregatorInputs.put(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping());
- aggregatorInputs.put(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping());
- boltMap.put("4", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(4)));
-
- TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
-
- cluster.submitTopology("test-acking2", Collections.emptyMap(), tracked);
-
- cluster.advanceClusterTime(11);
- feeder.spout.feed(new Values(1));
- Testing.trackedWait(tracked, 1);
- feeder.checker.accept(0);
- feeder.spout.feed(new Values(1));
- Testing.trackedWait(tracked, 1);
- feeder.checker.accept(2);
- }
- }
-
- private static class DupAnchorBolt extends BaseRichBolt {
-
- private OutputCollector collector;
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("num"));
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- ArrayList<Tuple> anchors = new ArrayList<>();
- anchors.add(input);
- anchors.add(input);
- collector.emit(anchors, new Values(1));
- collector.ack(input);
- }
- }
-
- private static boolean boltPrepared = false;
-
- private static class PrepareTrackedBolt extends BaseRichBolt {
-
- private OutputCollector collector;
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- boltPrepared = true;
- collector.ack(input);
- }
- }
-
- private static boolean spoutOpened = false;
-
- private static class OpenTrackedSpout extends BaseRichSpout {
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("val"));
- }
-
- @Override
- public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
- }
-
- @Override
- public void nextTuple() {
- spoutOpened = true;
- }
-
- }
-
- @Test
- public void testSubmitInactiveTopology() throws Exception {
- try (LocalCluster cluster = new LocalCluster.Builder()
- .withSimulatedTime()
- .withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
- .build()) {
- FeederSpout feeder = new FeederSpout(new Fields("field1"));
- AckFailMapTracker tracker = new AckFailMapTracker();
- feeder.setAckFailDelegate(tracker);
-
- Map<String, SpoutDetails> spoutMap = new HashMap<>();
- spoutMap.put("1", Thrift.prepareSpoutDetails(feeder));
- spoutMap.put("2", Thrift.prepareSpoutDetails(new OpenTrackedSpout()));
-
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareGlobalGrouping()), new PrepareTrackedBolt()));
-
- boltPrepared = false;
- spoutOpened = false;
-
- StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
-
- cluster.submitTopologyWithOpts("test", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology, new SubmitOptions(TopologyInitialStatus.INACTIVE));
-
- cluster.advanceClusterTime(11);
- feeder.feed(new Values("a"), 1);
- cluster.advanceClusterTime(9);
- assertThat(boltPrepared, is(false));
- assertThat(spoutOpened, is(false));
- cluster.getNimbus().activate("test");
-
- cluster.advanceClusterTime(12);
- assertAcked(tracker, 1);
- assertThat(boltPrepared, is(true));
- assertThat(spoutOpened, is(true));
- }
- }
-
- @Test
- public void testAckingSelfAnchor() throws Exception {
- try (LocalCluster cluster = new LocalCluster.Builder()
- .withSimulatedTime()
- .withTracked()
- .build()) {
- SpoutAndChecker feeder = ackTrackingFeeder("num");
-
- Map<String, SpoutDetails> spoutMap = new HashMap<>();
- spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.spout));
-
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new DupAnchorBolt()));
- boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping()), new AckBolt()));
-
- TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
-
- cluster.submitTopology("test", Collections.emptyMap(), tracked);
-
- cluster.advanceClusterTime(11);
- feeder.spout.feed(new Values(1));
- Testing.trackedWait(tracked, 1);
- feeder.checker.accept(1);
- feeder.spout.feed(new Values(1));
- feeder.spout.feed(new Values(1));
- feeder.spout.feed(new Values(1));
- Testing.trackedWait(tracked, 3);
- feeder.checker.accept(3);
- }
- }
-
- private Map<Object, Object> listToMap(List<Object> list) {
- assertThat(list.size() % 2, is(0));
- Map<Object, Object> res = new HashMap<>();
- for (int i = 0; i < list.size(); i += 2) {
- res.put(list.get(i), list.get(i + 1));
- }
- return res;
- }
-
- @Test
- public void testKryoDecoratorsConfig() throws Exception {
- Map<String, Object> daemonConf = new HashMap<>();
- daemonConf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
- daemonConf.put(Config.TOPOLOGY_KRYO_DECORATORS, "this-is-overridden");
- try (LocalCluster cluster = new LocalCluster.Builder()
- .withSimulatedTime()
- .withDaemonConf(daemonConf)
- .build()) {
- TopologyBuilder topologyBuilder = new TopologyBuilder();
- topologyBuilder.setSpout("1", new TestPlannerSpout(new Fields("conf")));
- topologyBuilder.setBolt("2", new TestConfBolt(Collections.singletonMap(Config.TOPOLOGY_KRYO_DECORATORS, Arrays.asList("one", "two"))))
- .shuffleGrouping("1");
-
- List<FixedTuple> testTuples = Arrays.asList(new Values(Config.TOPOLOGY_KRYO_DECORATORS)).stream()
- .map(value -> new FixedTuple(value))
- .collect(Collectors.toList());
-
- MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
-
- CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
- completeTopologyParams.setMockedSources(mockedSources);
- completeTopologyParams.setStormConf(Collections.singletonMap(Config.TOPOLOGY_KRYO_DECORATORS, Arrays.asList("one", "three")));
-
- Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topologyBuilder.createTopology(), completeTopologyParams);
-
- List<Object> concatValues = Testing.readTuples(results, "2").stream()
- .flatMap(values -> values.stream())
- .collect(Collectors.toList());
- assertThat(concatValues.get(0), is(Config.TOPOLOGY_KRYO_DECORATORS));
- assertThat(concatValues.get(1), is(Arrays.asList("one", "two", "three")));
- }
- }
-
- @Test
- public void testComponentSpecificConfig() throws Exception {
- Map<String, Object> daemonConf = new HashMap<>();
- daemonConf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
- try (LocalCluster cluster = new LocalCluster.Builder()
- .withSimulatedTime()
- .withDaemonConf(daemonConf)
- .build()) {
- TopologyBuilder topologyBuilder = new TopologyBuilder();
- topologyBuilder.setSpout("1", new TestPlannerSpout(new Fields("conf")));
- Map<String, Object> componentConf = new HashMap<>();
- componentConf.put("fake.config", 123);
- componentConf.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 20);
- componentConf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 30);
- componentConf.put(Config.TOPOLOGY_KRYO_REGISTER, Arrays.asList(Collections.singletonMap("fake.type", "bad.serializer"), Collections.singletonMap("fake.type2", "a.serializer")));
- topologyBuilder.setBolt("2", new TestConfBolt(componentConf))
- .shuffleGrouping("1")
- .setMaxTaskParallelism(2)
- .addConfiguration("fake.config2", 987);
-
- List<FixedTuple> testTuples = Arrays.asList("fake.config", Config.TOPOLOGY_MAX_TASK_PARALLELISM, Config.TOPOLOGY_MAX_SPOUT_PENDING, "fake.config2", Config.TOPOLOGY_KRYO_REGISTER).stream()
- .map(value -> new FixedTuple(new Values(value)))
- .collect(Collectors.toList());
- Map<String, String> kryoRegister = new HashMap<>();
- kryoRegister.put("fake.type", "good.serializer");
- kryoRegister.put("fake.type3", "a.serializer3");
- Map<String, Object> stormConf = new HashMap<>();
- stormConf.put(Config.TOPOLOGY_KRYO_REGISTER, Arrays.asList(kryoRegister));
-
- MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
-
- CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
- completeTopologyParams.setMockedSources(mockedSources);
- completeTopologyParams.setStormConf(stormConf);
-
- Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topologyBuilder.createTopology(), completeTopologyParams);
-
- Map<String, Object> expectedValues = new HashMap<>();
- expectedValues.put("fake.config", 123L);
- expectedValues.put("fake.config2", 987L);
- expectedValues.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 2L);
- expectedValues.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 30L);
- Map<String, String> expectedKryoRegister = new HashMap<>();
- expectedKryoRegister.putAll(kryoRegister);
- expectedKryoRegister.put("fake.type2", "a.serializer");
- expectedValues.put(Config.TOPOLOGY_KRYO_REGISTER, expectedKryoRegister);
- List<Object> concatValues = Testing.readTuples(results, "2").stream()
- .flatMap(values -> values.stream())
- .collect(Collectors.toList());
- assertThat(listToMap(concatValues), is(expectedValues));
- }
- }
-
- private static class HooksBolt extends BaseRichBolt {
-
- private int acked = 0;
- private int failed = 0;
- private int executed = 0;
- private int emitted = 0;
- private OutputCollector collector;
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("emit", "ack", "fail", "executed"));
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- context.addTaskHook(new BaseTaskHook() {
- @Override
- public void boltExecute(BoltExecuteInfo info) {
- executed++;
- }
-
- @Override
- public void boltFail(BoltFailInfo info) {
- failed++;
- }
-
- @Override
- public void boltAck(BoltAckInfo info) {
- acked++;
- }
-
- @Override
- public void emit(EmitInfo info) {
- emitted++;
- }
-
- });
- }
-
- @Override
- public void execute(Tuple input) {
- collector.emit(new Values(emitted, acked, failed, executed));
- if (acked - failed == 0) {
- collector.ack(input);
- } else {
- collector.fail(input);
- }
- }
- }
-
- @Test
- public void testHooks() throws Exception {
- try (LocalCluster cluster = new LocalCluster.Builder()
- .withSimulatedTime()
- .build()) {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestPlannerSpout(new Fields("conf"))));
-
- Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()),
- new HooksBolt()));
- StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
-
- List<FixedTuple> testTuples = Arrays.asList(1, 1, 1, 1).stream()
- .map(value -> new FixedTuple(new Values(value)))
- .collect(Collectors.toList());
-
- MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
-
- CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
- completeTopologyParams.setMockedSources(mockedSources);
-
- Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
-
- List<List<Object>> expectedTuples = Arrays.asList(
- Arrays.asList(0, 0, 0, 0),
- Arrays.asList(2, 1, 0, 1),
- Arrays.asList(4, 1, 1, 2),
- Arrays.asList(6, 2, 1, 3));
-
- assertThat(Testing.readTuples(results, "2"), is(expectedTuples));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/jvm/org/apache/storm/integration/AckEveryOtherBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/integration/AckEveryOtherBolt.java b/storm-core/test/jvm/org/apache/storm/integration/AckEveryOtherBolt.java
new file mode 100644
index 0000000..6d9db7f
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/integration/AckEveryOtherBolt.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import java.util.Map;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+
+class AckEveryOtherBolt extends BaseRichBolt {
+
+ private boolean state = true;
+ private OutputCollector collector;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ if (state) {
+ collector.ack(input);
+ }
+ state = !state;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/jvm/org/apache/storm/integration/AckTrackingFeeder.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/integration/AckTrackingFeeder.java b/storm-core/test/jvm/org/apache/storm/integration/AckTrackingFeeder.java
new file mode 100644
index 0000000..19c5776
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/integration/AckTrackingFeeder.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.List;
+import org.apache.storm.testing.AckTracker;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.tuple.Fields;
+
+class AckTrackingFeeder {
+
+ private final AckTracker tracker;
+ private final FeederSpout spout;
+
+ public AckTrackingFeeder(String... fields) {
+ tracker = new AckTracker();
+ spout = new FeederSpout(new Fields(fields));
+ spout.setAckFailDelegate(tracker);
+ }
+
+ public FeederSpout getSpout() {
+ return spout;
+ }
+
+ public void feed(List<Object> tuple) {
+ spout.feed(tuple);
+ }
+
+ public void feed(List<Object> tuple, Object msgId) {
+ spout.feed(tuple, msgId);
+ }
+
+ public void feedNoWait(List<Object> tuple, Object msgId) {
+ spout.feedNoWait(tuple, msgId);
+ }
+
+ public void assertNumAcks(int expectedNumAcks) {
+ assertThat(tracker.getNumAcks(), is(expectedNumAcks));
+ tracker.resetNumAcks();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/jvm/org/apache/storm/integration/AggBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/integration/AggBolt.java b/storm-core/test/jvm/org/apache/storm/integration/AggBolt.java
new file mode 100644
index 0000000..99b355c
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/integration/AggBolt.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+class AggBolt extends BaseRichBolt {
+
+ private final int branches;
+ private final List<Tuple> seen = new ArrayList<>();
+ private OutputCollector collector;
+
+ public AggBolt(int branches) {
+ this.branches = branches;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("num"));
+ }
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ seen.add(input);
+ if (seen.size() == branches) {
+ collector.emit(seen, new Values(1));
+ seen.forEach(t -> collector.ack(t));
+ seen.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/jvm/org/apache/storm/integration/AssertLoop.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/integration/AssertLoop.java b/storm-core/test/jvm/org/apache/storm/integration/AssertLoop.java
new file mode 100644
index 0000000..5ac8f1e
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/integration/AssertLoop.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import static org.apache.storm.utils.PredicateMatcher.matchesPredicate;
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import org.apache.storm.testing.AckFailMapTracker;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+
+class AssertLoop {
+
+ public static void assertLoop(Predicate<Object> condition, Object... conditionParams) {
+ try {
+ Awaitility.with()
+ .pollInterval(1, TimeUnit.MILLISECONDS)
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertThat(Arrays.asList(conditionParams), everyItem(matchesPredicate(condition))));
+ } catch (ConditionTimeoutException e) {
+ throw new AssertionError(e.getMessage());
+ }
+ }
+
+ public static void assertAcked(AckFailMapTracker tracker, Object... ids) {
+ assertLoop(tracker::isAcked, ids);
+ }
+
+ public static void assertFailed(AckFailMapTracker tracker, Object... ids) {
+ assertLoop(tracker::isFailed, ids);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/jvm/org/apache/storm/integration/IdentityBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/integration/IdentityBolt.java b/storm-core/test/jvm/org/apache/storm/integration/IdentityBolt.java
new file mode 100644
index 0000000..38aad5b
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/integration/IdentityBolt.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import java.util.Map;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+
+class IdentityBolt extends BaseRichBolt {
+
+ private OutputCollector collector;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("num"));
+ }
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ collector.emit(input, input.getValues());
+ collector.ack(input);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/02e0e75c/storm-core/test/jvm/org/apache/storm/integration/TestingTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/integration/TestingTest.java b/storm-core/test/jvm/org/apache/storm/integration/TestingTest.java
new file mode 100644
index 0000000..bd8d229
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/integration/TestingTest.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import static org.apache.storm.integration.AssertLoop.assertAcked;
+import static org.apache.storm.integration.AssertLoop.assertFailed;
+
+import org.apache.storm.LocalCluster;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.Testing;
+import org.apache.storm.Thrift;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.testing.AckFailMapTracker;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.testing.MkTupleParam;
+import org.apache.storm.testing.TrackedTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+
+public class TestingTest {
+
+ @Test
+ public void testSimulatedTime() throws Exception {
+ assertThat(Time.isSimulating(), is(false));
+ try(SimulatedTime time = new SimulatedTime()) {
+ assertThat(Time.isSimulating(), is(true));
+ }
+ }
+
+ @Test
+ public void testWithLocalCluster() throws Exception {
+ try (LocalCluster cluster = new LocalCluster.Builder()
+ .withSupervisors(2)
+ .withPortsPerSupervisor(5)
+ .build()) {
+ assertThat(cluster, notNullValue());
+ assertThat(cluster.getNimbus(), notNullValue());
+ }
+ }
+
+ @Test
+ public void testWithSimulatedTimeLocalCluster() throws Exception {
+ assertThat(Time.isSimulating(), is(false));
+ try (LocalCluster cluster = new LocalCluster.Builder()
+ .withSupervisors(2)
+ .withPortsPerSupervisor(5)
+ .withSimulatedTime()
+ .build()) {
+ assertThat(cluster, notNullValue());
+ assertThat(cluster.getNimbus(), notNullValue());
+ assertThat(Time.isSimulating(), is(true));
+ }
+ }
+
+ @Test
+ public void testWithTrackedCluster() throws Exception {
+ try (LocalCluster cluster = new LocalCluster.Builder()
+ .withSimulatedTime()
+ .withTracked()
+ .build()) {
+ AckTrackingFeeder feeder = new AckTrackingFeeder("num");
+
+ Map<String, Thrift.SpoutDetails> spoutMap = new HashMap<>();
+ spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.getSpout()));
+
+ Map<String, Thrift.BoltDetails> boltMap = new HashMap<>();
+ boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
+ boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
+
+ Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
+ aggregatorInputs.put(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping());
+ aggregatorInputs.put(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping());
+ boltMap.put("4", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(4)));
+
+ TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
+
+ cluster.submitTopology("test-acking2", new Config(), tracked);
+
+ cluster.advanceClusterTime(11);
+ feeder.feed(new Values(1));
+ Testing.trackedWait(tracked, 1);
+ feeder.assertNumAcks(0);
+ feeder.feed(new Values(1));
+ Testing.trackedWait(tracked, 1);
+ feeder.assertNumAcks(2);
+ }
+ }
+
+ @Test
+ public void testAdvanceClusterTime() throws Exception {
+ Config daemonConf = new Config();
+ daemonConf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true);
+ try (LocalCluster cluster = new LocalCluster.Builder()
+ .withDaemonConf(daemonConf)
+ .withSimulatedTime()
+ .build()) {
+ FeederSpout feeder = new FeederSpout(new Fields("field1"));
+ AckFailMapTracker tracker = new AckFailMapTracker();
+ feeder.setAckFailDelegate(tracker);
+
+ Map<String, Thrift.SpoutDetails> spoutMap = new HashMap<>();
+ spoutMap.put("1", Thrift.prepareSpoutDetails(feeder));
+
+ Map<String, Thrift.BoltDetails> boltMap = new HashMap<>();
+ boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new AckEveryOtherBolt()));
+
+ StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+ Config stormConf = new Config();
+ stormConf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);
+
+ cluster.submitTopology("timeout-tester", stormConf, topology);
+
+ feeder.feed(new Values("a"), 1);
+ feeder.feed(new Values("b"), 2);
+ feeder.feed(new Values("c"), 3);
+ cluster.advanceClusterTime(9);
+ assertAcked(tracker, 1, 3);
+ assertThat(tracker.isFailed(2), is(false));
+ cluster.advanceClusterTime(12);
+ assertFailed(tracker, 2);
+ }
+ }
+
+ @Test
+ public void testDisableTupleTimeout() throws Exception {
+ Config daemonConf = new Config();
+ daemonConf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, false);
+ try (LocalCluster cluster = new LocalCluster.Builder()
+ .withDaemonConf(daemonConf)
+ .withSimulatedTime()
+ .build()) {
+ FeederSpout feeder = new FeederSpout(new Fields("field1"));
+ AckFailMapTracker tracker = new AckFailMapTracker();
+ feeder.setAckFailDelegate(tracker);
+
+ Map<String, Thrift.SpoutDetails> spoutMap = new HashMap<>();
+ spoutMap.put("1", Thrift.prepareSpoutDetails(feeder));
+
+ Map<String, Thrift.BoltDetails> boltMap = new HashMap<>();
+ boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new AckEveryOtherBolt()));
+
+ StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+ Config stormConf = new Config();
+ stormConf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);
+ stormConf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, false);
+
+ cluster.submitTopology("disable-timeout-tester", stormConf, topology);
+
+ feeder.feed(new Values("a"), 1);
+ feeder.feed(new Values("b"), 2);
+ feeder.feed(new Values("c"), 3);
+ cluster.advanceClusterTime(9);
+ assertAcked(tracker, 1, 3);
+ assertThat(tracker.isFailed(2), is(false));
+ cluster.advanceClusterTime(12);
+ assertThat(tracker.isFailed(2), is(false));
+ }
+ }
+
+ @Test
+ public void testTestTuple() throws Exception {
+ Tuple tuple = Testing.testTuple(new Values("james", "bond"));
+ assertThat(tuple.getValues(), is(new Values("james", "bond")));
+ assertThat(tuple.getSourceStreamId(), is(Utils.DEFAULT_STREAM_ID));
+ assertThat(tuple.getFields().toList(), is(Arrays.asList("field1", "field2")));
+ assertThat(tuple.getSourceComponent(), is("component"));
+ }
+
+ @Test
+ public void testTestTupleWithMkTupleParam() throws Exception {
+ MkTupleParam mkTupleParam = new MkTupleParam();
+ mkTupleParam.setStream("test-stream");
+ mkTupleParam.setComponent("test-component");
+ mkTupleParam.setFields("fname", "lname");
+ Tuple tuple = Testing.testTuple(new Values("james", "bond"), mkTupleParam);
+ assertThat(tuple.getValues(), is(new Values("james", "bond")));
+ assertThat(tuple.getSourceStreamId(), is("test-stream"));
+ assertThat(tuple.getFields().toList(), is(Arrays.asList("fname", "lname")));
+ assertThat(tuple.getSourceComponent(), is("test-component"));
+ }
+
+}