You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by srdo <gi...@git.apache.org> on 2018/12/12 13:41:38 UTC

[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java

GitHub user srdo opened a pull request:

    https://github.com/apache/storm/pull/2927

    STORM-1307: Port testing4j_test.clj to Java

    Follow up to https://github.com/apache/storm/pull/2924, please review that one first.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-1307

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2927.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2927
    
----
commit a491d3e1b6a85726648d609a79c074a0ae66932c
Author: Stig Rohde Døssing <sr...@...>
Date:   2018-12-07T21:07:36Z

    STORM-1289: Port integration-test.clj to Java

commit e408283fd01d6b1146859265336f794203054901
Author: Stig Rohde Døssing <sr...@...>
Date:   2018-12-12T00:36:09Z

    STORM-1307: Port testing4j_test.clj to Java

----


---

[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2927#discussion_r242661816
  
    --- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java ---
    @@ -0,0 +1,927 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.integration;
    +
    +import static org.apache.storm.integration.AssertLoop.assertAcked;
    +import static org.apache.storm.integration.AssertLoop.assertFailed;
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.hamcrest.Matchers.contains;
    +import static org.hamcrest.Matchers.containsInAnyOrder;
    +import static org.junit.Assert.assertThat;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +import org.apache.storm.Config;
    +import org.apache.storm.LocalCluster;
    +import org.apache.storm.Testing;
    +import org.apache.storm.Thrift;
    +import org.apache.storm.Thrift.BoltDetails;
    +import org.apache.storm.Thrift.SpoutDetails;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.generated.InvalidTopologyException;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.generated.SubmitOptions;
    +import org.apache.storm.generated.TopologyInitialStatus;
    +import org.apache.storm.hooks.BaseTaskHook;
    +import org.apache.storm.hooks.info.BoltAckInfo;
    +import org.apache.storm.hooks.info.BoltExecuteInfo;
    +import org.apache.storm.hooks.info.BoltFailInfo;
    +import org.apache.storm.hooks.info.EmitInfo;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.testing.AckFailMapTracker;
    +import org.apache.storm.testing.CompleteTopologyParam;
    +import org.apache.storm.testing.FeederSpout;
    +import org.apache.storm.testing.FixedTuple;
    +import org.apache.storm.testing.IntegrationTest;
    +import org.apache.storm.testing.MockedSources;
    +import org.apache.storm.testing.TestAggregatesCounter;
    +import org.apache.storm.testing.TestConfBolt;
    +import org.apache.storm.testing.TestGlobalCount;
    +import org.apache.storm.testing.TestPlannerSpout;
    +import org.apache.storm.testing.TestWordCounter;
    +import org.apache.storm.testing.TestWordSpout;
    +import org.apache.storm.testing.TrackedTopology;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.tuple.Values;
    +import org.apache.storm.utils.Utils;
    +import org.junit.jupiter.api.Test;
    +import org.junit.jupiter.params.ParameterizedTest;
    +import org.junit.jupiter.params.provider.ValueSource;
    +
    +@IntegrationTest
    +public class TopologyIntegrationTest {
    +
    +    @ParameterizedTest
    +    @ValueSource(strings = {"true", "false"})
    +    public void testBasicTopology(boolean useLocalMessaging) throws Exception {
    +        try (LocalCluster cluster = new LocalCluster.Builder()
    +            .withSimulatedTime()
    +            .withSupervisors(4)
    +            .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
    +            .build()) {
    +            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
    +            Map<String, BoltDetails> boltMap = new HashMap<>();
    +            boltMap.put("2",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("1", null),
    +                        Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
    +                    new TestWordCounter(), 4));
    +            boltMap.put("3",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("1", null),
    +                        Thrift.prepareGlobalGrouping()),
    +                    new TestGlobalCount()));
    +            boltMap.put("4",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("2", null),
    +                        Thrift.prepareGlobalGrouping()),
    +                    new TestAggregatesCounter()));
    +            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
    --- End diff --
    
    Could we get a follow on JIRA to move the code that uses Thrift to create a topology over to using TopologyBuilder?  I think these APIs were used because they are a little cleaner in clojure, but in java they take up a lot more code, and it is harder to follow.


---

[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2927


---

[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2927#discussion_r242699069
  
    --- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java ---
    @@ -0,0 +1,927 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.integration;
    +
    +import static org.apache.storm.integration.AssertLoop.assertAcked;
    +import static org.apache.storm.integration.AssertLoop.assertFailed;
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.hamcrest.Matchers.contains;
    +import static org.hamcrest.Matchers.containsInAnyOrder;
    +import static org.junit.Assert.assertThat;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +import org.apache.storm.Config;
    +import org.apache.storm.LocalCluster;
    +import org.apache.storm.Testing;
    +import org.apache.storm.Thrift;
    +import org.apache.storm.Thrift.BoltDetails;
    +import org.apache.storm.Thrift.SpoutDetails;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.generated.InvalidTopologyException;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.generated.SubmitOptions;
    +import org.apache.storm.generated.TopologyInitialStatus;
    +import org.apache.storm.hooks.BaseTaskHook;
    +import org.apache.storm.hooks.info.BoltAckInfo;
    +import org.apache.storm.hooks.info.BoltExecuteInfo;
    +import org.apache.storm.hooks.info.BoltFailInfo;
    +import org.apache.storm.hooks.info.EmitInfo;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.testing.AckFailMapTracker;
    +import org.apache.storm.testing.CompleteTopologyParam;
    +import org.apache.storm.testing.FeederSpout;
    +import org.apache.storm.testing.FixedTuple;
    +import org.apache.storm.testing.IntegrationTest;
    +import org.apache.storm.testing.MockedSources;
    +import org.apache.storm.testing.TestAggregatesCounter;
    +import org.apache.storm.testing.TestConfBolt;
    +import org.apache.storm.testing.TestGlobalCount;
    +import org.apache.storm.testing.TestPlannerSpout;
    +import org.apache.storm.testing.TestWordCounter;
    +import org.apache.storm.testing.TestWordSpout;
    +import org.apache.storm.testing.TrackedTopology;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.tuple.Values;
    +import org.apache.storm.utils.Utils;
    +import org.junit.jupiter.api.Test;
    +import org.junit.jupiter.params.ParameterizedTest;
    +import org.junit.jupiter.params.provider.ValueSource;
    +
    +@IntegrationTest
    +public class TopologyIntegrationTest {
    +
    +    @ParameterizedTest
    +    @ValueSource(strings = {"true", "false"})
    +    public void testBasicTopology(boolean useLocalMessaging) throws Exception {
    +        try (LocalCluster cluster = new LocalCluster.Builder()
    +            .withSimulatedTime()
    +            .withSupervisors(4)
    +            .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
    +            .build()) {
    +            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
    +            Map<String, BoltDetails> boltMap = new HashMap<>();
    +            boltMap.put("2",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("1", null),
    +                        Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
    +                    new TestWordCounter(), 4));
    +            boltMap.put("3",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("1", null),
    +                        Thrift.prepareGlobalGrouping()),
    +                    new TestGlobalCount()));
    +            boltMap.put("4",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("2", null),
    +                        Thrift.prepareGlobalGrouping()),
    +                    new TestAggregatesCounter()));
    +            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
    --- End diff --
    
    Sure, raised https://issues.apache.org/jira/browse/STORM-3306


---

[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2927#discussion_r242663110
  
    --- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java ---
    @@ -0,0 +1,927 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.integration;
    +
    +import static org.apache.storm.integration.AssertLoop.assertAcked;
    +import static org.apache.storm.integration.AssertLoop.assertFailed;
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.hamcrest.Matchers.contains;
    +import static org.hamcrest.Matchers.containsInAnyOrder;
    +import static org.junit.Assert.assertThat;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +import org.apache.storm.Config;
    +import org.apache.storm.LocalCluster;
    +import org.apache.storm.Testing;
    +import org.apache.storm.Thrift;
    +import org.apache.storm.Thrift.BoltDetails;
    +import org.apache.storm.Thrift.SpoutDetails;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.generated.InvalidTopologyException;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.generated.SubmitOptions;
    +import org.apache.storm.generated.TopologyInitialStatus;
    +import org.apache.storm.hooks.BaseTaskHook;
    +import org.apache.storm.hooks.info.BoltAckInfo;
    +import org.apache.storm.hooks.info.BoltExecuteInfo;
    +import org.apache.storm.hooks.info.BoltFailInfo;
    +import org.apache.storm.hooks.info.EmitInfo;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.testing.AckFailMapTracker;
    +import org.apache.storm.testing.CompleteTopologyParam;
    +import org.apache.storm.testing.FeederSpout;
    +import org.apache.storm.testing.FixedTuple;
    +import org.apache.storm.testing.IntegrationTest;
    +import org.apache.storm.testing.MockedSources;
    +import org.apache.storm.testing.TestAggregatesCounter;
    +import org.apache.storm.testing.TestConfBolt;
    +import org.apache.storm.testing.TestGlobalCount;
    +import org.apache.storm.testing.TestPlannerSpout;
    +import org.apache.storm.testing.TestWordCounter;
    +import org.apache.storm.testing.TestWordSpout;
    +import org.apache.storm.testing.TrackedTopology;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.tuple.Values;
    +import org.apache.storm.utils.Utils;
    +import org.junit.jupiter.api.Test;
    +import org.junit.jupiter.params.ParameterizedTest;
    +import org.junit.jupiter.params.provider.ValueSource;
    +
    +@IntegrationTest
    +public class TopologyIntegrationTest {
    +
    +    @ParameterizedTest
    +    @ValueSource(strings = {"true", "false"})
    +    public void testBasicTopology(boolean useLocalMessaging) throws Exception {
    +        try (LocalCluster cluster = new LocalCluster.Builder()
    +            .withSimulatedTime()
    +            .withSupervisors(4)
    +            .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
    +            .build()) {
    +            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
    +            Map<String, BoltDetails> boltMap = new HashMap<>();
    +            boltMap.put("2",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("1", null),
    +                        Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
    +                    new TestWordCounter(), 4));
    +            boltMap.put("3",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("1", null),
    +                        Thrift.prepareGlobalGrouping()),
    +                    new TestGlobalCount()));
    +            boltMap.put("4",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("2", null),
    +                        Thrift.prepareGlobalGrouping()),
    +                    new TestAggregatesCounter()));
    +            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
    +
    +            Map<String, Object> stormConf = new HashMap<>();
    +            stormConf.put(Config.TOPOLOGY_WORKERS, 2);
    +            stormConf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
    +
    +            List<Values> testValues = new ArrayList<>();
    +            testValues.add(new Values("nathan"));
    +            testValues.add(new Values("bob"));
    +            testValues.add(new Values("joey"));
    +            testValues.add(new Values("nathan"));
    +            List<FixedTuple> testTuples = testValues.stream()
    +                .map(value -> new FixedTuple(value))
    +                .collect(Collectors.toList());
    --- End diff --
    
    Wouldn't it be simpler to do something like.
    
    ```
    List<FixedTuples> testTuples = Arrays.asList("nathan", "bob", "joey", "nathan").stream()
      .map(name -> new FixedTuple(new Values(name)))
      .collect(Collectors.toList());
    ```


---

[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2927#discussion_r242739488
  
    --- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java ---
    @@ -0,0 +1,927 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.integration;
    +
    +import static org.apache.storm.integration.AssertLoop.assertAcked;
    +import static org.apache.storm.integration.AssertLoop.assertFailed;
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.hamcrest.Matchers.contains;
    +import static org.hamcrest.Matchers.containsInAnyOrder;
    +import static org.junit.Assert.assertThat;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +import org.apache.storm.Config;
    +import org.apache.storm.LocalCluster;
    +import org.apache.storm.Testing;
    +import org.apache.storm.Thrift;
    +import org.apache.storm.Thrift.BoltDetails;
    +import org.apache.storm.Thrift.SpoutDetails;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.generated.InvalidTopologyException;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.generated.SubmitOptions;
    +import org.apache.storm.generated.TopologyInitialStatus;
    +import org.apache.storm.hooks.BaseTaskHook;
    +import org.apache.storm.hooks.info.BoltAckInfo;
    +import org.apache.storm.hooks.info.BoltExecuteInfo;
    +import org.apache.storm.hooks.info.BoltFailInfo;
    +import org.apache.storm.hooks.info.EmitInfo;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.testing.AckFailMapTracker;
    +import org.apache.storm.testing.CompleteTopologyParam;
    +import org.apache.storm.testing.FeederSpout;
    +import org.apache.storm.testing.FixedTuple;
    +import org.apache.storm.testing.IntegrationTest;
    +import org.apache.storm.testing.MockedSources;
    +import org.apache.storm.testing.TestAggregatesCounter;
    +import org.apache.storm.testing.TestConfBolt;
    +import org.apache.storm.testing.TestGlobalCount;
    +import org.apache.storm.testing.TestPlannerSpout;
    +import org.apache.storm.testing.TestWordCounter;
    +import org.apache.storm.testing.TestWordSpout;
    +import org.apache.storm.testing.TrackedTopology;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.tuple.Values;
    +import org.apache.storm.utils.Utils;
    +import org.junit.jupiter.api.Test;
    +import org.junit.jupiter.params.ParameterizedTest;
    +import org.junit.jupiter.params.provider.ValueSource;
    +
    +@IntegrationTest
    +public class TopologyIntegrationTest {
    +
    +    @ParameterizedTest
    +    @ValueSource(strings = {"true", "false"})
    +    public void testBasicTopology(boolean useLocalMessaging) throws Exception {
    +        try (LocalCluster cluster = new LocalCluster.Builder()
    +            .withSimulatedTime()
    +            .withSupervisors(4)
    +            .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
    +            .build()) {
    +            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
    +            Map<String, BoltDetails> boltMap = new HashMap<>();
    +            boltMap.put("2",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("1", null),
    +                        Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
    +                    new TestWordCounter(), 4));
    +            boltMap.put("3",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("1", null),
    +                        Thrift.prepareGlobalGrouping()),
    +                    new TestGlobalCount()));
    +            boltMap.put("4",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("2", null),
    +                        Thrift.prepareGlobalGrouping()),
    +                    new TestAggregatesCounter()));
    +            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
    +
    +            Map<String, Object> stormConf = new HashMap<>();
    +            stormConf.put(Config.TOPOLOGY_WORKERS, 2);
    +            stormConf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
    +
    +            List<Values> testValues = new ArrayList<>();
    +            testValues.add(new Values("nathan"));
    +            testValues.add(new Values("bob"));
    +            testValues.add(new Values("joey"));
    +            testValues.add(new Values("nathan"));
    +            List<FixedTuple> testTuples = testValues.stream()
    +                .map(value -> new FixedTuple(value))
    +                .collect(Collectors.toList());
    --- End diff --
    
    Fixed


---

[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2927#discussion_r242698501
  
    --- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java ---
    @@ -0,0 +1,927 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.integration;
    +
    +import static org.apache.storm.integration.AssertLoop.assertAcked;
    +import static org.apache.storm.integration.AssertLoop.assertFailed;
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.hamcrest.Matchers.contains;
    +import static org.hamcrest.Matchers.containsInAnyOrder;
    +import static org.junit.Assert.assertThat;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +import org.apache.storm.Config;
    +import org.apache.storm.LocalCluster;
    +import org.apache.storm.Testing;
    +import org.apache.storm.Thrift;
    +import org.apache.storm.Thrift.BoltDetails;
    +import org.apache.storm.Thrift.SpoutDetails;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.generated.InvalidTopologyException;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.generated.SubmitOptions;
    +import org.apache.storm.generated.TopologyInitialStatus;
    +import org.apache.storm.hooks.BaseTaskHook;
    +import org.apache.storm.hooks.info.BoltAckInfo;
    +import org.apache.storm.hooks.info.BoltExecuteInfo;
    +import org.apache.storm.hooks.info.BoltFailInfo;
    +import org.apache.storm.hooks.info.EmitInfo;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.testing.AckFailMapTracker;
    +import org.apache.storm.testing.CompleteTopologyParam;
    +import org.apache.storm.testing.FeederSpout;
    +import org.apache.storm.testing.FixedTuple;
    +import org.apache.storm.testing.IntegrationTest;
    +import org.apache.storm.testing.MockedSources;
    +import org.apache.storm.testing.TestAggregatesCounter;
    +import org.apache.storm.testing.TestConfBolt;
    +import org.apache.storm.testing.TestGlobalCount;
    +import org.apache.storm.testing.TestPlannerSpout;
    +import org.apache.storm.testing.TestWordCounter;
    +import org.apache.storm.testing.TestWordSpout;
    +import org.apache.storm.testing.TrackedTopology;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.tuple.Values;
    +import org.apache.storm.utils.Utils;
    +import org.junit.jupiter.api.Test;
    +import org.junit.jupiter.params.ParameterizedTest;
    +import org.junit.jupiter.params.provider.ValueSource;
    +
    +@IntegrationTest
    +public class TopologyIntegrationTest {
    +
    +    @ParameterizedTest
    +    @ValueSource(strings = {"true", "false"})
    +    public void testBasicTopology(boolean useLocalMessaging) throws Exception {
    +        try (LocalCluster cluster = new LocalCluster.Builder()
    +            .withSimulatedTime()
    +            .withSupervisors(4)
    +            .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
    +            .build()) {
    +            Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
    +            Map<String, BoltDetails> boltMap = new HashMap<>();
    +            boltMap.put("2",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("1", null),
    +                        Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
    +                    new TestWordCounter(), 4));
    +            boltMap.put("3",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("1", null),
    +                        Thrift.prepareGlobalGrouping()),
    +                    new TestGlobalCount()));
    +            boltMap.put("4",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("2", null),
    +                        Thrift.prepareGlobalGrouping()),
    +                    new TestAggregatesCounter()));
    +            StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
    +
    +            Map<String, Object> stormConf = new HashMap<>();
    +            stormConf.put(Config.TOPOLOGY_WORKERS, 2);
    +            stormConf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
    +
    +            List<Values> testValues = new ArrayList<>();
    +            testValues.add(new Values("nathan"));
    +            testValues.add(new Values("bob"));
    +            testValues.add(new Values("joey"));
    +            testValues.add(new Values("nathan"));
    +            List<FixedTuple> testTuples = testValues.stream()
    +                .map(value -> new FixedTuple(value))
    +                .collect(Collectors.toList());
    --- End diff --
    
    Yes, absolutely. I'll change it.


---