You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by srdo <gi...@git.apache.org> on 2018/12/12 13:41:38 UTC
[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java
GitHub user srdo opened a pull request:
https://github.com/apache/storm/pull/2927
STORM-1307: Port testing4j_test.clj to Java
Follow up to https://github.com/apache/storm/pull/2924, please review that one first.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/srdo/storm STORM-1307
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/storm/pull/2927.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2927
----
commit a491d3e1b6a85726648d609a79c074a0ae66932c
Author: Stig Rohde Døssing <sr...@...>
Date: 2018-12-07T21:07:36Z
STORM-1289: Port integration-test.clj to Java
commit e408283fd01d6b1146859265336f794203054901
Author: Stig Rohde Døssing <sr...@...>
Date: 2018-12-12T00:36:09Z
STORM-1307: Port testing4j_test.clj to Java
----
---
[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2927#discussion_r242661816
--- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java ---
@@ -0,0 +1,927 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import static org.apache.storm.integration.AssertLoop.assertAcked;
+import static org.apache.storm.integration.AssertLoop.assertFailed;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.Testing;
+import org.apache.storm.Thrift;
+import org.apache.storm.Thrift.BoltDetails;
+import org.apache.storm.Thrift.SpoutDetails;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInitialStatus;
+import org.apache.storm.hooks.BaseTaskHook;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.AckFailMapTracker;
+import org.apache.storm.testing.CompleteTopologyParam;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.testing.FixedTuple;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MockedSources;
+import org.apache.storm.testing.TestAggregatesCounter;
+import org.apache.storm.testing.TestConfBolt;
+import org.apache.storm.testing.TestGlobalCount;
+import org.apache.storm.testing.TestPlannerSpout;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.testing.TrackedTopology;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@IntegrationTest
+public class TopologyIntegrationTest {
+
+ @ParameterizedTest
+ @ValueSource(strings = {"true", "false"})
+ public void testBasicTopology(boolean useLocalMessaging) throws Exception {
+ try (LocalCluster cluster = new LocalCluster.Builder()
+ .withSimulatedTime()
+ .withSupervisors(4)
+ .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
+ .build()) {
+ Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
+ Map<String, BoltDetails> boltMap = new HashMap<>();
+ boltMap.put("2",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("1", null),
+ Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
+ new TestWordCounter(), 4));
+ boltMap.put("3",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("1", null),
+ Thrift.prepareGlobalGrouping()),
+ new TestGlobalCount()));
+ boltMap.put("4",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("2", null),
+ Thrift.prepareGlobalGrouping()),
+ new TestAggregatesCounter()));
+ StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
--- End diff --
Could we get a follow on JIRA to move the code that uses Thrift to create a topology over to using TopologyBuilder? I think these APIs were used because they are a little cleaner in clojure, but in java they take up a lot more code, and it is harder to follow.
---
[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/storm/pull/2927
---
[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2927#discussion_r242699069
--- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java ---
@@ -0,0 +1,927 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import static org.apache.storm.integration.AssertLoop.assertAcked;
+import static org.apache.storm.integration.AssertLoop.assertFailed;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.Testing;
+import org.apache.storm.Thrift;
+import org.apache.storm.Thrift.BoltDetails;
+import org.apache.storm.Thrift.SpoutDetails;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInitialStatus;
+import org.apache.storm.hooks.BaseTaskHook;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.AckFailMapTracker;
+import org.apache.storm.testing.CompleteTopologyParam;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.testing.FixedTuple;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MockedSources;
+import org.apache.storm.testing.TestAggregatesCounter;
+import org.apache.storm.testing.TestConfBolt;
+import org.apache.storm.testing.TestGlobalCount;
+import org.apache.storm.testing.TestPlannerSpout;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.testing.TrackedTopology;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@IntegrationTest
+public class TopologyIntegrationTest {
+
+ @ParameterizedTest
+ @ValueSource(strings = {"true", "false"})
+ public void testBasicTopology(boolean useLocalMessaging) throws Exception {
+ try (LocalCluster cluster = new LocalCluster.Builder()
+ .withSimulatedTime()
+ .withSupervisors(4)
+ .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
+ .build()) {
+ Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
+ Map<String, BoltDetails> boltMap = new HashMap<>();
+ boltMap.put("2",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("1", null),
+ Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
+ new TestWordCounter(), 4));
+ boltMap.put("3",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("1", null),
+ Thrift.prepareGlobalGrouping()),
+ new TestGlobalCount()));
+ boltMap.put("4",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("2", null),
+ Thrift.prepareGlobalGrouping()),
+ new TestAggregatesCounter()));
+ StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
--- End diff --
Sure, raised https://issues.apache.org/jira/browse/STORM-3306
---
[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2927#discussion_r242663110
--- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java ---
@@ -0,0 +1,927 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import static org.apache.storm.integration.AssertLoop.assertAcked;
+import static org.apache.storm.integration.AssertLoop.assertFailed;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.Testing;
+import org.apache.storm.Thrift;
+import org.apache.storm.Thrift.BoltDetails;
+import org.apache.storm.Thrift.SpoutDetails;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInitialStatus;
+import org.apache.storm.hooks.BaseTaskHook;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.AckFailMapTracker;
+import org.apache.storm.testing.CompleteTopologyParam;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.testing.FixedTuple;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MockedSources;
+import org.apache.storm.testing.TestAggregatesCounter;
+import org.apache.storm.testing.TestConfBolt;
+import org.apache.storm.testing.TestGlobalCount;
+import org.apache.storm.testing.TestPlannerSpout;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.testing.TrackedTopology;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@IntegrationTest
+public class TopologyIntegrationTest {
+
+ @ParameterizedTest
+ @ValueSource(strings = {"true", "false"})
+ public void testBasicTopology(boolean useLocalMessaging) throws Exception {
+ try (LocalCluster cluster = new LocalCluster.Builder()
+ .withSimulatedTime()
+ .withSupervisors(4)
+ .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
+ .build()) {
+ Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
+ Map<String, BoltDetails> boltMap = new HashMap<>();
+ boltMap.put("2",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("1", null),
+ Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
+ new TestWordCounter(), 4));
+ boltMap.put("3",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("1", null),
+ Thrift.prepareGlobalGrouping()),
+ new TestGlobalCount()));
+ boltMap.put("4",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("2", null),
+ Thrift.prepareGlobalGrouping()),
+ new TestAggregatesCounter()));
+ StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+ Map<String, Object> stormConf = new HashMap<>();
+ stormConf.put(Config.TOPOLOGY_WORKERS, 2);
+ stormConf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
+
+ List<Values> testValues = new ArrayList<>();
+ testValues.add(new Values("nathan"));
+ testValues.add(new Values("bob"));
+ testValues.add(new Values("joey"));
+ testValues.add(new Values("nathan"));
+ List<FixedTuple> testTuples = testValues.stream()
+ .map(value -> new FixedTuple(value))
+ .collect(Collectors.toList());
--- End diff --
Wouldn't it be simpler to do something like.
```
List<FixedTuples> testTuples = Arrays.asList("nathan", "bob", "joey", "nathan").stream()
.map(name -> new FixedTuple(new Values(name)))
.collect(Collectors.toList());
```
---
[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2927#discussion_r242739488
--- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java ---
@@ -0,0 +1,927 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import static org.apache.storm.integration.AssertLoop.assertAcked;
+import static org.apache.storm.integration.AssertLoop.assertFailed;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.Testing;
+import org.apache.storm.Thrift;
+import org.apache.storm.Thrift.BoltDetails;
+import org.apache.storm.Thrift.SpoutDetails;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInitialStatus;
+import org.apache.storm.hooks.BaseTaskHook;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.AckFailMapTracker;
+import org.apache.storm.testing.CompleteTopologyParam;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.testing.FixedTuple;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MockedSources;
+import org.apache.storm.testing.TestAggregatesCounter;
+import org.apache.storm.testing.TestConfBolt;
+import org.apache.storm.testing.TestGlobalCount;
+import org.apache.storm.testing.TestPlannerSpout;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.testing.TrackedTopology;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@IntegrationTest
+public class TopologyIntegrationTest {
+
+ @ParameterizedTest
+ @ValueSource(strings = {"true", "false"})
+ public void testBasicTopology(boolean useLocalMessaging) throws Exception {
+ try (LocalCluster cluster = new LocalCluster.Builder()
+ .withSimulatedTime()
+ .withSupervisors(4)
+ .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
+ .build()) {
+ Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
+ Map<String, BoltDetails> boltMap = new HashMap<>();
+ boltMap.put("2",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("1", null),
+ Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
+ new TestWordCounter(), 4));
+ boltMap.put("3",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("1", null),
+ Thrift.prepareGlobalGrouping()),
+ new TestGlobalCount()));
+ boltMap.put("4",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("2", null),
+ Thrift.prepareGlobalGrouping()),
+ new TestAggregatesCounter()));
+ StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+ Map<String, Object> stormConf = new HashMap<>();
+ stormConf.put(Config.TOPOLOGY_WORKERS, 2);
+ stormConf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
+
+ List<Values> testValues = new ArrayList<>();
+ testValues.add(new Values("nathan"));
+ testValues.add(new Values("bob"));
+ testValues.add(new Values("joey"));
+ testValues.add(new Values("nathan"));
+ List<FixedTuple> testTuples = testValues.stream()
+ .map(value -> new FixedTuple(value))
+ .collect(Collectors.toList());
--- End diff --
Fixed
---
[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2927#discussion_r242698501
--- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java ---
@@ -0,0 +1,927 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import static org.apache.storm.integration.AssertLoop.assertAcked;
+import static org.apache.storm.integration.AssertLoop.assertFailed;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.Testing;
+import org.apache.storm.Thrift;
+import org.apache.storm.Thrift.BoltDetails;
+import org.apache.storm.Thrift.SpoutDetails;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInitialStatus;
+import org.apache.storm.hooks.BaseTaskHook;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.AckFailMapTracker;
+import org.apache.storm.testing.CompleteTopologyParam;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.testing.FixedTuple;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MockedSources;
+import org.apache.storm.testing.TestAggregatesCounter;
+import org.apache.storm.testing.TestConfBolt;
+import org.apache.storm.testing.TestGlobalCount;
+import org.apache.storm.testing.TestPlannerSpout;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.testing.TrackedTopology;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@IntegrationTest
+public class TopologyIntegrationTest {
+
+ @ParameterizedTest
+ @ValueSource(strings = {"true", "false"})
+ public void testBasicTopology(boolean useLocalMessaging) throws Exception {
+ try (LocalCluster cluster = new LocalCluster.Builder()
+ .withSimulatedTime()
+ .withSupervisors(4)
+ .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
+ .build()) {
+ Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
+ Map<String, BoltDetails> boltMap = new HashMap<>();
+ boltMap.put("2",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("1", null),
+ Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
+ new TestWordCounter(), 4));
+ boltMap.put("3",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("1", null),
+ Thrift.prepareGlobalGrouping()),
+ new TestGlobalCount()));
+ boltMap.put("4",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("2", null),
+ Thrift.prepareGlobalGrouping()),
+ new TestAggregatesCounter()));
+ StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
+
+ Map<String, Object> stormConf = new HashMap<>();
+ stormConf.put(Config.TOPOLOGY_WORKERS, 2);
+ stormConf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
+
+ List<Values> testValues = new ArrayList<>();
+ testValues.add(new Values("nathan"));
+ testValues.add(new Values("bob"));
+ testValues.add(new Values("joey"));
+ testValues.add(new Values("nathan"));
+ List<FixedTuple> testTuples = testValues.stream()
+ .map(value -> new FixedTuple(value))
+ .collect(Collectors.toList());
--- End diff --
Yes, absolutely. I'll change it.
---