You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/03/16 18:02:15 UTC
[storm] branch master updated: [STORM-3306]
TopologyIntegrationTest.java use TopologyBuilder to build topologies not
Thrift
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 0994303 [STORM-3306] TopologyIntegrationTest.java use TopologyBuilder to build topologies not Thrift
new 097940d Merge pull request #3228 from nd368/master
0994303 is described below
commit 0994303194bc9f6a8b997ae4e3d776a04b04f1fd
Author: nathanday123 <na...@hotmail.co.uk>
AuthorDate: Sat Mar 14 15:41:15 2020 +0000
[STORM-3306] TopologyIntegrationTest.java use TopologyBuilder to build topologies not Thrift
---
.../storm/integration/TopologyIntegrationTest.java | 224 +++++++--------------
1 file changed, 75 insertions(+), 149 deletions(-)
diff --git a/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java b/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java
index 3b71af9..fca5bcf 100644
--- a/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java
+++ b/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java
@@ -35,11 +35,6 @@ import java.util.stream.IntStream;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.Testing;
-import org.apache.storm.Thrift;
-import org.apache.storm.Thrift.BoltDetails;
-import org.apache.storm.Thrift.SpoutDetails;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
@@ -72,7 +67,6 @@ import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -88,27 +82,13 @@ public class TopologyIntegrationTest {
.withSupervisors(4)
.withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
.build()) {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", null),
- Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
- new TestWordCounter(), 4));
- boltMap.put("3",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", null),
- Thrift.prepareGlobalGrouping()),
- new TestGlobalCount()));
- boltMap.put("4",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("2", null),
- Thrift.prepareGlobalGrouping()),
- new TestAggregatesCounter()));
- StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1", new TestWordSpout(true), 3);
+ builder.setBolt("2", new TestWordCounter(), 4).fieldsGrouping("1", new Fields("word"));
+ builder.setBolt("3", new TestGlobalCount()).globalGrouping("1");
+ builder.setBolt("4", new TestAggregatesCounter()).globalGrouping("2");
+ StormTopology topology = builder.createTopology();
Map<String, Object> stormConf = new HashMap<>();
stormConf.put(Config.TOPOLOGY_WORKERS, 2);
@@ -182,15 +162,11 @@ public class TopologyIntegrationTest {
.withSimulatedTime()
.withSupervisors(4)
.build()) {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true)));
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", null),
- Thrift.prepareAllGrouping()),
- new EmitTaskIdBolt(), 3, Collections.singletonMap(Config.TOPOLOGY_TASKS, 6)));
- StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1", new TestWordSpout(true));
+ builder.setBolt("2", new EmitTaskIdBolt(), 3).allGrouping("1")
+ .addConfigurations(Collections.singletonMap(Config.TOPOLOGY_TASKS, 6));
+ StormTopology topology = builder.createTopology();
MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", Collections.singletonList(new FixedTuple(new Values("a")))));
@@ -220,15 +196,11 @@ public class TopologyIntegrationTest {
FeederSpout feeder = new FeederSpout(new Fields("field1"));
AckFailMapTracker tracker = new AckFailMapTracker();
feeder.setAckFailDelegate(tracker);
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(feeder));
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", null),
- Thrift.prepareGlobalGrouping()),
- new AckEveryOtherBolt()));
- StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1", feeder);
+ builder.setBolt("2", new AckEveryOtherBolt()).globalGrouping("1");
+ StormTopology topology = builder.createTopology();
cluster.submitTopology("timeout-tester", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology);
@@ -285,15 +257,11 @@ public class TopologyIntegrationTest {
FeederSpout feeder = new FeederSpout(new Fields("field1"));
AckFailMapTracker tracker = new AckFailMapTracker();
feeder.setAckFailDelegate(tracker);
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(feeder));
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", null),
- Thrift.prepareGlobalGrouping()),
- new ResetTimeoutBolt()));
- StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1", feeder);
+ builder.setBolt("2", new ResetTimeoutBolt()).globalGrouping("1");
+ StormTopology topology = builder.createTopology();
cluster.submitTopology("reset-timeout-tester", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology);
@@ -323,47 +291,31 @@ public class TopologyIntegrationTest {
}
private StormTopology mkValidateTopology() {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
- Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", null),
- Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
- new TestWordCounter(), 4));
- return Thrift.buildTopology(spoutMap, boltMap);
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1", new TestWordSpout(true), 3);
+ builder.setBolt("2", new TestWordCounter(), 4).fieldsGrouping("1", new Fields("word"));
+ return builder.createTopology();
}
private StormTopology mkInvalidateTopology1() {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
- Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("3", null),
- Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
- new TestWordCounter(), 4));
- return Thrift.buildTopology(spoutMap, boltMap);
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1", new TestWordSpout(true), 3);
+ builder.setBolt("2", new TestWordCounter(), 4).fieldsGrouping("3", new Fields("word"));
+ return builder.createTopology();
}
private StormTopology mkInvalidateTopology2() {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
- Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", null),
- Thrift.prepareFieldsGrouping(Collections.singletonList("non-exists-field"))),
- new TestWordCounter(), 4));
- return Thrift.buildTopology(spoutMap, boltMap);
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1", new TestWordSpout(true), 3);
+ builder.setBolt("2", new TestWordCounter(), 4).fieldsGrouping("1", new Fields("non-exists-field"));
+ return builder.createTopology();
}
private StormTopology mkInvalidateTopology3() {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
- Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(
- Utils.getGlobalStreamId("1", "non-exists-stream"),
- Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
- new TestWordCounter(), 4));
- return Thrift.buildTopology(spoutMap, boltMap);
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1", new TestWordSpout(true), 3);
+ builder.setBolt("2", new TestWordCounter(), 4).fieldsGrouping("1", "non-exists-stream", new Fields("word"));
+ return builder.createTopology();
}
private boolean tryCompleteWordCountTopology(LocalCluster cluster, StormTopology topology) throws Exception {
@@ -401,16 +353,12 @@ public class TopologyIntegrationTest {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.build()) {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
-
- Map<GlobalStreamId, Grouping> boltInputs = new HashMap<>();
- boltInputs.put(Utils.getGlobalStreamId("1", null), Thrift.prepareFieldsGrouping(Collections.singletonList("word")));
- boltInputs.put(Utils.getGlobalStreamId("1", "__system"), Thrift.prepareGlobalGrouping());
- Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
- Thrift.prepareBoltDetails(
- boltInputs,
- new IdentityBolt(), 1));
- StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1", new TestWordSpout(true), 3);
+ builder.setBolt("2", new IdentityBolt(), 1)
+ .fieldsGrouping("1", new Fields("word"))
+ .globalGrouping("1", "__system");
+ StormTopology topology = builder.createTopology();
Map<String, Object> stormConf = new HashMap<>();
stormConf.put(Config.TOPOLOGY_WORKERS, 2);
@@ -502,26 +450,18 @@ public class TopologyIntegrationTest {
AckTrackingFeeder feeder2 = new AckTrackingFeeder("num");
AckTrackingFeeder feeder3 = new AckTrackingFeeder("num");
- Map<String, SpoutDetails> spoutMap = new HashMap<>();
- spoutMap.put("1", Thrift.prepareSpoutDetails(feeder1.getSpout()));
- spoutMap.put("2", Thrift.prepareSpoutDetails(feeder2.getSpout()));
- spoutMap.put("3", Thrift.prepareSpoutDetails(feeder3.getSpout()));
-
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("4", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
- boltMap.put("5", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(4)));
- boltMap.put("6", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(1)));
-
- Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
- aggregatorInputs.put(Utils.getGlobalStreamId("4", null), Thrift.prepareShuffleGrouping());
- aggregatorInputs.put(Utils.getGlobalStreamId("5", null), Thrift.prepareShuffleGrouping());
- aggregatorInputs.put(Utils.getGlobalStreamId("6", null), Thrift.prepareShuffleGrouping());
- boltMap.put("7", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(3)));
-
- boltMap.put("8", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("7", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
- boltMap.put("9", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("8", null), Thrift.prepareShuffleGrouping()), new AckBolt()));
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1", feeder1.getSpout());
+ builder.setSpout("2", feeder2.getSpout());
+ builder.setSpout("3", feeder3.getSpout());
+ builder.setBolt("4", new BranchingBolt(2)).shuffleGrouping("1");
+ builder.setBolt("5", new BranchingBolt(4)).shuffleGrouping("2");
+ builder.setBolt("6", new BranchingBolt(1)).shuffleGrouping("3");
+ builder.setBolt("7", new AggBolt(3)).shuffleGrouping("4").shuffleGrouping("5").shuffleGrouping("6");
+ builder.setBolt("8", new BranchingBolt(2)).shuffleGrouping("7");
+ builder.setBolt("9", new AckBolt()).shuffleGrouping("8");
- TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
+ TrackedTopology tracked = new TrackedTopology(builder.createTopology(), cluster);
cluster.submitTopology("acking-test1", Collections.emptyMap(), tracked);
@@ -559,19 +499,13 @@ public class TopologyIntegrationTest {
.build()) {
AckTrackingFeeder feeder = new AckTrackingFeeder("num");
- Map<String, SpoutDetails> spoutMap = new HashMap<>();
- spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.getSpout()));
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1", feeder.getSpout());
+ builder.setBolt("2", new IdentityBolt()).shuffleGrouping("1");
+ builder.setBolt("3", new IdentityBolt()).shuffleGrouping("1");
+ builder.setBolt("4", new AggBolt(4)).shuffleGrouping("2").shuffleGrouping("3");
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
- boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
-
- Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
- aggregatorInputs.put(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping());
- aggregatorInputs.put(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping());
- boltMap.put("4", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(4)));
-
- TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
+ TrackedTopology tracked = new TrackedTopology(builder.createTopology(), cluster);
cluster.submitTopology("test-acking2", Collections.emptyMap(), tracked);
@@ -661,17 +595,15 @@ public class TopologyIntegrationTest {
AckFailMapTracker tracker = new AckFailMapTracker();
feeder.setAckFailDelegate(tracker);
- Map<String, SpoutDetails> spoutMap = new HashMap<>();
- spoutMap.put("1", Thrift.prepareSpoutDetails(feeder));
- spoutMap.put("2", Thrift.prepareSpoutDetails(new OpenTrackedSpout()));
-
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareGlobalGrouping()), new PrepareTrackedBolt()));
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1", feeder);
+ builder.setSpout("2", new OpenTrackedSpout());
+ builder.setBolt("3", new PrepareTrackedBolt()).globalGrouping("1");
boltPrepared = false;
spoutOpened = false;
- StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+ StormTopology topology = builder.createTopology();
cluster.submitTopologyWithOpts("test", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology, new SubmitOptions(TopologyInitialStatus.INACTIVE));
@@ -697,14 +629,11 @@ public class TopologyIntegrationTest {
.build()) {
AckTrackingFeeder feeder = new AckTrackingFeeder("num");
- Map<String, SpoutDetails> spoutMap = new HashMap<>();
- spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.getSpout()));
-
- Map<String, BoltDetails> boltMap = new HashMap<>();
- boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new DupAnchorBolt()));
- boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping()), new AckBolt()));
-
- TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1", feeder.getSpout());
+ builder.setBolt("2", new DupAnchorBolt()).shuffleGrouping("1");
+ builder.setBolt("3", new AckBolt()).shuffleGrouping("2");
+ TrackedTopology tracked = new TrackedTopology(builder.createTopology(), cluster);
cluster.submitTopology("test", Collections.emptyMap(), tracked);
@@ -872,13 +801,10 @@ public class TopologyIntegrationTest {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.build()) {
- Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestPlannerSpout(new Fields("conf"))));
-
- Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
- Thrift.prepareBoltDetails(
- Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()),
- new HooksBolt()));
- StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1", new TestPlannerSpout(new Fields("conf")));
+ builder.setBolt("2", new HooksBolt()).shuffleGrouping("1");
+ StormTopology topology = builder.createTopology();
List<FixedTuple> testTuples = Arrays.asList(1, 1, 1, 1).stream()
.map(value -> new FixedTuple(new Values(value)))