You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/03/16 18:02:15 UTC

[storm] branch master updated: [STORM-3306] TopologyIntegrationTest.java use TopologyBuilder to build topologies not Thrift

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 0994303  [STORM-3306] TopologyIntegrationTest.java use TopologyBuilder to build topologies not Thrift
     new 097940d  Merge pull request #3228 from nd368/master
0994303 is described below

commit 0994303194bc9f6a8b997ae4e3d776a04b04f1fd
Author: nathanday123 <na...@hotmail.co.uk>
AuthorDate: Sat Mar 14 15:41:15 2020 +0000

    [STORM-3306] TopologyIntegrationTest.java use TopologyBuilder to build topologies not Thrift
---
 .../storm/integration/TopologyIntegrationTest.java | 224 +++++++--------------
 1 file changed, 75 insertions(+), 149 deletions(-)

diff --git a/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java b/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java
index 3b71af9..fca5bcf 100644
--- a/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java
+++ b/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java
@@ -35,11 +35,6 @@ import java.util.stream.IntStream;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.Testing;
-import org.apache.storm.Thrift;
-import org.apache.storm.Thrift.BoltDetails;
-import org.apache.storm.Thrift.SpoutDetails;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.InvalidTopologyException;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.SubmitOptions;
@@ -72,7 +67,6 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -88,27 +82,13 @@ public class TopologyIntegrationTest {
             .withSupervisors(4)
             .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
             .build()) {
-            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("2",
-                Thrift.prepareBoltDetails(
-                    Collections.singletonMap(
-                        Utils.getGlobalStreamId("1", null),
-                        Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
-                    new TestWordCounter(), 4));
-            boltMap.put("3",
-                Thrift.prepareBoltDetails(
-                    Collections.singletonMap(
-                        Utils.getGlobalStreamId("1", null),
-                        Thrift.prepareGlobalGrouping()),
-                    new TestGlobalCount()));
-            boltMap.put("4",
-                Thrift.prepareBoltDetails(
-                    Collections.singletonMap(
-                        Utils.getGlobalStreamId("2", null),
-                        Thrift.prepareGlobalGrouping()),
-                    new TestAggregatesCounter()));
-            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            TopologyBuilder builder = new TopologyBuilder();
+            builder.setSpout("1", new TestWordSpout(true), 3);
+            builder.setBolt("2", new TestWordCounter(), 4).fieldsGrouping("1", new Fields("word"));
+            builder.setBolt("3", new TestGlobalCount()).globalGrouping("1");
+            builder.setBolt("4", new TestAggregatesCounter()).globalGrouping("2");
+            StormTopology topology = builder.createTopology();
 
             Map<String, Object> stormConf = new HashMap<>();
             stormConf.put(Config.TOPOLOGY_WORKERS, 2);
@@ -182,15 +162,11 @@ public class TopologyIntegrationTest {
             .withSimulatedTime()
             .withSupervisors(4)
             .build()) {
-            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true)));
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("2",
-                Thrift.prepareBoltDetails(
-                    Collections.singletonMap(
-                        Utils.getGlobalStreamId("1", null),
-                        Thrift.prepareAllGrouping()),
-                    new EmitTaskIdBolt(), 3, Collections.singletonMap(Config.TOPOLOGY_TASKS, 6)));
-            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+            TopologyBuilder builder = new TopologyBuilder();
+            builder.setSpout("1", new TestWordSpout(true));
+            builder.setBolt("2", new EmitTaskIdBolt(), 3).allGrouping("1")
+                    .addConfigurations(Collections.singletonMap(Config.TOPOLOGY_TASKS, 6));
+            StormTopology topology = builder.createTopology();
 
             MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", Collections.singletonList(new FixedTuple(new Values("a")))));
 
@@ -220,15 +196,11 @@ public class TopologyIntegrationTest {
             FeederSpout feeder = new FeederSpout(new Fields("field1"));
             AckFailMapTracker tracker = new AckFailMapTracker();
             feeder.setAckFailDelegate(tracker);
-            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(feeder));
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("2",
-                Thrift.prepareBoltDetails(
-                    Collections.singletonMap(
-                        Utils.getGlobalStreamId("1", null),
-                        Thrift.prepareGlobalGrouping()),
-                    new AckEveryOtherBolt()));
-            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            TopologyBuilder builder = new TopologyBuilder();
+            builder.setSpout("1", feeder);
+            builder.setBolt("2", new AckEveryOtherBolt()).globalGrouping("1");
+            StormTopology topology = builder.createTopology();
 
             cluster.submitTopology("timeout-tester", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology);
 
@@ -285,15 +257,11 @@ public class TopologyIntegrationTest {
             FeederSpout feeder = new FeederSpout(new Fields("field1"));
             AckFailMapTracker tracker = new AckFailMapTracker();
             feeder.setAckFailDelegate(tracker);
-            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(feeder));
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("2",
-                Thrift.prepareBoltDetails(
-                    Collections.singletonMap(
-                        Utils.getGlobalStreamId("1", null),
-                        Thrift.prepareGlobalGrouping()),
-                    new ResetTimeoutBolt()));
-            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+            TopologyBuilder builder = new TopologyBuilder();
+            builder.setSpout("1", feeder);
+            builder.setBolt("2", new ResetTimeoutBolt()).globalGrouping("1");
+            StormTopology topology = builder.createTopology();
 
             cluster.submitTopology("reset-timeout-tester", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology);
 
@@ -323,47 +291,31 @@ public class TopologyIntegrationTest {
     }
 
     private StormTopology mkValidateTopology() {
-        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
-        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
-            Thrift.prepareBoltDetails(
-                Collections.singletonMap(
-                    Utils.getGlobalStreamId("1", null),
-                    Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
-                new TestWordCounter(), 4));
-        return Thrift.buildTopology(spoutMap, boltMap);
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("1", new TestWordSpout(true), 3);
+        builder.setBolt("2", new TestWordCounter(), 4).fieldsGrouping("1", new Fields("word"));
+        return builder.createTopology();
     }
 
     private StormTopology mkInvalidateTopology1() {
-        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
-        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
-            Thrift.prepareBoltDetails(
-                Collections.singletonMap(
-                    Utils.getGlobalStreamId("3", null),
-                    Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
-                new TestWordCounter(), 4));
-        return Thrift.buildTopology(spoutMap, boltMap);
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("1", new TestWordSpout(true), 3);
+        builder.setBolt("2", new TestWordCounter(), 4).fieldsGrouping("3", new Fields("word"));
+        return builder.createTopology();
     }
 
     private StormTopology mkInvalidateTopology2() {
-        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
-        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
-            Thrift.prepareBoltDetails(
-                Collections.singletonMap(
-                    Utils.getGlobalStreamId("1", null),
-                    Thrift.prepareFieldsGrouping(Collections.singletonList("non-exists-field"))),
-                new TestWordCounter(), 4));
-        return Thrift.buildTopology(spoutMap, boltMap);
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("1", new TestWordSpout(true), 3);
+        builder.setBolt("2", new TestWordCounter(), 4).fieldsGrouping("1", new Fields("non-exists-field"));
+        return builder.createTopology();
     }
 
     private StormTopology mkInvalidateTopology3() {
-        Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
-        Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
-            Thrift.prepareBoltDetails(
-                Collections.singletonMap(
-                    Utils.getGlobalStreamId("1", "non-exists-stream"),
-                    Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
-                new TestWordCounter(), 4));
-        return Thrift.buildTopology(spoutMap, boltMap);
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("1", new TestWordSpout(true), 3);
+        builder.setBolt("2", new TestWordCounter(), 4).fieldsGrouping("1", "non-exists-stream", new Fields("word"));
+        return builder.createTopology();
     }
 
     private boolean tryCompleteWordCountTopology(LocalCluster cluster, StormTopology topology) throws Exception {
@@ -401,16 +353,12 @@ public class TopologyIntegrationTest {
         try (LocalCluster cluster = new LocalCluster.Builder()
             .withSimulatedTime()
             .build()) {
-            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
-
-            Map<GlobalStreamId, Grouping> boltInputs = new HashMap<>();
-            boltInputs.put(Utils.getGlobalStreamId("1", null), Thrift.prepareFieldsGrouping(Collections.singletonList("word")));
-            boltInputs.put(Utils.getGlobalStreamId("1", "__system"), Thrift.prepareGlobalGrouping());
-            Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
-                Thrift.prepareBoltDetails(
-                    boltInputs,
-                    new IdentityBolt(), 1));
-            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+            TopologyBuilder builder = new TopologyBuilder();
+            builder.setSpout("1", new TestWordSpout(true), 3);
+            builder.setBolt("2", new IdentityBolt(), 1)
+                    .fieldsGrouping("1", new Fields("word"))
+                    .globalGrouping("1", "__system");
+            StormTopology topology = builder.createTopology();
 
             Map<String, Object> stormConf = new HashMap<>();
             stormConf.put(Config.TOPOLOGY_WORKERS, 2);
@@ -502,26 +450,18 @@ public class TopologyIntegrationTest {
             AckTrackingFeeder feeder2 = new AckTrackingFeeder("num");
             AckTrackingFeeder feeder3 = new AckTrackingFeeder("num");
 
-            Map<String, SpoutDetails> spoutMap = new HashMap<>();
-            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder1.getSpout()));
-            spoutMap.put("2", Thrift.prepareSpoutDetails(feeder2.getSpout()));
-            spoutMap.put("3", Thrift.prepareSpoutDetails(feeder3.getSpout()));
-
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("4", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
-            boltMap.put("5", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(4)));
-            boltMap.put("6", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(1)));
-
-            Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
-            aggregatorInputs.put(Utils.getGlobalStreamId("4", null), Thrift.prepareShuffleGrouping());
-            aggregatorInputs.put(Utils.getGlobalStreamId("5", null), Thrift.prepareShuffleGrouping());
-            aggregatorInputs.put(Utils.getGlobalStreamId("6", null), Thrift.prepareShuffleGrouping());
-            boltMap.put("7", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(3)));
-
-            boltMap.put("8", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("7", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
-            boltMap.put("9", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("8", null), Thrift.prepareShuffleGrouping()), new AckBolt()));
+            TopologyBuilder builder = new TopologyBuilder();
+            builder.setSpout("1", feeder1.getSpout());
+            builder.setSpout("2", feeder2.getSpout());
+            builder.setSpout("3", feeder3.getSpout());
+            builder.setBolt("4", new BranchingBolt(2)).shuffleGrouping("1");
+            builder.setBolt("5", new BranchingBolt(4)).shuffleGrouping("2");
+            builder.setBolt("6", new BranchingBolt(1)).shuffleGrouping("3");
+            builder.setBolt("7", new AggBolt(3)).shuffleGrouping("4").shuffleGrouping("5").shuffleGrouping("6");
+            builder.setBolt("8", new BranchingBolt(2)).shuffleGrouping("7");
+            builder.setBolt("9", new AckBolt()).shuffleGrouping("8");
 
-            TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
+            TrackedTopology tracked = new TrackedTopology(builder.createTopology(), cluster);
 
             cluster.submitTopology("acking-test1", Collections.emptyMap(), tracked);
 
@@ -559,19 +499,13 @@ public class TopologyIntegrationTest {
             .build()) {
             AckTrackingFeeder feeder = new AckTrackingFeeder("num");
 
-            Map<String, SpoutDetails> spoutMap = new HashMap<>();
-            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.getSpout()));
+            TopologyBuilder builder = new TopologyBuilder();
+            builder.setSpout("1", feeder.getSpout());
+            builder.setBolt("2", new IdentityBolt()).shuffleGrouping("1");
+            builder.setBolt("3", new IdentityBolt()).shuffleGrouping("1");
+            builder.setBolt("4", new AggBolt(4)).shuffleGrouping("2").shuffleGrouping("3");
 
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
-            boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
-
-            Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
-            aggregatorInputs.put(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping());
-            aggregatorInputs.put(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping());
-            boltMap.put("4", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(4)));
-
-            TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
+            TrackedTopology tracked = new TrackedTopology(builder.createTopology(), cluster);
 
             cluster.submitTopology("test-acking2", Collections.emptyMap(), tracked);
 
@@ -661,17 +595,15 @@ public class TopologyIntegrationTest {
             AckFailMapTracker tracker = new AckFailMapTracker();
             feeder.setAckFailDelegate(tracker);
 
-            Map<String, SpoutDetails> spoutMap = new HashMap<>();
-            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder));
-            spoutMap.put("2", Thrift.prepareSpoutDetails(new OpenTrackedSpout()));
-
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareGlobalGrouping()), new PrepareTrackedBolt()));
+            TopologyBuilder builder = new TopologyBuilder();
+            builder.setSpout("1", feeder);
+            builder.setSpout("2", new OpenTrackedSpout());
+            builder.setBolt("3", new PrepareTrackedBolt()).globalGrouping("1");
 
             boltPrepared = false;
             spoutOpened = false;
 
-            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+            StormTopology topology = builder.createTopology();
 
             cluster.submitTopologyWithOpts("test", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology, new SubmitOptions(TopologyInitialStatus.INACTIVE));
 
@@ -697,14 +629,11 @@ public class TopologyIntegrationTest {
             .build()) {
             AckTrackingFeeder feeder = new AckTrackingFeeder("num");
 
-            Map<String, SpoutDetails> spoutMap = new HashMap<>();
-            spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.getSpout()));
-
-            Map<String, BoltDetails> boltMap = new HashMap<>();
-            boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new DupAnchorBolt()));
-            boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping()), new AckBolt()));
-
-            TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
+            TopologyBuilder builder = new TopologyBuilder();
+            builder.setSpout("1", feeder.getSpout());
+            builder.setBolt("2", new DupAnchorBolt()).shuffleGrouping("1");
+            builder.setBolt("3", new AckBolt()).shuffleGrouping("2");
+            TrackedTopology tracked = new TrackedTopology(builder.createTopology(), cluster);
 
             cluster.submitTopology("test", Collections.emptyMap(), tracked);
 
@@ -872,13 +801,10 @@ public class TopologyIntegrationTest {
         try (LocalCluster cluster = new LocalCluster.Builder()
             .withSimulatedTime()
             .build()) {
-            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestPlannerSpout(new Fields("conf"))));
-
-            Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
-                Thrift.prepareBoltDetails(
-                    Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()),
-                    new HooksBolt()));
-            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+            TopologyBuilder builder = new TopologyBuilder();
+            builder.setSpout("1", new TestPlannerSpout(new Fields("conf")));
+            builder.setBolt("2", new HooksBolt()).shuffleGrouping("1");
+            StormTopology topology = builder.createTopology();
 
             List<FixedTuple> testTuples = Arrays.asList(1, 1, 1, 1).stream()
                 .map(value -> new FixedTuple(new Values(value)))