You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/04/13 14:54:30 UTC
[storm] branch master updated: [STORM-1293] port
storm.messaging.netty-integration-test to java (#3233)
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 1a382d4 [STORM-1293] port storm.messaging.netty-integration-test to java (#3233)
1a382d4 is described below
commit 1a382d477b550723984e46acf24dd957fd043aa3
Author: nd368 <na...@hotmail.co.uk>
AuthorDate: Mon Apr 13 15:54:13 2020 +0100
[STORM-1293] port storm.messaging.netty-integration-test to java (#3233)
---
.../storm/messaging/netty_integration_test.clj | 66 ---------------
.../storm/messaging/NettyIntegrationTest.java | 98 ++++++++++++++++++++++
2 files changed, 98 insertions(+), 66 deletions(-)
diff --git a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
deleted file mode 100644
index 8e813af..0000000
--- a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
+++ /dev/null
@@ -1,66 +0,0 @@
-
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.messaging.netty-integration-test
- (:use [clojure test])
- (:import [org.apache.storm.messaging TransportFactory]
- [org.apache.storm Thrift Testing LocalCluster$Builder])
- (:import [org.apache.storm.testing CompleteTopologyParam MockedSources TestWordSpout TestGlobalCount])
- (:import [org.apache.storm.utils Utils])
- (:use [org.apache.storm util config]))
-
-(deftest test-integration
- (with-open [cluster (.build (doto (LocalCluster$Builder.)
- (.withSimulatedTime)
- (.withSupervisors 4)
- (.withSupervisorSlotPortMin 6710)
- (.withDaemonConf {STORM-LOCAL-MODE-ZMQ true
- STORM-MESSAGING-TRANSPORT "org.apache.storm.messaging.netty.Context"
- STORM-MESSAGING-NETTY-AUTHENTICATION false
- STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
- STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
- STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
- STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
- STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1})))]
- (let [topology (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails
- (TestWordSpout. true) (Integer. 4))}
- {"2" (Thrift/prepareBoltDetails
- {(Utils/getGlobalStreamId "1" nil)
- (Thrift/prepareShuffleGrouping)}
- (TestGlobalCount.) (Integer. 6))})
- results (Testing/completeTopology cluster
- topology
- (doto (CompleteTopologyParam.)
- ;; important for test that
- ;; #tuples = multiple of 4 and 6
- (.setStormConf {TOPOLOGY-WORKERS 3})
- (.setMockedSources
- (MockedSources. {"1" [["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ]}
- ))))]
- (is (= (* 6 4) (.size (Testing/readTuples results "2")))))))
diff --git a/storm-core/test/jvm/org/apache/storm/messaging/NettyIntegrationTest.java b/storm-core/test/jvm/org/apache/storm/messaging/NettyIntegrationTest.java
new file mode 100644
index 0000000..ddb8663
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/messaging/NettyIntegrationTest.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.messaging;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.Builder;
+import org.apache.storm.Testing;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.testing.CompleteTopologyParam;
+import org.apache.storm.testing.FixedTuple;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MockedSources;
+import org.apache.storm.testing.TestGlobalCount;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Values;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+
+@IntegrationTest
+public class NettyIntegrationTest {
+
+ @Test
+ public void testIntegration() throws Exception {
+ Map<String, Object> daemonConf = new HashMap<>();
+ daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true);
+ daemonConf.put(Config.STORM_MESSAGING_TRANSPORT, "org.apache.storm.messaging.netty.Context");
+ daemonConf.put(Config.STORM_MESSAGING_NETTY_AUTHENTICATION, false);
+ daemonConf.put(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE, 1024000);
+ daemonConf.put(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS, 1000);
+ daemonConf.put(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS, 5000);
+ daemonConf.put(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS, 1);
+ daemonConf.put(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS, 1);
+
+ Builder builder = new Builder()
+ .withSimulatedTime()
+ .withSupervisors(4)
+ .withSupervisorSlotPortMin(6710)
+ .withDaemonConf(daemonConf);
+
+ try (LocalCluster cluster = builder.build()) {
+
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ topologyBuilder.setSpout("1", new TestWordSpout(true), 4);
+ topologyBuilder.setBolt("2", new TestGlobalCount(), 6).shuffleGrouping("1");
+ StormTopology topology = topologyBuilder.createTopology();
+
+ // important for test that tuples = multiple of 4 and 6
+ List<FixedTuple> testTuples = Stream.of("a", "b",
+ "a", "b",
+ "a", "b",
+ "a", "b",
+ "a", "b",
+ "a", "b",
+ "a", "b",
+ "a", "b",
+ "a", "b",
+ "a", "b",
+ "a", "b",
+ "a", "b")
+ .map(value -> new FixedTuple(new Values(value)))
+ .collect(Collectors.toList());
+
+ MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
+
+ CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
+ completeTopologyParams.setStormConf(Collections.singletonMap(Config.TOPOLOGY_WORKERS, 3));
+ completeTopologyParams.setMockedSources(mockedSources);
+
+ Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
+
+ assertEquals(6 * 4, Testing.readTuples(results, "2").size());
+ }
+ }
+}